From c8220e69a11e16db050f450eeebdad9bc521eac9 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sat, 4 Apr 2026 19:05:27 -0700 Subject: [PATCH] fix: strip MEDIA: directives from streamed gateway messages (#5152) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When streaming is enabled, the GatewayStreamConsumer sends raw text chunks directly to the platform without post-processing. This causes MEDIA:/path/to/file tags and [[audio_as_voice]] directives to appear as visible text in the user's chat instead of being stripped. The non-streaming path already handles this correctly via extract_media() in base.py, but the streaming path was missing equivalent cleanup. Add _clean_for_display() to GatewayStreamConsumer that strips MEDIA: tags and internal markers before any text reaches the platform. The actual media file delivery is unaffected — _deliver_media_from_response() in gateway/run.py still extracts files from the agent's final_response (separate from the stream consumer's display text). Reported by Ao [FotM] on Discord. --- gateway/stream_consumer.py | 32 +++++ tests/gateway/test_stream_consumer.py | 179 ++++++++++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 tests/gateway/test_stream_consumer.py diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 4a3cf744a..7f4a73d0b 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -18,6 +18,7 @@ from __future__ import annotations import asyncio import logging import queue +import re import time from dataclasses import dataclass from typing import Any, Optional @@ -156,8 +157,39 @@ class GatewayStreamConsumer: except Exception as e: logger.error("Stream consumer error: %s", e) + # Pattern to strip MEDIA: tags (including optional surrounding quotes). + # Matches the simple cleanup regex used by the non-streaming path in + # gateway/platforms/base.py for post-processing. + _MEDIA_RE = re.compile(r'''[`"']?MEDIA:\s*\S+[`"']?''') + + @staticmethod + def _clean_for_display(text: str) -> str: + """Strip MEDIA: directives and internal markers from text before display. + + The streaming path delivers raw text chunks that may include + ``MEDIA:`` tags and ``[[audio_as_voice]]`` directives meant for + the platform adapter's post-processing. The actual media files are + delivered separately via ``_deliver_media_from_response()`` after the + stream finishes — we just need to hide the raw directives from the + user. + """ + if "MEDIA:" not in text and "[[audio_as_voice]]" not in text: + return text + cleaned = text.replace("[[audio_as_voice]]", "") + cleaned = GatewayStreamConsumer._MEDIA_RE.sub("", cleaned) + # Collapse excessive blank lines left behind by removed tags + cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) + # Strip trailing whitespace/newlines but preserve leading content + return cleaned.rstrip() + async def _send_or_edit(self, text: str) -> None: """Send or edit the streaming message.""" + # Strip MEDIA: directives so they don't appear as visible text. + # Media files are delivered as native attachments after the stream + # finishes (via _deliver_media_from_response in gateway/run.py). + text = self._clean_for_display(text) + if not text.strip(): + return try: if self._message_id is not None: if self._edit_supported: diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py new file mode 100644 index 000000000..1234307ca --- /dev/null +++ b/tests/gateway/test_stream_consumer.py @@ -0,0 +1,179 @@ +"""Tests for GatewayStreamConsumer — media directive stripping in streaming.""" + +import asyncio +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig + + +# ── _clean_for_display unit tests ──────────────────────────────────────── + + +class TestCleanForDisplay: + """Verify MEDIA: directives and internal markers are stripped from display text.""" + + def test_no_media_passthrough(self): + """Text without MEDIA: passes through unchanged.""" + text = "Here is your analysis of the image." + assert GatewayStreamConsumer._clean_for_display(text) == text + + def test_media_tag_stripped(self): + """Basic MEDIA: tag is removed.""" + text = "Here is the image\nMEDIA:/tmp/hermes/image.png" + result = GatewayStreamConsumer._clean_for_display(text) + assert "MEDIA:" not in result + assert "Here is the image" in result + + def test_media_tag_with_space(self): + """MEDIA: tag with space after colon is removed.""" + text = "Audio generated\nMEDIA: /home/user/.hermes/audio_cache/voice.mp3" + result = GatewayStreamConsumer._clean_for_display(text) + assert "MEDIA:" not in result + assert "Audio generated" in result + + def test_media_tag_with_quotes(self): + """MEDIA: tags wrapped in quotes or backticks are removed.""" + for wrapper in ['`MEDIA:/path/file.png`', '"MEDIA:/path/file.png"', "'MEDIA:/path/file.png'"]: + text = f"Result: {wrapper}" + result = GatewayStreamConsumer._clean_for_display(text) + assert "MEDIA:" not in result, f"Failed for wrapper: {wrapper}" + + def test_audio_as_voice_stripped(self): + """[[audio_as_voice]] directive is removed.""" + text = "[[audio_as_voice]]\nMEDIA:/tmp/voice.ogg" + result = GatewayStreamConsumer._clean_for_display(text) + assert "[[audio_as_voice]]" not in result + assert "MEDIA:" not in result + + def test_multiple_media_tags(self): + """Multiple MEDIA: tags are all removed.""" + text = "Here are two files:\nMEDIA:/tmp/a.png\nMEDIA:/tmp/b.jpg" + result = GatewayStreamConsumer._clean_for_display(text) + assert "MEDIA:" not in result + assert "Here are two files:" in result + + def test_excessive_newlines_collapsed(self): + """Blank lines left by removed tags are collapsed.""" + text = "Before\n\n\nMEDIA:/tmp/file.png\n\n\nAfter" + result = GatewayStreamConsumer._clean_for_display(text) + # Should not have 3+ consecutive newlines + assert "\n\n\n" not in result + + def test_media_only_response(self): + """Response that is entirely MEDIA: tags returns empty/whitespace.""" + text = "MEDIA:/tmp/image.png" + result = GatewayStreamConsumer._clean_for_display(text) + assert result.strip() == "" + + def test_media_mid_sentence(self): + """MEDIA: tag embedded in prose is stripped cleanly.""" + text = "I generated this image MEDIA:/tmp/art.png for you." + result = GatewayStreamConsumer._clean_for_display(text) + assert "MEDIA:" not in result + assert "generated" in result + assert "for you." in result + + def test_preserves_non_media_colons(self): + """Normal colons and text with 'MEDIA' as a word aren't stripped.""" + text = "The media: files are stored in /tmp. Use social MEDIA carefully." + result = GatewayStreamConsumer._clean_for_display(text) + # "MEDIA:" in upper case without a path won't match \S+ (space follows) + # But "media:" is lowercase so won't match either + assert result == text + + +# ── Integration: _send_or_edit strips MEDIA: ───────────────────────────── + + +class TestSendOrEditMediaStripping: + """Verify _send_or_edit strips MEDIA: before sending to the platform.""" + + @pytest.mark.asyncio + async def test_first_send_strips_media(self): + """Initial send removes MEDIA: tags from visible text.""" + adapter = MagicMock() + send_result = SimpleNamespace(success=True, message_id="msg_1") + adapter.send = AsyncMock(return_value=send_result) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer(adapter, "chat_123") + await consumer._send_or_edit("Here is your image\nMEDIA:/tmp/test.png") + + adapter.send.assert_called_once() + sent_text = adapter.send.call_args[1]["content"] + assert "MEDIA:" not in sent_text + assert "Here is your image" in sent_text + + @pytest.mark.asyncio + async def test_edit_strips_media(self): + """Edit call removes MEDIA: tags from visible text.""" + adapter = MagicMock() + send_result = SimpleNamespace(success=True, message_id="msg_1") + edit_result = SimpleNamespace(success=True) + adapter.send = AsyncMock(return_value=send_result) + adapter.edit_message = AsyncMock(return_value=edit_result) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer(adapter, "chat_123") + # First send + await consumer._send_or_edit("Starting response...") + # Edit with MEDIA: tag + await consumer._send_or_edit("Here is the result\nMEDIA:/tmp/image.png") + + adapter.edit_message.assert_called_once() + edited_text = adapter.edit_message.call_args[1]["content"] + assert "MEDIA:" not in edited_text + + @pytest.mark.asyncio + async def test_media_only_skips_send(self): + """If text is entirely MEDIA: tags, the send is skipped.""" + adapter = MagicMock() + adapter.send = AsyncMock() + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer(adapter, "chat_123") + await consumer._send_or_edit("MEDIA:/tmp/image.png") + + adapter.send.assert_not_called() + + +# ── Integration: full stream run ───────────────────────────────────────── + + +class TestStreamRunMediaStripping: + """End-to-end: deltas with MEDIA: produce clean visible text.""" + + @pytest.mark.asyncio + async def test_stream_with_media_tag(self): + """Full stream run strips MEDIA: from the final visible message.""" + adapter = MagicMock() + send_result = SimpleNamespace(success=True, message_id="msg_1") + edit_result = SimpleNamespace(success=True) + adapter.send = AsyncMock(return_value=send_result) + adapter.edit_message = AsyncMock(return_value=edit_result) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + # Feed deltas + consumer.on_delta("Here is your generated image\n") + consumer.on_delta("MEDIA:/home/user/.hermes/cache/images/abc123.png") + consumer.finish() + + await consumer.run() + + # Verify the final text sent/edited doesn't contain MEDIA: + all_calls = [] + for call in adapter.send.call_args_list: + all_calls.append(call[1].get("content", "")) + for call in adapter.edit_message.call_args_list: + all_calls.append(call[1].get("content", "")) + + for sent_text in all_calls: + assert "MEDIA:" not in sent_text, f"MEDIA: leaked into display: {sent_text!r}" + + assert consumer.already_sent