#!/usr/bin/env python3 """ Code Execution Tool -- Programmatic Tool Calling (PTC) Lets the LLM write a Python script that calls Hermes tools via RPC, collapsing multi-step tool chains into a single inference turn. Architecture (two transports): **Local backend (UDS):** 1. Parent generates a `hermes_tools.py` stub module with UDS RPC functions 2. Parent opens a Unix domain socket and starts an RPC listener thread 3. Parent spawns a child process that runs the LLM's script 4. Tool calls travel over the UDS back to the parent for dispatch **Remote backends (file-based RPC):** 1. Parent generates `hermes_tools.py` with file-based RPC stubs 2. Parent ships both files to the remote environment 3. Script runs inside the terminal backend (Docker/SSH/Modal/Daytona/etc.) 4. Tool calls are written as request files; a polling thread on the parent reads them via execute_oneshot(), dispatches, and writes response files 5. The script polls for response files and continues In both cases, only the script's stdout is returned to the LLM; intermediate tool results never enter the context window. Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Windows. Remote execution additionally requires Python 3 in the terminal backend. """ import base64 import json import logging import os import platform import signal import socket import subprocess import sys import tempfile import threading import time import uuid _IS_WINDOWS = platform.system() == "Windows" from typing import Any, Dict, List, Optional # Availability gate: UDS requires a POSIX OS logger = logging.getLogger(__name__) SANDBOX_AVAILABLE = sys.platform != "win32" # The 7 tools allowed inside the sandbox. The intersection of this list # and the session's enabled tools determines which stubs are generated. SANDBOX_ALLOWED_TOOLS = frozenset([ "web_search", "web_extract", "read_file", "write_file", "search_files", "patch", "terminal", ]) # Resource limit defaults (overridable via config.yaml → code_execution.*) DEFAULT_TIMEOUT = 300 # 5 minutes DEFAULT_MAX_TOOL_CALLS = 50 MAX_STDOUT_BYTES = 50_000 # 50 KB MAX_STDERR_BYTES = 10_000 # 10 KB def check_sandbox_requirements() -> bool: """Code execution sandbox requires a POSIX OS for Unix domain sockets.""" return SANDBOX_AVAILABLE # --------------------------------------------------------------------------- # hermes_tools.py code generator # --------------------------------------------------------------------------- # Per-tool stub templates: (function_name, signature, docstring, args_dict_expr) # The args_dict_expr builds the JSON payload sent over the RPC socket. _TOOL_STUBS = { "web_search": ( "web_search", "query: str, limit: int = 5", '"""Search the web. Returns dict with data.web list of {url, title, description}."""', '{"query": query, "limit": limit}', ), "web_extract": ( "web_extract", "urls: list", '"""Extract content from URLs. Returns dict with results list of {url, title, content, error}."""', '{"urls": urls}', ), "read_file": ( "read_file", "path: str, offset: int = 1, limit: int = 500", '"""Read a file (1-indexed lines). Returns dict with "content" and "total_lines"."""', '{"path": path, "offset": offset, "limit": limit}', ), "write_file": ( "write_file", "path: str, content: str", '"""Write content to a file (always overwrites). Returns dict with status."""', '{"path": path, "content": content}', ), "search_files": ( "search_files", 'pattern: str, target: str = "content", path: str = ".", file_glob: str = None, limit: int = 50, offset: int = 0, output_mode: str = "content", context: int = 0', '"""Search file contents (target="content") or find files by name (target="files"). Returns dict with "matches"."""', '{"pattern": pattern, "target": target, "path": path, "file_glob": file_glob, "limit": limit, "offset": offset, "output_mode": output_mode, "context": context}', ), "patch": ( "patch", 'path: str = None, old_string: str = None, new_string: str = None, replace_all: bool = False, mode: str = "replace", patch: str = None', '"""Targeted find-and-replace (mode="replace") or V4A multi-file patches (mode="patch"). Returns dict with status."""', '{"path": path, "old_string": old_string, "new_string": new_string, "replace_all": replace_all, "mode": mode, "patch": patch}', ), "terminal": ( "terminal", "command: str, timeout: int = None, workdir: str = None", '"""Run a shell command (foreground only). Returns dict with "output" and "exit_code"."""', '{"command": command, "timeout": timeout, "workdir": workdir}', ), } def generate_hermes_tools_module(enabled_tools: List[str], transport: str = "uds") -> str: """ Build the source code for the hermes_tools.py stub module. Only tools in both SANDBOX_ALLOWED_TOOLS and enabled_tools get stubs. Args: enabled_tools: Tool names enabled in the current session. transport: ``"uds"`` for Unix domain socket (local backend) or ``"file"`` for file-based RPC (remote backends). """ tools_to_generate = sorted(SANDBOX_ALLOWED_TOOLS & set(enabled_tools)) stub_functions = [] export_names = [] for tool_name in tools_to_generate: if tool_name not in _TOOL_STUBS: continue func_name, sig, doc, args_expr = _TOOL_STUBS[tool_name] stub_functions.append( f"def {func_name}({sig}):\n" f" {doc}\n" f" return _call({func_name!r}, {args_expr})\n" ) export_names.append(func_name) if transport == "file": header = _FILE_TRANSPORT_HEADER else: header = _UDS_TRANSPORT_HEADER return header + "\n".join(stub_functions) # ---- Shared helpers section (embedded in both transport headers) ---------- _COMMON_HELPERS = '''\ # --------------------------------------------------------------------------- # Convenience helpers (avoid common scripting pitfalls) # --------------------------------------------------------------------------- def json_parse(text: str): """Parse JSON tolerant of control characters (strict=False). Use this instead of json.loads() when parsing output from terminal() or web_extract() that may contain raw tabs/newlines in strings.""" return json.loads(text, strict=False) def shell_quote(s: str) -> str: """Shell-escape a string for safe interpolation into commands. Use this when inserting dynamic content into terminal() commands: terminal(f"echo {shell_quote(user_input)}") """ return shlex.quote(s) def retry(fn, max_attempts=3, delay=2): """Retry a function up to max_attempts times with exponential backoff. Use for transient failures (network errors, API rate limits): result = retry(lambda: terminal("gh issue list ...")) """ last_err = None for attempt in range(max_attempts): try: return fn() except Exception as e: last_err = e if attempt < max_attempts - 1: time.sleep(delay * (2 ** attempt)) raise last_err ''' # ---- UDS transport (local backend) --------------------------------------- _UDS_TRANSPORT_HEADER = '''\ """Auto-generated Hermes tools RPC stubs.""" import json, os, socket, shlex, time _sock = None ''' + _COMMON_HELPERS + '''\ def _connect(): global _sock if _sock is None: _sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) _sock.connect(os.environ["HERMES_RPC_SOCKET"]) _sock.settimeout(300) return _sock def _call(tool_name, args): """Send a tool call to the parent process and return the parsed result.""" conn = _connect() request = json.dumps({"tool": tool_name, "args": args}) + "\\n" conn.sendall(request.encode()) buf = b"" while True: chunk = conn.recv(65536) if not chunk: raise RuntimeError("Agent process disconnected") buf += chunk if buf.endswith(b"\\n"): break raw = buf.decode().strip() result = json.loads(raw) if isinstance(result, str): try: return json.loads(result) except (json.JSONDecodeError, TypeError): return result return result ''' # ---- File-based transport (remote backends) ------------------------------- _FILE_TRANSPORT_HEADER = '''\ """Auto-generated Hermes tools RPC stubs (file-based transport).""" import json, os, shlex, time _RPC_DIR = os.environ.get("HERMES_RPC_DIR", "/tmp/hermes_rpc") _seq = 0 ''' + _COMMON_HELPERS + '''\ def _call(tool_name, args): """Send a tool call request via file-based RPC and wait for response.""" global _seq _seq += 1 seq_str = f"{_seq:06d}" req_file = os.path.join(_RPC_DIR, f"req_{seq_str}") res_file = os.path.join(_RPC_DIR, f"res_{seq_str}") # Write request atomically (write to .tmp, then rename) tmp = req_file + ".tmp" with open(tmp, "w") as f: json.dump({"tool": tool_name, "args": args, "seq": _seq}, f) os.rename(tmp, req_file) # Wait for response with adaptive polling deadline = time.monotonic() + 300 # 5-minute timeout per tool call poll_interval = 0.05 # Start at 50ms while not os.path.exists(res_file): if time.monotonic() > deadline: raise RuntimeError(f"RPC timeout: no response for {tool_name} after 300s") time.sleep(poll_interval) poll_interval = min(poll_interval * 1.2, 0.25) # Back off to 250ms with open(res_file) as f: raw = f.read() # Clean up response file try: os.unlink(res_file) except OSError: pass result = json.loads(raw) if isinstance(result, str): try: return json.loads(result) except (json.JSONDecodeError, TypeError): return result return result ''' # --------------------------------------------------------------------------- # RPC server (runs in a thread inside the parent process) # --------------------------------------------------------------------------- # Terminal parameters that must not be used from ephemeral sandbox scripts _TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty"} def _rpc_server_loop( server_sock: socket.socket, task_id: str, tool_call_log: list, tool_call_counter: list, # mutable [int] so the thread can increment max_tool_calls: int, allowed_tools: frozenset, ): """ Accept one client connection and dispatch tool-call requests until the client disconnects or the call limit is reached. """ from model_tools import handle_function_call conn = None try: server_sock.settimeout(5) conn, _ = server_sock.accept() conn.settimeout(300) buf = b"" while True: try: chunk = conn.recv(65536) except socket.timeout: break if not chunk: break buf += chunk # Process all complete newline-delimited messages in the buffer while b"\n" in buf: line, buf = buf.split(b"\n", 1) line = line.strip() if not line: continue call_start = time.monotonic() try: request = json.loads(line.decode()) except (json.JSONDecodeError, UnicodeDecodeError) as exc: resp = json.dumps({"error": f"Invalid RPC request: {exc}"}) conn.sendall((resp + "\n").encode()) continue tool_name = request.get("tool", "") tool_args = request.get("args", {}) # Enforce the allow-list if tool_name not in allowed_tools: available = ", ".join(sorted(allowed_tools)) resp = json.dumps({ "error": ( f"Tool '{tool_name}' is not available in execute_code. " f"Available: {available}" ) }) conn.sendall((resp + "\n").encode()) continue # Enforce tool call limit if tool_call_counter[0] >= max_tool_calls: resp = json.dumps({ "error": ( f"Tool call limit reached ({max_tool_calls}). " "No more tool calls allowed in this execution." ) }) conn.sendall((resp + "\n").encode()) continue # Strip forbidden terminal parameters if tool_name == "terminal" and isinstance(tool_args, dict): for param in _TERMINAL_BLOCKED_PARAMS: tool_args.pop(param, None) # Dispatch through the standard tool handler. # Suppress stdout/stderr from internal tool handlers so # their status prints don't leak into the CLI spinner. try: _real_stdout, _real_stderr = sys.stdout, sys.stderr devnull = open(os.devnull, "w") try: sys.stdout = devnull sys.stderr = devnull result = handle_function_call( tool_name, tool_args, task_id=task_id ) finally: sys.stdout, sys.stderr = _real_stdout, _real_stderr devnull.close() except Exception as exc: logger.error("Tool call failed in sandbox: %s", exc, exc_info=True) result = json.dumps({"error": str(exc)}) tool_call_counter[0] += 1 call_duration = time.monotonic() - call_start # Log for observability args_preview = str(tool_args)[:80] tool_call_log.append({ "tool": tool_name, "args_preview": args_preview, "duration": round(call_duration, 2), }) conn.sendall((result + "\n").encode()) except socket.timeout: logger.debug("RPC listener socket timeout") except OSError as e: logger.debug("RPC listener socket error: %s", e, exc_info=True) finally: if conn: try: conn.close() except OSError as e: logger.debug("RPC conn close error: %s", e) # --------------------------------------------------------------------------- # Remote execution support (file-based RPC via terminal backend) # --------------------------------------------------------------------------- def _get_or_create_env(task_id: str): """Get or create the terminal environment for *task_id*. Reuses the same environment (container/sandbox/SSH session) that the terminal and file tools use, creating one if it doesn't exist yet. Returns ``(env, env_type)`` tuple. """ from tools.terminal_tool import ( _active_environments, _env_lock, _create_environment, _get_env_config, _last_activity, _start_cleanup_thread, _creation_locks, _creation_locks_lock, _task_env_overrides, ) effective_task_id = task_id or "default" # Fast path: environment already exists with _env_lock: if effective_task_id in _active_environments: _last_activity[effective_task_id] = time.time() return _active_environments[effective_task_id], _get_env_config()["env_type"] # Slow path: create environment (same pattern as file_tools._get_file_ops) with _creation_locks_lock: if effective_task_id not in _creation_locks: _creation_locks[effective_task_id] = threading.Lock() task_lock = _creation_locks[effective_task_id] with task_lock: with _env_lock: if effective_task_id in _active_environments: _last_activity[effective_task_id] = time.time() return _active_environments[effective_task_id], _get_env_config()["env_type"] config = _get_env_config() env_type = config["env_type"] overrides = _task_env_overrides.get(effective_task_id, {}) if env_type == "docker": image = overrides.get("docker_image") or config["docker_image"] elif env_type == "singularity": image = overrides.get("singularity_image") or config["singularity_image"] elif env_type == "modal": image = overrides.get("modal_image") or config["modal_image"] elif env_type == "daytona": image = overrides.get("daytona_image") or config["daytona_image"] else: image = "" cwd = overrides.get("cwd") or config["cwd"] container_config = None if env_type in ("docker", "singularity", "modal", "daytona"): container_config = { "container_cpu": config.get("container_cpu", 1), "container_memory": config.get("container_memory", 5120), "container_disk": config.get("container_disk", 51200), "container_persistent": config.get("container_persistent", True), "docker_volumes": config.get("docker_volumes", []), } ssh_config = None if env_type == "ssh": ssh_config = { "host": config.get("ssh_host", ""), "user": config.get("ssh_user", ""), "port": config.get("ssh_port", 22), "key": config.get("ssh_key", ""), "persistent": config.get("ssh_persistent", False), } local_config = None if env_type == "local": local_config = { "persistent": config.get("local_persistent", False), } logger.info("Creating new %s environment for execute_code task %s...", env_type, effective_task_id[:8]) env = _create_environment( env_type=env_type, image=image, cwd=cwd, timeout=config["timeout"], ssh_config=ssh_config, container_config=container_config, local_config=local_config, task_id=effective_task_id, host_cwd=config.get("host_cwd"), ) with _env_lock: _active_environments[effective_task_id] = env _last_activity[effective_task_id] = time.time() _start_cleanup_thread() logger.info("%s environment ready for execute_code task %s", env_type, effective_task_id[:8]) return env, env_type def _ship_file_to_remote(env, remote_path: str, content: str) -> None: """Write *content* to *remote_path* on the remote environment. Uses ``echo … | base64 -d`` rather than stdin piping because some backends (Modal) don't reliably deliver stdin_data to chained commands. Base64 output is shell-safe ([A-Za-z0-9+/=]) so single quotes are fine. """ encoded = base64.b64encode(content.encode("utf-8")).decode("ascii") env.execute_oneshot( f"echo '{encoded}' | base64 -d > {remote_path}", cwd="/", timeout=30, ) def _rpc_poll_loop( env, rpc_dir: str, task_id: str, tool_call_log: list, tool_call_counter: list, max_tool_calls: int, allowed_tools: frozenset, stop_event: threading.Event, ): """Poll the remote filesystem for tool call requests and dispatch them. Runs in a background thread. Uses ``env.execute_oneshot()`` so it can operate concurrently with the script-execution thread that holds ``env.execute()`` (important for persistent-shell backends like SSH). """ from model_tools import handle_function_call poll_interval = 0.1 # 100 ms while not stop_event.is_set(): try: # List pending request files (skip .tmp partials) ls_result = env.execute_oneshot( f"ls -1 {rpc_dir}/req_* 2>/dev/null || true", cwd="/", timeout=10, ) output = ls_result.get("output", "").strip() if not output: stop_event.wait(poll_interval) continue req_files = sorted([ f.strip() for f in output.split("\n") if f.strip() and not f.strip().endswith(".tmp") and "/req_" in f.strip() ]) for req_file in req_files: if stop_event.is_set(): break call_start = time.monotonic() # Read request read_result = env.execute_oneshot( f"cat {req_file}", cwd="/", timeout=10, ) try: request = json.loads(read_result.get("output", "")) except (json.JSONDecodeError, ValueError): logger.debug("Malformed RPC request in %s", req_file) # Remove bad request to avoid infinite retry env.execute_oneshot(f"rm -f {req_file}", cwd="/", timeout=5) continue tool_name = request.get("tool", "") tool_args = request.get("args", {}) seq = request.get("seq", 0) seq_str = f"{seq:06d}" res_file = f"{rpc_dir}/res_{seq_str}" # Enforce allow-list if tool_name not in allowed_tools: available = ", ".join(sorted(allowed_tools)) tool_result = json.dumps({ "error": ( f"Tool '{tool_name}' is not available in execute_code. " f"Available: {available}" ) }) # Enforce tool call limit elif tool_call_counter[0] >= max_tool_calls: tool_result = json.dumps({ "error": ( f"Tool call limit reached ({max_tool_calls}). " "No more tool calls allowed in this execution." ) }) else: # Strip forbidden terminal parameters if tool_name == "terminal" and isinstance(tool_args, dict): for param in _TERMINAL_BLOCKED_PARAMS: tool_args.pop(param, None) # Dispatch through the standard tool handler try: _real_stdout, _real_stderr = sys.stdout, sys.stderr devnull = open(os.devnull, "w") try: sys.stdout = devnull sys.stderr = devnull tool_result = handle_function_call( tool_name, tool_args, task_id=task_id ) finally: sys.stdout, sys.stderr = _real_stdout, _real_stderr devnull.close() except Exception as exc: logger.error("Tool call failed in remote sandbox: %s", exc, exc_info=True) tool_result = json.dumps({"error": str(exc)}) tool_call_counter[0] += 1 call_duration = time.monotonic() - call_start tool_call_log.append({ "tool": tool_name, "args_preview": str(tool_args)[:80], "duration": round(call_duration, 2), }) # Write response atomically (tmp + rename). # Use echo piping (not stdin_data) because Modal doesn't # reliably deliver stdin to chained commands. encoded_result = base64.b64encode( tool_result.encode("utf-8") ).decode("ascii") env.execute_oneshot( f"echo '{encoded_result}' | base64 -d > {res_file}.tmp" f" && mv {res_file}.tmp {res_file}", cwd="/", timeout=60, ) # Remove the request file env.execute_oneshot(f"rm -f {req_file}", cwd="/", timeout=5) except Exception as e: if not stop_event.is_set(): logger.debug("RPC poll error: %s", e, exc_info=True) if not stop_event.is_set(): stop_event.wait(poll_interval) def _execute_remote( code: str, task_id: Optional[str], enabled_tools: Optional[List[str]], ) -> str: """Run a script on the remote terminal backend via file-based RPC. The script and the generated hermes_tools.py module are shipped to the remote environment, and tool calls are proxied through a polling thread that communicates via request/response files. """ from tools.terminal_tool import _interrupt_event _cfg = _load_config() timeout = _cfg.get("timeout", DEFAULT_TIMEOUT) max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS) session_tools = set(enabled_tools) if enabled_tools else set() sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools) if not sandbox_tools: sandbox_tools = SANDBOX_ALLOWED_TOOLS effective_task_id = task_id or "default" env, env_type = _get_or_create_env(effective_task_id) sandbox_id = uuid.uuid4().hex[:12] sandbox_dir = f"/tmp/hermes_exec_{sandbox_id}" tool_call_log: list = [] tool_call_counter = [0] exec_start = time.monotonic() stop_event = threading.Event() rpc_thread = None try: # Verify Python is available on the remote py_check = env.execute_oneshot( "command -v python3 >/dev/null 2>&1 && echo OK", cwd="/", timeout=15, ) if "OK" not in py_check.get("output", ""): return json.dumps({ "status": "error", "error": ( f"Python 3 is not available in the {env_type} terminal " "environment. Install Python to use execute_code with " "remote backends." ), "tool_calls_made": 0, "duration_seconds": 0, }) # Create sandbox directory on remote env.execute_oneshot( f"mkdir -p {sandbox_dir}/rpc", cwd="/", timeout=10, ) # Generate and ship files tools_src = generate_hermes_tools_module( list(sandbox_tools), transport="file", ) _ship_file_to_remote(env, f"{sandbox_dir}/hermes_tools.py", tools_src) _ship_file_to_remote(env, f"{sandbox_dir}/script.py", code) # Start RPC polling thread rpc_thread = threading.Thread( target=_rpc_poll_loop, args=( env, f"{sandbox_dir}/rpc", effective_task_id, tool_call_log, tool_call_counter, max_tool_calls, sandbox_tools, stop_event, ), daemon=True, ) rpc_thread.start() # Build environment variable prefix for the script env_prefix = ( f"HERMES_RPC_DIR={sandbox_dir}/rpc " f"PYTHONDONTWRITEBYTECODE=1" ) tz = os.getenv("HERMES_TIMEZONE", "").strip() if tz: env_prefix += f" TZ={tz}" # Execute the script on the remote backend logger.info("Executing code on %s backend (task %s)...", env_type, effective_task_id[:8]) script_result = env.execute( f"cd {sandbox_dir} && {env_prefix} python3 script.py", timeout=timeout, ) stdout_text = script_result.get("output", "") exit_code = script_result.get("returncode", -1) status = "success" # Check for timeout/interrupt from the backend if exit_code == 124: status = "timeout" elif exit_code == 130: status = "interrupted" except Exception as exc: duration = round(time.monotonic() - exec_start, 2) logger.error( "execute_code remote failed after %ss with %d tool calls: %s: %s", duration, tool_call_counter[0], type(exc).__name__, exc, exc_info=True, ) return json.dumps({ "status": "error", "error": str(exc), "tool_calls_made": tool_call_counter[0], "duration_seconds": duration, }, ensure_ascii=False) finally: # Stop the polling thread stop_event.set() if rpc_thread is not None: rpc_thread.join(timeout=5) # Clean up remote sandbox dir try: env.execute_oneshot( f"rm -rf {sandbox_dir}", cwd="/", timeout=15, ) except Exception: logger.debug("Failed to clean up remote sandbox %s", sandbox_dir) duration = round(time.monotonic() - exec_start, 2) # --- Post-process output (same as local path) --- # Truncate stdout to cap if len(stdout_text) > MAX_STDOUT_BYTES: head_bytes = int(MAX_STDOUT_BYTES * 0.4) tail_bytes = MAX_STDOUT_BYTES - head_bytes head = stdout_text[:head_bytes] tail = stdout_text[-tail_bytes:] omitted = len(stdout_text) - len(head) - len(tail) stdout_text = ( head + f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted " f"out of {len(stdout_text):,} total] ...\n\n" + tail ) # Strip ANSI escape sequences from tools.ansi_strip import strip_ansi stdout_text = strip_ansi(stdout_text) # Redact secrets from agent.redact import redact_sensitive_text stdout_text = redact_sensitive_text(stdout_text) # Build response result: Dict[str, Any] = { "status": status, "output": stdout_text, "tool_calls_made": tool_call_counter[0], "duration_seconds": duration, } if status == "timeout": result["error"] = f"Script timed out after {timeout}s and was killed." elif status == "interrupted": result["output"] = ( stdout_text + "\n[execution interrupted — user sent a new message]" ) elif exit_code != 0: result["status"] = "error" result["error"] = f"Script exited with code {exit_code}" return json.dumps(result, ensure_ascii=False) # --------------------------------------------------------------------------- # Main entry point # --------------------------------------------------------------------------- def execute_code( code: str, task_id: Optional[str] = None, enabled_tools: Optional[List[str]] = None, ) -> str: """ Run a Python script in a sandboxed child process with RPC access to a subset of Hermes tools. Dispatches to the local (UDS) or remote (file-based RPC) path depending on the configured terminal backend. Args: code: Python source code to execute. task_id: Session task ID for tool isolation (terminal env, etc.). enabled_tools: Tool names enabled in the current session. The sandbox gets the intersection with SANDBOX_ALLOWED_TOOLS. Returns: JSON string with execution results. """ if not SANDBOX_AVAILABLE: return json.dumps({ "error": "execute_code is not available on Windows. Use normal tool calls instead." }) if not code or not code.strip(): return json.dumps({"error": "No code provided."}) # Dispatch: remote backends use file-based RPC, local uses UDS from tools.terminal_tool import _get_env_config env_type = _get_env_config()["env_type"] if env_type != "local": return _execute_remote(code, task_id, enabled_tools) # --- Local execution path (UDS) --- below this line is unchanged --- # Import interrupt event from terminal_tool (cooperative cancellation) from tools.terminal_tool import _interrupt_event # Resolve config _cfg = _load_config() timeout = _cfg.get("timeout", DEFAULT_TIMEOUT) max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS) # Determine which tools the sandbox can call session_tools = set(enabled_tools) if enabled_tools else set() sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools) if not sandbox_tools: sandbox_tools = SANDBOX_ALLOWED_TOOLS # --- Set up temp directory with hermes_tools.py and script.py --- tmpdir = tempfile.mkdtemp(prefix="hermes_sandbox_") # Use /tmp on macOS to avoid the long /var/folders/... path that pushes # Unix domain socket paths past the 104-byte macOS AF_UNIX limit. # On Linux, tempfile.gettempdir() already returns /tmp. _sock_tmpdir = "/tmp" if sys.platform == "darwin" else tempfile.gettempdir() sock_path = os.path.join(_sock_tmpdir, f"hermes_rpc_{uuid.uuid4().hex}.sock") tool_call_log: list = [] tool_call_counter = [0] # mutable so the RPC thread can increment exec_start = time.monotonic() server_sock = None try: # Write the auto-generated hermes_tools module # sandbox_tools is already the correct set (intersection with session # tools, or SANDBOX_ALLOWED_TOOLS as fallback — see lines above). tools_src = generate_hermes_tools_module(list(sandbox_tools)) with open(os.path.join(tmpdir, "hermes_tools.py"), "w") as f: f.write(tools_src) # Write the user's script with open(os.path.join(tmpdir, "script.py"), "w") as f: f.write(code) # --- Start UDS server --- server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) server_sock.bind(sock_path) server_sock.listen(1) rpc_thread = threading.Thread( target=_rpc_server_loop, args=( server_sock, task_id, tool_call_log, tool_call_counter, max_tool_calls, sandbox_tools, ), daemon=True, ) rpc_thread.start() # --- Spawn child process --- # Build a minimal environment for the child. We intentionally exclude # API keys and tokens to prevent credential exfiltration from LLM- # generated scripts. The child accesses tools via RPC, not direct API. # Exception: env vars declared by loaded skills (via env_passthrough # registry) or explicitly allowed by the user in config.yaml # (terminal.env_passthrough) are passed through. _SAFE_ENV_PREFIXES = ("PATH", "HOME", "USER", "LANG", "LC_", "TERM", "TMPDIR", "TMP", "TEMP", "SHELL", "LOGNAME", "XDG_", "PYTHONPATH", "VIRTUAL_ENV", "CONDA") _SECRET_SUBSTRINGS = ("KEY", "TOKEN", "SECRET", "PASSWORD", "CREDENTIAL", "PASSWD", "AUTH") try: from tools.env_passthrough import is_env_passthrough as _is_passthrough except Exception: _is_passthrough = lambda _: False # noqa: E731 child_env = {} for k, v in os.environ.items(): # Passthrough vars (skill-declared or user-configured) always pass. if _is_passthrough(k): child_env[k] = v continue # Block vars with secret-like names. if any(s in k.upper() for s in _SECRET_SUBSTRINGS): continue # Allow vars with known safe prefixes. if any(k.startswith(p) for p in _SAFE_ENV_PREFIXES): child_env[k] = v child_env["HERMES_RPC_SOCKET"] = sock_path child_env["PYTHONDONTWRITEBYTECODE"] = "1" # Ensure the hermes-agent root is importable in the sandbox so # repo-root modules are available to child scripts. _hermes_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) _existing_pp = child_env.get("PYTHONPATH", "") child_env["PYTHONPATH"] = _hermes_root + (os.pathsep + _existing_pp if _existing_pp else "") # Inject user's configured timezone so datetime.now() in sandboxed # code reflects the correct wall-clock time. _tz_name = os.getenv("HERMES_TIMEZONE", "").strip() if _tz_name: child_env["TZ"] = _tz_name proc = subprocess.Popen( [sys.executable, "script.py"], cwd=tmpdir, env=child_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL, preexec_fn=None if _IS_WINDOWS else os.setsid, ) # --- Poll loop: watch for exit, timeout, and interrupt --- deadline = time.monotonic() + timeout stderr_chunks: list = [] # Background readers to avoid pipe buffer deadlocks. # For stdout we use a head+tail strategy: keep the first HEAD_BYTES # and a rolling window of the last TAIL_BYTES so the final print() # output is never lost. Stderr keeps head-only (errors appear early). _STDOUT_HEAD_BYTES = int(MAX_STDOUT_BYTES * 0.4) # 40% head _STDOUT_TAIL_BYTES = MAX_STDOUT_BYTES - _STDOUT_HEAD_BYTES # 60% tail def _drain(pipe, chunks, max_bytes): """Simple head-only drain (used for stderr).""" total = 0 try: while True: data = pipe.read(4096) if not data: break if total < max_bytes: keep = max_bytes - total chunks.append(data[:keep]) total += len(data) except (ValueError, OSError) as e: logger.debug("Error reading process output: %s", e, exc_info=True) stdout_total_bytes = [0] # mutable ref for total bytes seen def _drain_head_tail(pipe, head_chunks, tail_chunks, head_bytes, tail_bytes, total_ref): """Drain stdout keeping both head and tail data.""" head_collected = 0 from collections import deque tail_buf = deque() tail_collected = 0 try: while True: data = pipe.read(4096) if not data: break total_ref[0] += len(data) # Fill head buffer first if head_collected < head_bytes: keep = min(len(data), head_bytes - head_collected) head_chunks.append(data[:keep]) head_collected += keep data = data[keep:] # remaining goes to tail if not data: continue # Everything past head goes into rolling tail buffer tail_buf.append(data) tail_collected += len(data) # Evict old tail data to stay within tail_bytes budget while tail_collected > tail_bytes and tail_buf: oldest = tail_buf.popleft() tail_collected -= len(oldest) except (ValueError, OSError): pass # Transfer final tail to output list tail_chunks.extend(tail_buf) stdout_head_chunks: list = [] stdout_tail_chunks: list = [] stdout_reader = threading.Thread( target=_drain_head_tail, args=(proc.stdout, stdout_head_chunks, stdout_tail_chunks, _STDOUT_HEAD_BYTES, _STDOUT_TAIL_BYTES, stdout_total_bytes), daemon=True ) stderr_reader = threading.Thread( target=_drain, args=(proc.stderr, stderr_chunks, MAX_STDERR_BYTES), daemon=True ) stdout_reader.start() stderr_reader.start() status = "success" while proc.poll() is None: if _interrupt_event.is_set(): _kill_process_group(proc) status = "interrupted" break if time.monotonic() > deadline: _kill_process_group(proc, escalate=True) status = "timeout" break time.sleep(0.2) # Wait for readers to finish draining stdout_reader.join(timeout=3) stderr_reader.join(timeout=3) stdout_head = b"".join(stdout_head_chunks).decode("utf-8", errors="replace") stdout_tail = b"".join(stdout_tail_chunks).decode("utf-8", errors="replace") stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") # Assemble stdout with head+tail truncation total_stdout = stdout_total_bytes[0] if total_stdout > MAX_STDOUT_BYTES and stdout_tail: omitted = total_stdout - len(stdout_head) - len(stdout_tail) truncated_notice = ( f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted " f"out of {total_stdout:,} total] ...\n\n" ) stdout_text = stdout_head + truncated_notice + stdout_tail else: stdout_text = stdout_head + stdout_tail exit_code = proc.returncode if proc.returncode is not None else -1 duration = round(time.monotonic() - exec_start, 2) # Wait for RPC thread to finish server_sock.close() # break accept() so thread exits promptly server_sock = None # prevent double close in finally rpc_thread.join(timeout=3) # Strip ANSI escape sequences so the model never sees terminal # formatting — prevents it from copying escapes into file writes. from tools.ansi_strip import strip_ansi stdout_text = strip_ansi(stdout_text) stderr_text = strip_ansi(stderr_text) # Redact secrets (API keys, tokens, etc.) from sandbox output. # The sandbox env-var filter (lines 434-454) blocks os.environ access, # but scripts can still read secrets from disk (e.g. open('~/.hermes/.env')). # This ensures leaked secrets never enter the model context. from agent.redact import redact_sensitive_text stdout_text = redact_sensitive_text(stdout_text) stderr_text = redact_sensitive_text(stderr_text) # Build response result: Dict[str, Any] = { "status": status, "output": stdout_text, "tool_calls_made": tool_call_counter[0], "duration_seconds": duration, } if status == "timeout": result["error"] = f"Script timed out after {timeout}s and was killed." elif status == "interrupted": result["output"] = stdout_text + "\n[execution interrupted — user sent a new message]" elif exit_code != 0: result["status"] = "error" result["error"] = stderr_text or f"Script exited with code {exit_code}" # Include stderr in output so the LLM sees the traceback if stderr_text: result["output"] = stdout_text + "\n--- stderr ---\n" + stderr_text return json.dumps(result, ensure_ascii=False) except Exception as exc: duration = round(time.monotonic() - exec_start, 2) logger.error( "execute_code failed after %ss with %d tool calls: %s: %s", duration, tool_call_counter[0], type(exc).__name__, exc, exc_info=True, ) return json.dumps({ "status": "error", "error": str(exc), "tool_calls_made": tool_call_counter[0], "duration_seconds": duration, }, ensure_ascii=False) finally: # Cleanup temp dir and socket if server_sock is not None: try: server_sock.close() except OSError as e: logger.debug("Server socket close error: %s", e) import shutil shutil.rmtree(tmpdir, ignore_errors=True) try: os.unlink(sock_path) except OSError: pass # already cleaned up or never created def _kill_process_group(proc, escalate: bool = False): """Kill the child and its entire process group.""" try: if _IS_WINDOWS: proc.terminate() else: os.killpg(os.getpgid(proc.pid), signal.SIGTERM) except (ProcessLookupError, PermissionError) as e: logger.debug("Could not kill process group: %s", e, exc_info=True) try: proc.kill() except Exception as e2: logger.debug("Could not kill process: %s", e2, exc_info=True) if escalate: # Give the process 5s to exit after SIGTERM, then SIGKILL try: proc.wait(timeout=5) except subprocess.TimeoutExpired: try: if _IS_WINDOWS: proc.kill() else: os.killpg(os.getpgid(proc.pid), signal.SIGKILL) except (ProcessLookupError, PermissionError) as e: logger.debug("Could not kill process group with SIGKILL: %s", e, exc_info=True) try: proc.kill() except Exception as e2: logger.debug("Could not kill process: %s", e2, exc_info=True) def _load_config() -> dict: """Load code_execution config from CLI_CONFIG if available.""" try: from cli import CLI_CONFIG return CLI_CONFIG.get("code_execution", {}) except Exception: return {} # --------------------------------------------------------------------------- # OpenAI Function-Calling Schema # --------------------------------------------------------------------------- # Per-tool documentation lines for the execute_code description. # Ordered to match the canonical display order. _TOOL_DOC_LINES = [ ("web_search", " web_search(query: str, limit: int = 5) -> dict\n" " Returns {\"data\": {\"web\": [{\"url\", \"title\", \"description\"}, ...]}}"), ("web_extract", " web_extract(urls: list[str]) -> dict\n" " Returns {\"results\": [{\"url\", \"title\", \"content\", \"error\"}, ...]} where content is markdown"), ("read_file", " read_file(path: str, offset: int = 1, limit: int = 500) -> dict\n" " Lines are 1-indexed. Returns {\"content\": \"...\", \"total_lines\": N}"), ("write_file", " write_file(path: str, content: str) -> dict\n" " Always overwrites the entire file."), ("search_files", " search_files(pattern: str, target=\"content\", path=\".\", file_glob=None, limit=50) -> dict\n" " target: \"content\" (search inside files) or \"files\" (find files by name). Returns {\"matches\": [...]}"), ("patch", " patch(path: str, old_string: str, new_string: str, replace_all: bool = False) -> dict\n" " Replaces old_string with new_string in the file."), ("terminal", " terminal(command: str, timeout=None, workdir=None) -> dict\n" " Foreground only (no background/pty). Returns {\"output\": \"...\", \"exit_code\": N}"), ] def build_execute_code_schema(enabled_sandbox_tools: set = None) -> dict: """Build the execute_code schema with description listing only enabled tools. When tools are disabled via ``hermes tools`` (e.g. web is turned off), the schema description should NOT mention web_search / web_extract — otherwise the model thinks they are available and keeps trying to use them. """ if enabled_sandbox_tools is None: enabled_sandbox_tools = SANDBOX_ALLOWED_TOOLS # Build tool documentation lines for only the enabled tools tool_lines = "\n".join( doc for name, doc in _TOOL_DOC_LINES if name in enabled_sandbox_tools ) # Build example import list from enabled tools import_examples = [n for n in ("web_search", "terminal") if n in enabled_sandbox_tools] if not import_examples: import_examples = sorted(enabled_sandbox_tools)[:2] if import_examples: import_str = ", ".join(import_examples) + ", ..." else: import_str = "..." description = ( "Run a Python script that can call Hermes tools programmatically. " "Use this when you need 3+ tool calls with processing logic between them, " "need to filter/reduce large tool outputs before they enter your context, " "need conditional branching (if X then Y else Z), or need to loop " "(fetch N pages, process N files, retry on failure).\n\n" "Use normal tool calls instead when: single tool call with no processing, " "you need to see the full result and apply complex reasoning, " "or the task requires interactive user input.\n\n" f"Available via `from hermes_tools import ...`:\n\n" f"{tool_lines}\n\n" "Limits: 5-minute timeout, 50KB stdout cap, max 50 tool calls per script. " "terminal() is foreground-only (no background or pty). " "If the session uses a cloud sandbox backend, treat it as resumable task state rather than a durable always-on machine.\n\n" "Print your final result to stdout. Use Python stdlib (json, re, math, csv, " "datetime, collections, etc.) for processing between tool calls.\n\n" "Also available (no import needed — built into hermes_tools):\n" " json_parse(text: str) — json.loads with strict=False; use for terminal() output with control chars\n" " shell_quote(s: str) — shlex.quote(); use when interpolating dynamic strings into shell commands\n" " retry(fn, max_attempts=3, delay=2) — retry with exponential backoff for transient failures" ) return { "name": "execute_code", "description": description, "parameters": { "type": "object", "properties": { "code": { "type": "string", "description": ( "Python code to execute. Import tools with " f"`from hermes_tools import {import_str}` " "and print your final result to stdout." ), }, }, "required": ["code"], }, } # Default schema used at registration time (all sandbox tools listed) EXECUTE_CODE_SCHEMA = build_execute_code_schema() # --- Registry --- from tools.registry import registry registry.register( name="execute_code", toolset="code_execution", schema=EXECUTE_CODE_SCHEMA, handler=lambda args, **kw: execute_code( code=args.get("code", ""), task_id=kw.get("task_id"), enabled_tools=kw.get("enabled_tools")), check_fn=check_sandbox_requirements, emoji="🐍", )