From dd69f16c3e06a069d52a4ee8d44963ea2dcd8dbd Mon Sep 17 00:00:00 2001 From: lila <137614867+cutepawss@users.noreply.github.com> Date: Sun, 1 Mar 2026 10:18:00 +0300 Subject: [PATCH] feat(gateway): expose subagent tool calls and thinking to user (fixes #169) (#186) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When subagents run via delegate_task, the user now sees real-time progress instead of silence: CLI: tree-view activity lines print above the delegation spinner 🔀 Delegating: research quantum computing ├─ 💭 "I'll search for papers first..." ├─ 🔍 web_search "quantum computing" ├─ 📖 read_file "paper.pdf" └─ ⠹ working... (18.2s) Gateway (Telegram/Discord): batched progress summaries sent every 5 tool calls to avoid message spam. Remaining tools flushed on subagent completion. Changes: - agent/display.py: add KawaiiSpinner.print_above() to print status lines above an active spinner without disrupting animation. Uses captured stdout (self._out) so it works inside the child's redirect_stdout(devnull). - tools/delegate_tool.py: add _build_child_progress_callback() that creates a per-child callback relaying tool calls and thinking events to the parent's spinner (CLI) or progress queue (gateway). Each child gets its own callback instance, so parallel subagents don't share state. Includes _flush() for gateway batch completion. - run_agent.py: fire tool_progress_callback with '_thinking' event when the model produces text content. Guarded by _delegate_depth > 0 so only subagents fire this (prevents gateway spam from main agent). REASONING_SCRATCHPAD/think/ reasoning XML tags are stripped before display. Tests: 21 new tests covering print_above, callback builder, thinking relay, SCRATCHPAD filtering, batching, flush, thread isolation, delegate_depth guard, and prefix handling. --- agent/display.py | 15 ++ run_agent.py | 18 ++ tests/agent/test_subagent_progress.py | 367 ++++++++++++++++++++++++++ tools/delegate_tool.py | 111 ++++++-- 4 files changed, 489 insertions(+), 22 deletions(-) create mode 100644 tests/agent/test_subagent_progress.py diff --git a/agent/display.py b/agent/display.py index 6f65c5d35..19acc67d9 100644 --- a/agent/display.py +++ b/agent/display.py @@ -199,6 +199,21 @@ class KawaiiSpinner: def update_text(self, new_message: str): self.message = new_message + def print_above(self, text: str): + """Print a line above the spinner without disrupting animation. + + Clears the current spinner line, prints the text, and lets the + next animation tick redraw the spinner on the line below. + Thread-safe: uses the captured stdout reference (self._out). + Works inside redirect_stdout(devnull) because _write bypasses + sys.stdout and writes to the stdout captured at spinner creation. + """ + if not self.running: + self._write(f" {text}", flush=True) + return + # Clear spinner line, print text above, spinner redraws on next tick + self._write(f"\r\033[K {text}", flush=True) + def stop(self, final_message: str = None): self.running = False if self.thread: diff --git a/run_agent.py b/run_agent.py index 8cd3b157b..32b69489f 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3312,6 +3312,24 @@ class AIAgent: # Handle assistant response if assistant_message.content and not self.quiet_mode: print(f"{self.log_prefix}🤖 Assistant: {assistant_message.content[:100]}{'...' if len(assistant_message.content) > 100 else ''}") + + # Notify progress callback of model's thinking (used by subagent + # delegation to relay the child's reasoning to the parent display). + # Guard: only fire for subagents (_delegate_depth >= 1) to avoid + # spamming gateway platforms with the main agent's every thought. + if (assistant_message.content and self.tool_progress_callback + and getattr(self, '_delegate_depth', 0) > 0): + _think_text = assistant_message.content.strip() + # Strip reasoning XML tags that shouldn't leak to parent display + _think_text = re.sub( + r'', '', _think_text + ).strip() + first_line = _think_text.split('\n')[0][:80] if _think_text else "" + if first_line: + try: + self.tool_progress_callback("_thinking", first_line) + except Exception: + pass # Check for incomplete (opened but never closed) # This means the model ran out of output tokens mid-reasoning — retry up to 2 times diff --git a/tests/agent/test_subagent_progress.py b/tests/agent/test_subagent_progress.py new file mode 100644 index 000000000..0ff4fcb83 --- /dev/null +++ b/tests/agent/test_subagent_progress.py @@ -0,0 +1,367 @@ +""" +Tests for subagent progress relay (issue #169). + +Verifies that: +- KawaiiSpinner.print_above() works with and without active spinner +- _build_child_progress_callback handles CLI/gateway/no-display paths +- Thinking events are relayed correctly +- Parallel callbacks don't share state +""" + +import io +import sys +import time +import threading +import pytest +from unittest.mock import MagicMock, patch + +from agent.display import KawaiiSpinner +from tools.delegate_tool import _build_child_progress_callback + + +# ========================================================================= +# KawaiiSpinner.print_above tests +# ========================================================================= + +class TestPrintAbove: + """Tests for KawaiiSpinner.print_above method.""" + + def test_print_above_without_spinner_running(self): + """print_above should write to stdout even when spinner is not running.""" + buf = io.StringIO() + spinner = KawaiiSpinner("test") + spinner._out = buf # Redirect to buffer + + spinner.print_above("hello world") + output = buf.getvalue() + assert "hello world" in output + + def test_print_above_with_spinner_running(self): + """print_above should clear spinner line and print text.""" + buf = io.StringIO() + spinner = KawaiiSpinner("test") + spinner._out = buf + spinner.running = True # Pretend spinner is running (don't start thread) + + spinner.print_above("tool line") + output = buf.getvalue() + assert "tool line" in output + assert "\r\033[K" in output # Should start with line clear + + def test_print_above_uses_captured_stdout(self): + """print_above should use self._out, not sys.stdout. + This ensures it works inside redirect_stdout(devnull).""" + buf = io.StringIO() + spinner = KawaiiSpinner("test") + spinner._out = buf + + # Simulate redirect_stdout(devnull) + old_stdout = sys.stdout + sys.stdout = io.StringIO() + try: + spinner.print_above("should go to buf") + finally: + sys.stdout = old_stdout + + assert "should go to buf" in buf.getvalue() + + +# ========================================================================= +# _build_child_progress_callback tests +# ========================================================================= + +class TestBuildChildProgressCallback: + """Tests for child progress callback builder.""" + + def test_returns_none_when_no_display(self): + """Should return None when parent has no spinner or callback.""" + parent = MagicMock() + parent._delegate_spinner = None + parent.tool_progress_callback = None + + cb = _build_child_progress_callback(0, parent) + assert cb is None + + def test_cli_spinner_tool_event(self): + """Should print tool line above spinner for CLI path.""" + buf = io.StringIO() + spinner = KawaiiSpinner("delegating") + spinner._out = buf + spinner.running = True + + parent = MagicMock() + parent._delegate_spinner = spinner + parent.tool_progress_callback = None + + cb = _build_child_progress_callback(0, parent) + assert cb is not None + + cb("web_search", "quantum computing") + output = buf.getvalue() + assert "web_search" in output + assert "quantum computing" in output + assert "├─" in output + + def test_cli_spinner_thinking_event(self): + """Should print thinking line above spinner for CLI path.""" + buf = io.StringIO() + spinner = KawaiiSpinner("delegating") + spinner._out = buf + spinner.running = True + + parent = MagicMock() + parent._delegate_spinner = spinner + parent.tool_progress_callback = None + + cb = _build_child_progress_callback(0, parent) + cb("_thinking", "I'll search for papers first") + + output = buf.getvalue() + assert "💭" in output + assert "search for papers" in output + + def test_gateway_batched_progress(self): + """Gateway path should batch tool calls and flush at BATCH_SIZE.""" + parent = MagicMock() + parent._delegate_spinner = None + parent_cb = MagicMock() + parent.tool_progress_callback = parent_cb + + cb = _build_child_progress_callback(0, parent) + + # Send 4 tool calls — shouldn't flush yet (BATCH_SIZE = 5) + for i in range(4): + cb(f"tool_{i}", f"arg_{i}") + parent_cb.assert_not_called() + + # 5th call should trigger flush + cb("tool_4", "arg_4") + parent_cb.assert_called_once() + call_args = parent_cb.call_args + assert "tool_0" in call_args[0][1] + assert "tool_4" in call_args[0][1] + + def test_thinking_not_relayed_to_gateway(self): + """Thinking events should NOT be sent to gateway (too noisy).""" + parent = MagicMock() + parent._delegate_spinner = None + parent_cb = MagicMock() + parent.tool_progress_callback = parent_cb + + cb = _build_child_progress_callback(0, parent) + cb("_thinking", "some reasoning text") + + parent_cb.assert_not_called() + + def test_parallel_callbacks_independent(self): + """Each child's callback should have independent batch state.""" + parent = MagicMock() + parent._delegate_spinner = None + parent_cb = MagicMock() + parent.tool_progress_callback = parent_cb + + cb0 = _build_child_progress_callback(0, parent) + cb1 = _build_child_progress_callback(1, parent) + + # Send 3 calls to each — neither should flush (batch size = 5) + for i in range(3): + cb0(f"tool_{i}") + cb1(f"other_{i}") + + parent_cb.assert_not_called() + + def test_task_index_prefix_in_output(self): + """Multi-task mode should show task index prefix.""" + buf = io.StringIO() + spinner = KawaiiSpinner("delegating") + spinner._out = buf + spinner.running = True + + parent = MagicMock() + parent._delegate_spinner = spinner + parent.tool_progress_callback = None + + # task_index > 0 should add prefix + cb = _build_child_progress_callback(2, parent) + cb("web_search", "test") + + output = buf.getvalue() + assert "[2]" in output + + def test_task_index_zero_no_prefix(self): + """Single task (index 0) should not show index prefix.""" + buf = io.StringIO() + spinner = KawaiiSpinner("delegating") + spinner._out = buf + spinner.running = True + + parent = MagicMock() + parent._delegate_spinner = spinner + parent.tool_progress_callback = None + + cb = _build_child_progress_callback(0, parent) + cb("web_search", "test") + + output = buf.getvalue() + assert "[0]" not in output + + +# ========================================================================= +# Integration: thinking callback in run_agent.py +# ========================================================================= + +class TestThinkingCallback: + """Tests for the _thinking callback in AIAgent conversation loop.""" + + def _simulate_thinking_callback(self, content, callback, delegate_depth=1): + """Simulate the exact code path from run_agent.py for the thinking callback. + + delegate_depth: simulates self._delegate_depth. + 0 = main agent (should NOT fire), >=1 = subagent (should fire). + """ + import re + if (content and callback and delegate_depth > 0): + _think_text = content.strip() + _think_text = re.sub( + r'', '', _think_text + ).strip() + first_line = _think_text.split('\n')[0][:80] if _think_text else "" + if first_line: + try: + callback("_thinking", first_line) + except Exception: + pass + + def test_thinking_callback_fires_on_content(self): + """tool_progress_callback should receive _thinking event + when assistant message has content.""" + calls = [] + self._simulate_thinking_callback( + "I'll research quantum computing first, then summarize.", + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 1 + assert calls[0][0] == "_thinking" + assert "quantum computing" in calls[0][1] + + def test_thinking_callback_skipped_when_no_content(self): + """Should not fire when assistant has no content.""" + calls = [] + self._simulate_thinking_callback( + None, + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 0 + + def test_thinking_callback_truncates_long_content(self): + """Should truncate long content to 80 chars.""" + calls = [] + self._simulate_thinking_callback( + "A" * 200 + "\nSecond line should be ignored", + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 1 + assert len(calls[0][1]) == 80 + + def test_thinking_callback_skipped_for_main_agent(self): + """Main agent (delegate_depth=0) should NOT fire thinking events. + This prevents gateway spam on Telegram/Discord.""" + calls = [] + self._simulate_thinking_callback( + "I'll help you with that request.", + lambda name, preview=None: calls.append((name, preview)), + delegate_depth=0, + ) + assert len(calls) == 0 + + def test_thinking_callback_strips_reasoning_scratchpad(self): + """REASONING_SCRATCHPAD tags should be stripped before display.""" + calls = [] + self._simulate_thinking_callback( + "I need to analyze this carefully", + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 1 + assert "" not in calls[0][1] + assert "analyze this carefully" in calls[0][1] + + def test_thinking_callback_strips_think_tags(self): + """ tags should be stripped before display.""" + calls = [] + self._simulate_thinking_callback( + "Let me think about this problem", + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 1 + assert "" not in calls[0][1] + assert "think about this problem" in calls[0][1] + + def test_thinking_callback_empty_after_strip(self): + """Should not fire when content is only XML tags.""" + calls = [] + self._simulate_thinking_callback( + "", + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 0 + + +# ========================================================================= +# Gateway batch flush tests +# ========================================================================= + +class TestBatchFlush: + """Tests for gateway batch flush on subagent completion.""" + + def test_flush_sends_remaining_batch(self): + """_flush should send remaining tool names to gateway.""" + parent = MagicMock() + parent._delegate_spinner = None + parent_cb = MagicMock() + parent.tool_progress_callback = parent_cb + + cb = _build_child_progress_callback(0, parent) + + # Send 3 tools (below batch size of 5) + cb("web_search", "query1") + cb("read_file", "file.txt") + cb("write_file", "out.txt") + parent_cb.assert_not_called() + + # Flush should send the remaining 3 + cb._flush() + parent_cb.assert_called_once() + summary = parent_cb.call_args[0][1] + assert "web_search" in summary + assert "write_file" in summary + + def test_flush_noop_when_batch_empty(self): + """_flush should not send anything when batch is empty.""" + parent = MagicMock() + parent._delegate_spinner = None + parent_cb = MagicMock() + parent.tool_progress_callback = parent_cb + + cb = _build_child_progress_callback(0, parent) + cb._flush() + parent_cb.assert_not_called() + + def test_flush_noop_when_no_parent_callback(self): + """_flush should not crash when there's no parent callback.""" + buf = io.StringIO() + spinner = KawaiiSpinner("test") + spinner._out = buf + spinner.running = True + + parent = MagicMock() + parent._delegate_spinner = spinner + parent.tool_progress_callback = None + + cb = _build_child_progress_callback(0, parent) + cb("web_search", "test") + cb._flush() # Should not crash + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) + diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py index 77659d3c2..1f5c64b5b 100644 --- a/tools/delegate_tool.py +++ b/tools/delegate_tool.py @@ -77,6 +77,84 @@ def _strip_blocked_tools(toolsets: List[str]) -> List[str]: return [t for t in toolsets if t not in blocked_toolset_names] +def _build_child_progress_callback(task_index: int, parent_agent) -> Optional[callable]: + """Build a callback that relays child agent tool calls to the parent display. + + Two display paths: + CLI: prints tree-view lines above the parent's delegation spinner + Gateway: batches tool names and relays to parent's progress callback + + Returns None if no display mechanism is available, in which case the + child agent runs with no progress callback (identical to current behavior). + """ + spinner = getattr(parent_agent, '_delegate_spinner', None) + parent_cb = getattr(parent_agent, 'tool_progress_callback', None) + + if not spinner and not parent_cb: + return None # No display → no callback → zero behavior change + + prefix = f"[{task_index}] " if task_index > 0 else "" + + # Gateway: batch tool names, flush periodically + _BATCH_SIZE = 5 + _batch: List[str] = [] + + def _callback(tool_name: str, preview: str = None): + # Special "_thinking" event: model produced text content (reasoning) + if tool_name == "_thinking": + if spinner: + short = (preview[:55] + "...") if preview and len(preview) > 55 else (preview or "") + try: + spinner.print_above(f" {prefix}├─ 💭 \"{short}\"") + except Exception: + pass + # Don't relay thinking to gateway (too noisy for chat) + return + + # Regular tool call event + if spinner: + short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "") + tool_emojis = { + "terminal": "💻", "web_search": "🔍", "web_extract": "📄", + "read_file": "📖", "write_file": "✍️", "patch": "🔧", + "search_files": "🔎", "list_directory": "📂", + "browser_navigate": "🌐", "browser_click": "👆", + "text_to_speech": "🔊", "image_generate": "🎨", + "vision_analyze": "👁️", "process": "⚙️", + } + emoji = tool_emojis.get(tool_name, "⚡") + line = f" {prefix}├─ {emoji} {tool_name}" + if short: + line += f" \"{short}\"" + try: + spinner.print_above(line) + except Exception: + pass + + if parent_cb: + _batch.append(tool_name) + if len(_batch) >= _BATCH_SIZE: + summary = ", ".join(_batch) + try: + parent_cb("subagent_progress", f"🔀 {prefix}{summary}") + except Exception: + pass + _batch.clear() + + def _flush(): + """Flush remaining batched tool names to gateway on completion.""" + if parent_cb and _batch: + summary = ", ".join(_batch) + try: + parent_cb("subagent_progress", f"🔀 {prefix}{summary}") + except Exception: + pass + _batch.clear() + + _callback._flush = _flush + return _callback + + def _run_single_child( task_index: int, goal: str, @@ -98,33 +176,15 @@ def _run_single_child( child_prompt = _build_child_system_prompt(goal, context) - # Build a progress callback that surfaces subagent tool activity. - # CLI: updates the parent's delegate spinner text. - # Gateway: forwards to the parent's progress callback (feeds message queue). - parent_progress_cb = getattr(parent_agent, 'tool_progress_callback', None) - def _child_progress(tool_name: str, preview: str = None): - tag = f"[subagent-{task_index+1}] {tool_name}" - # Update CLI spinner - spinner = getattr(parent_agent, '_delegate_spinner', None) - if spinner: - detail = f'"{preview}"' if preview else "" - try: - spinner.update_text(f"🔀 {tag} {detail}") - except Exception: - pass - # Forward to gateway progress queue - if parent_progress_cb: - try: - parent_progress_cb(tag, preview) - except Exception: - pass - try: # Extract parent's API key so subagents inherit auth (e.g. Nous Portal). parent_api_key = getattr(parent_agent, "api_key", None) if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"): parent_api_key = parent_agent._client_kwargs.get("api_key") + # Build progress callback to relay tool calls to parent display + child_progress_cb = _build_child_progress_callback(task_index, parent_agent) + child = AIAgent( base_url=parent_agent.base_url, api_key=parent_api_key, @@ -145,7 +205,7 @@ def _run_single_child( providers_ignored=parent_agent.providers_ignored, providers_order=parent_agent.providers_order, provider_sort=parent_agent.provider_sort, - tool_progress_callback=_child_progress, + tool_progress_callback=child_progress_cb, ) # Set delegation depth so children can't spawn grandchildren @@ -160,6 +220,13 @@ def _run_single_child( with contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull): result = child.run_conversation(user_message=goal) + # Flush any remaining batched progress to gateway + if child_progress_cb and hasattr(child_progress_cb, '_flush'): + try: + child_progress_cb._flush() + except Exception: + pass + duration = round(time.monotonic() - child_start, 2) summary = result.get("final_response") or ""