fix: prevent reasoning box from rendering 3x during tool-calling loops (#3405)
Two independent bugs caused the reasoning box to appear three times when the model produced reasoning + tool_calls: Bug A: _build_assistant_message() re-fired reasoning_callback with the full reasoning text even when streaming had already displayed it. The original guard only checked structured reasoning_content deltas, but reasoning also arrives via content tag extraction (<REASONING_SCRATCHPAD>/<think> tags in delta.content), which went through _fire_stream_delta not _fire_reasoning_delta. Fix: skip the callback entirely when streaming is active — both paths display reasoning during the stream. Any reasoning not shown during streaming is caught by the CLI post-response fallback. Bug B: The post-response reasoning display checked _reasoning_stream_started, but that flag was reset by _reset_stream_state() during intermediate turn boundaries (when stream_delta_callback(None) fires between tool calls). Introduced _reasoning_shown_this_turn flag that persists across the tool loop and is only reset at the start of each user turn. Live-tested in PTY: reasoning now shows exactly once per API call, no duplicates across tool-calling loops.
This commit is contained in:
@@ -472,6 +472,7 @@ class TestInlineThinkBlockExtraction(unittest.TestCase):
|
||||
agent._extract_reasoning = AIAgent._extract_reasoning.__get__(agent)
|
||||
agent.verbose_logging = False
|
||||
agent.reasoning_callback = None
|
||||
agent.stream_delta_callback = None # non-streaming by default
|
||||
return agent
|
||||
|
||||
def test_single_think_block_extracted(self):
|
||||
@@ -605,5 +606,159 @@ class TestEndToEndPipeline(unittest.TestCase):
|
||||
self.assertIsNone(result["last_reasoning"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Duplicate reasoning box prevention (Bug fix: 3 boxes for 1 reasoning)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestReasoningDeltasFiredFlag(unittest.TestCase):
|
||||
"""_build_assistant_message should not re-fire reasoning_callback when
|
||||
reasoning was already streamed via _fire_reasoning_delta."""
|
||||
|
||||
def _make_agent(self):
|
||||
from run_agent import AIAgent
|
||||
agent = AIAgent.__new__(AIAgent)
|
||||
agent.reasoning_callback = None
|
||||
agent.stream_delta_callback = None
|
||||
agent._reasoning_deltas_fired = False
|
||||
agent.verbose_logging = False
|
||||
return agent
|
||||
|
||||
def test_fire_reasoning_delta_sets_flag(self):
|
||||
agent = self._make_agent()
|
||||
captured = []
|
||||
agent.reasoning_callback = lambda t: captured.append(t)
|
||||
self.assertFalse(agent._reasoning_deltas_fired)
|
||||
agent._fire_reasoning_delta("thinking...")
|
||||
self.assertTrue(agent._reasoning_deltas_fired)
|
||||
self.assertEqual(captured, ["thinking..."])
|
||||
|
||||
def test_build_assistant_message_skips_callback_when_already_streamed(self):
|
||||
"""When streaming already fired reasoning deltas, the post-stream
|
||||
_build_assistant_message should NOT re-fire the callback."""
|
||||
agent = self._make_agent()
|
||||
captured = []
|
||||
agent.reasoning_callback = lambda t: captured.append(t)
|
||||
agent.stream_delta_callback = lambda t: None # streaming is active
|
||||
|
||||
# Simulate streaming having fired reasoning
|
||||
agent._reasoning_deltas_fired = True
|
||||
|
||||
msg = SimpleNamespace(
|
||||
content="I'll merge that.",
|
||||
tool_calls=None,
|
||||
reasoning_content="Let me merge the PR.",
|
||||
reasoning=None,
|
||||
reasoning_details=None,
|
||||
)
|
||||
agent._build_assistant_message(msg, "stop")
|
||||
|
||||
# Callback should NOT have been fired again
|
||||
self.assertEqual(captured, [])
|
||||
|
||||
def test_build_assistant_message_skips_callback_when_streaming_active(self):
|
||||
"""When streaming is active, callback should NEVER fire from
|
||||
_build_assistant_message — reasoning was already displayed during the
|
||||
stream (either via reasoning_content deltas or content tag extraction).
|
||||
Any missed reasoning is caught by the CLI post-response fallback."""
|
||||
agent = self._make_agent()
|
||||
captured = []
|
||||
agent.reasoning_callback = lambda t: captured.append(t)
|
||||
agent.stream_delta_callback = lambda t: None # streaming active
|
||||
|
||||
# Even though _reasoning_deltas_fired is False (reasoning came through
|
||||
# content tags, not reasoning_content deltas), callback should not fire
|
||||
agent._reasoning_deltas_fired = False
|
||||
|
||||
msg = SimpleNamespace(
|
||||
content="I'll merge that.",
|
||||
tool_calls=None,
|
||||
reasoning_content="Let me merge the PR.",
|
||||
reasoning=None,
|
||||
reasoning_details=None,
|
||||
)
|
||||
agent._build_assistant_message(msg, "stop")
|
||||
|
||||
# Callback should NOT fire — streaming is active
|
||||
self.assertEqual(captured, [])
|
||||
|
||||
def test_build_assistant_message_fires_callback_without_streaming(self):
|
||||
"""When no streaming is active, callback always fires for structured
|
||||
reasoning."""
|
||||
agent = self._make_agent()
|
||||
captured = []
|
||||
agent.reasoning_callback = lambda t: captured.append(t)
|
||||
# No streaming
|
||||
agent.stream_delta_callback = None
|
||||
agent._reasoning_deltas_fired = False
|
||||
|
||||
msg = SimpleNamespace(
|
||||
content="I'll merge that.",
|
||||
tool_calls=None,
|
||||
reasoning_content="Let me merge the PR.",
|
||||
reasoning=None,
|
||||
reasoning_details=None,
|
||||
)
|
||||
agent._build_assistant_message(msg, "stop")
|
||||
|
||||
self.assertEqual(captured, ["Let me merge the PR."])
|
||||
|
||||
|
||||
class TestReasoningShownThisTurnFlag(unittest.TestCase):
|
||||
"""Post-response reasoning display should be suppressed when reasoning
|
||||
was already shown during streaming in a tool-calling loop."""
|
||||
|
||||
def _make_cli(self):
|
||||
from cli import HermesCLI
|
||||
cli = HermesCLI.__new__(HermesCLI)
|
||||
cli.show_reasoning = True
|
||||
cli.streaming_enabled = True
|
||||
cli._stream_box_opened = False
|
||||
cli._reasoning_box_opened = False
|
||||
cli._reasoning_stream_started = False
|
||||
cli._reasoning_shown_this_turn = False
|
||||
cli._reasoning_buf = ""
|
||||
cli._stream_buf = ""
|
||||
cli._stream_started = False
|
||||
cli._stream_text_ansi = ""
|
||||
cli._stream_prefilt = ""
|
||||
cli._in_reasoning_block = False
|
||||
cli._reasoning_preview_buf = ""
|
||||
return cli
|
||||
|
||||
@patch("cli._cprint")
|
||||
def test_streaming_reasoning_sets_turn_flag(self, mock_cprint):
|
||||
cli = self._make_cli()
|
||||
self.assertFalse(cli._reasoning_shown_this_turn)
|
||||
cli._stream_reasoning_delta("Thinking about it...")
|
||||
self.assertTrue(cli._reasoning_shown_this_turn)
|
||||
|
||||
@patch("cli._cprint")
|
||||
def test_turn_flag_survives_reset_stream_state(self, mock_cprint):
|
||||
"""_reasoning_shown_this_turn must NOT be cleared by
|
||||
_reset_stream_state (called at intermediate turn boundaries)."""
|
||||
cli = self._make_cli()
|
||||
cli._stream_reasoning_delta("Thinking...")
|
||||
self.assertTrue(cli._reasoning_shown_this_turn)
|
||||
|
||||
# Simulate intermediate turn boundary (tool call)
|
||||
cli._reset_stream_state()
|
||||
|
||||
# Flag must persist
|
||||
self.assertTrue(cli._reasoning_shown_this_turn)
|
||||
|
||||
@patch("cli._cprint")
|
||||
def test_turn_flag_cleared_before_new_turn(self, mock_cprint):
|
||||
"""The turn flag should be reset at the start of a new user turn.
|
||||
This happens outside _reset_stream_state, at the call site."""
|
||||
cli = self._make_cli()
|
||||
cli._reasoning_shown_this_turn = True
|
||||
|
||||
# Simulate new user turn setup
|
||||
cli._reset_stream_state()
|
||||
cli._reasoning_shown_this_turn = False # done by process_input
|
||||
|
||||
self.assertFalse(cli._reasoning_shown_this_turn)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user