feat(gateway): expose subagent tool calls and thinking to user (fixes #169) (#186)

When subagents run via delegate_task, the user now sees real-time
progress instead of silence:

CLI: tree-view activity lines print above the delegation spinner
  🔀 Delegating: research quantum computing
     ├─ 💭 "I'll search for papers first..."
     ├─ 🔍 web_search  "quantum computing"
     ├─ 📖 read_file  "paper.pdf"
     └─ ⠹ working... (18.2s)

Gateway (Telegram/Discord): batched progress summaries sent every
5 tool calls to avoid message spam. Remaining tools flushed on
subagent completion.

Changes:
- agent/display.py: add KawaiiSpinner.print_above() to print
  status lines above an active spinner without disrupting animation.
  Uses captured stdout (self._out) so it works inside the child's
  redirect_stdout(devnull).

- tools/delegate_tool.py: add _build_child_progress_callback()
  that creates a per-child callback relaying tool calls and
  thinking events to the parent's spinner (CLI) or progress
  queue (gateway). Each child gets its own callback instance,
  so parallel subagents don't share state. Includes _flush()
  for gateway batch completion.

- run_agent.py: fire tool_progress_callback with '_thinking'
  event when the model produces text content. Guarded by
  _delegate_depth > 0 so only subagents fire this (prevents
  gateway spam from main agent). REASONING_SCRATCHPAD/think/
  reasoning XML tags are stripped before display.

Tests: 21 new tests covering print_above, callback builder,
thinking relay, SCRATCHPAD filtering, batching, flush, thread
isolation, delegate_depth guard, and prefix handling.
This commit is contained in:
lila
2026-03-01 10:18:00 +03:00
committed by GitHub
parent 1db5598294
commit dd69f16c3e
4 changed files with 489 additions and 22 deletions

View File

@@ -199,6 +199,21 @@ class KawaiiSpinner:
def update_text(self, new_message: str):
self.message = new_message
def print_above(self, text: str):
"""Print a line above the spinner without disrupting animation.
Clears the current spinner line, prints the text, and lets the
next animation tick redraw the spinner on the line below.
Thread-safe: uses the captured stdout reference (self._out).
Works inside redirect_stdout(devnull) because _write bypasses
sys.stdout and writes to the stdout captured at spinner creation.
"""
if not self.running:
self._write(f" {text}", flush=True)
return
# Clear spinner line, print text above, spinner redraws on next tick
self._write(f"\r\033[K {text}", flush=True)
def stop(self, final_message: str = None):
self.running = False
if self.thread:

View File

@@ -3312,6 +3312,24 @@ class AIAgent:
# Handle assistant response
if assistant_message.content and not self.quiet_mode:
print(f"{self.log_prefix}🤖 Assistant: {assistant_message.content[:100]}{'...' if len(assistant_message.content) > 100 else ''}")
# Notify progress callback of model's thinking (used by subagent
# delegation to relay the child's reasoning to the parent display).
# Guard: only fire for subagents (_delegate_depth >= 1) to avoid
# spamming gateway platforms with the main agent's every thought.
if (assistant_message.content and self.tool_progress_callback
and getattr(self, '_delegate_depth', 0) > 0):
_think_text = assistant_message.content.strip()
# Strip reasoning XML tags that shouldn't leak to parent display
_think_text = re.sub(
r'</?(?:REASONING_SCRATCHPAD|think|reasoning)>', '', _think_text
).strip()
first_line = _think_text.split('\n')[0][:80] if _think_text else ""
if first_line:
try:
self.tool_progress_callback("_thinking", first_line)
except Exception:
pass
# Check for incomplete <REASONING_SCRATCHPAD> (opened but never closed)
# This means the model ran out of output tokens mid-reasoning — retry up to 2 times

View File

@@ -0,0 +1,367 @@
"""
Tests for subagent progress relay (issue #169).
Verifies that:
- KawaiiSpinner.print_above() works with and without active spinner
- _build_child_progress_callback handles CLI/gateway/no-display paths
- Thinking events are relayed correctly
- Parallel callbacks don't share state
"""
import io
import sys
import time
import threading
import pytest
from unittest.mock import MagicMock, patch
from agent.display import KawaiiSpinner
from tools.delegate_tool import _build_child_progress_callback
# =========================================================================
# KawaiiSpinner.print_above tests
# =========================================================================
class TestPrintAbove:
"""Tests for KawaiiSpinner.print_above method."""
def test_print_above_without_spinner_running(self):
"""print_above should write to stdout even when spinner is not running."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf # Redirect to buffer
spinner.print_above("hello world")
output = buf.getvalue()
assert "hello world" in output
def test_print_above_with_spinner_running(self):
"""print_above should clear spinner line and print text."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf
spinner.running = True # Pretend spinner is running (don't start thread)
spinner.print_above("tool line")
output = buf.getvalue()
assert "tool line" in output
assert "\r\033[K" in output # Should start with line clear
def test_print_above_uses_captured_stdout(self):
"""print_above should use self._out, not sys.stdout.
This ensures it works inside redirect_stdout(devnull)."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf
# Simulate redirect_stdout(devnull)
old_stdout = sys.stdout
sys.stdout = io.StringIO()
try:
spinner.print_above("should go to buf")
finally:
sys.stdout = old_stdout
assert "should go to buf" in buf.getvalue()
# =========================================================================
# _build_child_progress_callback tests
# =========================================================================
class TestBuildChildProgressCallback:
"""Tests for child progress callback builder."""
def test_returns_none_when_no_display(self):
"""Should return None when parent has no spinner or callback."""
parent = MagicMock()
parent._delegate_spinner = None
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
assert cb is None
def test_cli_spinner_tool_event(self):
"""Should print tool line above spinner for CLI path."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
assert cb is not None
cb("web_search", "quantum computing")
output = buf.getvalue()
assert "web_search" in output
assert "quantum computing" in output
assert "├─" in output
def test_cli_spinner_thinking_event(self):
"""Should print thinking line above spinner for CLI path."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
cb("_thinking", "I'll search for papers first")
output = buf.getvalue()
assert "💭" in output
assert "search for papers" in output
def test_gateway_batched_progress(self):
"""Gateway path should batch tool calls and flush at BATCH_SIZE."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
# Send 4 tool calls — shouldn't flush yet (BATCH_SIZE = 5)
for i in range(4):
cb(f"tool_{i}", f"arg_{i}")
parent_cb.assert_not_called()
# 5th call should trigger flush
cb("tool_4", "arg_4")
parent_cb.assert_called_once()
call_args = parent_cb.call_args
assert "tool_0" in call_args[0][1]
assert "tool_4" in call_args[0][1]
def test_thinking_not_relayed_to_gateway(self):
"""Thinking events should NOT be sent to gateway (too noisy)."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
cb("_thinking", "some reasoning text")
parent_cb.assert_not_called()
def test_parallel_callbacks_independent(self):
"""Each child's callback should have independent batch state."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb0 = _build_child_progress_callback(0, parent)
cb1 = _build_child_progress_callback(1, parent)
# Send 3 calls to each — neither should flush (batch size = 5)
for i in range(3):
cb0(f"tool_{i}")
cb1(f"other_{i}")
parent_cb.assert_not_called()
def test_task_index_prefix_in_output(self):
"""Multi-task mode should show task index prefix."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
# task_index > 0 should add prefix
cb = _build_child_progress_callback(2, parent)
cb("web_search", "test")
output = buf.getvalue()
assert "[2]" in output
def test_task_index_zero_no_prefix(self):
"""Single task (index 0) should not show index prefix."""
buf = io.StringIO()
spinner = KawaiiSpinner("delegating")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
cb("web_search", "test")
output = buf.getvalue()
assert "[0]" not in output
# =========================================================================
# Integration: thinking callback in run_agent.py
# =========================================================================
class TestThinkingCallback:
"""Tests for the _thinking callback in AIAgent conversation loop."""
def _simulate_thinking_callback(self, content, callback, delegate_depth=1):
"""Simulate the exact code path from run_agent.py for the thinking callback.
delegate_depth: simulates self._delegate_depth.
0 = main agent (should NOT fire), >=1 = subagent (should fire).
"""
import re
if (content and callback and delegate_depth > 0):
_think_text = content.strip()
_think_text = re.sub(
r'</?(?:REASONING_SCRATCHPAD|think|reasoning)>', '', _think_text
).strip()
first_line = _think_text.split('\n')[0][:80] if _think_text else ""
if first_line:
try:
callback("_thinking", first_line)
except Exception:
pass
def test_thinking_callback_fires_on_content(self):
"""tool_progress_callback should receive _thinking event
when assistant message has content."""
calls = []
self._simulate_thinking_callback(
"I'll research quantum computing first, then summarize.",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert calls[0][0] == "_thinking"
assert "quantum computing" in calls[0][1]
def test_thinking_callback_skipped_when_no_content(self):
"""Should not fire when assistant has no content."""
calls = []
self._simulate_thinking_callback(
None,
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 0
def test_thinking_callback_truncates_long_content(self):
"""Should truncate long content to 80 chars."""
calls = []
self._simulate_thinking_callback(
"A" * 200 + "\nSecond line should be ignored",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert len(calls[0][1]) == 80
def test_thinking_callback_skipped_for_main_agent(self):
"""Main agent (delegate_depth=0) should NOT fire thinking events.
This prevents gateway spam on Telegram/Discord."""
calls = []
self._simulate_thinking_callback(
"I'll help you with that request.",
lambda name, preview=None: calls.append((name, preview)),
delegate_depth=0,
)
assert len(calls) == 0
def test_thinking_callback_strips_reasoning_scratchpad(self):
"""REASONING_SCRATCHPAD tags should be stripped before display."""
calls = []
self._simulate_thinking_callback(
"<REASONING_SCRATCHPAD>I need to analyze this carefully</REASONING_SCRATCHPAD>",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert "<REASONING_SCRATCHPAD>" not in calls[0][1]
assert "analyze this carefully" in calls[0][1]
def test_thinking_callback_strips_think_tags(self):
"""<think> tags should be stripped before display."""
calls = []
self._simulate_thinking_callback(
"<think>Let me think about this problem</think>",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 1
assert "<think>" not in calls[0][1]
assert "think about this problem" in calls[0][1]
def test_thinking_callback_empty_after_strip(self):
"""Should not fire when content is only XML tags."""
calls = []
self._simulate_thinking_callback(
"<REASONING_SCRATCHPAD></REASONING_SCRATCHPAD>",
lambda name, preview=None: calls.append((name, preview))
)
assert len(calls) == 0
# =========================================================================
# Gateway batch flush tests
# =========================================================================
class TestBatchFlush:
"""Tests for gateway batch flush on subagent completion."""
def test_flush_sends_remaining_batch(self):
"""_flush should send remaining tool names to gateway."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
# Send 3 tools (below batch size of 5)
cb("web_search", "query1")
cb("read_file", "file.txt")
cb("write_file", "out.txt")
parent_cb.assert_not_called()
# Flush should send the remaining 3
cb._flush()
parent_cb.assert_called_once()
summary = parent_cb.call_args[0][1]
assert "web_search" in summary
assert "write_file" in summary
def test_flush_noop_when_batch_empty(self):
"""_flush should not send anything when batch is empty."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
parent.tool_progress_callback = parent_cb
cb = _build_child_progress_callback(0, parent)
cb._flush()
parent_cb.assert_not_called()
def test_flush_noop_when_no_parent_callback(self):
"""_flush should not crash when there's no parent callback."""
buf = io.StringIO()
spinner = KawaiiSpinner("test")
spinner._out = buf
spinner.running = True
parent = MagicMock()
parent._delegate_spinner = spinner
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent)
cb("web_search", "test")
cb._flush() # Should not crash
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -77,6 +77,84 @@ def _strip_blocked_tools(toolsets: List[str]) -> List[str]:
return [t for t in toolsets if t not in blocked_toolset_names]
def _build_child_progress_callback(task_index: int, parent_agent) -> Optional[callable]:
"""Build a callback that relays child agent tool calls to the parent display.
Two display paths:
CLI: prints tree-view lines above the parent's delegation spinner
Gateway: batches tool names and relays to parent's progress callback
Returns None if no display mechanism is available, in which case the
child agent runs with no progress callback (identical to current behavior).
"""
spinner = getattr(parent_agent, '_delegate_spinner', None)
parent_cb = getattr(parent_agent, 'tool_progress_callback', None)
if not spinner and not parent_cb:
return None # No display → no callback → zero behavior change
prefix = f"[{task_index}] " if task_index > 0 else ""
# Gateway: batch tool names, flush periodically
_BATCH_SIZE = 5
_batch: List[str] = []
def _callback(tool_name: str, preview: str = None):
# Special "_thinking" event: model produced text content (reasoning)
if tool_name == "_thinking":
if spinner:
short = (preview[:55] + "...") if preview and len(preview) > 55 else (preview or "")
try:
spinner.print_above(f" {prefix}├─ 💭 \"{short}\"")
except Exception:
pass
# Don't relay thinking to gateway (too noisy for chat)
return
# Regular tool call event
if spinner:
short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "")
tool_emojis = {
"terminal": "💻", "web_search": "🔍", "web_extract": "📄",
"read_file": "📖", "write_file": "✍️", "patch": "🔧",
"search_files": "🔎", "list_directory": "📂",
"browser_navigate": "🌐", "browser_click": "👆",
"text_to_speech": "🔊", "image_generate": "🎨",
"vision_analyze": "👁️", "process": "⚙️",
}
emoji = tool_emojis.get(tool_name, "")
line = f" {prefix}├─ {emoji} {tool_name}"
if short:
line += f" \"{short}\""
try:
spinner.print_above(line)
except Exception:
pass
if parent_cb:
_batch.append(tool_name)
if len(_batch) >= _BATCH_SIZE:
summary = ", ".join(_batch)
try:
parent_cb("subagent_progress", f"🔀 {prefix}{summary}")
except Exception:
pass
_batch.clear()
def _flush():
"""Flush remaining batched tool names to gateway on completion."""
if parent_cb and _batch:
summary = ", ".join(_batch)
try:
parent_cb("subagent_progress", f"🔀 {prefix}{summary}")
except Exception:
pass
_batch.clear()
_callback._flush = _flush
return _callback
def _run_single_child(
task_index: int,
goal: str,
@@ -98,33 +176,15 @@ def _run_single_child(
child_prompt = _build_child_system_prompt(goal, context)
# Build a progress callback that surfaces subagent tool activity.
# CLI: updates the parent's delegate spinner text.
# Gateway: forwards to the parent's progress callback (feeds message queue).
parent_progress_cb = getattr(parent_agent, 'tool_progress_callback', None)
def _child_progress(tool_name: str, preview: str = None):
tag = f"[subagent-{task_index+1}] {tool_name}"
# Update CLI spinner
spinner = getattr(parent_agent, '_delegate_spinner', None)
if spinner:
detail = f'"{preview}"' if preview else ""
try:
spinner.update_text(f"🔀 {tag} {detail}")
except Exception:
pass
# Forward to gateway progress queue
if parent_progress_cb:
try:
parent_progress_cb(tag, preview)
except Exception:
pass
try:
# Extract parent's API key so subagents inherit auth (e.g. Nous Portal).
parent_api_key = getattr(parent_agent, "api_key", None)
if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"):
parent_api_key = parent_agent._client_kwargs.get("api_key")
# Build progress callback to relay tool calls to parent display
child_progress_cb = _build_child_progress_callback(task_index, parent_agent)
child = AIAgent(
base_url=parent_agent.base_url,
api_key=parent_api_key,
@@ -145,7 +205,7 @@ def _run_single_child(
providers_ignored=parent_agent.providers_ignored,
providers_order=parent_agent.providers_order,
provider_sort=parent_agent.provider_sort,
tool_progress_callback=_child_progress,
tool_progress_callback=child_progress_cb,
)
# Set delegation depth so children can't spawn grandchildren
@@ -160,6 +220,13 @@ def _run_single_child(
with contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull):
result = child.run_conversation(user_message=goal)
# Flush any remaining batched progress to gateway
if child_progress_cb and hasattr(child_progress_cb, '_flush'):
try:
child_progress_cb._flush()
except Exception:
pass
duration = round(time.monotonic() - child_start, 2)
summary = result.get("final_response") or ""