* fix: thread safety for concurrent subagent delegation Four thread-safety fixes that prevent crashes and data races when running multiple subagents concurrently via delegate_task: 1. Remove redirect_stdout/stderr from delegate_tool — mutating global sys.stdout races with the spinner thread when multiple children start concurrently, causing segfaults. Children already run with quiet_mode=True so the redirect was redundant. 2. Split _run_single_child into _build_child_agent (main thread) + _run_single_child (worker thread). AIAgent construction creates httpx/SSL clients which are not thread-safe to initialize concurrently. 3. Add threading.Lock to SessionDB — subagents share the parent's SessionDB and call create_session/append_message from worker threads with no synchronization. 4. Add _active_children_lock to AIAgent — interrupt() iterates _active_children while worker threads append/remove children. 5. Add _client_cache_lock to auxiliary_client — multiple subagent threads may resolve clients concurrently via call_llm(). Based on PR #1471 by peteromallet. * feat: Honcho base_url override via config.yaml + quick command alias type Two features salvaged from PR #1576: 1. Honcho base_url override: allows pointing Hermes at a remote self-hosted Honcho deployment via config.yaml: honcho: base_url: "http://192.168.x.x:8000" When set, this overrides the Honcho SDK's environment mapping (production/local), enabling LAN/VPN Honcho deployments without requiring the server to live on localhost. Uses config.yaml instead of env var (HONCHO_URL) per project convention. 2. Quick command alias type: adds a new 'alias' quick command type that rewrites to another slash command before normal dispatch: quick_commands: sc: type: alias target: /context Supports both CLI and gateway. Arguments are forwarded to the target command. Based on PR #1576 by redhelix. --------- Co-authored-by: peteromallet <peteromallet@users.noreply.github.com> Co-authored-by: redhelix <redhelix@users.noreply.github.com>
162 lines
5.6 KiB
Python
162 lines
5.6 KiB
Python
"""Test interrupt propagation from parent to child agents.
|
|
|
|
Reproduces the CLI scenario: user sends a message while delegate_task is
|
|
running, main thread calls parent.interrupt(), child should stop.
|
|
"""
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
import unittest
|
|
from unittest.mock import MagicMock, patch, PropertyMock
|
|
|
|
from tools.interrupt import set_interrupt, is_interrupted, _interrupt_event
|
|
|
|
|
|
class TestInterruptPropagationToChild(unittest.TestCase):
|
|
"""Verify interrupt propagates from parent to child agent."""
|
|
|
|
def setUp(self):
|
|
set_interrupt(False)
|
|
|
|
def tearDown(self):
|
|
set_interrupt(False)
|
|
|
|
def test_parent_interrupt_sets_child_flag(self):
|
|
"""When parent.interrupt() is called, child._interrupt_requested should be set."""
|
|
from run_agent import AIAgent
|
|
|
|
parent = AIAgent.__new__(AIAgent)
|
|
parent._interrupt_requested = False
|
|
parent._interrupt_message = None
|
|
parent._active_children = []
|
|
parent._active_children_lock = threading.Lock()
|
|
parent.quiet_mode = True
|
|
|
|
child = AIAgent.__new__(AIAgent)
|
|
child._interrupt_requested = False
|
|
child._interrupt_message = None
|
|
child._active_children = []
|
|
child._active_children_lock = threading.Lock()
|
|
child.quiet_mode = True
|
|
|
|
parent._active_children.append(child)
|
|
|
|
parent.interrupt("new user message")
|
|
|
|
assert parent._interrupt_requested is True
|
|
assert child._interrupt_requested is True
|
|
assert child._interrupt_message == "new user message"
|
|
assert is_interrupted() is True
|
|
|
|
def test_child_clear_interrupt_at_start_clears_global(self):
|
|
"""child.clear_interrupt() at start of run_conversation clears the GLOBAL event.
|
|
|
|
This is the intended behavior at startup, but verify it doesn't
|
|
accidentally clear an interrupt intended for a running child.
|
|
"""
|
|
from run_agent import AIAgent
|
|
|
|
child = AIAgent.__new__(AIAgent)
|
|
child._interrupt_requested = True
|
|
child._interrupt_message = "msg"
|
|
child.quiet_mode = True
|
|
child._active_children = []
|
|
child._active_children_lock = threading.Lock()
|
|
|
|
# Global is set
|
|
set_interrupt(True)
|
|
assert is_interrupted() is True
|
|
|
|
# child.clear_interrupt() clears both
|
|
child.clear_interrupt()
|
|
assert child._interrupt_requested is False
|
|
assert is_interrupted() is False
|
|
|
|
def test_interrupt_during_child_api_call_detected(self):
|
|
"""Interrupt set during _interruptible_api_call is detected within 0.5s."""
|
|
from run_agent import AIAgent
|
|
|
|
child = AIAgent.__new__(AIAgent)
|
|
child._interrupt_requested = False
|
|
child._interrupt_message = None
|
|
child._active_children = []
|
|
child._active_children_lock = threading.Lock()
|
|
child.quiet_mode = True
|
|
child.api_mode = "chat_completions"
|
|
child.log_prefix = ""
|
|
child._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1234"}
|
|
|
|
# Mock a slow API call
|
|
mock_client = MagicMock()
|
|
def slow_api_call(**kwargs):
|
|
time.sleep(5) # Would take 5s normally
|
|
return MagicMock()
|
|
mock_client.chat.completions.create = slow_api_call
|
|
mock_client.close = MagicMock()
|
|
child.client = mock_client
|
|
|
|
# Set interrupt after 0.2s from another thread
|
|
def set_interrupt_later():
|
|
time.sleep(0.2)
|
|
child.interrupt("stop!")
|
|
t = threading.Thread(target=set_interrupt_later, daemon=True)
|
|
t.start()
|
|
|
|
start = time.monotonic()
|
|
try:
|
|
child._interruptible_api_call({"model": "test", "messages": []})
|
|
self.fail("Should have raised InterruptedError")
|
|
except InterruptedError:
|
|
elapsed = time.monotonic() - start
|
|
# Should detect within ~0.5s (0.2s delay + 0.3s poll interval)
|
|
assert elapsed < 1.0, f"Took {elapsed:.2f}s to detect interrupt (expected < 1.0s)"
|
|
finally:
|
|
t.join(timeout=2)
|
|
set_interrupt(False)
|
|
|
|
def test_concurrent_interrupt_propagation(self):
|
|
"""Simulates exact CLI flow: parent runs delegate in thread, main thread interrupts."""
|
|
from run_agent import AIAgent
|
|
|
|
parent = AIAgent.__new__(AIAgent)
|
|
parent._interrupt_requested = False
|
|
parent._interrupt_message = None
|
|
parent._active_children = []
|
|
parent._active_children_lock = threading.Lock()
|
|
parent.quiet_mode = True
|
|
|
|
child = AIAgent.__new__(AIAgent)
|
|
child._interrupt_requested = False
|
|
child._interrupt_message = None
|
|
child._active_children = []
|
|
child._active_children_lock = threading.Lock()
|
|
child.quiet_mode = True
|
|
|
|
# Register child (simulating what _run_single_child does)
|
|
parent._active_children.append(child)
|
|
|
|
# Simulate child running (checking flag in a loop)
|
|
child_detected = threading.Event()
|
|
def simulate_child_loop():
|
|
while not child._interrupt_requested:
|
|
time.sleep(0.05)
|
|
child_detected.set()
|
|
|
|
child_thread = threading.Thread(target=simulate_child_loop, daemon=True)
|
|
child_thread.start()
|
|
|
|
# Small delay, then interrupt from "main thread"
|
|
time.sleep(0.1)
|
|
parent.interrupt("user typed something new")
|
|
|
|
# Child should detect within 200ms
|
|
detected = child_detected.wait(timeout=1.0)
|
|
assert detected, "Child never detected the interrupt!"
|
|
child_thread.join(timeout=1)
|
|
set_interrupt(False)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|