fix: strip MEDIA: directives from streamed gateway messages (#5152)
When streaming is enabled, the GatewayStreamConsumer sends raw text chunks directly to the platform without post-processing. This causes MEDIA:/path/to/file tags and [[audio_as_voice]] directives to appear as visible text in the user's chat instead of being stripped. The non-streaming path already handles this correctly via extract_media() in base.py, but the streaming path was missing equivalent cleanup. Add _clean_for_display() to GatewayStreamConsumer that strips MEDIA: tags and internal markers before any text reaches the platform. The actual media file delivery is unaffected — _deliver_media_from_response() in gateway/run.py still extracts files from the agent's final_response (separate from the stream consumer's display text). Reported by Ao [FotM] on Discord.
This commit is contained in:
@@ -18,6 +18,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
import queue
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Optional
|
||||
@@ -156,8 +157,39 @@ class GatewayStreamConsumer:
|
||||
except Exception as e:
|
||||
logger.error("Stream consumer error: %s", e)
|
||||
|
||||
# Pattern to strip MEDIA:<path> tags (including optional surrounding quotes).
|
||||
# Matches the simple cleanup regex used by the non-streaming path in
|
||||
# gateway/platforms/base.py for post-processing.
|
||||
_MEDIA_RE = re.compile(r'''[`"']?MEDIA:\s*\S+[`"']?''')
|
||||
|
||||
@staticmethod
|
||||
def _clean_for_display(text: str) -> str:
|
||||
"""Strip MEDIA: directives and internal markers from text before display.
|
||||
|
||||
The streaming path delivers raw text chunks that may include
|
||||
``MEDIA:<path>`` tags and ``[[audio_as_voice]]`` directives meant for
|
||||
the platform adapter's post-processing. The actual media files are
|
||||
delivered separately via ``_deliver_media_from_response()`` after the
|
||||
stream finishes — we just need to hide the raw directives from the
|
||||
user.
|
||||
"""
|
||||
if "MEDIA:" not in text and "[[audio_as_voice]]" not in text:
|
||||
return text
|
||||
cleaned = text.replace("[[audio_as_voice]]", "")
|
||||
cleaned = GatewayStreamConsumer._MEDIA_RE.sub("", cleaned)
|
||||
# Collapse excessive blank lines left behind by removed tags
|
||||
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
|
||||
# Strip trailing whitespace/newlines but preserve leading content
|
||||
return cleaned.rstrip()
|
||||
|
||||
async def _send_or_edit(self, text: str) -> None:
|
||||
"""Send or edit the streaming message."""
|
||||
# Strip MEDIA: directives so they don't appear as visible text.
|
||||
# Media files are delivered as native attachments after the stream
|
||||
# finishes (via _deliver_media_from_response in gateway/run.py).
|
||||
text = self._clean_for_display(text)
|
||||
if not text.strip():
|
||||
return
|
||||
try:
|
||||
if self._message_id is not None:
|
||||
if self._edit_supported:
|
||||
|
||||
179
tests/gateway/test_stream_consumer.py
Normal file
179
tests/gateway/test_stream_consumer.py
Normal file
@@ -0,0 +1,179 @@
|
||||
"""Tests for GatewayStreamConsumer — media directive stripping in streaming."""
|
||||
|
||||
import asyncio
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
|
||||
|
||||
|
||||
# ── _clean_for_display unit tests ────────────────────────────────────────
|
||||
|
||||
|
||||
class TestCleanForDisplay:
|
||||
"""Verify MEDIA: directives and internal markers are stripped from display text."""
|
||||
|
||||
def test_no_media_passthrough(self):
|
||||
"""Text without MEDIA: passes through unchanged."""
|
||||
text = "Here is your analysis of the image."
|
||||
assert GatewayStreamConsumer._clean_for_display(text) == text
|
||||
|
||||
def test_media_tag_stripped(self):
|
||||
"""Basic MEDIA:<path> tag is removed."""
|
||||
text = "Here is the image\nMEDIA:/tmp/hermes/image.png"
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert "MEDIA:" not in result
|
||||
assert "Here is the image" in result
|
||||
|
||||
def test_media_tag_with_space(self):
|
||||
"""MEDIA: tag with space after colon is removed."""
|
||||
text = "Audio generated\nMEDIA: /home/user/.hermes/audio_cache/voice.mp3"
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert "MEDIA:" not in result
|
||||
assert "Audio generated" in result
|
||||
|
||||
def test_media_tag_with_quotes(self):
|
||||
"""MEDIA: tags wrapped in quotes or backticks are removed."""
|
||||
for wrapper in ['`MEDIA:/path/file.png`', '"MEDIA:/path/file.png"', "'MEDIA:/path/file.png'"]:
|
||||
text = f"Result: {wrapper}"
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert "MEDIA:" not in result, f"Failed for wrapper: {wrapper}"
|
||||
|
||||
def test_audio_as_voice_stripped(self):
|
||||
"""[[audio_as_voice]] directive is removed."""
|
||||
text = "[[audio_as_voice]]\nMEDIA:/tmp/voice.ogg"
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert "[[audio_as_voice]]" not in result
|
||||
assert "MEDIA:" not in result
|
||||
|
||||
def test_multiple_media_tags(self):
|
||||
"""Multiple MEDIA: tags are all removed."""
|
||||
text = "Here are two files:\nMEDIA:/tmp/a.png\nMEDIA:/tmp/b.jpg"
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert "MEDIA:" not in result
|
||||
assert "Here are two files:" in result
|
||||
|
||||
def test_excessive_newlines_collapsed(self):
|
||||
"""Blank lines left by removed tags are collapsed."""
|
||||
text = "Before\n\n\nMEDIA:/tmp/file.png\n\n\nAfter"
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
# Should not have 3+ consecutive newlines
|
||||
assert "\n\n\n" not in result
|
||||
|
||||
def test_media_only_response(self):
|
||||
"""Response that is entirely MEDIA: tags returns empty/whitespace."""
|
||||
text = "MEDIA:/tmp/image.png"
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert result.strip() == ""
|
||||
|
||||
def test_media_mid_sentence(self):
|
||||
"""MEDIA: tag embedded in prose is stripped cleanly."""
|
||||
text = "I generated this image MEDIA:/tmp/art.png for you."
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert "MEDIA:" not in result
|
||||
assert "generated" in result
|
||||
assert "for you." in result
|
||||
|
||||
def test_preserves_non_media_colons(self):
|
||||
"""Normal colons and text with 'MEDIA' as a word aren't stripped."""
|
||||
text = "The media: files are stored in /tmp. Use social MEDIA carefully."
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
# "MEDIA:" in upper case without a path won't match \S+ (space follows)
|
||||
# But "media:" is lowercase so won't match either
|
||||
assert result == text
|
||||
|
||||
|
||||
# ── Integration: _send_or_edit strips MEDIA: ─────────────────────────────
|
||||
|
||||
|
||||
class TestSendOrEditMediaStripping:
|
||||
"""Verify _send_or_edit strips MEDIA: before sending to the platform."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_first_send_strips_media(self):
|
||||
"""Initial send removes MEDIA: tags from visible text."""
|
||||
adapter = MagicMock()
|
||||
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
||||
adapter.send = AsyncMock(return_value=send_result)
|
||||
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||
|
||||
consumer = GatewayStreamConsumer(adapter, "chat_123")
|
||||
await consumer._send_or_edit("Here is your image\nMEDIA:/tmp/test.png")
|
||||
|
||||
adapter.send.assert_called_once()
|
||||
sent_text = adapter.send.call_args[1]["content"]
|
||||
assert "MEDIA:" not in sent_text
|
||||
assert "Here is your image" in sent_text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_edit_strips_media(self):
|
||||
"""Edit call removes MEDIA: tags from visible text."""
|
||||
adapter = MagicMock()
|
||||
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
||||
edit_result = SimpleNamespace(success=True)
|
||||
adapter.send = AsyncMock(return_value=send_result)
|
||||
adapter.edit_message = AsyncMock(return_value=edit_result)
|
||||
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||
|
||||
consumer = GatewayStreamConsumer(adapter, "chat_123")
|
||||
# First send
|
||||
await consumer._send_or_edit("Starting response...")
|
||||
# Edit with MEDIA: tag
|
||||
await consumer._send_or_edit("Here is the result\nMEDIA:/tmp/image.png")
|
||||
|
||||
adapter.edit_message.assert_called_once()
|
||||
edited_text = adapter.edit_message.call_args[1]["content"]
|
||||
assert "MEDIA:" not in edited_text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_media_only_skips_send(self):
|
||||
"""If text is entirely MEDIA: tags, the send is skipped."""
|
||||
adapter = MagicMock()
|
||||
adapter.send = AsyncMock()
|
||||
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||
|
||||
consumer = GatewayStreamConsumer(adapter, "chat_123")
|
||||
await consumer._send_or_edit("MEDIA:/tmp/image.png")
|
||||
|
||||
adapter.send.assert_not_called()
|
||||
|
||||
|
||||
# ── Integration: full stream run ─────────────────────────────────────────
|
||||
|
||||
|
||||
class TestStreamRunMediaStripping:
|
||||
"""End-to-end: deltas with MEDIA: produce clean visible text."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_with_media_tag(self):
|
||||
"""Full stream run strips MEDIA: from the final visible message."""
|
||||
adapter = MagicMock()
|
||||
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
||||
edit_result = SimpleNamespace(success=True)
|
||||
adapter.send = AsyncMock(return_value=send_result)
|
||||
adapter.edit_message = AsyncMock(return_value=edit_result)
|
||||
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||
|
||||
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
||||
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
||||
|
||||
# Feed deltas
|
||||
consumer.on_delta("Here is your generated image\n")
|
||||
consumer.on_delta("MEDIA:/home/user/.hermes/cache/images/abc123.png")
|
||||
consumer.finish()
|
||||
|
||||
await consumer.run()
|
||||
|
||||
# Verify the final text sent/edited doesn't contain MEDIA:
|
||||
all_calls = []
|
||||
for call in adapter.send.call_args_list:
|
||||
all_calls.append(call[1].get("content", ""))
|
||||
for call in adapter.edit_message.call_args_list:
|
||||
all_calls.append(call[1].get("content", ""))
|
||||
|
||||
for sent_text in all_calls:
|
||||
assert "MEDIA:" not in sent_text, f"MEDIA: leaked into display: {sent_text!r}"
|
||||
|
||||
assert consumer.already_sent
|
||||
Reference in New Issue
Block a user