Files
hermes-agent/tests/gateway/test_stream_consumer.py
Teknium 8dee82ea1e fix: stream consumer creates new message after tool boundaries (#5739)
When streaming was enabled on the gateway, the stream consumer created a
single message at the start and kept editing it as tokens arrived. Tool
progress messages were sent as separate messages below it. Since edits
don't change message position on Telegram/Matrix/Discord, the final
response ended up stuck above all tool progress messages — users had to
scroll up past potentially dozens of tool call lines to read the answer.

The agent already sends stream_delta_callback(None) at tool boundaries
(before _execute_tool_calls). The stream consumer was ignoring this
signal. Now it treats None as a segment break: finalizes the current
message (removes cursor), resets _message_id, and the next text chunk
creates a fresh message below the tool progress messages.

Timeline before:
  [msg 1: 'Let me search...' → edits → 'Here is the answer'] ← top
  [msg 2: tool progress lines]                                ← bottom

Timeline after:
  [msg 1: 'Let me search...']          ← top
  [msg 2: tool progress lines]
  [msg 3: 'Here is the answer']        ← bottom (visible)

Reported by SkyLinx on Discord.
2026-04-06 23:00:14 -07:00

327 lines
14 KiB
Python

"""Tests for GatewayStreamConsumer — media directive stripping in streaming."""
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
# ── _clean_for_display unit tests ────────────────────────────────────────
class TestCleanForDisplay:
"""Verify MEDIA: directives and internal markers are stripped from display text."""
def test_no_media_passthrough(self):
"""Text without MEDIA: passes through unchanged."""
text = "Here is your analysis of the image."
assert GatewayStreamConsumer._clean_for_display(text) == text
def test_media_tag_stripped(self):
"""Basic MEDIA:<path> tag is removed."""
text = "Here is the image\nMEDIA:/tmp/hermes/image.png"
result = GatewayStreamConsumer._clean_for_display(text)
assert "MEDIA:" not in result
assert "Here is the image" in result
def test_media_tag_with_space(self):
"""MEDIA: tag with space after colon is removed."""
text = "Audio generated\nMEDIA: /home/user/.hermes/audio_cache/voice.mp3"
result = GatewayStreamConsumer._clean_for_display(text)
assert "MEDIA:" not in result
assert "Audio generated" in result
def test_media_tag_with_quotes(self):
"""MEDIA: tags wrapped in quotes or backticks are removed."""
for wrapper in ['`MEDIA:/path/file.png`', '"MEDIA:/path/file.png"', "'MEDIA:/path/file.png'"]:
text = f"Result: {wrapper}"
result = GatewayStreamConsumer._clean_for_display(text)
assert "MEDIA:" not in result, f"Failed for wrapper: {wrapper}"
def test_audio_as_voice_stripped(self):
"""[[audio_as_voice]] directive is removed."""
text = "[[audio_as_voice]]\nMEDIA:/tmp/voice.ogg"
result = GatewayStreamConsumer._clean_for_display(text)
assert "[[audio_as_voice]]" not in result
assert "MEDIA:" not in result
def test_multiple_media_tags(self):
"""Multiple MEDIA: tags are all removed."""
text = "Here are two files:\nMEDIA:/tmp/a.png\nMEDIA:/tmp/b.jpg"
result = GatewayStreamConsumer._clean_for_display(text)
assert "MEDIA:" not in result
assert "Here are two files:" in result
def test_excessive_newlines_collapsed(self):
"""Blank lines left by removed tags are collapsed."""
text = "Before\n\n\nMEDIA:/tmp/file.png\n\n\nAfter"
result = GatewayStreamConsumer._clean_for_display(text)
# Should not have 3+ consecutive newlines
assert "\n\n\n" not in result
def test_media_only_response(self):
"""Response that is entirely MEDIA: tags returns empty/whitespace."""
text = "MEDIA:/tmp/image.png"
result = GatewayStreamConsumer._clean_for_display(text)
assert result.strip() == ""
def test_media_mid_sentence(self):
"""MEDIA: tag embedded in prose is stripped cleanly."""
text = "I generated this image MEDIA:/tmp/art.png for you."
result = GatewayStreamConsumer._clean_for_display(text)
assert "MEDIA:" not in result
assert "generated" in result
assert "for you." in result
def test_preserves_non_media_colons(self):
"""Normal colons and text with 'MEDIA' as a word aren't stripped."""
text = "The media: files are stored in /tmp. Use social MEDIA carefully."
result = GatewayStreamConsumer._clean_for_display(text)
# "MEDIA:" in upper case without a path won't match \S+ (space follows)
# But "media:" is lowercase so won't match either
assert result == text
# ── Integration: _send_or_edit strips MEDIA: ─────────────────────────────
class TestSendOrEditMediaStripping:
"""Verify _send_or_edit strips MEDIA: before sending to the platform."""
@pytest.mark.asyncio
async def test_first_send_strips_media(self):
"""Initial send removes MEDIA: tags from visible text."""
adapter = MagicMock()
send_result = SimpleNamespace(success=True, message_id="msg_1")
adapter.send = AsyncMock(return_value=send_result)
adapter.MAX_MESSAGE_LENGTH = 4096
consumer = GatewayStreamConsumer(adapter, "chat_123")
await consumer._send_or_edit("Here is your image\nMEDIA:/tmp/test.png")
adapter.send.assert_called_once()
sent_text = adapter.send.call_args[1]["content"]
assert "MEDIA:" not in sent_text
assert "Here is your image" in sent_text
@pytest.mark.asyncio
async def test_edit_strips_media(self):
"""Edit call removes MEDIA: tags from visible text."""
adapter = MagicMock()
send_result = SimpleNamespace(success=True, message_id="msg_1")
edit_result = SimpleNamespace(success=True)
adapter.send = AsyncMock(return_value=send_result)
adapter.edit_message = AsyncMock(return_value=edit_result)
adapter.MAX_MESSAGE_LENGTH = 4096
consumer = GatewayStreamConsumer(adapter, "chat_123")
# First send
await consumer._send_or_edit("Starting response...")
# Edit with MEDIA: tag
await consumer._send_or_edit("Here is the result\nMEDIA:/tmp/image.png")
adapter.edit_message.assert_called_once()
edited_text = adapter.edit_message.call_args[1]["content"]
assert "MEDIA:" not in edited_text
@pytest.mark.asyncio
async def test_media_only_skips_send(self):
"""If text is entirely MEDIA: tags, the send is skipped."""
adapter = MagicMock()
adapter.send = AsyncMock()
adapter.MAX_MESSAGE_LENGTH = 4096
consumer = GatewayStreamConsumer(adapter, "chat_123")
await consumer._send_or_edit("MEDIA:/tmp/image.png")
adapter.send.assert_not_called()
# ── Integration: full stream run ─────────────────────────────────────────
class TestStreamRunMediaStripping:
"""End-to-end: deltas with MEDIA: produce clean visible text."""
@pytest.mark.asyncio
async def test_stream_with_media_tag(self):
"""Full stream run strips MEDIA: from the final visible message."""
adapter = MagicMock()
send_result = SimpleNamespace(success=True, message_id="msg_1")
edit_result = SimpleNamespace(success=True)
adapter.send = AsyncMock(return_value=send_result)
adapter.edit_message = AsyncMock(return_value=edit_result)
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
# Feed deltas
consumer.on_delta("Here is your generated image\n")
consumer.on_delta("MEDIA:/home/user/.hermes/cache/images/abc123.png")
consumer.finish()
await consumer.run()
# Verify the final text sent/edited doesn't contain MEDIA:
all_calls = []
for call in adapter.send.call_args_list:
all_calls.append(call[1].get("content", ""))
for call in adapter.edit_message.call_args_list:
all_calls.append(call[1].get("content", ""))
for sent_text in all_calls:
assert "MEDIA:" not in sent_text, f"MEDIA: leaked into display: {sent_text!r}"
assert consumer.already_sent
# ── Segment break (tool boundary) tests ──────────────────────────────────
class TestSegmentBreakOnToolBoundary:
"""Verify that on_delta(None) finalizes the current message and starts a
new one so the final response appears below tool-progress messages."""
@pytest.mark.asyncio
async def test_segment_break_creates_new_message(self):
"""After a None boundary, next text creates a fresh message."""
adapter = MagicMock()
send_result_1 = SimpleNamespace(success=True, message_id="msg_1")
send_result_2 = SimpleNamespace(success=True, message_id="msg_2")
edit_result = SimpleNamespace(success=True)
adapter.send = AsyncMock(side_effect=[send_result_1, send_result_2])
adapter.edit_message = AsyncMock(return_value=edit_result)
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
# Phase 1: intermediate text before tool calls
consumer.on_delta("Let me search for that...")
# Tool boundary — model is about to call tools
consumer.on_delta(None)
# Phase 2: final response text after tools finished
consumer.on_delta("Here are the results.")
consumer.finish()
await consumer.run()
# Should have sent TWO separate messages (two adapter.send calls),
# not just edited the first one.
assert adapter.send.call_count == 2
first_text = adapter.send.call_args_list[0][1]["content"]
second_text = adapter.send.call_args_list[1][1]["content"]
assert "search" in first_text
assert "results" in second_text
@pytest.mark.asyncio
async def test_segment_break_no_text_before(self):
"""A None boundary with no preceding text is a no-op."""
adapter = MagicMock()
send_result = SimpleNamespace(success=True, message_id="msg_1")
adapter.send = AsyncMock(return_value=send_result)
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
# No text before the boundary — model went straight to tool calls
consumer.on_delta(None)
consumer.on_delta("Final answer.")
consumer.finish()
await consumer.run()
# Only one send call (the final answer)
assert adapter.send.call_count == 1
assert "Final answer" in adapter.send.call_args_list[0][1]["content"]
@pytest.mark.asyncio
async def test_segment_break_removes_cursor(self):
"""The finalized segment message should not have a cursor."""
adapter = MagicMock()
send_result = SimpleNamespace(success=True, message_id="msg_1")
edit_result = SimpleNamespace(success=True)
adapter.send = AsyncMock(return_value=send_result)
adapter.edit_message = AsyncMock(return_value=edit_result)
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="")
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
consumer.on_delta("Thinking...")
consumer.on_delta(None)
consumer.on_delta("Done.")
consumer.finish()
await consumer.run()
# The first segment should have been finalized without cursor.
# Check all edit_message calls + the initial send for the first segment.
# The last state of msg_1 should NOT have the cursor.
all_texts = []
for call in adapter.send.call_args_list:
all_texts.append(call[1].get("content", ""))
for call in adapter.edit_message.call_args_list:
all_texts.append(call[1].get("content", ""))
# Find the text(s) that contain "Thinking" — the finalized version
# should not have the cursor.
thinking_texts = [t for t in all_texts if "Thinking" in t]
assert thinking_texts, "Expected at least one message with 'Thinking'"
# The LAST occurrence is the finalized version
assert "" not in thinking_texts[-1], (
f"Cursor found in finalized segment: {thinking_texts[-1]!r}"
)
@pytest.mark.asyncio
async def test_multiple_segment_breaks(self):
"""Multiple tool boundaries create multiple message segments."""
adapter = MagicMock()
msg_counter = iter(["msg_1", "msg_2", "msg_3"])
adapter.send = AsyncMock(
side_effect=lambda **kw: SimpleNamespace(success=True, message_id=next(msg_counter))
)
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
consumer.on_delta("Phase 1")
consumer.on_delta(None) # tool boundary
consumer.on_delta("Phase 2")
consumer.on_delta(None) # another tool boundary
consumer.on_delta("Phase 3")
consumer.finish()
await consumer.run()
# Three separate messages
assert adapter.send.call_count == 3
@pytest.mark.asyncio
async def test_already_sent_stays_true_after_segment(self):
"""already_sent remains True after a segment break."""
adapter = MagicMock()
send_result = SimpleNamespace(success=True, message_id="msg_1")
adapter.send = AsyncMock(return_value=send_result)
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
consumer.on_delta("Text")
consumer.on_delta(None)
consumer.finish()
await consumer.run()
assert consumer.already_sent