feat(delegate): add observability metadata to subagent results (#1175)
* fix: Home Assistant event filtering now closed by default Previously, when no watch_domains or watch_entities were configured, ALL state_changed events passed through to the agent, causing users to be flooded with notifications for every HA entity change. Now events are dropped by default unless the user explicitly configures: - watch_domains: list of domains to monitor (e.g. climate, light) - watch_entities: list of specific entity IDs to monitor - watch_all: true (new option — opt-in to receive all events) A warning is logged at connect time if no filters are configured, guiding users to set up their HA platform config. All 49 gateway HA tests + 52 HA tool tests pass. * docs: update Home Assistant integration documentation - homeassistant.md: Fix event filtering docs to reflect closed-by-default behavior. Add watch_all option. Replace Python dict config example with YAML. Fix defaults table (was incorrectly showing 'all'). Add required configuration warning admonition. - environment-variables.md: Add HASS_TOKEN and HASS_URL to Messaging section. - messaging/index.md: Add Home Assistant to description, architecture diagram, platform toolsets table, and Next Steps links. * fix(terminal): strip provider env vars from background and PTY subprocesses Extends the env var blocklist from #1157 to also cover the two remaining leaky paths in process_registry.py: - spawn_local() PTY path (line 156) - spawn_local() background Popen path (line 197) Both were still using raw os.environ, leaking provider vars to background processes and interactive PTY sessions. Now uses the same dynamic _HERMES_PROVIDER_ENV_BLOCKLIST from local.py. Explicit env_vars passed to spawn_local() still override the blocklist, matching the existing behavior for callers that intentionally need these. Gap identified by PR #1004 (@PeterFile). * feat(delegate): add observability metadata to subagent results Enrich delegate_task results with metadata from the child AIAgent: - model: which model the child used - exit_reason: completed | interrupted | max_iterations - tokens.input / tokens.output: token counts - tool_trace: per-tool-call trace with byte sizes and ok/error status Tool trace uses tool_call_id matching to correctly pair parallel tool calls with their results, with a fallback for messages without IDs. Cherry-picked from PR #872 by @omerkaz, with fixes: - Fixed parallel tool call trace pairing (was always updating last entry) - Removed redundant 'iterations' field (identical to existing 'api_calls') - Added test for parallel tool call trace correctness Co-authored-by: omerkaz <omerkaz@users.noreply.github.com> --------- Co-authored-by: omerkaz <omerkaz@users.noreply.github.com>
This commit is contained in:
@@ -246,6 +246,169 @@ class TestDelegateTask(unittest.TestCase):
|
||||
self.assertEqual(kwargs["api_mode"], parent.api_mode)
|
||||
|
||||
|
||||
class TestDelegateObservability(unittest.TestCase):
|
||||
"""Tests for enriched metadata returned by _run_single_child."""
|
||||
|
||||
def test_observability_fields_present(self):
|
||||
"""Completed child should return tool_trace, tokens, model, exit_reason."""
|
||||
parent = _make_mock_parent(depth=0)
|
||||
|
||||
with patch("run_agent.AIAgent") as MockAgent:
|
||||
mock_child = MagicMock()
|
||||
mock_child.model = "claude-sonnet-4-6"
|
||||
mock_child.session_prompt_tokens = 5000
|
||||
mock_child.session_completion_tokens = 1200
|
||||
mock_child.run_conversation.return_value = {
|
||||
"final_response": "done",
|
||||
"completed": True,
|
||||
"interrupted": False,
|
||||
"api_calls": 3,
|
||||
"messages": [
|
||||
{"role": "user", "content": "do something"},
|
||||
{"role": "assistant", "tool_calls": [
|
||||
{"id": "tc_1", "function": {"name": "web_search", "arguments": '{"query": "test"}'}}
|
||||
]},
|
||||
{"role": "tool", "tool_call_id": "tc_1", "content": '{"results": [1,2,3]}'},
|
||||
{"role": "assistant", "content": "done"},
|
||||
],
|
||||
}
|
||||
MockAgent.return_value = mock_child
|
||||
|
||||
result = json.loads(delegate_task(goal="Test observability", parent_agent=parent))
|
||||
entry = result["results"][0]
|
||||
|
||||
# Core observability fields
|
||||
self.assertEqual(entry["model"], "claude-sonnet-4-6")
|
||||
self.assertEqual(entry["exit_reason"], "completed")
|
||||
self.assertEqual(entry["tokens"]["input"], 5000)
|
||||
self.assertEqual(entry["tokens"]["output"], 1200)
|
||||
|
||||
# Tool trace
|
||||
self.assertEqual(len(entry["tool_trace"]), 1)
|
||||
self.assertEqual(entry["tool_trace"][0]["tool"], "web_search")
|
||||
self.assertIn("args_bytes", entry["tool_trace"][0])
|
||||
self.assertIn("result_bytes", entry["tool_trace"][0])
|
||||
self.assertEqual(entry["tool_trace"][0]["status"], "ok")
|
||||
|
||||
def test_tool_trace_detects_error(self):
|
||||
"""Tool results containing 'error' should be marked as error status."""
|
||||
parent = _make_mock_parent(depth=0)
|
||||
|
||||
with patch("run_agent.AIAgent") as MockAgent:
|
||||
mock_child = MagicMock()
|
||||
mock_child.model = "claude-sonnet-4-6"
|
||||
mock_child.session_prompt_tokens = 0
|
||||
mock_child.session_completion_tokens = 0
|
||||
mock_child.run_conversation.return_value = {
|
||||
"final_response": "failed",
|
||||
"completed": True,
|
||||
"interrupted": False,
|
||||
"api_calls": 1,
|
||||
"messages": [
|
||||
{"role": "assistant", "tool_calls": [
|
||||
{"id": "tc_1", "function": {"name": "terminal", "arguments": '{"cmd": "ls"}'}}
|
||||
]},
|
||||
{"role": "tool", "tool_call_id": "tc_1", "content": "Error: command not found"},
|
||||
],
|
||||
}
|
||||
MockAgent.return_value = mock_child
|
||||
|
||||
result = json.loads(delegate_task(goal="Test error trace", parent_agent=parent))
|
||||
trace = result["results"][0]["tool_trace"]
|
||||
self.assertEqual(trace[0]["status"], "error")
|
||||
|
||||
def test_parallel_tool_calls_paired_correctly(self):
|
||||
"""Parallel tool calls should each get their own result via tool_call_id matching."""
|
||||
parent = _make_mock_parent(depth=0)
|
||||
|
||||
with patch("run_agent.AIAgent") as MockAgent:
|
||||
mock_child = MagicMock()
|
||||
mock_child.model = "claude-sonnet-4-6"
|
||||
mock_child.session_prompt_tokens = 3000
|
||||
mock_child.session_completion_tokens = 800
|
||||
mock_child.run_conversation.return_value = {
|
||||
"final_response": "done",
|
||||
"completed": True,
|
||||
"interrupted": False,
|
||||
"api_calls": 1,
|
||||
"messages": [
|
||||
{"role": "assistant", "tool_calls": [
|
||||
{"id": "tc_a", "function": {"name": "web_search", "arguments": '{"q": "a"}'}},
|
||||
{"id": "tc_b", "function": {"name": "web_search", "arguments": '{"q": "b"}'}},
|
||||
{"id": "tc_c", "function": {"name": "terminal", "arguments": '{"cmd": "ls"}'}},
|
||||
]},
|
||||
{"role": "tool", "tool_call_id": "tc_a", "content": '{"ok": true}'},
|
||||
{"role": "tool", "tool_call_id": "tc_b", "content": "Error: rate limited"},
|
||||
{"role": "tool", "tool_call_id": "tc_c", "content": "file1.txt\nfile2.txt"},
|
||||
{"role": "assistant", "content": "done"},
|
||||
],
|
||||
}
|
||||
MockAgent.return_value = mock_child
|
||||
|
||||
result = json.loads(delegate_task(goal="Test parallel", parent_agent=parent))
|
||||
trace = result["results"][0]["tool_trace"]
|
||||
|
||||
# All three tool calls should have results
|
||||
self.assertEqual(len(trace), 3)
|
||||
|
||||
# First: web_search → ok
|
||||
self.assertEqual(trace[0]["tool"], "web_search")
|
||||
self.assertEqual(trace[0]["status"], "ok")
|
||||
self.assertIn("result_bytes", trace[0])
|
||||
|
||||
# Second: web_search → error
|
||||
self.assertEqual(trace[1]["tool"], "web_search")
|
||||
self.assertEqual(trace[1]["status"], "error")
|
||||
self.assertIn("result_bytes", trace[1])
|
||||
|
||||
# Third: terminal → ok
|
||||
self.assertEqual(trace[2]["tool"], "terminal")
|
||||
self.assertEqual(trace[2]["status"], "ok")
|
||||
self.assertIn("result_bytes", trace[2])
|
||||
|
||||
def test_exit_reason_interrupted(self):
|
||||
"""Interrupted child should report exit_reason='interrupted'."""
|
||||
parent = _make_mock_parent(depth=0)
|
||||
|
||||
with patch("run_agent.AIAgent") as MockAgent:
|
||||
mock_child = MagicMock()
|
||||
mock_child.model = "claude-sonnet-4-6"
|
||||
mock_child.session_prompt_tokens = 0
|
||||
mock_child.session_completion_tokens = 0
|
||||
mock_child.run_conversation.return_value = {
|
||||
"final_response": "",
|
||||
"completed": False,
|
||||
"interrupted": True,
|
||||
"api_calls": 2,
|
||||
"messages": [],
|
||||
}
|
||||
MockAgent.return_value = mock_child
|
||||
|
||||
result = json.loads(delegate_task(goal="Test interrupt", parent_agent=parent))
|
||||
self.assertEqual(result["results"][0]["exit_reason"], "interrupted")
|
||||
|
||||
def test_exit_reason_max_iterations(self):
|
||||
"""Child that didn't complete and wasn't interrupted hit max_iterations."""
|
||||
parent = _make_mock_parent(depth=0)
|
||||
|
||||
with patch("run_agent.AIAgent") as MockAgent:
|
||||
mock_child = MagicMock()
|
||||
mock_child.model = "claude-sonnet-4-6"
|
||||
mock_child.session_prompt_tokens = 0
|
||||
mock_child.session_completion_tokens = 0
|
||||
mock_child.run_conversation.return_value = {
|
||||
"final_response": "",
|
||||
"completed": False,
|
||||
"interrupted": False,
|
||||
"api_calls": 50,
|
||||
"messages": [],
|
||||
}
|
||||
MockAgent.return_value = mock_child
|
||||
|
||||
result = json.loads(delegate_task(goal="Test max iter", parent_agent=parent))
|
||||
self.assertEqual(result["results"][0]["exit_reason"], "max_iterations")
|
||||
|
||||
|
||||
class TestBlockedTools(unittest.TestCase):
|
||||
def test_blocked_tools_constant(self):
|
||||
for tool in ["delegate_task", "clarify", "memory", "send_message", "execute_code"]:
|
||||
|
||||
@@ -276,12 +276,70 @@ def _run_single_child(
|
||||
else:
|
||||
status = "failed"
|
||||
|
||||
# Build tool trace from conversation messages (already in memory).
|
||||
# Uses tool_call_id to correctly pair parallel tool calls with results.
|
||||
tool_trace: list[Dict[str, Any]] = []
|
||||
trace_by_id: Dict[str, Dict[str, Any]] = {}
|
||||
messages = result.get("messages") or []
|
||||
if isinstance(messages, list):
|
||||
for msg in messages:
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
if msg.get("role") == "assistant":
|
||||
for tc in (msg.get("tool_calls") or []):
|
||||
fn = tc.get("function", {})
|
||||
entry_t = {
|
||||
"tool": fn.get("name", "unknown"),
|
||||
"args_bytes": len(fn.get("arguments", "")),
|
||||
}
|
||||
tool_trace.append(entry_t)
|
||||
tc_id = tc.get("id")
|
||||
if tc_id:
|
||||
trace_by_id[tc_id] = entry_t
|
||||
elif msg.get("role") == "tool":
|
||||
content = msg.get("content", "")
|
||||
is_error = bool(
|
||||
content and "error" in content[:80].lower()
|
||||
)
|
||||
result_meta = {
|
||||
"result_bytes": len(content),
|
||||
"status": "error" if is_error else "ok",
|
||||
}
|
||||
# Match by tool_call_id for parallel calls
|
||||
tc_id = msg.get("tool_call_id")
|
||||
target = trace_by_id.get(tc_id) if tc_id else None
|
||||
if target is not None:
|
||||
target.update(result_meta)
|
||||
elif tool_trace:
|
||||
# Fallback for messages without tool_call_id
|
||||
tool_trace[-1].update(result_meta)
|
||||
|
||||
# Determine exit reason
|
||||
if interrupted:
|
||||
exit_reason = "interrupted"
|
||||
elif completed:
|
||||
exit_reason = "completed"
|
||||
else:
|
||||
exit_reason = "max_iterations"
|
||||
|
||||
# Extract token counts (safe for mock objects)
|
||||
_input_tokens = getattr(child, "session_prompt_tokens", 0)
|
||||
_output_tokens = getattr(child, "session_completion_tokens", 0)
|
||||
_model = getattr(child, "model", None)
|
||||
|
||||
entry: Dict[str, Any] = {
|
||||
"task_index": task_index,
|
||||
"status": status,
|
||||
"summary": summary,
|
||||
"api_calls": api_calls,
|
||||
"duration_seconds": duration,
|
||||
"model": _model if isinstance(_model, str) else None,
|
||||
"exit_reason": exit_reason,
|
||||
"tokens": {
|
||||
"input": _input_tokens if isinstance(_input_tokens, (int, float)) else 0,
|
||||
"output": _output_tokens if isinstance(_output_tokens, (int, float)) else 0,
|
||||
},
|
||||
"tool_trace": tool_trace,
|
||||
}
|
||||
if status == "failed":
|
||||
entry["error"] = result.get("error", "Subagent did not produce a response.")
|
||||
|
||||
Reference in New Issue
Block a user