During rapid tool-calling, the model often emits 1-2 tokens before switching to tool calls. The stream consumer would create a new message with 'X ▉' (short text + cursor), and if the follow-up edit to strip the cursor was rate-limited by the platform, the cursor remained as a permanent standalone message — reported on Telegram as 'white box' artifacts. Add a minimum-content guard in _send_or_edit: when creating a new standalone message (no existing message_id), require at least 4 visible characters alongside the cursor before sending. Shorter text accumulates into the next streaming segment instead. This prevents cursor-only 'tofu' messages across all platforms without affecting normal streaming (edits to existing messages, final sends without cursor, and messages with substantial text are all unaffected). Reported by @michalkomar on X.
966 lines
38 KiB
Python
966 lines
38 KiB
Python
"""Tests for GatewayStreamConsumer — media directive stripping in streaming."""
|
|
|
|
import asyncio
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
|
|
|
|
|
|
# ── _clean_for_display unit tests ────────────────────────────────────────
|
|
|
|
|
|
class TestCleanForDisplay:
|
|
"""Verify MEDIA: directives and internal markers are stripped from display text."""
|
|
|
|
def test_no_media_passthrough(self):
|
|
"""Text without MEDIA: passes through unchanged."""
|
|
text = "Here is your analysis of the image."
|
|
assert GatewayStreamConsumer._clean_for_display(text) == text
|
|
|
|
def test_media_tag_stripped(self):
|
|
"""Basic MEDIA:<path> tag is removed."""
|
|
text = "Here is the image\nMEDIA:/tmp/hermes/image.png"
|
|
result = GatewayStreamConsumer._clean_for_display(text)
|
|
assert "MEDIA:" not in result
|
|
assert "Here is the image" in result
|
|
|
|
def test_media_tag_with_space(self):
|
|
"""MEDIA: tag with space after colon is removed."""
|
|
text = "Audio generated\nMEDIA: /home/user/.hermes/audio_cache/voice.mp3"
|
|
result = GatewayStreamConsumer._clean_for_display(text)
|
|
assert "MEDIA:" not in result
|
|
assert "Audio generated" in result
|
|
|
|
def test_media_tag_with_quotes(self):
|
|
"""MEDIA: tags wrapped in quotes or backticks are removed."""
|
|
for wrapper in ['`MEDIA:/path/file.png`', '"MEDIA:/path/file.png"', "'MEDIA:/path/file.png'"]:
|
|
text = f"Result: {wrapper}"
|
|
result = GatewayStreamConsumer._clean_for_display(text)
|
|
assert "MEDIA:" not in result, f"Failed for wrapper: {wrapper}"
|
|
|
|
def test_audio_as_voice_stripped(self):
|
|
"""[[audio_as_voice]] directive is removed."""
|
|
text = "[[audio_as_voice]]\nMEDIA:/tmp/voice.ogg"
|
|
result = GatewayStreamConsumer._clean_for_display(text)
|
|
assert "[[audio_as_voice]]" not in result
|
|
assert "MEDIA:" not in result
|
|
|
|
def test_multiple_media_tags(self):
|
|
"""Multiple MEDIA: tags are all removed."""
|
|
text = "Here are two files:\nMEDIA:/tmp/a.png\nMEDIA:/tmp/b.jpg"
|
|
result = GatewayStreamConsumer._clean_for_display(text)
|
|
assert "MEDIA:" not in result
|
|
assert "Here are two files:" in result
|
|
|
|
def test_excessive_newlines_collapsed(self):
|
|
"""Blank lines left by removed tags are collapsed."""
|
|
text = "Before\n\n\nMEDIA:/tmp/file.png\n\n\nAfter"
|
|
result = GatewayStreamConsumer._clean_for_display(text)
|
|
# Should not have 3+ consecutive newlines
|
|
assert "\n\n\n" not in result
|
|
|
|
def test_media_only_response(self):
|
|
"""Response that is entirely MEDIA: tags returns empty/whitespace."""
|
|
text = "MEDIA:/tmp/image.png"
|
|
result = GatewayStreamConsumer._clean_for_display(text)
|
|
assert result.strip() == ""
|
|
|
|
def test_media_mid_sentence(self):
|
|
"""MEDIA: tag embedded in prose is stripped cleanly."""
|
|
text = "I generated this image MEDIA:/tmp/art.png for you."
|
|
result = GatewayStreamConsumer._clean_for_display(text)
|
|
assert "MEDIA:" not in result
|
|
assert "generated" in result
|
|
assert "for you." in result
|
|
|
|
def test_preserves_non_media_colons(self):
|
|
"""Normal colons and text with 'MEDIA' as a word aren't stripped."""
|
|
text = "The media: files are stored in /tmp. Use social MEDIA carefully."
|
|
result = GatewayStreamConsumer._clean_for_display(text)
|
|
# "MEDIA:" in upper case without a path won't match \S+ (space follows)
|
|
# But "media:" is lowercase so won't match either
|
|
assert result == text
|
|
|
|
|
|
# ── Integration: _send_or_edit strips MEDIA: ─────────────────────────────
|
|
|
|
|
|
class TestSendOrEditMediaStripping:
|
|
"""Verify _send_or_edit strips MEDIA: before sending to the platform."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_first_send_strips_media(self):
|
|
"""Initial send removes MEDIA: tags from visible text."""
|
|
adapter = MagicMock()
|
|
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
|
adapter.send = AsyncMock(return_value=send_result)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123")
|
|
await consumer._send_or_edit("Here is your image\nMEDIA:/tmp/test.png")
|
|
|
|
adapter.send.assert_called_once()
|
|
sent_text = adapter.send.call_args[1]["content"]
|
|
assert "MEDIA:" not in sent_text
|
|
assert "Here is your image" in sent_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_edit_strips_media(self):
|
|
"""Edit call removes MEDIA: tags from visible text."""
|
|
adapter = MagicMock()
|
|
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
|
edit_result = SimpleNamespace(success=True)
|
|
adapter.send = AsyncMock(return_value=send_result)
|
|
adapter.edit_message = AsyncMock(return_value=edit_result)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123")
|
|
# First send
|
|
await consumer._send_or_edit("Starting response...")
|
|
# Edit with MEDIA: tag
|
|
await consumer._send_or_edit("Here is the result\nMEDIA:/tmp/image.png")
|
|
|
|
adapter.edit_message.assert_called_once()
|
|
edited_text = adapter.edit_message.call_args[1]["content"]
|
|
assert "MEDIA:" not in edited_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_media_only_skips_send(self):
|
|
"""If text is entirely MEDIA: tags, the send is skipped."""
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock()
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123")
|
|
await consumer._send_or_edit("MEDIA:/tmp/image.png")
|
|
|
|
adapter.send.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cursor_only_update_skips_send(self):
|
|
"""A bare streaming cursor should not be sent as its own message."""
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock()
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_123",
|
|
StreamConsumerConfig(cursor=" ▉"),
|
|
)
|
|
await consumer._send_or_edit(" ▉")
|
|
|
|
adapter.send.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_short_text_with_cursor_skips_new_message(self):
|
|
"""Short text + cursor should not create a standalone new message.
|
|
|
|
During rapid tool-calling the model often emits 1-2 tokens before
|
|
switching to tool calls. Sending 'I ▉' as a new message risks
|
|
leaving the cursor permanently visible if the follow-up edit is
|
|
rate-limited. The guard should skip the first send and let the
|
|
text accumulate into the next segment.
|
|
"""
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock()
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_123",
|
|
StreamConsumerConfig(cursor=" ▉"),
|
|
)
|
|
# No message_id yet (first send) — short text + cursor should be skipped
|
|
assert consumer._message_id is None
|
|
result = await consumer._send_or_edit("I ▉")
|
|
assert result is True
|
|
adapter.send.assert_not_called()
|
|
|
|
# 3 chars is still under the threshold
|
|
result = await consumer._send_or_edit("Hi! ▉")
|
|
assert result is True
|
|
adapter.send.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_longer_text_with_cursor_sends_new_message(self):
|
|
"""Text >= 4 visible chars + cursor should create a new message normally."""
|
|
adapter = MagicMock()
|
|
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
|
adapter.send = AsyncMock(return_value=send_result)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_123",
|
|
StreamConsumerConfig(cursor=" ▉"),
|
|
)
|
|
result = await consumer._send_or_edit("Hello ▉")
|
|
assert result is True
|
|
adapter.send.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_short_text_without_cursor_sends_normally(self):
|
|
"""Short text without cursor (e.g. final edit) should send normally."""
|
|
adapter = MagicMock()
|
|
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
|
adapter.send = AsyncMock(return_value=send_result)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_123",
|
|
StreamConsumerConfig(cursor=" ▉"),
|
|
)
|
|
# No cursor in text — even short text should be sent
|
|
result = await consumer._send_or_edit("OK")
|
|
assert result is True
|
|
adapter.send.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_short_text_cursor_edit_existing_message_allowed(self):
|
|
"""Short text + cursor editing an existing message should proceed."""
|
|
adapter = MagicMock()
|
|
edit_result = SimpleNamespace(success=True)
|
|
adapter.edit_message = AsyncMock(return_value=edit_result)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_123",
|
|
StreamConsumerConfig(cursor=" ▉"),
|
|
)
|
|
consumer._message_id = "msg_1" # Existing message — guard should not fire
|
|
consumer._last_sent_text = ""
|
|
result = await consumer._send_or_edit("I ▉")
|
|
assert result is True
|
|
adapter.edit_message.assert_called_once()
|
|
|
|
|
|
# ── Integration: full stream run ─────────────────────────────────────────
|
|
|
|
|
|
class TestStreamRunMediaStripping:
|
|
"""End-to-end: deltas with MEDIA: produce clean visible text."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stream_with_media_tag(self):
|
|
"""Full stream run strips MEDIA: from the final visible message."""
|
|
adapter = MagicMock()
|
|
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
|
edit_result = SimpleNamespace(success=True)
|
|
adapter.send = AsyncMock(return_value=send_result)
|
|
adapter.edit_message = AsyncMock(return_value=edit_result)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
# Feed deltas
|
|
consumer.on_delta("Here is your generated image\n")
|
|
consumer.on_delta("MEDIA:/home/user/.hermes/cache/images/abc123.png")
|
|
consumer.finish()
|
|
|
|
await consumer.run()
|
|
|
|
# Verify the final text sent/edited doesn't contain MEDIA:
|
|
all_calls = []
|
|
for call in adapter.send.call_args_list:
|
|
all_calls.append(call[1].get("content", ""))
|
|
for call in adapter.edit_message.call_args_list:
|
|
all_calls.append(call[1].get("content", ""))
|
|
|
|
for sent_text in all_calls:
|
|
assert "MEDIA:" not in sent_text, f"MEDIA: leaked into display: {sent_text!r}"
|
|
|
|
assert consumer.already_sent
|
|
|
|
|
|
# ── Segment break (tool boundary) tests ──────────────────────────────────
|
|
|
|
|
|
class TestSegmentBreakOnToolBoundary:
|
|
"""Verify that on_delta(None) finalizes the current message and starts a
|
|
new one so the final response appears below tool-progress messages."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_segment_break_creates_new_message(self):
|
|
"""After a None boundary, next text creates a fresh message."""
|
|
adapter = MagicMock()
|
|
send_result_1 = SimpleNamespace(success=True, message_id="msg_1")
|
|
send_result_2 = SimpleNamespace(success=True, message_id="msg_2")
|
|
edit_result = SimpleNamespace(success=True)
|
|
adapter.send = AsyncMock(side_effect=[send_result_1, send_result_2])
|
|
adapter.edit_message = AsyncMock(return_value=edit_result)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
# Phase 1: intermediate text before tool calls
|
|
consumer.on_delta("Let me search for that...")
|
|
# Tool boundary — model is about to call tools
|
|
consumer.on_delta(None)
|
|
# Phase 2: final response text after tools finished
|
|
consumer.on_delta("Here are the results.")
|
|
consumer.finish()
|
|
|
|
await consumer.run()
|
|
|
|
# Should have sent TWO separate messages (two adapter.send calls),
|
|
# not just edited the first one.
|
|
assert adapter.send.call_count == 2
|
|
first_text = adapter.send.call_args_list[0][1]["content"]
|
|
second_text = adapter.send.call_args_list[1][1]["content"]
|
|
assert "search" in first_text
|
|
assert "results" in second_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_segment_break_no_text_before(self):
|
|
"""A None boundary with no preceding text is a no-op."""
|
|
adapter = MagicMock()
|
|
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
|
adapter.send = AsyncMock(return_value=send_result)
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
# No text before the boundary — model went straight to tool calls
|
|
consumer.on_delta(None)
|
|
consumer.on_delta("Final answer.")
|
|
consumer.finish()
|
|
|
|
await consumer.run()
|
|
|
|
# Only one send call (the final answer)
|
|
assert adapter.send.call_count == 1
|
|
assert "Final answer" in adapter.send.call_args_list[0][1]["content"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_segment_break_removes_cursor(self):
|
|
"""The finalized segment message should not have a cursor."""
|
|
adapter = MagicMock()
|
|
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
|
edit_result = SimpleNamespace(success=True)
|
|
adapter.send = AsyncMock(return_value=send_result)
|
|
adapter.edit_message = AsyncMock(return_value=edit_result)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉")
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
consumer.on_delta("Thinking...")
|
|
consumer.on_delta(None)
|
|
consumer.on_delta("Done.")
|
|
consumer.finish()
|
|
|
|
await consumer.run()
|
|
|
|
# The first segment should have been finalized without cursor.
|
|
# Check all edit_message calls + the initial send for the first segment.
|
|
# The last state of msg_1 should NOT have the cursor.
|
|
all_texts = []
|
|
for call in adapter.send.call_args_list:
|
|
all_texts.append(call[1].get("content", ""))
|
|
for call in adapter.edit_message.call_args_list:
|
|
all_texts.append(call[1].get("content", ""))
|
|
|
|
# Find the text(s) that contain "Thinking" — the finalized version
|
|
# should not have the cursor.
|
|
thinking_texts = [t for t in all_texts if "Thinking" in t]
|
|
assert thinking_texts, "Expected at least one message with 'Thinking'"
|
|
# The LAST occurrence is the finalized version
|
|
assert "▉" not in thinking_texts[-1], (
|
|
f"Cursor found in finalized segment: {thinking_texts[-1]!r}"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_segment_breaks(self):
|
|
"""Multiple tool boundaries create multiple message segments."""
|
|
adapter = MagicMock()
|
|
msg_counter = iter(["msg_1", "msg_2", "msg_3"])
|
|
adapter.send = AsyncMock(
|
|
side_effect=lambda **kw: SimpleNamespace(success=True, message_id=next(msg_counter))
|
|
)
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
consumer.on_delta("Phase 1")
|
|
consumer.on_delta(None) # tool boundary
|
|
consumer.on_delta("Phase 2")
|
|
consumer.on_delta(None) # another tool boundary
|
|
consumer.on_delta("Phase 3")
|
|
consumer.finish()
|
|
|
|
await consumer.run()
|
|
|
|
# Three separate messages
|
|
assert adapter.send.call_count == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_already_sent_stays_true_after_segment(self):
|
|
"""already_sent remains True after a segment break."""
|
|
adapter = MagicMock()
|
|
send_result = SimpleNamespace(success=True, message_id="msg_1")
|
|
adapter.send = AsyncMock(return_value=send_result)
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
consumer.on_delta("Text")
|
|
consumer.on_delta(None)
|
|
consumer.finish()
|
|
|
|
await consumer.run()
|
|
|
|
assert consumer.already_sent
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_edit_failure_sends_only_unsent_tail_at_finish(self):
|
|
"""If an edit fails mid-stream, send only the missing tail once at finish."""
|
|
adapter = MagicMock()
|
|
send_results = [
|
|
SimpleNamespace(success=True, message_id="msg_1"),
|
|
SimpleNamespace(success=True, message_id="msg_2"),
|
|
]
|
|
adapter.send = AsyncMock(side_effect=send_results)
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6"))
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉")
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
consumer.on_delta("Hello")
|
|
task = asyncio.create_task(consumer.run())
|
|
await asyncio.sleep(0.08)
|
|
consumer.on_delta(" world")
|
|
await asyncio.sleep(0.08)
|
|
consumer.finish()
|
|
await task
|
|
|
|
assert adapter.send.call_count == 2
|
|
first_text = adapter.send.call_args_list[0][1]["content"]
|
|
second_text = adapter.send.call_args_list[1][1]["content"]
|
|
assert "Hello" in first_text
|
|
assert second_text.strip() == "world"
|
|
assert consumer.already_sent
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_segment_break_clears_failed_edit_fallback_state(self):
|
|
"""A tool boundary after edit failure must not duplicate the next segment."""
|
|
adapter = MagicMock()
|
|
send_results = [
|
|
SimpleNamespace(success=True, message_id="msg_1"),
|
|
SimpleNamespace(success=True, message_id="msg_2"),
|
|
]
|
|
adapter.send = AsyncMock(side_effect=send_results)
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6"))
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉")
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
consumer.on_delta("Hello")
|
|
task = asyncio.create_task(consumer.run())
|
|
await asyncio.sleep(0.08)
|
|
consumer.on_delta(" world")
|
|
await asyncio.sleep(0.08)
|
|
consumer.on_delta(None)
|
|
consumer.on_delta("Next segment")
|
|
consumer.finish()
|
|
await task
|
|
|
|
sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
|
|
assert sent_texts == ["Hello ▉", "Next segment"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_message_id_enters_fallback_mode(self):
|
|
"""Platform returns success but no message_id (Signal) — must not
|
|
re-send on every delta. Should enter fallback mode and send only
|
|
the continuation at finish."""
|
|
adapter = MagicMock()
|
|
# First send succeeds but returns no message_id (Signal behavior)
|
|
send_result_no_id = SimpleNamespace(success=True, message_id=None)
|
|
# Fallback final send succeeds
|
|
send_result_final = SimpleNamespace(success=True, message_id="msg_final")
|
|
adapter.send = AsyncMock(side_effect=[send_result_no_id, send_result_final])
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
consumer.on_delta("Hello")
|
|
task = asyncio.create_task(consumer.run())
|
|
await asyncio.sleep(0.08)
|
|
consumer.on_delta(" world, this is a longer response.")
|
|
await asyncio.sleep(0.08)
|
|
consumer.finish()
|
|
await task
|
|
|
|
# Should send exactly 2 messages: initial chunk + fallback continuation
|
|
# NOT one message per delta
|
|
assert adapter.send.call_count == 2
|
|
assert consumer.already_sent
|
|
# edit_message should NOT have been called (no valid message_id to edit)
|
|
adapter.edit_message.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_message_id_single_delta_marks_already_sent(self):
|
|
"""When the entire response fits in one delta and platform returns no
|
|
message_id, already_sent must still be True to prevent the gateway
|
|
from re-sending the full response."""
|
|
adapter = MagicMock()
|
|
send_result = SimpleNamespace(success=True, message_id=None)
|
|
adapter.send = AsyncMock(return_value=send_result)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
consumer.on_delta("Short response.")
|
|
consumer.finish()
|
|
|
|
await consumer.run()
|
|
|
|
assert consumer.already_sent
|
|
# Only one send call (the initial message)
|
|
assert adapter.send.call_count == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_message_id_segment_breaks_do_not_resend(self):
|
|
"""On a platform that never returns a message_id (e.g. webhook with
|
|
github_comment delivery), tool-call segment breaks must NOT trigger
|
|
a new adapter.send() per boundary. The fix: _message_id == '__no_edit__'
|
|
suppresses the reset so all text accumulates and is sent once."""
|
|
adapter = MagicMock()
|
|
# No message_id on first send, then one more for the fallback final
|
|
adapter.send = AsyncMock(side_effect=[
|
|
SimpleNamespace(success=True, message_id=None),
|
|
SimpleNamespace(success=True, message_id=None),
|
|
])
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
# Simulate: text → tool boundary → text → tool boundary → text (3 segments)
|
|
consumer.on_delta("Phase 1 text")
|
|
consumer.on_delta(None) # tool call boundary
|
|
consumer.on_delta("Phase 2 text")
|
|
consumer.on_delta(None) # another tool call boundary
|
|
consumer.on_delta("Phase 3 text")
|
|
consumer.finish()
|
|
|
|
await consumer.run()
|
|
|
|
# Before the fix this would post 3 comments (one per segment).
|
|
# After the fix: only the initial partial + one fallback-final continuation.
|
|
assert adapter.send.call_count == 2, (
|
|
f"Expected 2 sends (initial + fallback), got {adapter.send.call_count}"
|
|
)
|
|
assert consumer.already_sent
|
|
# The continuation must contain the text from segments 2 and 3
|
|
final_text = adapter.send.call_args_list[1][1]["content"]
|
|
assert "Phase 2" in final_text
|
|
assert "Phase 3" in final_text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_final_splits_long_continuation_without_dropping_text(self):
|
|
"""Long continuation tails should be chunked when fallback final-send runs."""
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock(side_effect=[
|
|
SimpleNamespace(success=True, message_id="msg_1"),
|
|
SimpleNamespace(success=True, message_id="msg_2"),
|
|
SimpleNamespace(success=True, message_id="msg_3"),
|
|
])
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6"))
|
|
adapter.MAX_MESSAGE_LENGTH = 610
|
|
|
|
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉")
|
|
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
|
|
|
prefix = "Hello world"
|
|
tail = "x" * 620
|
|
consumer.on_delta(prefix)
|
|
task = asyncio.create_task(consumer.run())
|
|
await asyncio.sleep(0.08)
|
|
consumer.on_delta(tail)
|
|
await asyncio.sleep(0.08)
|
|
consumer.finish()
|
|
await task
|
|
|
|
sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
|
|
assert len(sent_texts) == 3
|
|
assert sent_texts[0].startswith(prefix)
|
|
assert sum(len(t) for t in sent_texts[1:]) == len(tail)
|
|
|
|
|
|
class TestInterimCommentaryMessages:
|
|
@pytest.mark.asyncio
|
|
async def test_commentary_message_stays_separate_from_final_stream(self):
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock(side_effect=[
|
|
SimpleNamespace(success=True, message_id="msg_1"),
|
|
SimpleNamespace(success=True, message_id="msg_2"),
|
|
])
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_123",
|
|
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
|
|
)
|
|
|
|
consumer.on_commentary("I'll inspect the repository first.")
|
|
consumer.on_delta("Done.")
|
|
consumer.finish()
|
|
|
|
await consumer.run()
|
|
|
|
sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
|
|
assert sent_texts == ["I'll inspect the repository first.", "Done."]
|
|
assert consumer.final_response_sent is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_final_send_does_not_mark_final_response_sent(self):
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock(return_value=SimpleNamespace(success=False, message_id=None))
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_123",
|
|
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
|
|
)
|
|
|
|
consumer.on_delta("Done.")
|
|
consumer.finish()
|
|
|
|
await consumer.run()
|
|
|
|
assert consumer.final_response_sent is False
|
|
assert consumer.already_sent is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_success_without_message_id_marks_visible_and_sends_only_tail(self):
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock(side_effect=[
|
|
SimpleNamespace(success=True, message_id=None),
|
|
SimpleNamespace(success=True, message_id=None),
|
|
])
|
|
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_123",
|
|
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉"),
|
|
)
|
|
|
|
consumer.on_delta("Hello")
|
|
task = asyncio.create_task(consumer.run())
|
|
await asyncio.sleep(0.08)
|
|
consumer.on_delta(" world")
|
|
await asyncio.sleep(0.08)
|
|
consumer.finish()
|
|
await task
|
|
|
|
sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
|
|
assert sent_texts == ["Hello ▉", "world"]
|
|
assert consumer.already_sent is True
|
|
assert consumer.final_response_sent is True
|
|
|
|
|
|
class TestCancelledConsumerSetsFlags:
|
|
"""Cancellation must set final_response_sent when already_sent is True.
|
|
|
|
The 5-second stream_task timeout in gateway/run.py can cancel the
|
|
consumer while it's still processing. If final_response_sent stays
|
|
False, the gateway falls through to the normal send path and the
|
|
user sees a duplicate message.
|
|
"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancelled_with_already_sent_marks_final_response_sent(self):
|
|
"""Cancelling after content was sent should set final_response_sent."""
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock(
|
|
return_value=SimpleNamespace(success=True, message_id="msg_1")
|
|
)
|
|
adapter.edit_message = AsyncMock(
|
|
return_value=SimpleNamespace(success=True)
|
|
)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_123",
|
|
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
|
|
)
|
|
|
|
# Stream some text — the consumer sends it and sets already_sent
|
|
consumer.on_delta("Hello world")
|
|
task = asyncio.create_task(consumer.run())
|
|
await asyncio.sleep(0.08)
|
|
|
|
assert consumer.already_sent is True
|
|
|
|
# Cancel the task (simulates the 5-second timeout in gateway)
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# The fix: final_response_sent should be True even though _DONE
|
|
# was never processed, preventing a duplicate message.
|
|
assert consumer.final_response_sent is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancelled_without_any_sends_does_not_mark_final(self):
|
|
"""Cancelling before anything was sent should NOT set final_response_sent."""
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock(
|
|
return_value=SimpleNamespace(success=False, message_id=None)
|
|
)
|
|
adapter.edit_message = AsyncMock(
|
|
return_value=SimpleNamespace(success=True)
|
|
)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_123",
|
|
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
|
|
)
|
|
|
|
# Send fails — already_sent stays False
|
|
consumer.on_delta("x")
|
|
task = asyncio.create_task(consumer.run())
|
|
await asyncio.sleep(0.08)
|
|
|
|
assert consumer.already_sent is False
|
|
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Without a successful send, final_response_sent should stay False
|
|
# so the normal gateway send path can deliver the response.
|
|
assert consumer.final_response_sent is False
|
|
|
|
|
|
# ── Think-block filtering unit tests ─────────────────────────────────────
|
|
|
|
|
|
def _make_consumer() -> GatewayStreamConsumer:
|
|
"""Create a bare consumer for unit-testing the filter (no adapter needed)."""
|
|
adapter = MagicMock()
|
|
return GatewayStreamConsumer(adapter, "chat_test")
|
|
|
|
|
|
class TestFilterAndAccumulate:
|
|
"""Unit tests for _filter_and_accumulate think-block suppression."""
|
|
|
|
def test_plain_text_passes_through(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("Hello world")
|
|
assert c._accumulated == "Hello world"
|
|
|
|
def test_complete_think_block_stripped(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("<think>internal reasoning</think>Answer here")
|
|
assert c._accumulated == "Answer here"
|
|
|
|
def test_think_block_in_middle(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("Prefix\n<think>reasoning</think>\nSuffix")
|
|
assert c._accumulated == "Prefix\n\nSuffix"
|
|
|
|
def test_think_block_split_across_deltas(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("<think>start of")
|
|
c._filter_and_accumulate(" reasoning</think>visible text")
|
|
assert c._accumulated == "visible text"
|
|
|
|
def test_opening_tag_split_across_deltas(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("<thi")
|
|
# Partial tag held back
|
|
assert c._accumulated == ""
|
|
c._filter_and_accumulate("nk>hidden</think>shown")
|
|
assert c._accumulated == "shown"
|
|
|
|
def test_closing_tag_split_across_deltas(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("<think>hidden</thi")
|
|
assert c._accumulated == ""
|
|
c._filter_and_accumulate("nk>shown")
|
|
assert c._accumulated == "shown"
|
|
|
|
def test_multiple_think_blocks(self):
|
|
c = _make_consumer()
|
|
# Consecutive blocks with no text between them — both stripped
|
|
c._filter_and_accumulate(
|
|
"<think>block1</think><think>block2</think>visible"
|
|
)
|
|
assert c._accumulated == "visible"
|
|
|
|
def test_multiple_think_blocks_with_text_between(self):
|
|
"""Think tag after non-whitespace is NOT a boundary (prose safety)."""
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate(
|
|
"<think>block1</think>A<think>block2</think>B"
|
|
)
|
|
# Second <think> follows 'A' (not a block boundary) — treated as prose
|
|
assert "A" in c._accumulated
|
|
assert "B" in c._accumulated
|
|
|
|
def test_thinking_tag_variant(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("<thinking>deep thought</thinking>Result")
|
|
assert c._accumulated == "Result"
|
|
|
|
def test_thought_tag_variant(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("<thought>Gemma style</thought>Output")
|
|
assert c._accumulated == "Output"
|
|
|
|
def test_reasoning_scratchpad_variant(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate(
|
|
"<REASONING_SCRATCHPAD>long plan</REASONING_SCRATCHPAD>Done"
|
|
)
|
|
assert c._accumulated == "Done"
|
|
|
|
def test_case_insensitive_THINKING(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("<THINKING>caps</THINKING>answer")
|
|
assert c._accumulated == "answer"
|
|
|
|
def test_prose_mention_not_stripped(self):
|
|
"""<think> mentioned mid-line in prose should NOT trigger filtering."""
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("The <think> tag is used for reasoning")
|
|
assert "<think>" in c._accumulated
|
|
assert "used for reasoning" in c._accumulated
|
|
|
|
def test_prose_mention_after_text(self):
|
|
"""<think> after non-whitespace on same line is not a block boundary."""
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("Try using <think>some content</think> tags")
|
|
assert "<think>" in c._accumulated
|
|
|
|
def test_think_at_line_start_is_stripped(self):
|
|
"""<think> at start of a new line IS a block boundary."""
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("Previous line\n<think>reasoning</think>Next")
|
|
assert "Previous line\nNext" == c._accumulated
|
|
|
|
def test_think_with_only_whitespace_before(self):
|
|
"""<think> preceded by only whitespace on its line is a boundary."""
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate(" <think>hidden</think>visible")
|
|
# Leading whitespace before the tag is emitted, then block is stripped
|
|
assert c._accumulated == " visible"
|
|
|
|
def test_flush_think_buffer_on_non_tag(self):
|
|
"""Partial tag that turns out not to be a tag is flushed."""
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("<thi")
|
|
assert c._accumulated == ""
|
|
# Flush explicitly (simulates stream end)
|
|
c._flush_think_buffer()
|
|
assert c._accumulated == "<thi"
|
|
|
|
def test_flush_think_buffer_when_inside_block(self):
|
|
"""Flush while inside a think block does NOT emit buffered content."""
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("<think>still thinking")
|
|
c._flush_think_buffer()
|
|
assert c._accumulated == ""
|
|
|
|
def test_unclosed_think_block_suppresses(self):
|
|
"""An unclosed <think> suppresses all subsequent content."""
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("Before\n<think>reasoning that never ends...")
|
|
assert c._accumulated == "Before\n"
|
|
|
|
def test_multiline_think_block(self):
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate(
|
|
"<think>\nLine 1\nLine 2\nLine 3\n</think>Final answer"
|
|
)
|
|
assert c._accumulated == "Final answer"
|
|
|
|
def test_segment_reset_preserves_think_state(self):
|
|
"""_reset_segment_state should NOT clear think-block filter state."""
|
|
c = _make_consumer()
|
|
c._filter_and_accumulate("<think>start")
|
|
c._reset_segment_state()
|
|
# Still inside think block — subsequent text should be suppressed
|
|
c._filter_and_accumulate("still hidden</think>visible")
|
|
assert c._accumulated == "visible"
|
|
|
|
|
|
class TestFilterAndAccumulateIntegration:
|
|
"""Integration: verify think blocks don't leak through the full run() path."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_block_not_sent_to_platform(self):
|
|
"""Think blocks should be filtered before platform edit."""
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock(
|
|
return_value=SimpleNamespace(success=True, message_id="msg_1")
|
|
)
|
|
adapter.edit_message = AsyncMock(
|
|
return_value=SimpleNamespace(success=True)
|
|
)
|
|
adapter.MAX_MESSAGE_LENGTH = 4096
|
|
|
|
consumer = GatewayStreamConsumer(
|
|
adapter,
|
|
"chat_test",
|
|
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
|
|
)
|
|
|
|
# Simulate streaming: think block then visible text
|
|
consumer.on_delta("<think>deep reasoning here</think>")
|
|
consumer.on_delta("The answer is 42.")
|
|
consumer.finish()
|
|
|
|
task = asyncio.create_task(consumer.run())
|
|
await asyncio.sleep(0.15)
|
|
|
|
# The final text sent to the platform should NOT contain <think>
|
|
all_calls = list(adapter.send.call_args_list) + list(
|
|
adapter.edit_message.call_args_list
|
|
)
|
|
for call in all_calls:
|
|
args, kwargs = call
|
|
content = kwargs.get("content") or (args[0] if args else "")
|
|
assert "<think>" not in content, f"Think tag leaked: {content}"
|
|
assert "deep reasoning" not in content
|
|
|
|
try:
|
|
task.cancel()
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|