"""Tests for GatewayStreamConsumer — media directive stripping in streaming.""" import asyncio from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock import pytest from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig # ── _clean_for_display unit tests ──────────────────────────────────────── class TestCleanForDisplay: """Verify MEDIA: directives and internal markers are stripped from display text.""" def test_no_media_passthrough(self): """Text without MEDIA: passes through unchanged.""" text = "Here is your analysis of the image." assert GatewayStreamConsumer._clean_for_display(text) == text def test_media_tag_stripped(self): """Basic MEDIA: tag is removed.""" text = "Here is the image\nMEDIA:/tmp/hermes/image.png" result = GatewayStreamConsumer._clean_for_display(text) assert "MEDIA:" not in result assert "Here is the image" in result def test_media_tag_with_space(self): """MEDIA: tag with space after colon is removed.""" text = "Audio generated\nMEDIA: /home/user/.hermes/audio_cache/voice.mp3" result = GatewayStreamConsumer._clean_for_display(text) assert "MEDIA:" not in result assert "Audio generated" in result def test_media_tag_with_quotes(self): """MEDIA: tags wrapped in quotes or backticks are removed.""" for wrapper in ['`MEDIA:/path/file.png`', '"MEDIA:/path/file.png"', "'MEDIA:/path/file.png'"]: text = f"Result: {wrapper}" result = GatewayStreamConsumer._clean_for_display(text) assert "MEDIA:" not in result, f"Failed for wrapper: {wrapper}" def test_audio_as_voice_stripped(self): """[[audio_as_voice]] directive is removed.""" text = "[[audio_as_voice]]\nMEDIA:/tmp/voice.ogg" result = GatewayStreamConsumer._clean_for_display(text) assert "[[audio_as_voice]]" not in result assert "MEDIA:" not in result def test_multiple_media_tags(self): """Multiple MEDIA: tags are all removed.""" text = "Here are two files:\nMEDIA:/tmp/a.png\nMEDIA:/tmp/b.jpg" result = GatewayStreamConsumer._clean_for_display(text) assert "MEDIA:" not in result assert "Here are two files:" in result def test_excessive_newlines_collapsed(self): """Blank lines left by removed tags are collapsed.""" text = "Before\n\n\nMEDIA:/tmp/file.png\n\n\nAfter" result = GatewayStreamConsumer._clean_for_display(text) # Should not have 3+ consecutive newlines assert "\n\n\n" not in result def test_media_only_response(self): """Response that is entirely MEDIA: tags returns empty/whitespace.""" text = "MEDIA:/tmp/image.png" result = GatewayStreamConsumer._clean_for_display(text) assert result.strip() == "" def test_media_mid_sentence(self): """MEDIA: tag embedded in prose is stripped cleanly.""" text = "I generated this image MEDIA:/tmp/art.png for you." result = GatewayStreamConsumer._clean_for_display(text) assert "MEDIA:" not in result assert "generated" in result assert "for you." in result def test_preserves_non_media_colons(self): """Normal colons and text with 'MEDIA' as a word aren't stripped.""" text = "The media: files are stored in /tmp. Use social MEDIA carefully." result = GatewayStreamConsumer._clean_for_display(text) # "MEDIA:" in upper case without a path won't match \S+ (space follows) # But "media:" is lowercase so won't match either assert result == text # ── Integration: _send_or_edit strips MEDIA: ───────────────────────────── class TestSendOrEditMediaStripping: """Verify _send_or_edit strips MEDIA: before sending to the platform.""" @pytest.mark.asyncio async def test_first_send_strips_media(self): """Initial send removes MEDIA: tags from visible text.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") adapter.send = AsyncMock(return_value=send_result) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer(adapter, "chat_123") await consumer._send_or_edit("Here is your image\nMEDIA:/tmp/test.png") adapter.send.assert_called_once() sent_text = adapter.send.call_args[1]["content"] assert "MEDIA:" not in sent_text assert "Here is your image" in sent_text @pytest.mark.asyncio async def test_edit_strips_media(self): """Edit call removes MEDIA: tags from visible text.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") edit_result = SimpleNamespace(success=True) adapter.send = AsyncMock(return_value=send_result) adapter.edit_message = AsyncMock(return_value=edit_result) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer(adapter, "chat_123") # First send await consumer._send_or_edit("Starting response...") # Edit with MEDIA: tag await consumer._send_or_edit("Here is the result\nMEDIA:/tmp/image.png") adapter.edit_message.assert_called_once() edited_text = adapter.edit_message.call_args[1]["content"] assert "MEDIA:" not in edited_text @pytest.mark.asyncio async def test_media_only_skips_send(self): """If text is entirely MEDIA: tags, the send is skipped.""" adapter = MagicMock() adapter.send = AsyncMock() adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer(adapter, "chat_123") await consumer._send_or_edit("MEDIA:/tmp/image.png") adapter.send.assert_not_called() @pytest.mark.asyncio async def test_cursor_only_update_skips_send(self): """A bare streaming cursor should not be sent as its own message.""" adapter = MagicMock() adapter.send = AsyncMock() adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_123", StreamConsumerConfig(cursor=" ▉"), ) await consumer._send_or_edit(" ▉") adapter.send.assert_not_called() @pytest.mark.asyncio async def test_short_text_with_cursor_skips_new_message(self): """Short text + cursor should not create a standalone new message. During rapid tool-calling the model often emits 1-2 tokens before switching to tool calls. Sending 'I ▉' as a new message risks leaving the cursor permanently visible if the follow-up edit is rate-limited. The guard should skip the first send and let the text accumulate into the next segment. """ adapter = MagicMock() adapter.send = AsyncMock() adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_123", StreamConsumerConfig(cursor=" ▉"), ) # No message_id yet (first send) — short text + cursor should be skipped assert consumer._message_id is None result = await consumer._send_or_edit("I ▉") assert result is True adapter.send.assert_not_called() # 3 chars is still under the threshold result = await consumer._send_or_edit("Hi! ▉") assert result is True adapter.send.assert_not_called() @pytest.mark.asyncio async def test_longer_text_with_cursor_sends_new_message(self): """Text >= 4 visible chars + cursor should create a new message normally.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") adapter.send = AsyncMock(return_value=send_result) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_123", StreamConsumerConfig(cursor=" ▉"), ) result = await consumer._send_or_edit("Hello ▉") assert result is True adapter.send.assert_called_once() @pytest.mark.asyncio async def test_short_text_without_cursor_sends_normally(self): """Short text without cursor (e.g. final edit) should send normally.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") adapter.send = AsyncMock(return_value=send_result) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_123", StreamConsumerConfig(cursor=" ▉"), ) # No cursor in text — even short text should be sent result = await consumer._send_or_edit("OK") assert result is True adapter.send.assert_called_once() @pytest.mark.asyncio async def test_short_text_cursor_edit_existing_message_allowed(self): """Short text + cursor editing an existing message should proceed.""" adapter = MagicMock() edit_result = SimpleNamespace(success=True) adapter.edit_message = AsyncMock(return_value=edit_result) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_123", StreamConsumerConfig(cursor=" ▉"), ) consumer._message_id = "msg_1" # Existing message — guard should not fire consumer._last_sent_text = "" result = await consumer._send_or_edit("I ▉") assert result is True adapter.edit_message.assert_called_once() # ── Integration: full stream run ───────────────────────────────────────── class TestStreamRunMediaStripping: """End-to-end: deltas with MEDIA: produce clean visible text.""" @pytest.mark.asyncio async def test_stream_with_media_tag(self): """Full stream run strips MEDIA: from the final visible message.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") edit_result = SimpleNamespace(success=True) adapter.send = AsyncMock(return_value=send_result) adapter.edit_message = AsyncMock(return_value=edit_result) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) consumer = GatewayStreamConsumer(adapter, "chat_123", config) # Feed deltas consumer.on_delta("Here is your generated image\n") consumer.on_delta("MEDIA:/home/user/.hermes/cache/images/abc123.png") consumer.finish() await consumer.run() # Verify the final text sent/edited doesn't contain MEDIA: all_calls = [] for call in adapter.send.call_args_list: all_calls.append(call[1].get("content", "")) for call in adapter.edit_message.call_args_list: all_calls.append(call[1].get("content", "")) for sent_text in all_calls: assert "MEDIA:" not in sent_text, f"MEDIA: leaked into display: {sent_text!r}" assert consumer.already_sent # ── Segment break (tool boundary) tests ────────────────────────────────── class TestSegmentBreakOnToolBoundary: """Verify that on_delta(None) finalizes the current message and starts a new one so the final response appears below tool-progress messages.""" @pytest.mark.asyncio async def test_segment_break_creates_new_message(self): """After a None boundary, next text creates a fresh message.""" adapter = MagicMock() send_result_1 = SimpleNamespace(success=True, message_id="msg_1") send_result_2 = SimpleNamespace(success=True, message_id="msg_2") edit_result = SimpleNamespace(success=True) adapter.send = AsyncMock(side_effect=[send_result_1, send_result_2]) adapter.edit_message = AsyncMock(return_value=edit_result) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) consumer = GatewayStreamConsumer(adapter, "chat_123", config) # Phase 1: intermediate text before tool calls consumer.on_delta("Let me search for that...") # Tool boundary — model is about to call tools consumer.on_delta(None) # Phase 2: final response text after tools finished consumer.on_delta("Here are the results.") consumer.finish() await consumer.run() # Should have sent TWO separate messages (two adapter.send calls), # not just edited the first one. assert adapter.send.call_count == 2 first_text = adapter.send.call_args_list[0][1]["content"] second_text = adapter.send.call_args_list[1][1]["content"] assert "search" in first_text assert "results" in second_text @pytest.mark.asyncio async def test_segment_break_no_text_before(self): """A None boundary with no preceding text is a no-op.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") adapter.send = AsyncMock(return_value=send_result) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) consumer = GatewayStreamConsumer(adapter, "chat_123", config) # No text before the boundary — model went straight to tool calls consumer.on_delta(None) consumer.on_delta("Final answer.") consumer.finish() await consumer.run() # Only one send call (the final answer) assert adapter.send.call_count == 1 assert "Final answer" in adapter.send.call_args_list[0][1]["content"] @pytest.mark.asyncio async def test_segment_break_removes_cursor(self): """The finalized segment message should not have a cursor.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") edit_result = SimpleNamespace(success=True) adapter.send = AsyncMock(return_value=send_result) adapter.edit_message = AsyncMock(return_value=edit_result) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") consumer = GatewayStreamConsumer(adapter, "chat_123", config) consumer.on_delta("Thinking...") consumer.on_delta(None) consumer.on_delta("Done.") consumer.finish() await consumer.run() # The first segment should have been finalized without cursor. # Check all edit_message calls + the initial send for the first segment. # The last state of msg_1 should NOT have the cursor. all_texts = [] for call in adapter.send.call_args_list: all_texts.append(call[1].get("content", "")) for call in adapter.edit_message.call_args_list: all_texts.append(call[1].get("content", "")) # Find the text(s) that contain "Thinking" — the finalized version # should not have the cursor. thinking_texts = [t for t in all_texts if "Thinking" in t] assert thinking_texts, "Expected at least one message with 'Thinking'" # The LAST occurrence is the finalized version assert "▉" not in thinking_texts[-1], ( f"Cursor found in finalized segment: {thinking_texts[-1]!r}" ) @pytest.mark.asyncio async def test_multiple_segment_breaks(self): """Multiple tool boundaries create multiple message segments.""" adapter = MagicMock() msg_counter = iter(["msg_1", "msg_2", "msg_3"]) adapter.send = AsyncMock( side_effect=lambda **kw: SimpleNamespace(success=True, message_id=next(msg_counter)) ) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) consumer = GatewayStreamConsumer(adapter, "chat_123", config) consumer.on_delta("Phase 1") consumer.on_delta(None) # tool boundary consumer.on_delta("Phase 2") consumer.on_delta(None) # another tool boundary consumer.on_delta("Phase 3") consumer.finish() await consumer.run() # Three separate messages assert adapter.send.call_count == 3 @pytest.mark.asyncio async def test_already_sent_stays_true_after_segment(self): """already_sent remains True after a segment break.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") adapter.send = AsyncMock(return_value=send_result) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) consumer = GatewayStreamConsumer(adapter, "chat_123", config) consumer.on_delta("Text") consumer.on_delta(None) consumer.finish() await consumer.run() assert consumer.already_sent @pytest.mark.asyncio async def test_edit_failure_sends_only_unsent_tail_at_finish(self): """If an edit fails mid-stream, send only the missing tail once at finish.""" adapter = MagicMock() send_results = [ SimpleNamespace(success=True, message_id="msg_1"), SimpleNamespace(success=True, message_id="msg_2"), ] adapter.send = AsyncMock(side_effect=send_results) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6")) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") consumer = GatewayStreamConsumer(adapter, "chat_123", config) consumer.on_delta("Hello") task = asyncio.create_task(consumer.run()) await asyncio.sleep(0.08) consumer.on_delta(" world") await asyncio.sleep(0.08) consumer.finish() await task assert adapter.send.call_count == 2 first_text = adapter.send.call_args_list[0][1]["content"] second_text = adapter.send.call_args_list[1][1]["content"] assert "Hello" in first_text assert second_text.strip() == "world" assert consumer.already_sent @pytest.mark.asyncio async def test_segment_break_clears_failed_edit_fallback_state(self): """A tool boundary after edit failure must not duplicate the next segment.""" adapter = MagicMock() send_results = [ SimpleNamespace(success=True, message_id="msg_1"), SimpleNamespace(success=True, message_id="msg_2"), ] adapter.send = AsyncMock(side_effect=send_results) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6")) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") consumer = GatewayStreamConsumer(adapter, "chat_123", config) consumer.on_delta("Hello") task = asyncio.create_task(consumer.run()) await asyncio.sleep(0.08) consumer.on_delta(" world") await asyncio.sleep(0.08) consumer.on_delta(None) consumer.on_delta("Next segment") consumer.finish() await task sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] assert sent_texts == ["Hello ▉", "Next segment"] @pytest.mark.asyncio async def test_no_message_id_enters_fallback_mode(self): """Platform returns success but no message_id (Signal) — must not re-send on every delta. Should enter fallback mode and send only the continuation at finish.""" adapter = MagicMock() # First send succeeds but returns no message_id (Signal behavior) send_result_no_id = SimpleNamespace(success=True, message_id=None) # Fallback final send succeeds send_result_final = SimpleNamespace(success=True, message_id="msg_final") adapter.send = AsyncMock(side_effect=[send_result_no_id, send_result_final]) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) consumer = GatewayStreamConsumer(adapter, "chat_123", config) consumer.on_delta("Hello") task = asyncio.create_task(consumer.run()) await asyncio.sleep(0.08) consumer.on_delta(" world, this is a longer response.") await asyncio.sleep(0.08) consumer.finish() await task # Should send exactly 2 messages: initial chunk + fallback continuation # NOT one message per delta assert adapter.send.call_count == 2 assert consumer.already_sent # edit_message should NOT have been called (no valid message_id to edit) adapter.edit_message.assert_not_called() @pytest.mark.asyncio async def test_no_message_id_single_delta_marks_already_sent(self): """When the entire response fits in one delta and platform returns no message_id, already_sent must still be True to prevent the gateway from re-sending the full response.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id=None) adapter.send = AsyncMock(return_value=send_result) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) consumer = GatewayStreamConsumer(adapter, "chat_123", config) consumer.on_delta("Short response.") consumer.finish() await consumer.run() assert consumer.already_sent # Only one send call (the initial message) assert adapter.send.call_count == 1 @pytest.mark.asyncio async def test_no_message_id_segment_breaks_do_not_resend(self): """On a platform that never returns a message_id (e.g. webhook with github_comment delivery), tool-call segment breaks must NOT trigger a new adapter.send() per boundary. The fix: _message_id == '__no_edit__' suppresses the reset so all text accumulates and is sent once.""" adapter = MagicMock() # No message_id on first send, then one more for the fallback final adapter.send = AsyncMock(side_effect=[ SimpleNamespace(success=True, message_id=None), SimpleNamespace(success=True, message_id=None), ]) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) consumer = GatewayStreamConsumer(adapter, "chat_123", config) # Simulate: text → tool boundary → text → tool boundary → text (3 segments) consumer.on_delta("Phase 1 text") consumer.on_delta(None) # tool call boundary consumer.on_delta("Phase 2 text") consumer.on_delta(None) # another tool call boundary consumer.on_delta("Phase 3 text") consumer.finish() await consumer.run() # Before the fix this would post 3 comments (one per segment). # After the fix: only the initial partial + one fallback-final continuation. assert adapter.send.call_count == 2, ( f"Expected 2 sends (initial + fallback), got {adapter.send.call_count}" ) assert consumer.already_sent # The continuation must contain the text from segments 2 and 3 final_text = adapter.send.call_args_list[1][1]["content"] assert "Phase 2" in final_text assert "Phase 3" in final_text @pytest.mark.asyncio async def test_fallback_final_splits_long_continuation_without_dropping_text(self): """Long continuation tails should be chunked when fallback final-send runs.""" adapter = MagicMock() adapter.send = AsyncMock(side_effect=[ SimpleNamespace(success=True, message_id="msg_1"), SimpleNamespace(success=True, message_id="msg_2"), SimpleNamespace(success=True, message_id="msg_3"), ]) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=False, error="flood_control:6")) adapter.MAX_MESSAGE_LENGTH = 610 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉") consumer = GatewayStreamConsumer(adapter, "chat_123", config) prefix = "Hello world" tail = "x" * 620 consumer.on_delta(prefix) task = asyncio.create_task(consumer.run()) await asyncio.sleep(0.08) consumer.on_delta(tail) await asyncio.sleep(0.08) consumer.finish() await task sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] assert len(sent_texts) == 3 assert sent_texts[0].startswith(prefix) assert sum(len(t) for t in sent_texts[1:]) == len(tail) class TestInterimCommentaryMessages: @pytest.mark.asyncio async def test_commentary_message_stays_separate_from_final_stream(self): adapter = MagicMock() adapter.send = AsyncMock(side_effect=[ SimpleNamespace(success=True, message_id="msg_1"), SimpleNamespace(success=True, message_id="msg_2"), ]) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_123", StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), ) consumer.on_commentary("I'll inspect the repository first.") consumer.on_delta("Done.") consumer.finish() await consumer.run() sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] assert sent_texts == ["I'll inspect the repository first.", "Done."] assert consumer.final_response_sent is True @pytest.mark.asyncio async def test_failed_final_send_does_not_mark_final_response_sent(self): adapter = MagicMock() adapter.send = AsyncMock(return_value=SimpleNamespace(success=False, message_id=None)) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_123", StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), ) consumer.on_delta("Done.") consumer.finish() await consumer.run() assert consumer.final_response_sent is False assert consumer.already_sent is False @pytest.mark.asyncio async def test_success_without_message_id_marks_visible_and_sends_only_tail(self): adapter = MagicMock() adapter.send = AsyncMock(side_effect=[ SimpleNamespace(success=True, message_id=None), SimpleNamespace(success=True, message_id=None), ]) adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_123", StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor=" ▉"), ) consumer.on_delta("Hello") task = asyncio.create_task(consumer.run()) await asyncio.sleep(0.08) consumer.on_delta(" world") await asyncio.sleep(0.08) consumer.finish() await task sent_texts = [call[1]["content"] for call in adapter.send.call_args_list] assert sent_texts == ["Hello ▉", "world"] assert consumer.already_sent is True assert consumer.final_response_sent is True class TestCancelledConsumerSetsFlags: """Cancellation must set final_response_sent when already_sent is True. The 5-second stream_task timeout in gateway/run.py can cancel the consumer while it's still processing. If final_response_sent stays False, the gateway falls through to the normal send path and the user sees a duplicate message. """ @pytest.mark.asyncio async def test_cancelled_with_already_sent_marks_final_response_sent(self): """Cancelling after content was sent should set final_response_sent.""" adapter = MagicMock() adapter.send = AsyncMock( return_value=SimpleNamespace(success=True, message_id="msg_1") ) adapter.edit_message = AsyncMock( return_value=SimpleNamespace(success=True) ) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_123", StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), ) # Stream some text — the consumer sends it and sets already_sent consumer.on_delta("Hello world") task = asyncio.create_task(consumer.run()) await asyncio.sleep(0.08) assert consumer.already_sent is True # Cancel the task (simulates the 5-second timeout in gateway) task.cancel() try: await task except asyncio.CancelledError: pass # The fix: final_response_sent should be True even though _DONE # was never processed, preventing a duplicate message. assert consumer.final_response_sent is True @pytest.mark.asyncio async def test_cancelled_without_any_sends_does_not_mark_final(self): """Cancelling before anything was sent should NOT set final_response_sent.""" adapter = MagicMock() adapter.send = AsyncMock( return_value=SimpleNamespace(success=False, message_id=None) ) adapter.edit_message = AsyncMock( return_value=SimpleNamespace(success=True) ) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_123", StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), ) # Send fails — already_sent stays False consumer.on_delta("x") task = asyncio.create_task(consumer.run()) await asyncio.sleep(0.08) assert consumer.already_sent is False task.cancel() try: await task except asyncio.CancelledError: pass # Without a successful send, final_response_sent should stay False # so the normal gateway send path can deliver the response. assert consumer.final_response_sent is False # ── Think-block filtering unit tests ───────────────────────────────────── def _make_consumer() -> GatewayStreamConsumer: """Create a bare consumer for unit-testing the filter (no adapter needed).""" adapter = MagicMock() return GatewayStreamConsumer(adapter, "chat_test") class TestFilterAndAccumulate: """Unit tests for _filter_and_accumulate think-block suppression.""" def test_plain_text_passes_through(self): c = _make_consumer() c._filter_and_accumulate("Hello world") assert c._accumulated == "Hello world" def test_complete_think_block_stripped(self): c = _make_consumer() c._filter_and_accumulate("internal reasoningAnswer here") assert c._accumulated == "Answer here" def test_think_block_in_middle(self): c = _make_consumer() c._filter_and_accumulate("Prefix\nreasoning\nSuffix") assert c._accumulated == "Prefix\n\nSuffix" def test_think_block_split_across_deltas(self): c = _make_consumer() c._filter_and_accumulate("start of") c._filter_and_accumulate(" reasoningvisible text") assert c._accumulated == "visible text" def test_opening_tag_split_across_deltas(self): c = _make_consumer() c._filter_and_accumulate("hiddenshown") assert c._accumulated == "shown" def test_closing_tag_split_across_deltas(self): c = _make_consumer() c._filter_and_accumulate("hiddenshown") assert c._accumulated == "shown" def test_multiple_think_blocks(self): c = _make_consumer() # Consecutive blocks with no text between them — both stripped c._filter_and_accumulate( "block1block2visible" ) assert c._accumulated == "visible" def test_multiple_think_blocks_with_text_between(self): """Think tag after non-whitespace is NOT a boundary (prose safety).""" c = _make_consumer() c._filter_and_accumulate( "block1Ablock2B" ) # Second follows 'A' (not a block boundary) — treated as prose assert "A" in c._accumulated assert "B" in c._accumulated def test_thinking_tag_variant(self): c = _make_consumer() c._filter_and_accumulate("deep thoughtResult") assert c._accumulated == "Result" def test_thought_tag_variant(self): c = _make_consumer() c._filter_and_accumulate("Gemma styleOutput") assert c._accumulated == "Output" def test_reasoning_scratchpad_variant(self): c = _make_consumer() c._filter_and_accumulate( "long planDone" ) assert c._accumulated == "Done" def test_case_insensitive_THINKING(self): c = _make_consumer() c._filter_and_accumulate("capsanswer") assert c._accumulated == "answer" def test_prose_mention_not_stripped(self): """ mentioned mid-line in prose should NOT trigger filtering.""" c = _make_consumer() c._filter_and_accumulate("The tag is used for reasoning") assert "" in c._accumulated assert "used for reasoning" in c._accumulated def test_prose_mention_after_text(self): """ after non-whitespace on same line is not a block boundary.""" c = _make_consumer() c._filter_and_accumulate("Try using some content tags") assert "" in c._accumulated def test_think_at_line_start_is_stripped(self): """ at start of a new line IS a block boundary.""" c = _make_consumer() c._filter_and_accumulate("Previous line\nreasoningNext") assert "Previous line\nNext" == c._accumulated def test_think_with_only_whitespace_before(self): """ preceded by only whitespace on its line is a boundary.""" c = _make_consumer() c._filter_and_accumulate(" hiddenvisible") # Leading whitespace before the tag is emitted, then block is stripped assert c._accumulated == " visible" def test_flush_think_buffer_on_non_tag(self): """Partial tag that turns out not to be a tag is flushed.""" c = _make_consumer() c._filter_and_accumulate("still thinking") c._flush_think_buffer() assert c._accumulated == "" def test_unclosed_think_block_suppresses(self): """An unclosed suppresses all subsequent content.""" c = _make_consumer() c._filter_and_accumulate("Before\nreasoning that never ends...") assert c._accumulated == "Before\n" def test_multiline_think_block(self): c = _make_consumer() c._filter_and_accumulate( "\nLine 1\nLine 2\nLine 3\nFinal answer" ) assert c._accumulated == "Final answer" def test_segment_reset_preserves_think_state(self): """_reset_segment_state should NOT clear think-block filter state.""" c = _make_consumer() c._filter_and_accumulate("start") c._reset_segment_state() # Still inside think block — subsequent text should be suppressed c._filter_and_accumulate("still hiddenvisible") assert c._accumulated == "visible" class TestFilterAndAccumulateIntegration: """Integration: verify think blocks don't leak through the full run() path.""" @pytest.mark.asyncio async def test_think_block_not_sent_to_platform(self): """Think blocks should be filtered before platform edit.""" adapter = MagicMock() adapter.send = AsyncMock( return_value=SimpleNamespace(success=True, message_id="msg_1") ) adapter.edit_message = AsyncMock( return_value=SimpleNamespace(success=True) ) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer( adapter, "chat_test", StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), ) # Simulate streaming: think block then visible text consumer.on_delta("deep reasoning here") consumer.on_delta("The answer is 42.") consumer.finish() task = asyncio.create_task(consumer.run()) await asyncio.sleep(0.15) # The final text sent to the platform should NOT contain all_calls = list(adapter.send.call_args_list) + list( adapter.edit_message.call_args_list ) for call in all_calls: args, kwargs = call content = kwargs.get("content") or (args[0] if args else "") assert "" not in content, f"Think tag leaked: {content}" assert "deep reasoning" not in content try: task.cancel() await task except asyncio.CancelledError: pass