* fix: thread safety for concurrent subagent delegation Four thread-safety fixes that prevent crashes and data races when running multiple subagents concurrently via delegate_task: 1. Remove redirect_stdout/stderr from delegate_tool — mutating global sys.stdout races with the spinner thread when multiple children start concurrently, causing segfaults. Children already run with quiet_mode=True so the redirect was redundant. 2. Split _run_single_child into _build_child_agent (main thread) + _run_single_child (worker thread). AIAgent construction creates httpx/SSL clients which are not thread-safe to initialize concurrently. 3. Add threading.Lock to SessionDB — subagents share the parent's SessionDB and call create_session/append_message from worker threads with no synchronization. 4. Add _active_children_lock to AIAgent — interrupt() iterates _active_children while worker threads append/remove children. 5. Add _client_cache_lock to auxiliary_client — multiple subagent threads may resolve clients concurrently via call_llm(). Based on PR #1471 by peteromallet. * feat: Honcho base_url override via config.yaml + quick command alias type Two features salvaged from PR #1576: 1. Honcho base_url override: allows pointing Hermes at a remote self-hosted Honcho deployment via config.yaml: honcho: base_url: "http://192.168.x.x:8000" When set, this overrides the Honcho SDK's environment mapping (production/local), enabling LAN/VPN Honcho deployments without requiring the server to live on localhost. Uses config.yaml instead of env var (HONCHO_URL) per project convention. 2. Quick command alias type: adds a new 'alias' quick command type that rewrites to another slash command before normal dispatch: quick_commands: sc: type: alias target: /context Supports both CLI and gateway. Arguments are forwarded to the target command. Based on PR #1576 by redhelix. --------- Co-authored-by: peteromallet <peteromallet@users.noreply.github.com> Co-authored-by: redhelix <redhelix@users.noreply.github.com>
173 lines
6.5 KiB
Python
173 lines
6.5 KiB
Python
"""End-to-end test simulating CLI interrupt during subagent execution.
|
|
|
|
Reproduces the exact scenario:
|
|
1. Parent agent calls delegate_task
|
|
2. Child agent is running (simulated with a slow tool)
|
|
3. User "types a message" (simulated by calling parent.interrupt from another thread)
|
|
4. Child should detect the interrupt and stop
|
|
|
|
This tests the COMPLETE path including _run_single_child, _active_children
|
|
registration, interrupt propagation, and child detection.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import queue
|
|
import threading
|
|
import time
|
|
import unittest
|
|
from unittest.mock import MagicMock, patch, PropertyMock
|
|
|
|
from tools.interrupt import set_interrupt, is_interrupted
|
|
|
|
|
|
class TestCLISubagentInterrupt(unittest.TestCase):
|
|
"""Simulate exact CLI scenario."""
|
|
|
|
def setUp(self):
|
|
set_interrupt(False)
|
|
|
|
def tearDown(self):
|
|
set_interrupt(False)
|
|
|
|
def test_full_delegate_interrupt_flow(self):
|
|
"""Full integration: parent runs delegate_task, main thread interrupts."""
|
|
from run_agent import AIAgent
|
|
|
|
interrupt_detected = threading.Event()
|
|
child_started = threading.Event()
|
|
child_api_call_count = 0
|
|
|
|
# Create a real-enough parent agent
|
|
parent = AIAgent.__new__(AIAgent)
|
|
parent._interrupt_requested = False
|
|
parent._interrupt_message = None
|
|
parent._active_children = []
|
|
parent._active_children_lock = threading.Lock()
|
|
parent.quiet_mode = True
|
|
parent.model = "test/model"
|
|
parent.base_url = "http://localhost:1"
|
|
parent.api_key = "test"
|
|
parent.provider = "test"
|
|
parent.api_mode = "chat_completions"
|
|
parent.platform = "cli"
|
|
parent.enabled_toolsets = ["terminal", "file"]
|
|
parent.providers_allowed = None
|
|
parent.providers_ignored = None
|
|
parent.providers_order = None
|
|
parent.provider_sort = None
|
|
parent.max_tokens = None
|
|
parent.reasoning_config = None
|
|
parent.prefill_messages = None
|
|
parent._session_db = None
|
|
parent._delegate_depth = 0
|
|
parent._delegate_spinner = None
|
|
parent.tool_progress_callback = None
|
|
|
|
# We'll track what happens with _active_children
|
|
original_children = parent._active_children
|
|
|
|
# Mock the child's run_conversation to simulate a slow operation
|
|
# that checks _interrupt_requested like the real one does
|
|
def mock_child_run_conversation(user_message, **kwargs):
|
|
child_started.set()
|
|
# Find the child in parent._active_children
|
|
child = parent._active_children[-1] if parent._active_children else None
|
|
|
|
# Simulate the agent loop: poll _interrupt_requested like run_conversation does
|
|
for i in range(100): # Up to 10 seconds (100 * 0.1s)
|
|
if child and child._interrupt_requested:
|
|
interrupt_detected.set()
|
|
return {
|
|
"final_response": "Interrupted!",
|
|
"messages": [],
|
|
"api_calls": 1,
|
|
"completed": False,
|
|
"interrupted": True,
|
|
"interrupt_message": child._interrupt_message,
|
|
}
|
|
time.sleep(0.1)
|
|
|
|
return {
|
|
"final_response": "Finished without interrupt",
|
|
"messages": [],
|
|
"api_calls": 5,
|
|
"completed": True,
|
|
"interrupted": False,
|
|
}
|
|
|
|
# Patch AIAgent to use our mock
|
|
from tools.delegate_tool import _run_single_child
|
|
from run_agent import IterationBudget
|
|
|
|
parent.iteration_budget = IterationBudget(max_total=100)
|
|
|
|
# Run delegate in a thread (simulates agent_thread)
|
|
delegate_result = [None]
|
|
delegate_error = [None]
|
|
|
|
def run_delegate():
|
|
try:
|
|
with patch('run_agent.AIAgent') as MockAgent:
|
|
mock_instance = MagicMock()
|
|
mock_instance._interrupt_requested = False
|
|
mock_instance._interrupt_message = None
|
|
mock_instance._active_children = []
|
|
mock_instance._active_children_lock = threading.Lock()
|
|
mock_instance.quiet_mode = True
|
|
mock_instance.run_conversation = mock_child_run_conversation
|
|
mock_instance.interrupt = lambda msg=None: setattr(mock_instance, '_interrupt_requested', True) or setattr(mock_instance, '_interrupt_message', msg)
|
|
mock_instance.tools = []
|
|
MockAgent.return_value = mock_instance
|
|
|
|
# Register child manually (normally done by _build_child_agent)
|
|
parent._active_children.append(mock_instance)
|
|
|
|
result = _run_single_child(
|
|
task_index=0,
|
|
goal="Do something slow",
|
|
child=mock_instance,
|
|
parent_agent=parent,
|
|
)
|
|
delegate_result[0] = result
|
|
except Exception as e:
|
|
delegate_error[0] = e
|
|
|
|
agent_thread = threading.Thread(target=run_delegate, daemon=True)
|
|
agent_thread.start()
|
|
|
|
# Wait for child to start
|
|
assert child_started.wait(timeout=5), "Child never started!"
|
|
|
|
# Now simulate user interrupt (from main/process thread)
|
|
time.sleep(0.2) # Give child a moment to be in its loop
|
|
|
|
print(f"Parent has {len(parent._active_children)} active children")
|
|
assert len(parent._active_children) >= 1, f"Expected child in _active_children, got {len(parent._active_children)}"
|
|
|
|
# This is what the CLI does:
|
|
parent.interrupt("Hey stop that")
|
|
|
|
print(f"Parent._interrupt_requested: {parent._interrupt_requested}")
|
|
for i, child in enumerate(parent._active_children):
|
|
print(f"Child {i}._interrupt_requested: {child._interrupt_requested}")
|
|
|
|
# Wait for child to detect interrupt
|
|
detected = interrupt_detected.wait(timeout=3.0)
|
|
|
|
# Wait for delegate to finish
|
|
agent_thread.join(timeout=5)
|
|
|
|
if delegate_error[0]:
|
|
raise delegate_error[0]
|
|
|
|
assert detected, "Child never detected the interrupt!"
|
|
result = delegate_result[0]
|
|
assert result is not None, "Delegate returned no result"
|
|
assert result["status"] == "interrupted", f"Expected 'interrupted', got '{result['status']}'"
|
|
print(f"✓ Interrupt detected! Result: {result}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|