feat: implement code execution sandbox for programmatic tool calling

- Introduced a new `execute_code` tool that allows the agent to run Python scripts that call Hermes tools via RPC, reducing the number of round trips required for tool interactions.
- Added configuration options for timeout and maximum tool calls in the sandbox environment.
- Updated the toolset definitions to include the new code execution capabilities, ensuring integration across platforms.
- Implemented comprehensive tests for the code execution sandbox, covering various scenarios including tool call limits and error handling.
- Enhanced the CLI and documentation to reflect the new functionality, providing users with clear guidance on using the code execution tool.
This commit is contained in:
teknium1
2026-02-19 23:23:43 -08:00
parent 748f0b2b5f
commit 783acd712d
10 changed files with 1598 additions and 18 deletions

29
TODO.md
View File

@@ -70,7 +70,7 @@ The main agent becomes an orchestrator that delegates context-heavy tasks to sub
## 2. Interactive Clarifying Questions ❓
**Status:** Not started
**Status:** Implemented
**Priority:** Medium-High -- enables the knowledge system feedback loop
Allow the agent to present structured choices to the user when it needs clarification or feedback. Rich terminal UI in CLI mode, graceful fallback on messaging platforms.
@@ -235,7 +235,7 @@ Automatic filesystem snapshots after each agent loop iteration so the user can r
## 9. Programmatic Tool Calling (Code-Mediated Tool Use) 🧬
**Status:** Not started
**Status:** Implemented (MVP) ✅
**Priority:** High -- potentially the single biggest efficiency win for agent loops
Instead of the LLM making one tool call, reading the result, deciding what to do next, making another tool call (N round trips), the LLM writes a Python script that calls multiple tools, processes results, branches on conditions, and returns a final summary -- all in one turn.
@@ -596,24 +596,21 @@ This goes in the tool description:
- **Memory System.** MEMORY.md + USER.md, bounded, system prompt injection, `memory` tool.
- **Agent-Managed Skills.** `skill_manage` tool (create/patch/edit/delete/write_file/remove_file), unified `~/.hermes/skills/` dir, manifest-based sync.
- **SQLite State Store & Session Search.** `~/.hermes/state.db` with sessions, messages, FTS5 search, `session_search` tool.
- **Interactive Clarifying Questions.** `clarify` tool with arrow-key selection UI in CLI, configurable timeout, CLI-only.
- **Programmatic Tool Calling.** `execute_code` tool -- sandbox child process with UDS RPC bridge to 7 tools (`web_search`, `web_extract`, `read_file`, `write_file`, `search`, `patch`, `terminal`). Configurable timeout and tool call limits via `config.yaml`.
### Tier 1: Next Up
1. Interactive Clarifying Questions -- #2
2. Programmatic Tool Calling -- #9
1. Subagent Architecture -- #1
2. MCP Support -- #6
### Tier 2: Scaling & Ecosystem
### Tier 2: Quality of Life
3. Subagent Architecture -- #1
4. MCP Support -- #6
3. Local Browser Control via CDP -- #3
4. Plugin/Extension System -- #5
### Tier 3: Quality of Life
### Tier 3: Nice to Have
5. Local Browser Control via CDP -- #3
6. Plugin/Extension System -- #5
### Tier 4: Nice to Have
7. Session Branching / Checkpoints -- #7
8. Filesystem Checkpointing / Rollback -- #8
9. Signal Integration -- #4
5. Session Branching / Checkpoints -- #7
6. Filesystem Checkpointing / Rollback -- #8
7. Signal Integration -- #4

4
cli.py
View File

@@ -135,6 +135,10 @@ def load_cli_config() -> Dict[str, Any]:
"clarify": {
"timeout": 120, # Seconds to wait for a clarify answer before auto-proceeding
},
"code_execution": {
"timeout": 120, # Max seconds a sandbox script can run before being killed
"max_tool_calls": 50, # Max RPC tool calls per execution
},
}
# Track whether the config file explicitly set terminal config.

19
exprted.jsonl Normal file

File diff suppressed because one or more lines are too long

View File

@@ -95,6 +95,8 @@ from tools.memory_tool import memory_tool, check_memory_requirements, MEMORY_SCH
from tools.session_search_tool import session_search, check_session_search_requirements, SESSION_SEARCH_SCHEMA
# Clarifying questions tool
from tools.clarify_tool import clarify_tool, check_clarify_requirements, CLARIFY_SCHEMA
# Code execution sandbox (programmatic tool calling)
from tools.code_execution_tool import execute_code, check_sandbox_requirements, EXECUTE_CODE_SCHEMA
from toolsets import (
get_toolset, resolve_toolset, resolve_multiple_toolsets,
get_all_toolsets, get_toolset_names, validate_toolset,
@@ -212,6 +214,13 @@ TOOLSET_REQUIREMENTS = {
"setup_url": None,
"tools": ["clarify"],
},
"code_execution": {
"name": "Code Execution Sandbox",
"env_vars": [], # Uses stdlib only (subprocess, socket), no external deps
"check_fn": check_sandbox_requirements,
"setup_url": None,
"tools": ["execute_code"],
},
}
@@ -1005,6 +1014,13 @@ def get_clarify_tool_definitions() -> List[Dict[str, Any]]:
return [{"type": "function", "function": CLARIFY_SCHEMA}]
def get_execute_code_tool_definitions() -> List[Dict[str, Any]]:
"""
Get tool definitions for the code execution sandbox (programmatic tool calling).
"""
return [{"type": "function", "function": EXECUTE_CODE_SCHEMA}]
def get_send_message_tool_definitions():
"""Tool definitions for cross-channel messaging."""
return [
@@ -1174,6 +1190,10 @@ def get_all_tool_names() -> List[str]:
if check_clarify_requirements():
tool_names.extend(["clarify"])
# Code execution sandbox (programmatic tool calling)
if check_sandbox_requirements():
tool_names.extend(["execute_code"])
# Cross-channel messaging (always available on messaging platforms)
tool_names.extend(["send_message"])
@@ -1236,6 +1256,10 @@ TOOL_TO_TOOLSET_MAP = {
"memory": "memory_tools",
# Session history search
"session_search": "session_search_tools",
# Clarifying questions
"clarify": "clarify_tools",
# Code execution sandbox
"execute_code": "code_execution_tools",
}
@@ -1252,6 +1276,11 @@ def get_toolset_for_tool(tool_name: str) -> str:
return TOOL_TO_TOOLSET_MAP.get(tool_name, "unknown")
# Stores the resolved tool name list from the most recent get_tool_definitions()
# call, so execute_code can determine which tools are available in this session.
_last_resolved_tool_names: Optional[List[str]] = None
def get_tool_definitions(
enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None,
@@ -1364,6 +1393,11 @@ def get_tool_definitions(
for tool in get_clarify_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
# Code execution sandbox (programmatic tool calling)
if check_sandbox_requirements():
for tool in get_execute_code_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
# Cross-channel messaging (always available on messaging platforms)
for tool in get_send_message_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
@@ -1491,6 +1525,10 @@ def get_tool_definitions(
else:
print("🛠️ No tools selected (all filtered out or unavailable)")
# Store resolved names so execute_code knows what's available in this session
global _last_resolved_tool_names
_last_resolved_tool_names = [t["function"]["name"] for t in filtered_tools]
return filtered_tools
def handle_web_function_call(function_name: str, function_args: Dict[str, Any]) -> str:
@@ -2239,6 +2277,15 @@ def handle_function_call(
elif function_name in ["read_file", "write_file", "patch", "search"]:
return handle_file_function_call(function_name, function_args, task_id)
# Route code execution sandbox (programmatic tool calling)
elif function_name == "execute_code":
code = function_args.get("code", "")
return execute_code(
code=code,
task_id=task_id,
enabled_tools=_last_resolved_tool_names,
)
# Route text-to-speech tools
elif function_name in ["text_to_speech"]:
return handle_tts_function_call(function_name, function_args)
@@ -2367,6 +2414,12 @@ def get_available_toolsets() -> Dict[str, Dict[str, Any]]:
"tools": ["clarify"],
"description": "Clarifying questions: ask the user multiple-choice or open-ended questions",
"requirements": []
},
"code_execution_tools": {
"available": check_sandbox_requirements(),
"tools": ["execute_code"],
"description": "Code execution sandbox: run Python scripts that call tools programmatically",
"requirements": ["Linux or macOS (Unix domain sockets)"]
}
}
@@ -2389,7 +2442,8 @@ def check_toolset_requirements() -> Dict[str, bool]:
"browser_tools": check_browser_requirements(),
"cronjob_tools": check_cronjob_requirements(),
"file_tools": check_file_requirements(),
"tts_tools": check_tts_requirements()
"tts_tools": check_tts_requirements(),
"code_execution_tools": check_sandbox_requirements(),
}
if __name__ == "__main__":

View File

@@ -1594,6 +1594,12 @@ class AIAgent:
detail = rl.get(tool_name, tool_name.replace("rl_", ""))
return f"┊ 🧪 rl {detail} {dur}"
# ── Code Execution Sandbox ──
if tool_name == "execute_code":
code = args.get("code", "")
first_line = code.strip().split("\n")[0] if code.strip() else ""
return f"┊ 🐍 exec {_trunc(first_line, 35)} {dur}"
# ── Fallback ──
preview = _build_tool_preview(tool_name, args) or ""
return f"┊ ⚡ {tool_name[:9]:9} {_trunc(preview, 35)} {dur}"
@@ -2970,7 +2976,7 @@ class AIAgent:
'skills_list': '📚', 'skill_view': '📚',
'schedule_cronjob': '', 'list_cronjobs': '', 'remove_cronjob': '',
'send_message': '📨', 'todo': '📋', 'memory': '🧠', 'session_search': '🔍',
'clarify': '',
'clarify': '', 'execute_code': '🐍',
}
emoji = tool_emoji_map.get(function_name, '')
preview = _build_tool_preview(function_name, function_args) or function_name

682
session_viewer.html Normal file
View File

@@ -0,0 +1,682 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Hermes Agent - Session Viewer</title>
<style>
:root {
--bg: #0d1117;
--surface: #161b22;
--surface2: #1c2333;
--border: #30363d;
--text: #e6edf3;
--text-muted: #8b949e;
--accent: #58a6ff;
--accent-dim: #1f3a5f;
--user: #da8ee7;
--user-bg: #2d1b3d;
--assistant: #58a6ff;
--assistant-bg: #152238;
--tool: #3fb950;
--tool-bg: #12261e;
--system: #d29922;
--system-bg: #2a2000;
--error: #f85149;
--meta: #768390;
--radius: 10px;
--font-mono: 'SF Mono', 'Cascadia Code', 'Fira Code', 'JetBrains Mono', monospace;
--font-sans: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Noto Sans', Helvetica, Arial, sans-serif;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
background: var(--bg);
color: var(--text);
font-family: var(--font-sans);
font-size: 14px;
line-height: 1.6;
display: flex;
height: 100vh;
overflow: hidden;
}
/* Sidebar */
#sidebar {
width: 340px;
min-width: 340px;
background: var(--surface);
border-right: 1px solid var(--border);
display: flex;
flex-direction: column;
overflow: hidden;
}
#sidebar-header {
padding: 20px;
border-bottom: 1px solid var(--border);
flex-shrink: 0;
}
#sidebar-header h1 {
font-size: 18px;
font-weight: 700;
color: var(--accent);
margin-bottom: 4px;
letter-spacing: -0.3px;
}
#sidebar-header p {
color: var(--text-muted);
font-size: 12px;
}
#file-picker {
padding: 12px 20px;
border-bottom: 1px solid var(--border);
flex-shrink: 0;
}
#file-picker label {
display: block;
padding: 10px 16px;
background: var(--accent-dim);
border: 1px dashed var(--accent);
border-radius: var(--radius);
text-align: center;
cursor: pointer;
color: var(--accent);
font-size: 13px;
font-weight: 500;
transition: all 0.15s;
}
#file-picker label:hover {
background: #1a4478;
}
#file-picker input { display: none; }
#session-list {
flex: 1;
overflow-y: auto;
padding: 8px;
}
.session-item {
padding: 12px 14px;
border-radius: 8px;
cursor: pointer;
transition: background 0.12s;
margin-bottom: 2px;
}
.session-item:hover { background: var(--surface2); }
.session-item.active { background: var(--accent-dim); border: 1px solid var(--accent); }
.session-item .session-title {
font-weight: 600;
font-size: 13px;
color: var(--text);
margin-bottom: 3px;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.session-item .session-meta {
display: flex;
gap: 10px;
font-size: 11px;
color: var(--text-muted);
}
.session-item .session-meta .badge {
display: inline-block;
padding: 1px 6px;
border-radius: 4px;
font-size: 10px;
font-weight: 600;
text-transform: uppercase;
}
.badge-cli { background: #1a3a2a; color: #3fb950; }
.badge-telegram { background: #1a2a3a; color: #58a6ff; }
.badge-discord { background: #2a1a3a; color: #bc8cff; }
/* Main area */
#main {
flex: 1;
display: flex;
flex-direction: column;
overflow: hidden;
}
#session-header {
padding: 16px 24px;
border-bottom: 1px solid var(--border);
background: var(--surface);
flex-shrink: 0;
}
#session-header h2 {
font-size: 16px;
font-weight: 600;
margin-bottom: 6px;
}
#session-header .meta-row {
display: flex;
gap: 20px;
flex-wrap: wrap;
font-size: 12px;
color: var(--text-muted);
}
#session-header .meta-row span { display: flex; align-items: center; gap: 4px; }
#messages-container {
flex: 1;
overflow-y: auto;
padding: 20px 24px;
}
/* Welcome state */
#welcome {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100%;
color: var(--text-muted);
text-align: center;
gap: 12px;
}
#welcome .icon { font-size: 48px; opacity: 0.3; }
#welcome h3 { font-size: 18px; color: var(--text); font-weight: 600; }
/* Messages */
.message {
margin-bottom: 16px;
border-radius: var(--radius);
overflow: hidden;
border: 1px solid var(--border);
}
.message-header {
display: flex;
align-items: center;
gap: 8px;
padding: 8px 14px;
font-size: 12px;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.5px;
}
.message-body {
padding: 12px 16px;
white-space: pre-wrap;
word-wrap: break-word;
font-size: 13.5px;
line-height: 1.65;
}
.msg-user .message-header { background: var(--user-bg); color: var(--user); }
.msg-user .message-body { background: #1e1228; }
.msg-user { border-color: #3d2650; }
.msg-assistant .message-header { background: var(--assistant-bg); color: var(--assistant); }
.msg-assistant .message-body { background: #0f1a2e; }
.msg-assistant { border-color: #1e3a5f; }
.msg-tool .message-header { background: var(--tool-bg); color: var(--tool); }
.msg-tool .message-body { background: #0c1a14; font-family: var(--font-mono); font-size: 12px; }
.msg-tool { border-color: #1a3525; }
.msg-session_meta .message-header { background: var(--system-bg); color: var(--system); }
.msg-session_meta .message-body { background: #1a1800; }
.msg-session_meta { border-color: #3a3000; }
.msg-system .message-header { background: var(--system-bg); color: var(--system); }
.msg-system .message-body { background: #1a1800; }
.msg-system { border-color: #3a3000; }
.tool-calls-section {
margin-top: 8px;
border-top: 1px solid var(--border);
padding-top: 8px;
}
.tool-call-item {
background: var(--surface2);
border: 1px solid var(--border);
border-radius: 6px;
margin-bottom: 6px;
overflow: hidden;
}
.tool-call-name {
padding: 6px 10px;
font-family: var(--font-mono);
font-size: 12px;
font-weight: 600;
color: var(--tool);
background: var(--tool-bg);
border-bottom: 1px solid var(--border);
}
.tool-call-args {
padding: 8px 10px;
font-family: var(--font-mono);
font-size: 11px;
white-space: pre-wrap;
word-break: break-all;
color: var(--text-muted);
max-height: 300px;
overflow-y: auto;
}
/* System prompt collapsible */
.system-prompt-toggle {
padding: 10px 16px;
background: var(--surface2);
border: 1px solid var(--border);
border-radius: var(--radius);
margin-bottom: 16px;
cursor: pointer;
user-select: none;
}
.system-prompt-toggle summary {
font-size: 12px;
font-weight: 600;
color: var(--system);
text-transform: uppercase;
letter-spacing: 0.5px;
list-style: none;
display: flex;
align-items: center;
gap: 6px;
}
.system-prompt-toggle summary::before {
content: '\25B6';
font-size: 10px;
transition: transform 0.15s;
}
.system-prompt-toggle[open] summary::before {
transform: rotate(90deg);
}
.system-prompt-content {
margin-top: 10px;
padding: 12px;
background: var(--bg);
border-radius: 6px;
font-size: 12px;
white-space: pre-wrap;
word-wrap: break-word;
color: var(--text-muted);
max-height: 400px;
overflow-y: auto;
line-height: 1.5;
}
.timestamp {
font-size: 11px;
color: var(--meta);
font-family: var(--font-mono);
}
.tool-result-truncated {
max-height: 400px;
overflow-y: auto;
}
/* Scrollbar styling */
::-webkit-scrollbar { width: 8px; }
::-webkit-scrollbar-track { background: transparent; }
::-webkit-scrollbar-thumb { background: var(--border); border-radius: 4px; }
::-webkit-scrollbar-thumb:hover { background: #484f58; }
.no-content { color: var(--text-muted); font-style: italic; font-size: 12px; }
.reasoning-block {
margin-top: 8px;
padding: 8px 12px;
background: #1a1a2e;
border: 1px solid #2a2a4e;
border-radius: 6px;
font-size: 12px;
color: #a0a0d0;
white-space: pre-wrap;
max-height: 200px;
overflow-y: auto;
}
.reasoning-label {
font-size: 10px;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 0.5px;
color: #7070b0;
margin-bottom: 4px;
}
.session-divider {
display: flex;
align-items: center;
gap: 12px;
margin: 24px 0;
color: var(--text-muted);
font-size: 11px;
text-transform: uppercase;
letter-spacing: 1px;
}
.session-divider::before, .session-divider::after {
content: '';
flex: 1;
height: 1px;
background: var(--border);
}
.stats-bar {
display: flex;
gap: 16px;
padding: 8px 14px;
background: var(--surface2);
border-radius: 6px;
margin-bottom: 16px;
font-size: 12px;
}
.stats-bar .stat { display: flex; align-items: center; gap: 4px; }
.stats-bar .stat-label { color: var(--text-muted); }
.stats-bar .stat-value { color: var(--text); font-weight: 600; font-family: var(--font-mono); }
</style>
</head>
<body>
<div id="sidebar">
<div id="sidebar-header">
<h1>Hermes Agent</h1>
<p>Session Transcript Viewer</p>
</div>
<div id="file-picker">
<label for="jsonl-input">Load .jsonl file</label>
<input type="file" id="jsonl-input" accept=".jsonl,.json,.txt">
</div>
<div id="session-list"></div>
</div>
<div id="main">
<div id="session-header" style="display:none"></div>
<div id="messages-container">
<div id="welcome">
<div class="icon">&#x2699;</div>
<h3>Load a session file</h3>
<p>Select a .jsonl file from the sidebar to view exported Hermes Agent sessions.</p>
</div>
</div>
</div>
<script>
const sessions = [];
let activeIdx = -1;
document.getElementById('jsonl-input').addEventListener('change', e => {
const file = e.target.files[0];
if (!file) return;
const reader = new FileReader();
reader.onload = ev => {
sessions.length = 0;
const lines = ev.target.result.split('\n').filter(l => l.trim());
for (const line of lines) {
try { sessions.push(JSON.parse(line)); } catch {}
}
renderSessionList();
if (sessions.length > 0) selectSession(0);
document.querySelector('#sidebar-header p').textContent = `${sessions.length} sessions loaded from ${file.name}`;
};
reader.readAsText(file);
});
function renderSessionList() {
const list = document.getElementById('session-list');
list.innerHTML = '';
sessions.forEach((s, i) => {
const firstUserMsg = (s.messages || []).find(m => m.role === 'user');
const preview = firstUserMsg
? firstUserMsg.content.substring(0, 80).replace(/\n/g, ' ')
: '(no messages)';
const dt = s.started_at ? new Date(s.started_at * 1000) : null;
const dateStr = dt ? dt.toLocaleString('en-US', { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' }) : '';
const div = document.createElement('div');
div.className = 'session-item' + (i === activeIdx ? ' active' : '');
div.onclick = () => selectSession(i);
div.innerHTML = `
<div class="session-title">${esc(preview)}</div>
<div class="session-meta">
<span class="badge badge-${s.source || 'cli'}">${s.source || 'cli'}</span>
<span>${dateStr}</span>
<span>${s.message_count || 0} msgs</span>
</div>
`;
list.appendChild(div);
});
}
function selectSession(idx) {
activeIdx = idx;
const s = sessions[idx];
document.querySelectorAll('.session-item').forEach((el, i) => {
el.classList.toggle('active', i === idx);
});
const header = document.getElementById('session-header');
header.style.display = 'block';
const dt = s.started_at ? new Date(s.started_at * 1000) : null;
const endDt = s.ended_at ? new Date(s.ended_at * 1000) : null;
const duration = s.started_at && s.ended_at
? formatDuration(s.ended_at - s.started_at)
: 'unknown';
header.innerHTML = `
<h2>Session ${esc(s.id)}</h2>
<div class="meta-row">
<span>&#x1F4E1; ${esc(s.source || 'cli')}</span>
<span>&#x1F916; ${esc(s.model || 'unknown')}</span>
<span>&#x1F4AC; ${s.message_count || 0} messages</span>
<span>&#x1F527; ${s.tool_call_count || 0} tool calls</span>
<span>&#x23F1; ${duration}</span>
${s.end_reason ? `<span>&#x1F3C1; ${esc(s.end_reason)}</span>` : ''}
${dt ? `<span>&#x1F4C5; ${dt.toLocaleString()}</span>` : ''}
</div>
`;
renderMessages(s);
}
function renderMessages(session) {
const container = document.getElementById('messages-container');
container.innerHTML = '';
// System prompt (collapsible)
if (session.system_prompt) {
const details = document.createElement('details');
details.className = 'system-prompt-toggle';
details.innerHTML = `
<summary>System Prompt (${(session.system_prompt.length / 1024).toFixed(1)}KB)</summary>
<div class="system-prompt-content">${esc(session.system_prompt)}</div>
`;
container.appendChild(details);
}
// Stats bar
const stats = document.createElement('div');
stats.className = 'stats-bar';
stats.innerHTML = `
<div class="stat"><span class="stat-label">Messages:</span><span class="stat-value">${session.message_count || 0}</span></div>
<div class="stat"><span class="stat-label">Tool Calls:</span><span class="stat-value">${session.tool_call_count || 0}</span></div>
<div class="stat"><span class="stat-label">Source:</span><span class="stat-value">${esc(session.source || 'cli')}</span></div>
${session.user_id ? `<div class="stat"><span class="stat-label">User ID:</span><span class="stat-value">${esc(session.user_id)}</span></div>` : ''}
`;
container.appendChild(stats);
const messages = session.messages || [];
for (const msg of messages) {
const el = renderMessage(msg);
container.appendChild(el);
}
container.scrollTop = 0;
}
function renderMessage(msg) {
const div = document.createElement('div');
const role = msg.role || 'unknown';
div.className = `message msg-${role}`;
const roleIcon = {
user: '&#x1F464;',
assistant: '&#x1F916;',
tool: '&#x1F527;',
session_meta: '&#x2699;',
system: '&#x1F4CB;'
}[role] || '&#x2753;';
const ts = msg.timestamp ? new Date(msg.timestamp * 1000).toLocaleTimeString() : '';
const toolName = msg.tool_name ? ` (${msg.tool_name})` : '';
let headerExtra = '';
if (msg.tool_call_id && role === 'tool') {
headerExtra = ` &mdash; <span style="opacity:0.7;font-size:10px;text-transform:none;letter-spacing:0">${esc(msg.tool_call_id.substring(0, 24))}...</span>`;
}
div.innerHTML = `<div class="message-header">
<span>${roleIcon}</span>
<span>${role}${toolName}</span>
${headerExtra}
<span style="margin-left:auto" class="timestamp">${ts}</span>
</div>`;
const body = document.createElement('div');
body.className = 'message-body';
// Content
if (msg.content) {
let text = msg.content;
// Try to detect if content is a JSON string and pretty-print it
if (role === 'tool' && text.startsWith('{')) {
try {
const parsed = JSON.parse(text);
text = JSON.stringify(parsed, null, 2);
} catch {}
}
const contentDiv = document.createElement('div');
if (role === 'tool') {
contentDiv.className = 'tool-result-truncated';
}
contentDiv.textContent = text;
body.appendChild(contentDiv);
} else if (role !== 'session_meta' && !msg.tool_calls) {
const empty = document.createElement('span');
empty.className = 'no-content';
empty.textContent = '(no text content)';
body.appendChild(empty);
}
// Reasoning
if (msg.reasoning) {
const rBlock = document.createElement('div');
rBlock.innerHTML = `<div class="reasoning-label">Reasoning</div>`;
const rContent = document.createElement('div');
rContent.className = 'reasoning-block';
rContent.textContent = msg.reasoning;
rBlock.appendChild(rContent);
body.appendChild(rBlock);
}
// Tool calls
if (msg.tool_calls && msg.tool_calls.length > 0) {
const tcSection = document.createElement('div');
tcSection.className = 'tool-calls-section';
const label = document.createElement('div');
label.style.cssText = 'font-size:11px;font-weight:700;color:var(--tool);text-transform:uppercase;letter-spacing:0.5px;margin-bottom:6px;';
label.textContent = `Tool Calls (${msg.tool_calls.length})`;
tcSection.appendChild(label);
for (const tc of msg.tool_calls) {
const fn = tc.function || {};
const tcItem = document.createElement('div');
tcItem.className = 'tool-call-item';
const nameDiv = document.createElement('div');
nameDiv.className = 'tool-call-name';
nameDiv.textContent = fn.name || 'unknown';
tcItem.appendChild(nameDiv);
if (fn.arguments) {
const argsDiv = document.createElement('div');
argsDiv.className = 'tool-call-args';
let argsText = fn.arguments;
try {
argsText = JSON.stringify(JSON.parse(fn.arguments), null, 2);
} catch {}
argsDiv.textContent = argsText;
tcItem.appendChild(argsDiv);
}
tcSection.appendChild(tcItem);
}
body.appendChild(tcSection);
}
div.appendChild(body);
return div;
}
function esc(str) {
if (!str) return '';
const d = document.createElement('div');
d.textContent = str;
return d.innerHTML;
}
function formatDuration(seconds) {
if (seconds < 60) return `${Math.round(seconds)}s`;
if (seconds < 3600) return `${Math.floor(seconds / 60)}m ${Math.round(seconds % 60)}s`;
const h = Math.floor(seconds / 3600);
const m = Math.floor((seconds % 3600) / 60);
return `${h}h ${m}m`;
}
// Auto-load if file is in same directory (for local dev)
window.addEventListener('DOMContentLoaded', () => {
fetch('exprted.jsonl')
.then(r => { if (!r.ok) throw new Error(); return r.text(); })
.then(text => {
const lines = text.split('\n').filter(l => l.trim());
for (const line of lines) {
try { sessions.push(JSON.parse(line)); } catch {}
}
if (sessions.length) {
renderSessionList();
selectSession(sessions.length - 1);
document.querySelector('#sidebar-header p').textContent = `${sessions.length} sessions loaded`;
}
})
.catch(() => {});
});
</script>
</body>
</html>

View File

@@ -0,0 +1,222 @@
#!/usr/bin/env python3
"""
Tests for the code execution sandbox (programmatic tool calling).
These tests monkeypatch handle_function_call so they don't require API keys
or a running terminal backend. They verify the core sandbox mechanics:
UDS socket lifecycle, hermes_tools generation, timeout enforcement,
output capping, tool call counting, and error propagation.
Run with: python -m pytest tests/test_code_execution.py -v
or: python tests/test_code_execution.py
"""
import json
import os
import sys
import time
import unittest
from unittest.mock import patch
# Ensure the project root is on the path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from tools.code_execution_tool import (
SANDBOX_ALLOWED_TOOLS,
execute_code,
generate_hermes_tools_module,
check_sandbox_requirements,
EXECUTE_CODE_SCHEMA,
)
def _mock_handle_function_call(function_name, function_args, task_id=None, user_task=None):
"""Mock dispatcher that returns canned responses for each tool."""
if function_name == "terminal":
cmd = function_args.get("command", "")
return json.dumps({"output": f"mock output for: {cmd}", "exit_code": 0})
if function_name == "web_search":
return json.dumps({"results": [{"url": "https://example.com", "title": "Example", "description": "A test result"}]})
if function_name == "read_file":
return json.dumps({"content": "line 1\nline 2\nline 3\n", "total_lines": 3})
if function_name == "write_file":
return json.dumps({"status": "ok", "path": function_args.get("path", "")})
if function_name == "search":
return json.dumps({"matches": [{"file": "test.py", "line": 1, "text": "match"}]})
if function_name == "patch":
return json.dumps({"status": "ok", "replacements": 1})
if function_name == "web_extract":
return json.dumps("# Extracted content\nSome text from the page.")
return json.dumps({"error": f"Unknown tool in mock: {function_name}"})
class TestSandboxRequirements(unittest.TestCase):
def test_available_on_posix(self):
if sys.platform != "win32":
self.assertTrue(check_sandbox_requirements())
def test_schema_is_valid(self):
self.assertEqual(EXECUTE_CODE_SCHEMA["name"], "execute_code")
self.assertIn("code", EXECUTE_CODE_SCHEMA["parameters"]["properties"])
self.assertIn("code", EXECUTE_CODE_SCHEMA["parameters"]["required"])
class TestHermesToolsGeneration(unittest.TestCase):
def test_generates_all_allowed_tools(self):
src = generate_hermes_tools_module(list(SANDBOX_ALLOWED_TOOLS))
for tool in SANDBOX_ALLOWED_TOOLS:
self.assertIn(f"def {tool}(", src)
def test_generates_subset(self):
src = generate_hermes_tools_module(["terminal", "web_search"])
self.assertIn("def terminal(", src)
self.assertIn("def web_search(", src)
self.assertNotIn("def read_file(", src)
def test_empty_list_generates_nothing(self):
src = generate_hermes_tools_module([])
self.assertNotIn("def terminal(", src)
self.assertIn("def _call(", src) # infrastructure still present
def test_non_allowed_tools_ignored(self):
src = generate_hermes_tools_module(["vision_analyze", "terminal"])
self.assertIn("def terminal(", src)
self.assertNotIn("def vision_analyze(", src)
def test_rpc_infrastructure_present(self):
src = generate_hermes_tools_module(["terminal"])
self.assertIn("HERMES_RPC_SOCKET", src)
self.assertIn("AF_UNIX", src)
self.assertIn("def _connect(", src)
self.assertIn("def _call(", src)
@unittest.skipIf(sys.platform == "win32", "UDS not available on Windows")
class TestExecuteCode(unittest.TestCase):
"""Integration tests using the mock dispatcher."""
def _run(self, code, enabled_tools=None):
"""Helper: run code with mocked handle_function_call."""
with patch("tools.code_execution_tool._rpc_server_loop") as mock_rpc:
# Use real execution but mock the tool dispatcher
pass
# Actually run with full integration, mocking at the model_tools level
with patch("model_tools.handle_function_call", side_effect=_mock_handle_function_call):
result = execute_code(
code=code,
task_id="test-task",
enabled_tools=enabled_tools or list(SANDBOX_ALLOWED_TOOLS),
)
return json.loads(result)
def test_basic_print(self):
"""Script that just prints -- no tool calls."""
result = self._run('print("hello world")')
self.assertEqual(result["status"], "success")
self.assertIn("hello world", result["output"])
self.assertEqual(result["tool_calls_made"], 0)
def test_single_tool_call(self):
"""Script calls terminal and prints the result."""
code = """
from hermes_tools import terminal
result = terminal("echo hello")
print(result.get("output", ""))
"""
result = self._run(code)
self.assertEqual(result["status"], "success")
self.assertIn("mock output for: echo hello", result["output"])
self.assertEqual(result["tool_calls_made"], 1)
def test_multi_tool_chain(self):
"""Script calls multiple tools sequentially."""
code = """
from hermes_tools import terminal, read_file
r1 = terminal("ls")
r2 = read_file("test.py")
print(f"terminal: {r1['output'][:20]}")
print(f"file lines: {r2['total_lines']}")
"""
result = self._run(code)
self.assertEqual(result["status"], "success")
self.assertEqual(result["tool_calls_made"], 2)
def test_syntax_error(self):
"""Script with a syntax error returns error status."""
result = self._run("def broken(")
self.assertEqual(result["status"], "error")
self.assertIn("SyntaxError", result.get("error", "") + result.get("output", ""))
def test_runtime_exception(self):
"""Script with a runtime error returns error status."""
result = self._run("raise ValueError('test error')")
self.assertEqual(result["status"], "error")
def test_excluded_tool_returns_error(self):
"""Script calling a tool not in the allow-list gets an error from RPC."""
code = """
from hermes_tools import terminal
result = terminal("echo hi")
print(result)
"""
# Only enable web_search -- terminal should be excluded
result = self._run(code, enabled_tools=["web_search"])
# terminal won't be in hermes_tools.py, so import fails
self.assertEqual(result["status"], "error")
def test_empty_code(self):
"""Empty code string returns an error."""
result = json.loads(execute_code("", task_id="test"))
self.assertIn("error", result)
def test_output_captured(self):
"""Multiple print statements are captured in order."""
code = """
for i in range(5):
print(f"line {i}")
"""
result = self._run(code)
self.assertEqual(result["status"], "success")
for i in range(5):
self.assertIn(f"line {i}", result["output"])
def test_stderr_on_error(self):
"""Traceback from stderr is included in the response."""
code = """
import sys
print("before error")
raise RuntimeError("deliberate crash")
"""
result = self._run(code)
self.assertEqual(result["status"], "error")
self.assertIn("before error", result["output"])
self.assertIn("RuntimeError", result.get("error", "") + result.get("output", ""))
def test_timeout_enforcement(self):
"""Script that sleeps too long is killed."""
code = "import time; time.sleep(999)"
with patch("model_tools.handle_function_call", side_effect=_mock_handle_function_call):
# Override config to use a very short timeout
with patch("tools.code_execution_tool._load_config", return_value={"timeout": 2, "max_tool_calls": 50}):
result = json.loads(execute_code(
code=code,
task_id="test-task",
enabled_tools=list(SANDBOX_ALLOWED_TOOLS),
))
self.assertEqual(result["status"], "timeout")
self.assertIn("timed out", result.get("error", ""))
def test_web_search_tool(self):
"""Script calls web_search and processes results."""
code = """
from hermes_tools import web_search
results = web_search("test query")
print(f"Found {len(results.get('results', []))} results")
"""
result = self._run(code)
self.assertEqual(result["status"], "success")
self.assertIn("Found 1 results", result["output"])
if __name__ == "__main__":
unittest.main()

View File

@@ -149,6 +149,13 @@ from .clarify_tool import (
CLARIFY_SCHEMA,
)
# Code execution sandbox (programmatic tool calling)
from .code_execution_tool import (
execute_code,
check_sandbox_requirements,
EXECUTE_CODE_SCHEMA,
)
# File tools have no external requirements - they use the terminal backend
def check_file_requirements():
"""File tools only require terminal backend to be available."""
@@ -250,5 +257,9 @@ __all__ = [
'clarify_tool',
'check_clarify_requirements',
'CLARIFY_SCHEMA',
# Code execution sandbox
'execute_code',
'check_sandbox_requirements',
'EXECUTE_CODE_SCHEMA',
]

View File

@@ -0,0 +1,569 @@
#!/usr/bin/env python3
"""
Code Execution Tool -- Programmatic Tool Calling (PTC)
Lets the LLM write a Python script that calls Hermes tools via RPC,
collapsing multi-step tool chains into a single inference turn.
Architecture:
1. Parent generates a `hermes_tools.py` stub module with RPC functions
2. Parent opens a Unix domain socket and starts an RPC listener thread
3. Parent spawns a child process that runs the LLM's script
4. When the script calls a tool function, the call travels over the UDS
back to the parent, which dispatches through handle_function_call
5. Only the script's stdout is returned to the LLM; intermediate tool
results never enter the context window
Platform: Linux / macOS only (Unix domain sockets). Disabled on Windows.
"""
import json
import logging
import os
import signal
import socket
import subprocess
import sys
import tempfile
import threading
import time
import uuid
from typing import Any, Dict, List, Optional
# Availability gate: UDS requires a POSIX OS
SANDBOX_AVAILABLE = sys.platform != "win32"
# The 7 tools allowed inside the sandbox. The intersection of this list
# and the session's enabled tools determines which stubs are generated.
SANDBOX_ALLOWED_TOOLS = frozenset([
"web_search",
"web_extract",
"read_file",
"write_file",
"search",
"patch",
"terminal",
])
# Resource limit defaults (overridable via config.yaml → code_execution.*)
DEFAULT_TIMEOUT = 120 # seconds
DEFAULT_MAX_TOOL_CALLS = 50
MAX_STDOUT_BYTES = 50_000 # 50 KB
MAX_STDERR_BYTES = 10_000 # 10 KB
def check_sandbox_requirements() -> bool:
"""Code execution sandbox requires a POSIX OS for Unix domain sockets."""
return SANDBOX_AVAILABLE
# ---------------------------------------------------------------------------
# hermes_tools.py code generator
# ---------------------------------------------------------------------------
# Per-tool stub templates: (function_name, signature, docstring, args_dict_expr)
# The args_dict_expr builds the JSON payload sent over the RPC socket.
_TOOL_STUBS = {
"web_search": (
"web_search",
"query: str, limit: int = 5",
'"""Search the web. Returns dict with "results" list of {url, title, description}."""',
'{"query": query, "limit": limit}',
),
"web_extract": (
"web_extract",
"urls: list",
'"""Extract content from URLs. Returns markdown text."""',
'{"urls": urls}',
),
"read_file": (
"read_file",
"path: str, offset: int = 1, limit: int = 500",
'"""Read a file (1-indexed lines). Returns dict with "content" and "total_lines"."""',
'{"path": path, "offset": offset, "limit": limit}',
),
"write_file": (
"write_file",
"path: str, content: str",
'"""Write content to a file (always overwrites). Returns dict with status."""',
'{"path": path, "content": content}',
),
"search": (
"search",
'pattern: str, target: str = "content", path: str = ".", file_glob: str = None, limit: int = 50',
'"""Search file contents (target="content") or find files (target="files"). Returns dict with "matches"."""',
'{"pattern": pattern, "target": target, "path": path, "file_glob": file_glob, "limit": limit}',
),
"patch": (
"patch",
"path: str, old_string: str, new_string: str, replace_all: bool = False",
'"""Replace old_string with new_string in a file. Returns dict with status."""',
'{"path": path, "old_string": old_string, "new_string": new_string, "replace_all": replace_all}',
),
"terminal": (
"terminal",
"command: str, timeout: int = None, workdir: str = None",
'"""Run a shell command (foreground only). Returns dict with "output" and "exit_code"."""',
'{"command": command, "timeout": timeout, "workdir": workdir}',
),
}
def generate_hermes_tools_module(enabled_tools: List[str]) -> str:
"""
Build the source code for the hermes_tools.py stub module.
Only tools in both SANDBOX_ALLOWED_TOOLS and enabled_tools get stubs.
"""
tools_to_generate = sorted(SANDBOX_ALLOWED_TOOLS & set(enabled_tools))
stub_functions = []
export_names = []
for tool_name in tools_to_generate:
if tool_name not in _TOOL_STUBS:
continue
func_name, sig, doc, args_expr = _TOOL_STUBS[tool_name]
stub_functions.append(
f"def {func_name}({sig}):\n"
f" {doc}\n"
f" return _call({func_name!r}, {args_expr})\n"
)
export_names.append(func_name)
header = '''\
"""Auto-generated Hermes tools RPC stubs."""
import json, os, socket
_sock = None
def _connect():
global _sock
if _sock is None:
_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
_sock.connect(os.environ["HERMES_RPC_SOCKET"])
_sock.settimeout(300)
return _sock
def _call(tool_name, args):
"""Send a tool call to the parent process and return the parsed result."""
conn = _connect()
request = json.dumps({"tool": tool_name, "args": args}) + "\\n"
conn.sendall(request.encode())
buf = b""
while True:
chunk = conn.recv(65536)
if not chunk:
raise RuntimeError("Agent process disconnected")
buf += chunk
if buf.endswith(b"\\n"):
break
raw = buf.decode().strip()
result = json.loads(raw)
if isinstance(result, str):
try:
return json.loads(result)
except (json.JSONDecodeError, TypeError):
return result
return result
'''
return header + "\n".join(stub_functions)
# ---------------------------------------------------------------------------
# RPC server (runs in a thread inside the parent process)
# ---------------------------------------------------------------------------
# Terminal parameters that must not be used from ephemeral sandbox scripts
_TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty"}
def _rpc_server_loop(
server_sock: socket.socket,
task_id: str,
tool_call_log: list,
tool_call_counter: list, # mutable [int] so the thread can increment
max_tool_calls: int,
allowed_tools: frozenset,
):
"""
Accept one client connection and dispatch tool-call requests until
the client disconnects or the call limit is reached.
"""
from model_tools import handle_function_call
conn = None
try:
server_sock.settimeout(5)
conn, _ = server_sock.accept()
conn.settimeout(300)
buf = b""
while True:
try:
chunk = conn.recv(65536)
except socket.timeout:
break
if not chunk:
break
buf += chunk
# Process all complete newline-delimited messages in the buffer
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
line = line.strip()
if not line:
continue
call_start = time.monotonic()
try:
request = json.loads(line.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as exc:
resp = json.dumps({"error": f"Invalid RPC request: {exc}"})
conn.sendall((resp + "\n").encode())
continue
tool_name = request.get("tool", "")
tool_args = request.get("args", {})
# Enforce the allow-list
if tool_name not in allowed_tools:
available = ", ".join(sorted(allowed_tools))
resp = json.dumps({
"error": (
f"Tool '{tool_name}' is not available in execute_code. "
f"Available: {available}"
)
})
conn.sendall((resp + "\n").encode())
continue
# Enforce tool call limit
if tool_call_counter[0] >= max_tool_calls:
resp = json.dumps({
"error": (
f"Tool call limit reached ({max_tool_calls}). "
"No more tool calls allowed in this execution."
)
})
conn.sendall((resp + "\n").encode())
continue
# Strip forbidden terminal parameters
if tool_name == "terminal" and isinstance(tool_args, dict):
for param in _TERMINAL_BLOCKED_PARAMS:
tool_args.pop(param, None)
# Dispatch through the standard tool handler
try:
result = handle_function_call(
tool_name, tool_args, task_id=task_id
)
except Exception as exc:
result = json.dumps({"error": str(exc)})
tool_call_counter[0] += 1
call_duration = time.monotonic() - call_start
# Log for observability
args_preview = str(tool_args)[:80]
tool_call_log.append({
"tool": tool_name,
"args_preview": args_preview,
"duration": round(call_duration, 2),
})
conn.sendall((result + "\n").encode())
except socket.timeout:
pass
except OSError:
pass
finally:
if conn:
try:
conn.close()
except OSError:
pass
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def execute_code(
code: str,
task_id: Optional[str] = None,
enabled_tools: Optional[List[str]] = None,
) -> str:
"""
Run a Python script in a sandboxed child process with RPC access
to a subset of Hermes tools.
Args:
code: Python source code to execute.
task_id: Session task ID for tool isolation (terminal env, etc.).
enabled_tools: Tool names enabled in the current session. The sandbox
gets the intersection with SANDBOX_ALLOWED_TOOLS.
Returns:
JSON string with execution results.
"""
if not SANDBOX_AVAILABLE:
return json.dumps({
"error": "execute_code is not available on Windows. Use normal tool calls instead."
})
if not code or not code.strip():
return json.dumps({"error": "No code provided."})
# Import interrupt event from terminal_tool (cooperative cancellation)
from tools.terminal_tool import _interrupt_event
# Resolve config
_cfg = _load_config()
timeout = _cfg.get("timeout", DEFAULT_TIMEOUT)
max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS)
# Determine which tools the sandbox can call
session_tools = set(enabled_tools) if enabled_tools else set()
sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools)
if not sandbox_tools:
sandbox_tools = SANDBOX_ALLOWED_TOOLS
# --- Set up temp directory with hermes_tools.py and script.py ---
tmpdir = tempfile.mkdtemp(prefix="hermes_sandbox_")
sock_path = f"/tmp/hermes_rpc_{uuid.uuid4().hex}.sock"
tool_call_log: list = []
tool_call_counter = [0] # mutable so the RPC thread can increment
exec_start = time.monotonic()
try:
# Write the auto-generated hermes_tools module
tools_src = generate_hermes_tools_module(
list(sandbox_tools) if enabled_tools else list(SANDBOX_ALLOWED_TOOLS)
)
with open(os.path.join(tmpdir, "hermes_tools.py"), "w") as f:
f.write(tools_src)
# Write the user's script
with open(os.path.join(tmpdir, "script.py"), "w") as f:
f.write(code)
# --- Start UDS server ---
server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_sock.bind(sock_path)
server_sock.listen(1)
rpc_thread = threading.Thread(
target=_rpc_server_loop,
args=(
server_sock, task_id, tool_call_log,
tool_call_counter, max_tool_calls, sandbox_tools,
),
daemon=True,
)
rpc_thread.start()
# --- Spawn child process ---
child_env = os.environ.copy()
child_env["HERMES_RPC_SOCKET"] = sock_path
child_env["PYTHONDONTWRITEBYTECODE"] = "1"
proc = subprocess.Popen(
[sys.executable, "script.py"],
cwd=tmpdir,
env=child_env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL,
preexec_fn=os.setsid,
)
# --- Poll loop: watch for exit, timeout, and interrupt ---
deadline = time.monotonic() + timeout
stdout_chunks: list = []
stderr_chunks: list = []
# Background readers to avoid pipe buffer deadlocks
def _drain(pipe, chunks, max_bytes):
total = 0
try:
while True:
data = pipe.read(4096)
if not data:
break
if total < max_bytes:
keep = max_bytes - total
chunks.append(data[:keep])
total += len(data)
except (ValueError, OSError):
pass
stdout_reader = threading.Thread(
target=_drain, args=(proc.stdout, stdout_chunks, MAX_STDOUT_BYTES), daemon=True
)
stderr_reader = threading.Thread(
target=_drain, args=(proc.stderr, stderr_chunks, MAX_STDERR_BYTES), daemon=True
)
stdout_reader.start()
stderr_reader.start()
status = "success"
while proc.poll() is None:
if _interrupt_event.is_set():
_kill_process_group(proc)
status = "interrupted"
break
if time.monotonic() > deadline:
_kill_process_group(proc, escalate=True)
status = "timeout"
break
time.sleep(0.2)
# Wait for readers to finish draining
stdout_reader.join(timeout=3)
stderr_reader.join(timeout=3)
stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace")
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# Truncation notice
if len(stdout_text) >= MAX_STDOUT_BYTES:
stdout_text = stdout_text[:MAX_STDOUT_BYTES] + "\n[output truncated at 50KB]"
exit_code = proc.returncode if proc.returncode is not None else -1
duration = round(time.monotonic() - exec_start, 2)
# Wait for RPC thread to finish
server_sock.close()
rpc_thread.join(timeout=3)
# Build response
result: Dict[str, Any] = {
"status": status,
"output": stdout_text,
"tool_calls_made": tool_call_counter[0],
"duration_seconds": duration,
}
if status == "timeout":
result["error"] = f"Script timed out after {timeout}s and was killed."
elif status == "interrupted":
result["output"] = stdout_text + "\n[execution interrupted — user sent a new message]"
elif exit_code != 0:
result["status"] = "error"
result["error"] = stderr_text or f"Script exited with code {exit_code}"
# Include stderr in output so the LLM sees the traceback
if stderr_text:
result["output"] = stdout_text + "\n--- stderr ---\n" + stderr_text
return json.dumps(result, ensure_ascii=False)
except Exception as exc:
duration = round(time.monotonic() - exec_start, 2)
logging.exception("execute_code failed")
return json.dumps({
"status": "error",
"error": str(exc),
"tool_calls_made": tool_call_counter[0],
"duration_seconds": duration,
}, ensure_ascii=False)
finally:
# Cleanup temp dir and socket
try:
import shutil
shutil.rmtree(tmpdir, ignore_errors=True)
except Exception:
pass
try:
os.unlink(sock_path)
except OSError:
pass
def _kill_process_group(proc, escalate: bool = False):
"""Kill the child and its entire process group."""
try:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
except (ProcessLookupError, PermissionError):
try:
proc.kill()
except Exception:
pass
if escalate:
# Give the process 5s to exit after SIGTERM, then SIGKILL
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except (ProcessLookupError, PermissionError):
try:
proc.kill()
except Exception:
pass
def _load_config() -> dict:
"""Load code_execution config from CLI_CONFIG if available."""
try:
from cli import CLI_CONFIG
return CLI_CONFIG.get("code_execution", {})
except Exception:
return {}
# ---------------------------------------------------------------------------
# OpenAI Function-Calling Schema
# ---------------------------------------------------------------------------
EXECUTE_CODE_SCHEMA = {
"name": "execute_code",
"description": (
"Run a Python script that can call Hermes tools programmatically. "
"Use this when you need 3+ tool calls with processing logic between them, "
"need to filter/reduce large tool outputs before they enter your context, "
"need conditional branching (if X then Y else Z), or need to loop "
"(fetch N pages, process N files, retry on failure).\n\n"
"Use normal tool calls instead when: single tool call with no processing, "
"you need to see the full result and apply complex reasoning, "
"or the task requires interactive user input.\n\n"
"Available via `from hermes_tools import ...`:\n\n"
" web_search(query: str, limit: int = 5) -> dict\n"
" Returns {\"results\": [{\"url\", \"title\", \"description\"}, ...]}\n"
" web_extract(urls: list[str]) -> str\n"
" Returns extracted page content as markdown text\n"
" read_file(path: str, offset: int = 1, limit: int = 500) -> dict\n"
" Lines are 1-indexed. Returns {\"content\": \"...\", \"total_lines\": N}\n"
" write_file(path: str, content: str) -> dict\n"
" Always overwrites the entire file.\n"
" search(pattern: str, target=\"content\", path=\".\", file_glob=None, limit=50) -> dict\n"
" target: \"content\" (grep) or \"files\" (find). Returns {\"matches\": [...]}\n"
" patch(path: str, old_string: str, new_string: str, replace_all: bool = False) -> dict\n"
" Replaces old_string with new_string in the file.\n"
" terminal(command: str, timeout=None, workdir=None) -> dict\n"
" Foreground only (no background/pty). Returns {\"output\": \"...\", \"exit_code\": N}\n\n"
"Print your final result to stdout. Use Python stdlib (json, re, math, csv, "
"datetime, collections, etc.) for processing between tool calls."
),
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": (
"Python code to execute. Import tools with "
"`from hermes_tools import web_search, terminal, ...` "
"and print your final result to stdout."
),
},
},
"required": ["code"],
},
}

View File

@@ -138,6 +138,12 @@ TOOLSETS = {
"includes": []
},
"code_execution": {
"description": "Run Python scripts that call tools programmatically (reduces LLM round trips)",
"tools": ["execute_code"],
"includes": []
},
# Scenario-specific toolsets
@@ -189,6 +195,8 @@ TOOLSETS = {
"session_search",
# Clarifying questions
"clarify",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Cronjob management (CLI-only)
"schedule_cronjob", "list_cronjobs", "remove_cronjob"
],
@@ -227,6 +235,8 @@ TOOLSETS = {
"memory",
# Session history search
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Cronjob management - let users schedule tasks
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging
@@ -263,6 +273,8 @@ TOOLSETS = {
"memory",
# Session history search
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Cronjob management - let users schedule tasks
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging
@@ -299,6 +311,8 @@ TOOLSETS = {
"memory",
# Session history search
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Cronjob management
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging
@@ -335,6 +349,8 @@ TOOLSETS = {
"memory",
# Session history search
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Cronjob management - let users schedule tasks
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging