* fix: use session_key instead of chat_id for adapter interrupt lookups monitor_for_interrupt() in _run_agent was using source.chat_id to query the adapter's has_pending_interrupt() and get_pending_message() methods. But the adapter stores interrupt events under build_session_key(source), which produces a different string (e.g. 'agent:main:telegram:dm' vs '123456'). This key mismatch meant the interrupt was never detected through the adapter path, which is the only active interrupt path for all adapter-based platforms (Telegram, Discord, Slack, etc.). The gateway-level interrupt path (in dispatch_message) is unreachable because the adapter intercepts the 2nd message in handle_message() before it reaches dispatch_message(). Result: sending a new message while subagents were running had no effect — the interrupt was silently lost. Fix: replace all source.chat_id references in the interrupt-related code within _run_agent() with the session_key parameter, which matches the adapter's storage keys. Also adds regression tests verifying session_key vs chat_id consistency. * debug: add file-based logging to CLI interrupt path Temporary instrumentation to diagnose why message-based interrupts don't seem to work during subagent execution. Logs to ~/.hermes/interrupt_debug.log (immune to redirect_stdout). Two log points: 1. When Enter handler puts message into _interrupt_queue 2. When chat() reads it and calls agent.interrupt() This will reveal whether the message reaches the queue and whether the interrupt is actually fired.
177 lines
6.9 KiB
Python
177 lines
6.9 KiB
Python
"""Test real interrupt propagation through delegate_task with actual AIAgent.
|
|
|
|
This uses a real AIAgent with mocked HTTP responses to test the complete
|
|
interrupt flow through _run_single_child → child.run_conversation().
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
import unittest
|
|
from unittest.mock import MagicMock, patch, PropertyMock
|
|
|
|
from tools.interrupt import set_interrupt, is_interrupted
|
|
|
|
|
|
def _make_slow_api_response(delay=5.0):
|
|
"""Create a mock that simulates a slow API response (like a real LLM call)."""
|
|
def slow_create(**kwargs):
|
|
# Simulate a slow API call
|
|
time.sleep(delay)
|
|
# Return a simple text response (no tool calls)
|
|
resp = MagicMock()
|
|
resp.choices = [MagicMock()]
|
|
resp.choices[0].message = MagicMock()
|
|
resp.choices[0].message.content = "Done"
|
|
resp.choices[0].message.tool_calls = None
|
|
resp.choices[0].message.refusal = None
|
|
resp.choices[0].finish_reason = "stop"
|
|
resp.usage = MagicMock()
|
|
resp.usage.prompt_tokens = 100
|
|
resp.usage.completion_tokens = 10
|
|
resp.usage.total_tokens = 110
|
|
resp.usage.prompt_tokens_details = None
|
|
return resp
|
|
return slow_create
|
|
|
|
|
|
class TestRealSubagentInterrupt(unittest.TestCase):
|
|
"""Test interrupt with real AIAgent child through delegate_tool."""
|
|
|
|
def setUp(self):
|
|
set_interrupt(False)
|
|
os.environ.setdefault("OPENAI_API_KEY", "test-key")
|
|
|
|
def tearDown(self):
|
|
set_interrupt(False)
|
|
|
|
def test_interrupt_child_during_api_call(self):
|
|
"""Real AIAgent child interrupted while making API call."""
|
|
from run_agent import AIAgent, IterationBudget
|
|
|
|
# Create a real parent agent (just enough to be a parent)
|
|
parent = AIAgent.__new__(AIAgent)
|
|
parent._interrupt_requested = False
|
|
parent._interrupt_message = None
|
|
parent._active_children = []
|
|
parent.quiet_mode = True
|
|
parent.model = "test/model"
|
|
parent.base_url = "http://localhost:1"
|
|
parent.api_key = "test"
|
|
parent.provider = "test"
|
|
parent.api_mode = "chat_completions"
|
|
parent.platform = "cli"
|
|
parent.enabled_toolsets = ["terminal", "file"]
|
|
parent.providers_allowed = None
|
|
parent.providers_ignored = None
|
|
parent.providers_order = None
|
|
parent.provider_sort = None
|
|
parent.max_tokens = None
|
|
parent.reasoning_config = None
|
|
parent.prefill_messages = None
|
|
parent._session_db = None
|
|
parent._delegate_depth = 0
|
|
parent._delegate_spinner = None
|
|
parent.tool_progress_callback = None
|
|
parent.iteration_budget = IterationBudget(max_total=100)
|
|
parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"}
|
|
|
|
from tools.delegate_tool import _run_single_child
|
|
|
|
child_started = threading.Event()
|
|
result_holder = [None]
|
|
error_holder = [None]
|
|
|
|
def run_delegate():
|
|
try:
|
|
# Patch the OpenAI client creation inside AIAgent.__init__
|
|
with patch('run_agent.OpenAI') as MockOpenAI:
|
|
mock_client = MagicMock()
|
|
# API call takes 5 seconds — should be interrupted before that
|
|
mock_client.chat.completions.create = _make_slow_api_response(delay=5.0)
|
|
mock_client.close = MagicMock()
|
|
MockOpenAI.return_value = mock_client
|
|
|
|
# Also need to patch the system prompt builder
|
|
with patch('run_agent.build_system_prompt', return_value="You are a test agent"):
|
|
# Signal when child starts
|
|
original_run = AIAgent.run_conversation
|
|
|
|
def patched_run(self_agent, *args, **kwargs):
|
|
child_started.set()
|
|
return original_run(self_agent, *args, **kwargs)
|
|
|
|
with patch.object(AIAgent, 'run_conversation', patched_run):
|
|
result = _run_single_child(
|
|
task_index=0,
|
|
goal="Test task",
|
|
context=None,
|
|
toolsets=["terminal"],
|
|
model="test/model",
|
|
max_iterations=5,
|
|
parent_agent=parent,
|
|
task_count=1,
|
|
override_provider="test",
|
|
override_base_url="http://localhost:1",
|
|
override_api_key="test",
|
|
override_api_mode="chat_completions",
|
|
)
|
|
result_holder[0] = result
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
error_holder[0] = e
|
|
|
|
agent_thread = threading.Thread(target=run_delegate, daemon=True)
|
|
agent_thread.start()
|
|
|
|
# Wait for child to start run_conversation
|
|
started = child_started.wait(timeout=10)
|
|
if not started:
|
|
agent_thread.join(timeout=1)
|
|
if error_holder[0]:
|
|
raise error_holder[0]
|
|
self.fail("Child never started run_conversation")
|
|
|
|
# Give child time to enter main loop and start API call
|
|
time.sleep(0.5)
|
|
|
|
# Verify child is registered
|
|
print(f"Active children: {len(parent._active_children)}")
|
|
self.assertGreaterEqual(len(parent._active_children), 1,
|
|
"Child not registered in _active_children")
|
|
|
|
# Interrupt! (simulating what CLI does)
|
|
start = time.monotonic()
|
|
parent.interrupt("User typed a new message")
|
|
|
|
# Check propagation
|
|
child = parent._active_children[0] if parent._active_children else None
|
|
if child:
|
|
print(f"Child._interrupt_requested after parent.interrupt(): {child._interrupt_requested}")
|
|
self.assertTrue(child._interrupt_requested,
|
|
"Interrupt did not propagate to child!")
|
|
|
|
# Wait for delegate to finish (should be fast since interrupted)
|
|
agent_thread.join(timeout=5)
|
|
elapsed = time.monotonic() - start
|
|
|
|
if error_holder[0]:
|
|
raise error_holder[0]
|
|
|
|
result = result_holder[0]
|
|
self.assertIsNotNone(result, "Delegate returned no result")
|
|
print(f"Result status: {result['status']}, elapsed: {elapsed:.2f}s")
|
|
print(f"Full result: {result}")
|
|
|
|
# The child should have been interrupted, not completed the full 5s API call
|
|
self.assertLess(elapsed, 3.0,
|
|
f"Took {elapsed:.2f}s — interrupt was not detected quickly enough")
|
|
self.assertEqual(result["status"], "interrupted",
|
|
f"Expected 'interrupted', got '{result['status']}'")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|