feat: implement subagent delegation for task management

- Introduced the `delegate_task` tool, allowing the main agent to spawn child AIAgent instances with isolated context for complex tasks.
- Supported both single-task and batch processing (up to 3 concurrent tasks) to enhance task management capabilities.
- Updated configuration options for delegation, including maximum iterations and default toolsets for subagents.
- Enhanced documentation to provide clear guidance on using the delegation feature and its configuration.
- Added comprehensive tests to ensure the functionality and reliability of the delegation logic.
This commit is contained in:
teknium1
2026-02-20 03:15:53 -08:00
parent c0d412a736
commit 90e5211128
12 changed files with 822 additions and 5 deletions

View File

@@ -285,6 +285,9 @@ When `HERMES_TOOL_PROGRESS=true`, the bot sends status messages as it works:
- `💻 \`ls -la\`...` (terminal commands show the actual command)
- `🔍 web_search...`
- `📄 web_extract...`
- `🐍 execute_code...` (programmatic tool calling sandbox)
- `🔀 delegate_task...` (subagent delegation)
- `❓ clarify...` (user question, CLI-only)
Modes:
- `new`: Only when switching to a different tool (less spam)

View File

@@ -334,7 +334,7 @@ hermes --toolsets "web,terminal"
hermes --list-tools
```
**Available toolsets:** `web`, `terminal`, `file`, `browser`, `vision`, `image_gen`, `moa`, `skills`, `tts`, `todo`, `memory`, `session_search`, `cronjob`, and more.
**Available toolsets:** `web`, `terminal`, `file`, `browser`, `vision`, `image_gen`, `moa`, `skills`, `tts`, `todo`, `memory`, `session_search`, `cronjob`, `code_execution`, `delegation`, `clarify`, and more.
### 🖥️ Terminal & Process Management
@@ -682,6 +682,62 @@ How to confirm it worked.
└── .bundled_manifest # Tracks which bundled skills have been offered
```
### 🐍 Code Execution (Programmatic Tool Calling)
The `execute_code` tool lets the agent write Python scripts that call Hermes tools programmatically, collapsing multi-step workflows into a single LLM turn. The script runs in a sandboxed child process on the agent host, communicating with the parent via Unix domain socket RPC.
```bash
# The agent can write scripts like:
from hermes_tools import web_search, web_extract
results = web_search("Python 3.13 features", limit=5)
for r in results["data"]["web"]:
content = web_extract([r["url"]])
# ... filter and process ...
print(summary)
```
**Available tools in sandbox:** `web_search`, `web_extract`, `read_file`, `write_file`, `search`, `patch`, `terminal` (foreground only).
**When the agent uses this:** 3+ tool calls with processing logic between them, bulk data filtering, conditional branching, loops. The intermediate tool results never enter the context window -- only the final `print()` output comes back.
Configure via `~/.hermes/config.yaml`:
```yaml
code_execution:
timeout: 300 # Max seconds per script (default: 300)
max_tool_calls: 50 # Max tool calls per execution (default: 50)
```
### 🔀 Subagents (Task Delegation)
The `delegate_task` tool spawns child AIAgent instances with isolated context, restricted toolsets, and their own terminal sessions. Each child gets a fresh conversation and works independently -- only its final summary enters the parent's context.
**Single task:**
```
delegate_task(goal="Debug why tests fail", context="Error: assertion in test_foo.py line 42", toolsets=["terminal", "file"])
```
**Parallel batch (up to 3 concurrent):**
```
delegate_task(tasks=[
{"goal": "Research topic A", "toolsets": ["web"]},
{"goal": "Research topic B", "toolsets": ["web"]},
{"goal": "Fix the build", "toolsets": ["terminal", "file"]}
])
```
**Key properties:**
- Each subagent gets its own terminal session (separate from the parent)
- Depth limit of 2 (no grandchildren)
- Subagents cannot call: `delegate_task`, `clarify`, `memory`, `send_message`, `execute_code`
- Interrupt propagation: interrupting the parent interrupts all active children
Configure via `~/.hermes/config.yaml`:
```yaml
delegation:
max_iterations: 25 # Max turns per child (default: 25)
default_toolsets: ["terminal", "file", "web"] # Default toolsets
```
### 🤖 RL Training (Tinker + Atropos)
> **⚠️ In Development** — RL training integration is not yet functional. The tools and environments below are under active development.

View File

@@ -40,7 +40,7 @@ These four systems form a continuum of agent intelligence. They should be though
## 1. Subagent Architecture (Context Isolation) 🎯
**Status:** Not started
**Status:** Implemented
**Priority:** High -- this is foundational for scaling to complex tasks
The main agent becomes an orchestrator that delegates context-heavy tasks to subagents with isolated context. Each subagent returns a summary, keeping the orchestrator's context clean.
@@ -598,11 +598,11 @@ This goes in the tool description:
- **SQLite State Store & Session Search.** `~/.hermes/state.db` with sessions, messages, FTS5 search, `session_search` tool.
- **Interactive Clarifying Questions.** `clarify` tool with arrow-key selection UI in CLI, configurable timeout, CLI-only.
- **Programmatic Tool Calling.** `execute_code` tool -- sandbox child process with UDS RPC bridge to 7 tools (`web_search`, `web_extract`, `read_file`, `write_file`, `search`, `patch`, `terminal`). Configurable timeout and tool call limits via `config.yaml`.
- **Subagent Architecture.** `delegate_task` tool -- spawn child AIAgents with isolated context and terminal sessions. Single-task and batch (up to 3 parallel) modes via ThreadPoolExecutor. Depth limit of 2, blocked tool enforcement, interrupt propagation.
### Tier 1: Next Up
1. Subagent Architecture -- #1
2. MCP Support -- #6
1. MCP Support -- #6
### Tier 2: Quality of Life

View File

@@ -385,6 +385,24 @@ stt:
# No configuration needed - logging is always enabled.
# To disable, you would need to modify the source code.
# =============================================================================
# Code Execution Sandbox (Programmatic Tool Calling)
# =============================================================================
# The execute_code tool runs Python scripts that call Hermes tools via RPC.
# Intermediate tool results stay out of the LLM's context window.
code_execution:
timeout: 300 # Max seconds per script before kill (default: 300 = 5 min)
max_tool_calls: 50 # Max RPC tool calls per execution (default: 50)
# =============================================================================
# Subagent Delegation
# =============================================================================
# The delegate_task tool spawns child agents with isolated context.
# Supports single tasks and batch mode (up to 3 parallel).
delegation:
max_iterations: 50 # Max tool-calling turns per child (default: 25)
default_toolsets: ["terminal", "file", "web"] # Default toolsets for subagents
# =============================================================================
# Display
# =============================================================================

4
cli.py
View File

@@ -139,6 +139,10 @@ def load_cli_config() -> Dict[str, Any]:
"timeout": 300, # Max seconds a sandbox script can run before being killed (5 min)
"max_tool_calls": 50, # Max RPC tool calls per execution
},
"delegation": {
"max_iterations": 25, # Max tool-calling turns per child agent
"default_toolsets": ["terminal", "file", "web"], # Default toolsets for subagents
},
}
# Track whether the config file explicitly set terminal config.

View File

@@ -52,6 +52,9 @@ async def web_search(query: str) -> dict:
| **Session Search** | `session_search_tool.py` | `session_search` (search + summarize past conversations) |
| **Cronjob** | `cronjob_tools.py` | `schedule_cronjob`, `list_cronjobs`, `remove_cronjob` |
| **RL Training** | `rl_training_tool.py` | `rl_list_environments`, `rl_start_training`, `rl_check_status`, etc. |
| **Clarify** | `clarify_tool.py` | `clarify` (interactive multiple-choice / open-ended questions, CLI-only) |
| **Code Execution** | `code_execution_tool.py` | `execute_code` (run Python scripts that call tools via RPC sandbox) |
| **Delegation** | `delegate_tool.py` | `delegate_task` (spawn subagents with isolated context, single + parallel batch) |
## Tool Registration

View File

@@ -97,6 +97,8 @@ from tools.session_search_tool import session_search, check_session_search_requi
from tools.clarify_tool import clarify_tool, check_clarify_requirements, CLARIFY_SCHEMA
# Code execution sandbox (programmatic tool calling)
from tools.code_execution_tool import execute_code, check_sandbox_requirements, EXECUTE_CODE_SCHEMA
# Subagent delegation
from tools.delegate_tool import delegate_task, check_delegate_requirements, DELEGATE_TASK_SCHEMA
from toolsets import (
get_toolset, resolve_toolset, resolve_multiple_toolsets,
get_all_toolsets, get_toolset_names, validate_toolset,
@@ -221,6 +223,13 @@ TOOLSET_REQUIREMENTS = {
"setup_url": None,
"tools": ["execute_code"],
},
"delegation": {
"name": "Subagent Delegation",
"env_vars": [], # Uses existing AIAgent class, no external deps
"check_fn": check_delegate_requirements,
"setup_url": None,
"tools": ["delegate_task"],
},
}
@@ -1023,6 +1032,13 @@ def get_execute_code_tool_definitions() -> List[Dict[str, Any]]:
return [{"type": "function", "function": EXECUTE_CODE_SCHEMA}]
def get_delegate_tool_definitions() -> List[Dict[str, Any]]:
"""
Get tool definitions for the subagent delegation tool.
"""
return [{"type": "function", "function": DELEGATE_TASK_SCHEMA}]
def get_send_message_tool_definitions():
"""Tool definitions for cross-channel messaging."""
return [
@@ -1196,6 +1212,10 @@ def get_all_tool_names() -> List[str]:
if check_sandbox_requirements():
tool_names.extend(["execute_code"])
# Subagent delegation
if check_delegate_requirements():
tool_names.extend(["delegate_task"])
# Cross-channel messaging (always available on messaging platforms)
tool_names.extend(["send_message"])
@@ -1262,6 +1282,8 @@ TOOL_TO_TOOLSET_MAP = {
"clarify": "clarify_tools",
# Code execution sandbox
"execute_code": "code_execution_tools",
# Subagent delegation
"delegate_task": "delegation_tools",
}
@@ -1400,6 +1422,11 @@ def get_tool_definitions(
for tool in get_execute_code_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
# Subagent delegation
if check_delegate_requirements():
for tool in get_delegate_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
# Cross-channel messaging (always available on messaging platforms)
for tool in get_send_message_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
@@ -2313,6 +2340,10 @@ def handle_function_call(
elif function_name == "session_search":
return json.dumps({"error": "Session search is not available. The session database may not be initialized."})
# Delegate task -- handled by the agent loop (needs parent AIAgent instance).
elif function_name == "delegate_task":
return json.dumps({"error": "delegate_task must be handled by the agent loop"})
else:
error_msg = f"Unknown function: {function_name}"
print(f"{error_msg}")
@@ -2426,6 +2457,12 @@ def get_available_toolsets() -> Dict[str, Dict[str, Any]]:
"tools": ["execute_code"],
"description": "Code execution sandbox: run Python scripts that call tools programmatically",
"requirements": ["Linux or macOS (Unix domain sockets)"]
},
"delegation_tools": {
"available": check_delegate_requirements(),
"tools": ["delegate_task"],
"description": "Subagent delegation: spawn child agents with isolated context for complex subtasks",
"requirements": []
}
}
@@ -2450,6 +2487,7 @@ def check_toolset_requirements() -> Dict[str, bool]:
"file_tools": check_file_requirements(),
"tts_tools": check_tts_requirements(),
"code_execution_tools": check_sandbox_requirements(),
"delegation_tools": check_delegate_requirements(),
}
if __name__ == "__main__":

View File

@@ -1142,6 +1142,10 @@ class AIAgent:
self._interrupt_requested = False
self._interrupt_message = None # Optional message that triggered interrupt
# Subagent delegation state
self._delegate_depth = 0 # 0 = top-level agent, incremented for children
self._active_children = [] # Running child AIAgents (for interrupt propagation)
# Store OpenRouter provider preferences
self.providers_allowed = providers_allowed
self.providers_ignored = providers_ignored
@@ -1600,6 +1604,14 @@ class AIAgent:
first_line = code.strip().split("\n")[0] if code.strip() else ""
return f"┊ 🐍 exec {_trunc(first_line, 35)} {dur}"
# ── Subagent Delegation ──
if tool_name == "delegate_task":
tasks = args.get("tasks")
if tasks and isinstance(tasks, list):
return f"┊ 🔀 delegate {len(tasks)} parallel tasks {dur}"
goal = _trunc(args.get("goal", ""), 35)
return f"┊ 🔀 delegate {goal} {dur}"
# ── Fallback ──
preview = _build_tool_preview(tool_name, args) or ""
return f"┊ ⚡ {tool_name[:9]:9} {_trunc(preview, 35)} {dur}"
@@ -2091,6 +2103,12 @@ class AIAgent:
self._interrupt_message = message
# Signal the terminal tool to kill any running subprocess immediately
_set_terminal_interrupt(True)
# Propagate interrupt to any running child agents (subagent delegation)
for child in self._active_children:
try:
child.interrupt(message)
except Exception:
pass
if not self.quiet_mode:
print(f"\n⚡ Interrupt requested" + (f": '{message[:40]}...'" if message and len(message) > 40 else f": '{message}'" if message else ""))
@@ -2957,6 +2975,21 @@ class AIAgent:
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
print(f" {self._get_cute_tool_message('clarify', function_args, tool_duration)}")
# Delegate task -- spawn child agent(s) with isolated context
elif function_name == "delegate_task":
from tools.delegate_tool import delegate_task as _delegate_task
function_result = _delegate_task(
goal=function_args.get("goal"),
context=function_args.get("context"),
toolsets=function_args.get("toolsets"),
tasks=function_args.get("tasks"),
model=function_args.get("model"),
max_iterations=function_args.get("max_iterations"),
parent_agent=self,
)
tool_duration = time.time() - tool_start_time
if self.quiet_mode:
print(f" {self._get_cute_tool_message('delegate_task', function_args, tool_duration)}")
# Execute other tools - with animated kawaii spinner in quiet mode
# The face is "alive" while the tool works, then vanishes
# and is replaced by the clean result line.
@@ -2976,7 +3009,7 @@ class AIAgent:
'skills_list': '📚', 'skill_view': '📚',
'schedule_cronjob': '', 'list_cronjobs': '', 'remove_cronjob': '',
'send_message': '📨', 'todo': '📋', 'memory': '🧠', 'session_search': '🔍',
'clarify': '', 'execute_code': '🐍',
'clarify': '', 'execute_code': '🐍', 'delegate_task': '🔀',
}
emoji = tool_emoji_map.get(function_name, '')
preview = _build_tool_preview(function_name, function_args) or function_name

237
tests/test_delegate.py Normal file
View File

@@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""
Tests for the subagent delegation tool.
Uses mock AIAgent instances to test the delegation logic without
requiring API keys or real LLM calls.
Run with: python -m pytest tests/test_delegate.py -v
or: python tests/test_delegate.py
"""
import json
import os
import sys
import time
import unittest
from unittest.mock import MagicMock, patch
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from tools.delegate_tool import (
DELEGATE_BLOCKED_TOOLS,
DELEGATE_TASK_SCHEMA,
MAX_CONCURRENT_CHILDREN,
MAX_DEPTH,
check_delegate_requirements,
delegate_task,
_build_child_system_prompt,
_strip_blocked_tools,
)
def _make_mock_parent(depth=0):
"""Create a mock parent agent with the fields delegate_task expects."""
parent = MagicMock()
parent.base_url = "https://openrouter.ai/api/v1"
parent.model = "anthropic/claude-sonnet-4"
parent.platform = "cli"
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent._session_db = None
parent._delegate_depth = depth
parent._active_children = []
return parent
class TestDelegateRequirements(unittest.TestCase):
def test_always_available(self):
self.assertTrue(check_delegate_requirements())
def test_schema_valid(self):
self.assertEqual(DELEGATE_TASK_SCHEMA["name"], "delegate_task")
props = DELEGATE_TASK_SCHEMA["parameters"]["properties"]
self.assertIn("goal", props)
self.assertIn("tasks", props)
self.assertIn("context", props)
self.assertIn("toolsets", props)
self.assertIn("model", props)
self.assertIn("max_iterations", props)
self.assertEqual(props["tasks"]["maxItems"], 3)
class TestChildSystemPrompt(unittest.TestCase):
def test_goal_only(self):
prompt = _build_child_system_prompt("Fix the tests")
self.assertIn("Fix the tests", prompt)
self.assertIn("YOUR TASK", prompt)
self.assertNotIn("CONTEXT", prompt)
def test_goal_with_context(self):
prompt = _build_child_system_prompt("Fix the tests", "Error: assertion failed in test_foo.py line 42")
self.assertIn("Fix the tests", prompt)
self.assertIn("CONTEXT", prompt)
self.assertIn("assertion failed", prompt)
def test_empty_context_ignored(self):
prompt = _build_child_system_prompt("Do something", " ")
self.assertNotIn("CONTEXT", prompt)
class TestStripBlockedTools(unittest.TestCase):
def test_removes_blocked_toolsets(self):
result = _strip_blocked_tools(["terminal", "file", "delegation", "clarify", "memory", "code_execution"])
self.assertEqual(sorted(result), ["file", "terminal"])
def test_preserves_allowed_toolsets(self):
result = _strip_blocked_tools(["terminal", "file", "web", "browser"])
self.assertEqual(sorted(result), ["browser", "file", "terminal", "web"])
def test_empty_input(self):
result = _strip_blocked_tools([])
self.assertEqual(result, [])
class TestDelegateTask(unittest.TestCase):
def test_no_parent_agent(self):
result = json.loads(delegate_task(goal="test"))
self.assertIn("error", result)
self.assertIn("parent agent", result["error"])
def test_depth_limit(self):
parent = _make_mock_parent(depth=2)
result = json.loads(delegate_task(goal="test", parent_agent=parent))
self.assertIn("error", result)
self.assertIn("depth limit", result["error"].lower())
def test_no_goal_or_tasks(self):
parent = _make_mock_parent()
result = json.loads(delegate_task(parent_agent=parent))
self.assertIn("error", result)
def test_empty_goal(self):
parent = _make_mock_parent()
result = json.loads(delegate_task(goal=" ", parent_agent=parent))
self.assertIn("error", result)
def test_task_missing_goal(self):
parent = _make_mock_parent()
result = json.loads(delegate_task(tasks=[{"context": "no goal here"}], parent_agent=parent))
self.assertIn("error", result)
@patch("tools.delegate_tool._run_single_child")
def test_single_task_mode(self, mock_run):
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done!", "api_calls": 3, "duration_seconds": 5.0
}
parent = _make_mock_parent()
result = json.loads(delegate_task(goal="Fix tests", context="error log...", parent_agent=parent))
self.assertIn("results", result)
self.assertEqual(len(result["results"]), 1)
self.assertEqual(result["results"][0]["status"], "completed")
self.assertEqual(result["results"][0]["summary"], "Done!")
mock_run.assert_called_once()
@patch("tools.delegate_tool._run_single_child")
def test_batch_mode(self, mock_run):
mock_run.side_effect = [
{"task_index": 0, "status": "completed", "summary": "Result A", "api_calls": 2, "duration_seconds": 3.0},
{"task_index": 1, "status": "completed", "summary": "Result B", "api_calls": 4, "duration_seconds": 6.0},
]
parent = _make_mock_parent()
tasks = [
{"goal": "Research topic A"},
{"goal": "Research topic B"},
]
result = json.loads(delegate_task(tasks=tasks, parent_agent=parent))
self.assertIn("results", result)
self.assertEqual(len(result["results"]), 2)
self.assertEqual(result["results"][0]["summary"], "Result A")
self.assertEqual(result["results"][1]["summary"], "Result B")
self.assertIn("total_duration_seconds", result)
@patch("tools.delegate_tool._run_single_child")
def test_batch_capped_at_3(self, mock_run):
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
parent = _make_mock_parent()
tasks = [{"goal": f"Task {i}"} for i in range(5)]
result = json.loads(delegate_task(tasks=tasks, parent_agent=parent))
# Should only run 3 tasks (MAX_CONCURRENT_CHILDREN)
self.assertEqual(mock_run.call_count, 3)
@patch("tools.delegate_tool._run_single_child")
def test_batch_ignores_toplevel_goal(self, mock_run):
"""When tasks array is provided, top-level goal/context/toolsets are ignored."""
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
parent = _make_mock_parent()
result = json.loads(delegate_task(
goal="This should be ignored",
tasks=[{"goal": "Actual task"}],
parent_agent=parent,
))
# The mock was called with the tasks array item, not the top-level goal
call_args = mock_run.call_args
self.assertEqual(call_args.kwargs.get("goal") or call_args[1].get("goal", call_args[0][1] if len(call_args[0]) > 1 else None), "Actual task")
@patch("tools.delegate_tool._run_single_child")
def test_failed_child_included_in_results(self, mock_run):
mock_run.return_value = {
"task_index": 0, "status": "error",
"summary": None, "error": "Something broke",
"api_calls": 0, "duration_seconds": 0.5
}
parent = _make_mock_parent()
result = json.loads(delegate_task(goal="Break things", parent_agent=parent))
self.assertEqual(result["results"][0]["status"], "error")
self.assertIn("Something broke", result["results"][0]["error"])
def test_depth_increments(self):
"""Verify child gets parent's depth + 1."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Test depth", parent_agent=parent)
self.assertEqual(mock_child._delegate_depth, 1)
def test_active_children_tracking(self):
"""Verify children are registered/unregistered for interrupt propagation."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Test tracking", parent_agent=parent)
self.assertEqual(len(parent._active_children), 0)
class TestBlockedTools(unittest.TestCase):
def test_blocked_tools_constant(self):
for tool in ["delegate_task", "clarify", "memory", "send_message", "execute_code"]:
self.assertIn(tool, DELEGATE_BLOCKED_TOOLS)
def test_constants(self):
self.assertEqual(MAX_CONCURRENT_CHILDREN, 3)
self.assertEqual(MAX_DEPTH, 2)
if __name__ == "__main__":
unittest.main()

View File

@@ -156,6 +156,13 @@ from .code_execution_tool import (
EXECUTE_CODE_SCHEMA,
)
# Subagent delegation (spawn child agents with isolated context)
from .delegate_tool import (
delegate_task,
check_delegate_requirements,
DELEGATE_TASK_SCHEMA,
)
# File tools have no external requirements - they use the terminal backend
def check_file_requirements():
"""File tools only require terminal backend to be available."""
@@ -261,5 +268,9 @@ __all__ = [
'execute_code',
'check_sandbox_requirements',
'EXECUTE_CODE_SCHEMA',
# Subagent delegation
'delegate_task',
'check_delegate_requirements',
'DELEGATE_TASK_SCHEMA',
]

398
tools/delegate_tool.py Normal file
View File

@@ -0,0 +1,398 @@
#!/usr/bin/env python3
"""
Delegate Tool -- Subagent Architecture
Spawns child AIAgent instances with isolated context, restricted toolsets,
and their own terminal sessions. Supports single-task and batch (parallel)
modes. The parent blocks until all children complete.
Each child gets:
- A fresh conversation (no parent history)
- Its own task_id (own terminal session, file ops cache)
- A restricted toolset (configurable, with blocked tools always stripped)
- A focused system prompt built from the delegated goal + context
The parent's context only sees the delegation call and the summary result,
never the child's intermediate tool calls or reasoning.
"""
import contextlib
import io
import json
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional
# Tools that children must never have access to
DELEGATE_BLOCKED_TOOLS = frozenset([
"delegate_task", # no recursive delegation
"clarify", # no user interaction
"memory", # no writes to shared MEMORY.md
"send_message", # no cross-platform side effects
"execute_code", # children should reason step-by-step, not write scripts
])
MAX_CONCURRENT_CHILDREN = 3
MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2)
DEFAULT_MAX_ITERATIONS = 25
DEFAULT_TOOLSETS = ["terminal", "file", "web"]
def check_delegate_requirements() -> bool:
"""Delegation has no external requirements -- always available."""
return True
def _build_child_system_prompt(goal: str, context: Optional[str] = None) -> str:
"""Build a focused system prompt for a child agent."""
parts = [
"You are a focused subagent working on a specific delegated task.",
"",
f"YOUR TASK:\n{goal}",
]
if context and context.strip():
parts.append(f"\nCONTEXT:\n{context}")
parts.append(
"\nComplete this task using the tools available to you. "
"When finished, provide a clear, concise summary of:\n"
"- What you did\n"
"- What you found or accomplished\n"
"- Any files you created or modified\n"
"- Any issues encountered\n\n"
"Be thorough but concise -- your response is returned to the "
"parent agent as a summary."
)
return "\n".join(parts)
def _strip_blocked_tools(toolsets: List[str]) -> List[str]:
"""Remove toolsets that contain only blocked tools."""
blocked_toolset_names = {
"delegation", "clarify", "memory", "code_execution",
}
return [t for t in toolsets if t not in blocked_toolset_names]
def _run_single_child(
task_index: int,
goal: str,
context: Optional[str],
toolsets: Optional[List[str]],
model: Optional[str],
max_iterations: int,
parent_agent,
) -> Dict[str, Any]:
"""
Spawn and run a single child agent. Called from within a thread.
Returns a structured result dict.
"""
from run_agent import AIAgent
child_start = time.monotonic()
child_toolsets = _strip_blocked_tools(toolsets or DEFAULT_TOOLSETS)
child_prompt = _build_child_system_prompt(goal, context)
try:
child = AIAgent(
base_url=parent_agent.base_url,
model=model or parent_agent.model,
max_iterations=max_iterations,
enabled_toolsets=child_toolsets,
quiet_mode=True,
ephemeral_system_prompt=child_prompt,
log_prefix=f"[subagent-{task_index}]",
platform=parent_agent.platform,
skip_context_files=True,
skip_memory=True,
clarify_callback=None,
session_db=getattr(parent_agent, '_session_db', None),
providers_allowed=parent_agent.providers_allowed,
providers_ignored=parent_agent.providers_ignored,
providers_order=parent_agent.providers_order,
provider_sort=parent_agent.provider_sort,
)
# Set delegation depth so children can't spawn grandchildren
child._delegate_depth = getattr(parent_agent, '_delegate_depth', 0) + 1
# Register child for interrupt propagation
if hasattr(parent_agent, '_active_children'):
parent_agent._active_children.append(child)
# Run with stdout/stderr suppressed to prevent interleaved output
devnull = io.StringIO()
with contextlib.redirect_stdout(devnull), contextlib.redirect_stderr(devnull):
result = child.run_conversation(user_message=goal)
duration = round(time.monotonic() - child_start, 2)
summary = result.get("final_response") or ""
completed = result.get("completed", False)
interrupted = result.get("interrupted", False)
api_calls = result.get("api_calls", 0)
if interrupted:
status = "interrupted"
elif completed and summary:
status = "completed"
else:
status = "failed"
entry: Dict[str, Any] = {
"task_index": task_index,
"status": status,
"summary": summary,
"api_calls": api_calls,
"duration_seconds": duration,
}
if status == "failed":
entry["error"] = result.get("error", "Subagent did not produce a response.")
return entry
except Exception as exc:
duration = round(time.monotonic() - child_start, 2)
logging.exception(f"[subagent-{task_index}] failed")
return {
"task_index": task_index,
"status": "error",
"summary": None,
"error": str(exc),
"api_calls": 0,
"duration_seconds": duration,
}
finally:
# Unregister child from interrupt propagation
if hasattr(parent_agent, '_active_children'):
try:
parent_agent._active_children.remove(child)
except (ValueError, UnboundLocalError):
pass
def delegate_task(
goal: Optional[str] = None,
context: Optional[str] = None,
toolsets: Optional[List[str]] = None,
tasks: Optional[List[Dict[str, Any]]] = None,
model: Optional[str] = None,
max_iterations: Optional[int] = None,
parent_agent=None,
) -> str:
"""
Spawn one or more child agents to handle delegated tasks.
Supports two modes:
- Single: provide goal (+ optional context, toolsets)
- Batch: provide tasks array [{goal, context, toolsets}, ...]
Returns JSON with results array, one entry per task.
"""
if parent_agent is None:
return json.dumps({"error": "delegate_task requires a parent agent context."})
# Depth limit
depth = getattr(parent_agent, '_delegate_depth', 0)
if depth >= MAX_DEPTH:
return json.dumps({
"error": (
f"Delegation depth limit reached ({MAX_DEPTH}). "
"Subagents cannot spawn further subagents."
)
})
# Load config
cfg = _load_config()
default_max_iter = cfg.get("max_iterations", DEFAULT_MAX_ITERATIONS)
effective_max_iter = max_iterations or default_max_iter
# Normalize to task list
if tasks and isinstance(tasks, list):
task_list = tasks[:MAX_CONCURRENT_CHILDREN]
elif goal and isinstance(goal, str) and goal.strip():
task_list = [{"goal": goal, "context": context, "toolsets": toolsets}]
else:
return json.dumps({"error": "Provide either 'goal' (single task) or 'tasks' (batch)."})
if not task_list:
return json.dumps({"error": "No tasks provided."})
# Validate each task has a goal
for i, task in enumerate(task_list):
if not task.get("goal", "").strip():
return json.dumps({"error": f"Task {i} is missing a 'goal'."})
overall_start = time.monotonic()
results = []
if len(task_list) == 1:
# Single task -- run directly (no thread pool overhead)
t = task_list[0]
result = _run_single_child(
task_index=0,
goal=t["goal"],
context=t.get("context"),
toolsets=t.get("toolsets") or toolsets,
model=model,
max_iterations=effective_max_iter,
parent_agent=parent_agent,
)
results.append(result)
else:
# Batch -- run in parallel
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_CHILDREN) as executor:
futures = {}
for i, t in enumerate(task_list):
future = executor.submit(
_run_single_child,
task_index=i,
goal=t["goal"],
context=t.get("context"),
toolsets=t.get("toolsets") or toolsets,
model=model,
max_iterations=effective_max_iter,
parent_agent=parent_agent,
)
futures[future] = i
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as exc:
idx = futures[future]
results.append({
"task_index": idx,
"status": "error",
"summary": None,
"error": str(exc),
"api_calls": 0,
"duration_seconds": 0,
})
# Sort by task_index so results match input order
results.sort(key=lambda r: r["task_index"])
total_duration = round(time.monotonic() - overall_start, 2)
return json.dumps({
"results": results,
"total_duration_seconds": total_duration,
}, ensure_ascii=False)
def _load_config() -> dict:
"""Load delegation config from CLI_CONFIG if available."""
try:
from cli import CLI_CONFIG
return CLI_CONFIG.get("delegation", {})
except Exception:
return {}
# ---------------------------------------------------------------------------
# OpenAI Function-Calling Schema
# ---------------------------------------------------------------------------
DELEGATE_TASK_SCHEMA = {
"name": "delegate_task",
"description": (
"Spawn one or more subagents to work on tasks in isolated contexts. "
"Each subagent gets its own conversation, terminal session, and toolset. "
"Only the final summary is returned -- intermediate tool results "
"never enter your context window.\n\n"
"TWO MODES:\n"
"1. Single task: provide 'goal' (+ optional context, toolsets)\n"
"2. Batch (parallel): provide 'tasks' array with up to 3 items. "
"All run concurrently and results are returned together.\n\n"
"WHEN TO USE delegate_task:\n"
"- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n"
"- Tasks that would flood your context with intermediate data\n"
"- Parallel independent workstreams (research A and B simultaneously)\n\n"
"WHEN NOT TO USE (use these instead):\n"
"- Mechanical multi-step work with no reasoning needed -> use execute_code\n"
"- Single tool call -> just call the tool directly\n"
"- Tasks needing user interaction -> subagents cannot use clarify\n\n"
"IMPORTANT:\n"
"- Subagents have NO memory of your conversation. Pass all relevant "
"info (file paths, error messages, constraints) via the 'context' field.\n"
"- Subagents CANNOT call: delegate_task, clarify, memory, send_message, "
"execute_code.\n"
"- Each subagent gets its own terminal session (separate working directory and state).\n"
"- Results are always returned as an array, one entry per task."
),
"parameters": {
"type": "object",
"properties": {
"goal": {
"type": "string",
"description": (
"What the subagent should accomplish. Be specific and "
"self-contained -- the subagent knows nothing about your "
"conversation history."
),
},
"context": {
"type": "string",
"description": (
"Background information the subagent needs: file paths, "
"error messages, project structure, constraints. The more "
"specific you are, the better the subagent performs."
),
},
"toolsets": {
"type": "array",
"items": {"type": "string"},
"description": (
"Toolsets to enable for this subagent. "
"Default: ['terminal', 'file', 'web']. "
"Common patterns: ['terminal', 'file'] for code work, "
"['web'] for research, ['terminal', 'file', 'web'] for "
"full-stack tasks."
),
},
"tasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"goal": {"type": "string", "description": "Task goal"},
"context": {"type": "string", "description": "Task-specific context"},
"toolsets": {
"type": "array",
"items": {"type": "string"},
"description": "Toolsets for this specific task",
},
},
"required": ["goal"],
},
"maxItems": 3,
"description": (
"Batch mode: up to 3 tasks to run in parallel. Each gets "
"its own subagent with isolated context and terminal session. "
"When provided, top-level goal/context/toolsets are ignored."
),
},
"model": {
"type": "string",
"description": (
"Model override for the subagent(s). Omit to use your "
"same model. Use a cheaper/faster model for simple subtasks."
),
},
"max_iterations": {
"type": "integer",
"description": (
"Max tool-calling turns per subagent (default: 25). "
"Lower for simple tasks, higher for complex ones."
),
},
},
"required": [],
},
}

View File

@@ -144,6 +144,12 @@ TOOLSETS = {
"includes": []
},
"delegation": {
"description": "Spawn subagents with isolated context for complex subtasks",
"tools": ["delegate_task"],
"includes": []
},
# Scenario-specific toolsets
@@ -197,6 +203,8 @@ TOOLSETS = {
"clarify",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Subagent delegation
"delegate_task",
# Cronjob management (CLI-only)
"schedule_cronjob", "list_cronjobs", "remove_cronjob"
],
@@ -237,6 +245,8 @@ TOOLSETS = {
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Subagent delegation
"delegate_task",
# Cronjob management - let users schedule tasks
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging
@@ -275,6 +285,8 @@ TOOLSETS = {
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Subagent delegation
"delegate_task",
# Cronjob management - let users schedule tasks
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging
@@ -313,6 +325,8 @@ TOOLSETS = {
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Subagent delegation
"delegate_task",
# Cronjob management
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging
@@ -351,6 +365,8 @@ TOOLSETS = {
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Subagent delegation
"delegate_task",
# Cronjob management - let users schedule tasks
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging