diff --git a/agent/display.py b/agent/display.py index 6f65c5d35..19acc67d9 100644 --- a/agent/display.py +++ b/agent/display.py @@ -199,6 +199,21 @@ class KawaiiSpinner: def update_text(self, new_message: str): self.message = new_message + def print_above(self, text: str): + """Print a line above the spinner without disrupting animation. + + Clears the current spinner line, prints the text, and lets the + next animation tick redraw the spinner on the line below. + Thread-safe: uses the captured stdout reference (self._out). + Works inside redirect_stdout(devnull) because _write bypasses + sys.stdout and writes to the stdout captured at spinner creation. + """ + if not self.running: + self._write(f" {text}", flush=True) + return + # Clear spinner line, print text above, spinner redraws on next tick + self._write(f"\r\033[K {text}", flush=True) + def stop(self, final_message: str = None): self.running = False if self.thread: diff --git a/run_agent.py b/run_agent.py index 8cd3b157b..32b69489f 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3312,6 +3312,24 @@ class AIAgent: # Handle assistant response if assistant_message.content and not self.quiet_mode: print(f"{self.log_prefix}🤖 Assistant: {assistant_message.content[:100]}{'...' if len(assistant_message.content) > 100 else ''}") + + # Notify progress callback of model's thinking (used by subagent + # delegation to relay the child's reasoning to the parent display). + # Guard: only fire for subagents (_delegate_depth >= 1) to avoid + # spamming gateway platforms with the main agent's every thought. + if (assistant_message.content and self.tool_progress_callback + and getattr(self, '_delegate_depth', 0) > 0): + _think_text = assistant_message.content.strip() + # Strip reasoning XML tags that shouldn't leak to parent display + _think_text = re.sub( + r'', '', _think_text + ).strip() + first_line = _think_text.split('\n')[0][:80] if _think_text else "" + if first_line: + try: + self.tool_progress_callback("_thinking", first_line) + except Exception: + pass # Check for incomplete (opened but never closed) # This means the model ran out of output tokens mid-reasoning — retry up to 2 times diff --git a/tests/agent/test_subagent_progress.py b/tests/agent/test_subagent_progress.py new file mode 100644 index 000000000..0ff4fcb83 --- /dev/null +++ b/tests/agent/test_subagent_progress.py @@ -0,0 +1,367 @@ +""" +Tests for subagent progress relay (issue #169). + +Verifies that: +- KawaiiSpinner.print_above() works with and without active spinner +- _build_child_progress_callback handles CLI/gateway/no-display paths +- Thinking events are relayed correctly +- Parallel callbacks don't share state +""" + +import io +import sys +import time +import threading +import pytest +from unittest.mock import MagicMock, patch + +from agent.display import KawaiiSpinner +from tools.delegate_tool import _build_child_progress_callback + + +# ========================================================================= +# KawaiiSpinner.print_above tests +# ========================================================================= + +class TestPrintAbove: + """Tests for KawaiiSpinner.print_above method.""" + + def test_print_above_without_spinner_running(self): + """print_above should write to stdout even when spinner is not running.""" + buf = io.StringIO() + spinner = KawaiiSpinner("test") + spinner._out = buf # Redirect to buffer + + spinner.print_above("hello world") + output = buf.getvalue() + assert "hello world" in output + + def test_print_above_with_spinner_running(self): + """print_above should clear spinner line and print text.""" + buf = io.StringIO() + spinner = KawaiiSpinner("test") + spinner._out = buf + spinner.running = True # Pretend spinner is running (don't start thread) + + spinner.print_above("tool line") + output = buf.getvalue() + assert "tool line" in output + assert "\r\033[K" in output # Should start with line clear + + def test_print_above_uses_captured_stdout(self): + """print_above should use self._out, not sys.stdout. + This ensures it works inside redirect_stdout(devnull).""" + buf = io.StringIO() + spinner = KawaiiSpinner("test") + spinner._out = buf + + # Simulate redirect_stdout(devnull) + old_stdout = sys.stdout + sys.stdout = io.StringIO() + try: + spinner.print_above("should go to buf") + finally: + sys.stdout = old_stdout + + assert "should go to buf" in buf.getvalue() + + +# ========================================================================= +# _build_child_progress_callback tests +# ========================================================================= + +class TestBuildChildProgressCallback: + """Tests for child progress callback builder.""" + + def test_returns_none_when_no_display(self): + """Should return None when parent has no spinner or callback.""" + parent = MagicMock() + parent._delegate_spinner = None + parent.tool_progress_callback = None + + cb = _build_child_progress_callback(0, parent) + assert cb is None + + def test_cli_spinner_tool_event(self): + """Should print tool line above spinner for CLI path.""" + buf = io.StringIO() + spinner = KawaiiSpinner("delegating") + spinner._out = buf + spinner.running = True + + parent = MagicMock() + parent._delegate_spinner = spinner + parent.tool_progress_callback = None + + cb = _build_child_progress_callback(0, parent) + assert cb is not None + + cb("web_search", "quantum computing") + output = buf.getvalue() + assert "web_search" in output + assert "quantum computing" in output + assert "├─" in output + + def test_cli_spinner_thinking_event(self): + """Should print thinking line above spinner for CLI path.""" + buf = io.StringIO() + spinner = KawaiiSpinner("delegating") + spinner._out = buf + spinner.running = True + + parent = MagicMock() + parent._delegate_spinner = spinner + parent.tool_progress_callback = None + + cb = _build_child_progress_callback(0, parent) + cb("_thinking", "I'll search for papers first") + + output = buf.getvalue() + assert "💭" in output + assert "search for papers" in output + + def test_gateway_batched_progress(self): + """Gateway path should batch tool calls and flush at BATCH_SIZE.""" + parent = MagicMock() + parent._delegate_spinner = None + parent_cb = MagicMock() + parent.tool_progress_callback = parent_cb + + cb = _build_child_progress_callback(0, parent) + + # Send 4 tool calls — shouldn't flush yet (BATCH_SIZE = 5) + for i in range(4): + cb(f"tool_{i}", f"arg_{i}") + parent_cb.assert_not_called() + + # 5th call should trigger flush + cb("tool_4", "arg_4") + parent_cb.assert_called_once() + call_args = parent_cb.call_args + assert "tool_0" in call_args[0][1] + assert "tool_4" in call_args[0][1] + + def test_thinking_not_relayed_to_gateway(self): + """Thinking events should NOT be sent to gateway (too noisy).""" + parent = MagicMock() + parent._delegate_spinner = None + parent_cb = MagicMock() + parent.tool_progress_callback = parent_cb + + cb = _build_child_progress_callback(0, parent) + cb("_thinking", "some reasoning text") + + parent_cb.assert_not_called() + + def test_parallel_callbacks_independent(self): + """Each child's callback should have independent batch state.""" + parent = MagicMock() + parent._delegate_spinner = None + parent_cb = MagicMock() + parent.tool_progress_callback = parent_cb + + cb0 = _build_child_progress_callback(0, parent) + cb1 = _build_child_progress_callback(1, parent) + + # Send 3 calls to each — neither should flush (batch size = 5) + for i in range(3): + cb0(f"tool_{i}") + cb1(f"other_{i}") + + parent_cb.assert_not_called() + + def test_task_index_prefix_in_output(self): + """Multi-task mode should show task index prefix.""" + buf = io.StringIO() + spinner = KawaiiSpinner("delegating") + spinner._out = buf + spinner.running = True + + parent = MagicMock() + parent._delegate_spinner = spinner + parent.tool_progress_callback = None + + # task_index > 0 should add prefix + cb = _build_child_progress_callback(2, parent) + cb("web_search", "test") + + output = buf.getvalue() + assert "[2]" in output + + def test_task_index_zero_no_prefix(self): + """Single task (index 0) should not show index prefix.""" + buf = io.StringIO() + spinner = KawaiiSpinner("delegating") + spinner._out = buf + spinner.running = True + + parent = MagicMock() + parent._delegate_spinner = spinner + parent.tool_progress_callback = None + + cb = _build_child_progress_callback(0, parent) + cb("web_search", "test") + + output = buf.getvalue() + assert "[0]" not in output + + +# ========================================================================= +# Integration: thinking callback in run_agent.py +# ========================================================================= + +class TestThinkingCallback: + """Tests for the _thinking callback in AIAgent conversation loop.""" + + def _simulate_thinking_callback(self, content, callback, delegate_depth=1): + """Simulate the exact code path from run_agent.py for the thinking callback. + + delegate_depth: simulates self._delegate_depth. + 0 = main agent (should NOT fire), >=1 = subagent (should fire). + """ + import re + if (content and callback and delegate_depth > 0): + _think_text = content.strip() + _think_text = re.sub( + r'', '', _think_text + ).strip() + first_line = _think_text.split('\n')[0][:80] if _think_text else "" + if first_line: + try: + callback("_thinking", first_line) + except Exception: + pass + + def test_thinking_callback_fires_on_content(self): + """tool_progress_callback should receive _thinking event + when assistant message has content.""" + calls = [] + self._simulate_thinking_callback( + "I'll research quantum computing first, then summarize.", + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 1 + assert calls[0][0] == "_thinking" + assert "quantum computing" in calls[0][1] + + def test_thinking_callback_skipped_when_no_content(self): + """Should not fire when assistant has no content.""" + calls = [] + self._simulate_thinking_callback( + None, + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 0 + + def test_thinking_callback_truncates_long_content(self): + """Should truncate long content to 80 chars.""" + calls = [] + self._simulate_thinking_callback( + "A" * 200 + "\nSecond line should be ignored", + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 1 + assert len(calls[0][1]) == 80 + + def test_thinking_callback_skipped_for_main_agent(self): + """Main agent (delegate_depth=0) should NOT fire thinking events. + This prevents gateway spam on Telegram/Discord.""" + calls = [] + self._simulate_thinking_callback( + "I'll help you with that request.", + lambda name, preview=None: calls.append((name, preview)), + delegate_depth=0, + ) + assert len(calls) == 0 + + def test_thinking_callback_strips_reasoning_scratchpad(self): + """REASONING_SCRATCHPAD tags should be stripped before display.""" + calls = [] + self._simulate_thinking_callback( + "I need to analyze this carefully", + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 1 + assert "" not in calls[0][1] + assert "analyze this carefully" in calls[0][1] + + def test_thinking_callback_strips_think_tags(self): + """ tags should be stripped before display.""" + calls = [] + self._simulate_thinking_callback( + "Let me think about this problem", + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 1 + assert "" not in calls[0][1] + assert "think about this problem" in calls[0][1] + + def test_thinking_callback_empty_after_strip(self): + """Should not fire when content is only XML tags.""" + calls = [] + self._simulate_thinking_callback( + "", + lambda name, preview=None: calls.append((name, preview)) + ) + assert len(calls) == 0 + + +# ========================================================================= +# Gateway batch flush tests +# ========================================================================= + +class TestBatchFlush: + """Tests for gateway batch flush on subagent completion.""" + + def test_flush_sends_remaining_batch(self): + """_flush should send remaining tool names to gateway.""" + parent = MagicMock() + parent._delegate_spinner = None + parent_cb = MagicMock() + parent.tool_progress_callback = parent_cb + + cb = _build_child_progress_callback(0, parent) + + # Send 3 tools (below batch size of 5) + cb("web_search", "query1") + cb("read_file", "file.txt") + cb("write_file", "out.txt") + parent_cb.assert_not_called() + + # Flush should send the remaining 3 + cb._flush() + parent_cb.assert_called_once() + summary = parent_cb.call_args[0][1] + assert "web_search" in summary + assert "write_file" in summary + + def test_flush_noop_when_batch_empty(self): + """_flush should not send anything when batch is empty.""" + parent = MagicMock() + parent._delegate_spinner = None + parent_cb = MagicMock() + parent.tool_progress_callback = parent_cb + + cb = _build_child_progress_callback(0, parent) + cb._flush() + parent_cb.assert_not_called() + + def test_flush_noop_when_no_parent_callback(self): + """_flush should not crash when there's no parent callback.""" + buf = io.StringIO() + spinner = KawaiiSpinner("test") + spinner._out = buf + spinner.running = True + + parent = MagicMock() + parent._delegate_spinner = spinner + parent.tool_progress_callback = None + + cb = _build_child_progress_callback(0, parent) + cb("web_search", "test") + cb._flush() # Should not crash + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) + diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py index 77659d3c2..1f5c64b5b 100644 --- a/tools/delegate_tool.py +++ b/tools/delegate_tool.py @@ -77,6 +77,84 @@ def _strip_blocked_tools(toolsets: List[str]) -> List[str]: return [t for t in toolsets if t not in blocked_toolset_names] +def _build_child_progress_callback(task_index: int, parent_agent) -> Optional[callable]: + """Build a callback that relays child agent tool calls to the parent display. + + Two display paths: + CLI: prints tree-view lines above the parent's delegation spinner + Gateway: batches tool names and relays to parent's progress callback + + Returns None if no display mechanism is available, in which case the + child agent runs with no progress callback (identical to current behavior). + """ + spinner = getattr(parent_agent, '_delegate_spinner', None) + parent_cb = getattr(parent_agent, 'tool_progress_callback', None) + + if not spinner and not parent_cb: + return None # No display → no callback → zero behavior change + + prefix = f"[{task_index}] " if task_index > 0 else "" + + # Gateway: batch tool names, flush periodically + _BATCH_SIZE = 5 + _batch: List[str] = [] + + def _callback(tool_name: str, preview: str = None): + # Special "_thinking" event: model produced text content (reasoning) + if tool_name == "_thinking": + if spinner: + short = (preview[:55] + "...") if preview and len(preview) > 55 else (preview or "") + try: + spinner.print_above(f" {prefix}├─ 💭 \"{short}\"") + except Exception: + pass + # Don't relay thinking to gateway (too noisy for chat) + return + + # Regular tool call event + if spinner: + short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "") + tool_emojis = { + "terminal": "💻", "web_search": "🔍", "web_extract": "📄", + "read_file": "📖", "write_file": "✍️", "patch": "🔧", + "search_files": "🔎", "list_directory": "📂", + "browser_navigate": "🌐", "browser_click": "👆", + "text_to_speech": "🔊", "image_generate": "🎨", + "vision_analyze": "👁️", "process": "⚙️", + } + emoji = tool_emojis.get(tool_name, "⚡") + line = f" {prefix}├─ {emoji} {tool_name}" + if short: + line += f" \"{short}\"" + try: + spinner.print_above(line) + except Exception: + pass + + if parent_cb: + _batch.append(tool_name) + if len(_batch) >= _BATCH_SIZE: + summary = ", ".join(_batch) + try: + parent_cb("subagent_progress", f"🔀 {prefix}{summary}") + except Exception: + pass + _batch.clear() + + def _flush(): + """Flush remaining batched tool names to gateway on completion.""" + if parent_cb and _batch: + summary = ", ".join(_batch) + try: + parent_cb("subagent_progress", f"🔀 {prefix}{summary}") + except Exception: + pass + _batch.clear() + + _callback._flush = _flush + return _callback + + def _run_single_child( task_index: int, goal: str, @@ -98,33 +176,15 @@ def _run_single_child( child_prompt = _build_child_system_prompt(goal, context) - # Build a progress callback that surfaces subagent tool activity. - # CLI: updates the parent's delegate spinner text. - # Gateway: forwards to the parent's progress callback (feeds message queue). - parent_progress_cb = getattr(parent_agent, 'tool_progress_callback', None) - def _child_progress(tool_name: str, preview: str = None): - tag = f"[subagent-{task_index+1}] {tool_name}" - # Update CLI spinner - spinner = getattr(parent_agent, '_delegate_spinner', None) - if spinner: - detail = f'"{preview}"' if preview else "" - try: - spinner.update_text(f"🔀 {tag} {detail}") - except Exception: - pass - # Forward to gateway progress queue - if parent_progress_cb: - try: - parent_progress_cb(tag, preview) - except Exception: - pass - try: # Extract parent's API key so subagents inherit auth (e.g. Nous Portal). parent_api_key = getattr(parent_agent, "api_key", None) if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"): parent_api_key = parent_agent._client_kwargs.get("api_key") + # Build progress callback to relay tool calls to parent display + child_progress_cb = _build_child_progress_callback(task_index, parent_agent) + child = AIAgent( base_url=parent_agent.base_url, api_key=parent_api_key, @@ -145,7 +205,7 @@ def _run_single_child( providers_ignored=parent_agent.providers_ignored, providers_order=parent_agent.providers_order, provider_sort=parent_agent.provider_sort, - tool_progress_callback=_child_progress, + tool_progress_callback=child_progress_cb, ) # Set delegation depth so children can't spawn grandchildren @@ -160,6 +220,13 @@ def _run_single_child( with contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull): result = child.run_conversation(user_message=goal) + # Flush any remaining batched progress to gateway + if child_progress_cb and hasattr(child_progress_cb, '_flush'): + try: + child_progress_cb._flush() + except Exception: + pass + duration = round(time.monotonic() - child_start, 2) summary = result.get("final_response") or ""