feat: execute_code runs on remote terminal backends (#5088)
* feat: execute_code runs on remote terminal backends (Docker/SSH/Modal/Daytona/Singularity) When TERMINAL_ENV is not 'local', execute_code now ships the script to the remote environment and runs it there via the terminal backend -- the same container/sandbox/SSH session used by terminal() and file tools. Architecture: - Local backend: unchanged (UDS RPC, subprocess.Popen) - Remote backends: file-based RPC via execute_oneshot() polling - Script writes request files, parent polls and dispatches tool calls - Responses written atomically (tmp + rename) via base64/stdin - execute_oneshot() bypasses persistent shell lock for concurrency Changes: - tools/environments/base.py: add execute_oneshot() (delegates to execute()) - tools/environments/persistent_shell.py: override execute_oneshot() to bypass _shell_lock via _execute_oneshot(), enabling concurrent polling - tools/code_execution_tool.py: add file-based transport to generate_hermes_tools_module(), _execute_remote() with full env get-or-create, file shipping, RPC poll loop, output post-processing * fix: use _get_env_config() instead of raw TERMINAL_ENV env var Read terminal backend type through the canonical config resolution path (terminal_tool._get_env_config) instead of os.getenv directly. * fix: use echo piping instead of stdin_data for base64 writes Modal doesn't reliably deliver stdin_data to chained commands (base64 -d > file && mv), producing 0-byte files. Switch to echo 'base64' | base64 -d which works on all backends. Verified E2E on both Docker and Modal.
This commit is contained in:
@@ -5,18 +5,30 @@ Code Execution Tool -- Programmatic Tool Calling (PTC)
|
||||
Lets the LLM write a Python script that calls Hermes tools via RPC,
|
||||
collapsing multi-step tool chains into a single inference turn.
|
||||
|
||||
Architecture:
|
||||
1. Parent generates a `hermes_tools.py` stub module with RPC functions
|
||||
Architecture (two transports):
|
||||
|
||||
**Local backend (UDS):**
|
||||
1. Parent generates a `hermes_tools.py` stub module with UDS RPC functions
|
||||
2. Parent opens a Unix domain socket and starts an RPC listener thread
|
||||
3. Parent spawns a child process that runs the LLM's script
|
||||
4. When the script calls a tool function, the call travels over the UDS
|
||||
back to the parent, which dispatches through handle_function_call
|
||||
5. Only the script's stdout is returned to the LLM; intermediate tool
|
||||
results never enter the context window
|
||||
4. Tool calls travel over the UDS back to the parent for dispatch
|
||||
|
||||
Platform: Linux / macOS only (Unix domain sockets). Disabled on Windows.
|
||||
**Remote backends (file-based RPC):**
|
||||
1. Parent generates `hermes_tools.py` with file-based RPC stubs
|
||||
2. Parent ships both files to the remote environment
|
||||
3. Script runs inside the terminal backend (Docker/SSH/Modal/Daytona/etc.)
|
||||
4. Tool calls are written as request files; a polling thread on the parent
|
||||
reads them via execute_oneshot(), dispatches, and writes response files
|
||||
5. The script polls for response files and continues
|
||||
|
||||
In both cases, only the script's stdout is returned to the LLM; intermediate
|
||||
tool results never enter the context window.
|
||||
|
||||
Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Windows.
|
||||
Remote execution additionally requires Python 3 in the terminal backend.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -114,11 +126,17 @@ _TOOL_STUBS = {
|
||||
}
|
||||
|
||||
|
||||
def generate_hermes_tools_module(enabled_tools: List[str]) -> str:
|
||||
def generate_hermes_tools_module(enabled_tools: List[str],
|
||||
transport: str = "uds") -> str:
|
||||
"""
|
||||
Build the source code for the hermes_tools.py stub module.
|
||||
|
||||
Only tools in both SANDBOX_ALLOWED_TOOLS and enabled_tools get stubs.
|
||||
|
||||
Args:
|
||||
enabled_tools: Tool names enabled in the current session.
|
||||
transport: ``"uds"`` for Unix domain socket (local backend) or
|
||||
``"file"`` for file-based RPC (remote backends).
|
||||
"""
|
||||
tools_to_generate = sorted(SANDBOX_ALLOWED_TOOLS & set(enabled_tools))
|
||||
|
||||
@@ -135,13 +153,18 @@ def generate_hermes_tools_module(enabled_tools: List[str]) -> str:
|
||||
)
|
||||
export_names.append(func_name)
|
||||
|
||||
header = '''\
|
||||
"""Auto-generated Hermes tools RPC stubs."""
|
||||
import json, os, socket, shlex, time
|
||||
if transport == "file":
|
||||
header = _FILE_TRANSPORT_HEADER
|
||||
else:
|
||||
header = _UDS_TRANSPORT_HEADER
|
||||
|
||||
_sock = None
|
||||
return header + "\n".join(stub_functions)
|
||||
|
||||
|
||||
# ---- Shared helpers section (embedded in both transport headers) ----------
|
||||
|
||||
_COMMON_HELPERS = '''\
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Convenience helpers (avoid common scripting pitfalls)
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -176,6 +199,17 @@ def retry(fn, max_attempts=3, delay=2):
|
||||
time.sleep(delay * (2 ** attempt))
|
||||
raise last_err
|
||||
|
||||
'''
|
||||
|
||||
# ---- UDS transport (local backend) ---------------------------------------
|
||||
|
||||
_UDS_TRANSPORT_HEADER = '''\
|
||||
"""Auto-generated Hermes tools RPC stubs."""
|
||||
import json, os, socket, shlex, time
|
||||
|
||||
_sock = None
|
||||
''' + _COMMON_HELPERS + '''\
|
||||
|
||||
def _connect():
|
||||
global _sock
|
||||
if _sock is None:
|
||||
@@ -208,7 +242,57 @@ def _call(tool_name, args):
|
||||
|
||||
'''
|
||||
|
||||
return header + "\n".join(stub_functions)
|
||||
# ---- File-based transport (remote backends) -------------------------------
|
||||
|
||||
_FILE_TRANSPORT_HEADER = '''\
|
||||
"""Auto-generated Hermes tools RPC stubs (file-based transport)."""
|
||||
import json, os, shlex, time
|
||||
|
||||
_RPC_DIR = os.environ.get("HERMES_RPC_DIR", "/tmp/hermes_rpc")
|
||||
_seq = 0
|
||||
''' + _COMMON_HELPERS + '''\
|
||||
|
||||
def _call(tool_name, args):
|
||||
"""Send a tool call request via file-based RPC and wait for response."""
|
||||
global _seq
|
||||
_seq += 1
|
||||
seq_str = f"{_seq:06d}"
|
||||
req_file = os.path.join(_RPC_DIR, f"req_{seq_str}")
|
||||
res_file = os.path.join(_RPC_DIR, f"res_{seq_str}")
|
||||
|
||||
# Write request atomically (write to .tmp, then rename)
|
||||
tmp = req_file + ".tmp"
|
||||
with open(tmp, "w") as f:
|
||||
json.dump({"tool": tool_name, "args": args, "seq": _seq}, f)
|
||||
os.rename(tmp, req_file)
|
||||
|
||||
# Wait for response with adaptive polling
|
||||
deadline = time.monotonic() + 300 # 5-minute timeout per tool call
|
||||
poll_interval = 0.05 # Start at 50ms
|
||||
while not os.path.exists(res_file):
|
||||
if time.monotonic() > deadline:
|
||||
raise RuntimeError(f"RPC timeout: no response for {tool_name} after 300s")
|
||||
time.sleep(poll_interval)
|
||||
poll_interval = min(poll_interval * 1.2, 0.25) # Back off to 250ms
|
||||
|
||||
with open(res_file) as f:
|
||||
raw = f.read()
|
||||
|
||||
# Clean up response file
|
||||
try:
|
||||
os.unlink(res_file)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
result = json.loads(raw)
|
||||
if isinstance(result, str):
|
||||
try:
|
||||
return json.loads(result)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return result
|
||||
return result
|
||||
|
||||
'''
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -339,6 +423,443 @@ def _rpc_server_loop(
|
||||
logger.debug("RPC conn close error: %s", e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Remote execution support (file-based RPC via terminal backend)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _get_or_create_env(task_id: str):
|
||||
"""Get or create the terminal environment for *task_id*.
|
||||
|
||||
Reuses the same environment (container/sandbox/SSH session) that the
|
||||
terminal and file tools use, creating one if it doesn't exist yet.
|
||||
Returns ``(env, env_type)`` tuple.
|
||||
"""
|
||||
from tools.terminal_tool import (
|
||||
_active_environments, _env_lock, _create_environment,
|
||||
_get_env_config, _last_activity, _start_cleanup_thread,
|
||||
_creation_locks, _creation_locks_lock, _task_env_overrides,
|
||||
)
|
||||
|
||||
effective_task_id = task_id or "default"
|
||||
|
||||
# Fast path: environment already exists
|
||||
with _env_lock:
|
||||
if effective_task_id in _active_environments:
|
||||
_last_activity[effective_task_id] = time.time()
|
||||
return _active_environments[effective_task_id], _get_env_config()["env_type"]
|
||||
|
||||
# Slow path: create environment (same pattern as file_tools._get_file_ops)
|
||||
with _creation_locks_lock:
|
||||
if effective_task_id not in _creation_locks:
|
||||
_creation_locks[effective_task_id] = threading.Lock()
|
||||
task_lock = _creation_locks[effective_task_id]
|
||||
|
||||
with task_lock:
|
||||
with _env_lock:
|
||||
if effective_task_id in _active_environments:
|
||||
_last_activity[effective_task_id] = time.time()
|
||||
return _active_environments[effective_task_id], _get_env_config()["env_type"]
|
||||
|
||||
config = _get_env_config()
|
||||
env_type = config["env_type"]
|
||||
overrides = _task_env_overrides.get(effective_task_id, {})
|
||||
|
||||
if env_type == "docker":
|
||||
image = overrides.get("docker_image") or config["docker_image"]
|
||||
elif env_type == "singularity":
|
||||
image = overrides.get("singularity_image") or config["singularity_image"]
|
||||
elif env_type == "modal":
|
||||
image = overrides.get("modal_image") or config["modal_image"]
|
||||
elif env_type == "daytona":
|
||||
image = overrides.get("daytona_image") or config["daytona_image"]
|
||||
else:
|
||||
image = ""
|
||||
|
||||
cwd = overrides.get("cwd") or config["cwd"]
|
||||
|
||||
container_config = None
|
||||
if env_type in ("docker", "singularity", "modal", "daytona"):
|
||||
container_config = {
|
||||
"container_cpu": config.get("container_cpu", 1),
|
||||
"container_memory": config.get("container_memory", 5120),
|
||||
"container_disk": config.get("container_disk", 51200),
|
||||
"container_persistent": config.get("container_persistent", True),
|
||||
"docker_volumes": config.get("docker_volumes", []),
|
||||
}
|
||||
|
||||
ssh_config = None
|
||||
if env_type == "ssh":
|
||||
ssh_config = {
|
||||
"host": config.get("ssh_host", ""),
|
||||
"user": config.get("ssh_user", ""),
|
||||
"port": config.get("ssh_port", 22),
|
||||
"key": config.get("ssh_key", ""),
|
||||
"persistent": config.get("ssh_persistent", False),
|
||||
}
|
||||
|
||||
local_config = None
|
||||
if env_type == "local":
|
||||
local_config = {
|
||||
"persistent": config.get("local_persistent", False),
|
||||
}
|
||||
|
||||
logger.info("Creating new %s environment for execute_code task %s...",
|
||||
env_type, effective_task_id[:8])
|
||||
env = _create_environment(
|
||||
env_type=env_type,
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
timeout=config["timeout"],
|
||||
ssh_config=ssh_config,
|
||||
container_config=container_config,
|
||||
local_config=local_config,
|
||||
task_id=effective_task_id,
|
||||
host_cwd=config.get("host_cwd"),
|
||||
)
|
||||
|
||||
with _env_lock:
|
||||
_active_environments[effective_task_id] = env
|
||||
_last_activity[effective_task_id] = time.time()
|
||||
|
||||
_start_cleanup_thread()
|
||||
logger.info("%s environment ready for execute_code task %s",
|
||||
env_type, effective_task_id[:8])
|
||||
return env, env_type
|
||||
|
||||
|
||||
def _ship_file_to_remote(env, remote_path: str, content: str) -> None:
|
||||
"""Write *content* to *remote_path* on the remote environment.
|
||||
|
||||
Uses ``echo … | base64 -d`` rather than stdin piping because some
|
||||
backends (Modal) don't reliably deliver stdin_data to chained
|
||||
commands. Base64 output is shell-safe ([A-Za-z0-9+/=]) so single
|
||||
quotes are fine.
|
||||
"""
|
||||
encoded = base64.b64encode(content.encode("utf-8")).decode("ascii")
|
||||
env.execute_oneshot(
|
||||
f"echo '{encoded}' | base64 -d > {remote_path}",
|
||||
cwd="/",
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
|
||||
def _rpc_poll_loop(
|
||||
env,
|
||||
rpc_dir: str,
|
||||
task_id: str,
|
||||
tool_call_log: list,
|
||||
tool_call_counter: list,
|
||||
max_tool_calls: int,
|
||||
allowed_tools: frozenset,
|
||||
stop_event: threading.Event,
|
||||
):
|
||||
"""Poll the remote filesystem for tool call requests and dispatch them.
|
||||
|
||||
Runs in a background thread. Uses ``env.execute_oneshot()`` so it can
|
||||
operate concurrently with the script-execution thread that holds
|
||||
``env.execute()`` (important for persistent-shell backends like SSH).
|
||||
"""
|
||||
from model_tools import handle_function_call
|
||||
|
||||
poll_interval = 0.1 # 100 ms
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
# List pending request files (skip .tmp partials)
|
||||
ls_result = env.execute_oneshot(
|
||||
f"ls -1 {rpc_dir}/req_* 2>/dev/null || true",
|
||||
cwd="/",
|
||||
timeout=10,
|
||||
)
|
||||
output = ls_result.get("output", "").strip()
|
||||
if not output:
|
||||
stop_event.wait(poll_interval)
|
||||
continue
|
||||
|
||||
req_files = sorted([
|
||||
f.strip() for f in output.split("\n")
|
||||
if f.strip()
|
||||
and not f.strip().endswith(".tmp")
|
||||
and "/req_" in f.strip()
|
||||
])
|
||||
|
||||
for req_file in req_files:
|
||||
if stop_event.is_set():
|
||||
break
|
||||
|
||||
call_start = time.monotonic()
|
||||
|
||||
# Read request
|
||||
read_result = env.execute_oneshot(
|
||||
f"cat {req_file}",
|
||||
cwd="/",
|
||||
timeout=10,
|
||||
)
|
||||
try:
|
||||
request = json.loads(read_result.get("output", ""))
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
logger.debug("Malformed RPC request in %s", req_file)
|
||||
# Remove bad request to avoid infinite retry
|
||||
env.execute_oneshot(f"rm -f {req_file}", cwd="/", timeout=5)
|
||||
continue
|
||||
|
||||
tool_name = request.get("tool", "")
|
||||
tool_args = request.get("args", {})
|
||||
seq = request.get("seq", 0)
|
||||
seq_str = f"{seq:06d}"
|
||||
res_file = f"{rpc_dir}/res_{seq_str}"
|
||||
|
||||
# Enforce allow-list
|
||||
if tool_name not in allowed_tools:
|
||||
available = ", ".join(sorted(allowed_tools))
|
||||
tool_result = json.dumps({
|
||||
"error": (
|
||||
f"Tool '{tool_name}' is not available in execute_code. "
|
||||
f"Available: {available}"
|
||||
)
|
||||
})
|
||||
# Enforce tool call limit
|
||||
elif tool_call_counter[0] >= max_tool_calls:
|
||||
tool_result = json.dumps({
|
||||
"error": (
|
||||
f"Tool call limit reached ({max_tool_calls}). "
|
||||
"No more tool calls allowed in this execution."
|
||||
)
|
||||
})
|
||||
else:
|
||||
# Strip forbidden terminal parameters
|
||||
if tool_name == "terminal" and isinstance(tool_args, dict):
|
||||
for param in _TERMINAL_BLOCKED_PARAMS:
|
||||
tool_args.pop(param, None)
|
||||
|
||||
# Dispatch through the standard tool handler
|
||||
try:
|
||||
_real_stdout, _real_stderr = sys.stdout, sys.stderr
|
||||
devnull = open(os.devnull, "w")
|
||||
try:
|
||||
sys.stdout = devnull
|
||||
sys.stderr = devnull
|
||||
tool_result = handle_function_call(
|
||||
tool_name, tool_args, task_id=task_id
|
||||
)
|
||||
finally:
|
||||
sys.stdout, sys.stderr = _real_stdout, _real_stderr
|
||||
devnull.close()
|
||||
except Exception as exc:
|
||||
logger.error("Tool call failed in remote sandbox: %s",
|
||||
exc, exc_info=True)
|
||||
tool_result = json.dumps({"error": str(exc)})
|
||||
|
||||
tool_call_counter[0] += 1
|
||||
call_duration = time.monotonic() - call_start
|
||||
tool_call_log.append({
|
||||
"tool": tool_name,
|
||||
"args_preview": str(tool_args)[:80],
|
||||
"duration": round(call_duration, 2),
|
||||
})
|
||||
|
||||
# Write response atomically (tmp + rename).
|
||||
# Use echo piping (not stdin_data) because Modal doesn't
|
||||
# reliably deliver stdin to chained commands.
|
||||
encoded_result = base64.b64encode(
|
||||
tool_result.encode("utf-8")
|
||||
).decode("ascii")
|
||||
env.execute_oneshot(
|
||||
f"echo '{encoded_result}' | base64 -d > {res_file}.tmp"
|
||||
f" && mv {res_file}.tmp {res_file}",
|
||||
cwd="/",
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
# Remove the request file
|
||||
env.execute_oneshot(f"rm -f {req_file}", cwd="/", timeout=5)
|
||||
|
||||
except Exception as e:
|
||||
if not stop_event.is_set():
|
||||
logger.debug("RPC poll error: %s", e, exc_info=True)
|
||||
|
||||
if not stop_event.is_set():
|
||||
stop_event.wait(poll_interval)
|
||||
|
||||
|
||||
def _execute_remote(
|
||||
code: str,
|
||||
task_id: Optional[str],
|
||||
enabled_tools: Optional[List[str]],
|
||||
) -> str:
|
||||
"""Run a script on the remote terminal backend via file-based RPC.
|
||||
|
||||
The script and the generated hermes_tools.py module are shipped to
|
||||
the remote environment, and tool calls are proxied through a polling
|
||||
thread that communicates via request/response files.
|
||||
"""
|
||||
from tools.terminal_tool import _interrupt_event
|
||||
|
||||
_cfg = _load_config()
|
||||
timeout = _cfg.get("timeout", DEFAULT_TIMEOUT)
|
||||
max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS)
|
||||
|
||||
session_tools = set(enabled_tools) if enabled_tools else set()
|
||||
sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools)
|
||||
if not sandbox_tools:
|
||||
sandbox_tools = SANDBOX_ALLOWED_TOOLS
|
||||
|
||||
effective_task_id = task_id or "default"
|
||||
env, env_type = _get_or_create_env(effective_task_id)
|
||||
|
||||
sandbox_id = uuid.uuid4().hex[:12]
|
||||
sandbox_dir = f"/tmp/hermes_exec_{sandbox_id}"
|
||||
|
||||
tool_call_log: list = []
|
||||
tool_call_counter = [0]
|
||||
exec_start = time.monotonic()
|
||||
stop_event = threading.Event()
|
||||
rpc_thread = None
|
||||
|
||||
try:
|
||||
# Verify Python is available on the remote
|
||||
py_check = env.execute_oneshot(
|
||||
"command -v python3 >/dev/null 2>&1 && echo OK",
|
||||
cwd="/", timeout=15,
|
||||
)
|
||||
if "OK" not in py_check.get("output", ""):
|
||||
return json.dumps({
|
||||
"status": "error",
|
||||
"error": (
|
||||
f"Python 3 is not available in the {env_type} terminal "
|
||||
"environment. Install Python to use execute_code with "
|
||||
"remote backends."
|
||||
),
|
||||
"tool_calls_made": 0,
|
||||
"duration_seconds": 0,
|
||||
})
|
||||
|
||||
# Create sandbox directory on remote
|
||||
env.execute_oneshot(
|
||||
f"mkdir -p {sandbox_dir}/rpc", cwd="/", timeout=10,
|
||||
)
|
||||
|
||||
# Generate and ship files
|
||||
tools_src = generate_hermes_tools_module(
|
||||
list(sandbox_tools), transport="file",
|
||||
)
|
||||
_ship_file_to_remote(env, f"{sandbox_dir}/hermes_tools.py", tools_src)
|
||||
_ship_file_to_remote(env, f"{sandbox_dir}/script.py", code)
|
||||
|
||||
# Start RPC polling thread
|
||||
rpc_thread = threading.Thread(
|
||||
target=_rpc_poll_loop,
|
||||
args=(
|
||||
env, f"{sandbox_dir}/rpc", effective_task_id,
|
||||
tool_call_log, tool_call_counter, max_tool_calls,
|
||||
sandbox_tools, stop_event,
|
||||
),
|
||||
daemon=True,
|
||||
)
|
||||
rpc_thread.start()
|
||||
|
||||
# Build environment variable prefix for the script
|
||||
env_prefix = (
|
||||
f"HERMES_RPC_DIR={sandbox_dir}/rpc "
|
||||
f"PYTHONDONTWRITEBYTECODE=1"
|
||||
)
|
||||
tz = os.getenv("HERMES_TIMEZONE", "").strip()
|
||||
if tz:
|
||||
env_prefix += f" TZ={tz}"
|
||||
|
||||
# Execute the script on the remote backend
|
||||
logger.info("Executing code on %s backend (task %s)...",
|
||||
env_type, effective_task_id[:8])
|
||||
script_result = env.execute(
|
||||
f"cd {sandbox_dir} && {env_prefix} python3 script.py",
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
stdout_text = script_result.get("output", "")
|
||||
exit_code = script_result.get("returncode", -1)
|
||||
status = "success"
|
||||
|
||||
# Check for timeout/interrupt from the backend
|
||||
if exit_code == 124:
|
||||
status = "timeout"
|
||||
elif exit_code == 130:
|
||||
status = "interrupted"
|
||||
|
||||
except Exception as exc:
|
||||
duration = round(time.monotonic() - exec_start, 2)
|
||||
logger.error(
|
||||
"execute_code remote failed after %ss with %d tool calls: %s: %s",
|
||||
duration, tool_call_counter[0], type(exc).__name__, exc,
|
||||
exc_info=True,
|
||||
)
|
||||
return json.dumps({
|
||||
"status": "error",
|
||||
"error": str(exc),
|
||||
"tool_calls_made": tool_call_counter[0],
|
||||
"duration_seconds": duration,
|
||||
}, ensure_ascii=False)
|
||||
|
||||
finally:
|
||||
# Stop the polling thread
|
||||
stop_event.set()
|
||||
if rpc_thread is not None:
|
||||
rpc_thread.join(timeout=5)
|
||||
|
||||
# Clean up remote sandbox dir
|
||||
try:
|
||||
env.execute_oneshot(
|
||||
f"rm -rf {sandbox_dir}", cwd="/", timeout=15,
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to clean up remote sandbox %s", sandbox_dir)
|
||||
|
||||
duration = round(time.monotonic() - exec_start, 2)
|
||||
|
||||
# --- Post-process output (same as local path) ---
|
||||
|
||||
# Truncate stdout to cap
|
||||
if len(stdout_text) > MAX_STDOUT_BYTES:
|
||||
head_bytes = int(MAX_STDOUT_BYTES * 0.4)
|
||||
tail_bytes = MAX_STDOUT_BYTES - head_bytes
|
||||
head = stdout_text[:head_bytes]
|
||||
tail = stdout_text[-tail_bytes:]
|
||||
omitted = len(stdout_text) - len(head) - len(tail)
|
||||
stdout_text = (
|
||||
head
|
||||
+ f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted "
|
||||
f"out of {len(stdout_text):,} total] ...\n\n"
|
||||
+ tail
|
||||
)
|
||||
|
||||
# Strip ANSI escape sequences
|
||||
from tools.ansi_strip import strip_ansi
|
||||
stdout_text = strip_ansi(stdout_text)
|
||||
|
||||
# Redact secrets
|
||||
from agent.redact import redact_sensitive_text
|
||||
stdout_text = redact_sensitive_text(stdout_text)
|
||||
|
||||
# Build response
|
||||
result: Dict[str, Any] = {
|
||||
"status": status,
|
||||
"output": stdout_text,
|
||||
"tool_calls_made": tool_call_counter[0],
|
||||
"duration_seconds": duration,
|
||||
}
|
||||
|
||||
if status == "timeout":
|
||||
result["error"] = f"Script timed out after {timeout}s and was killed."
|
||||
elif status == "interrupted":
|
||||
result["output"] = (
|
||||
stdout_text + "\n[execution interrupted — user sent a new message]"
|
||||
)
|
||||
elif exit_code != 0:
|
||||
result["status"] = "error"
|
||||
result["error"] = f"Script exited with code {exit_code}"
|
||||
|
||||
return json.dumps(result, ensure_ascii=False)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -352,6 +873,9 @@ def execute_code(
|
||||
Run a Python script in a sandboxed child process with RPC access
|
||||
to a subset of Hermes tools.
|
||||
|
||||
Dispatches to the local (UDS) or remote (file-based RPC) path
|
||||
depending on the configured terminal backend.
|
||||
|
||||
Args:
|
||||
code: Python source code to execute.
|
||||
task_id: Session task ID for tool isolation (terminal env, etc.).
|
||||
@@ -369,6 +893,14 @@ def execute_code(
|
||||
if not code or not code.strip():
|
||||
return json.dumps({"error": "No code provided."})
|
||||
|
||||
# Dispatch: remote backends use file-based RPC, local uses UDS
|
||||
from tools.terminal_tool import _get_env_config
|
||||
env_type = _get_env_config()["env_type"]
|
||||
if env_type != "local":
|
||||
return _execute_remote(code, task_id, enabled_tools)
|
||||
|
||||
# --- Local execution path (UDS) --- below this line is unchanged ---
|
||||
|
||||
# Import interrupt event from terminal_tool (cooperative cancellation)
|
||||
from tools.terminal_tool import _interrupt_event
|
||||
|
||||
|
||||
@@ -91,6 +91,19 @@ class BaseEnvironment(ABC):
|
||||
kw["stdin"] = subprocess.DEVNULL
|
||||
return kw
|
||||
|
||||
def execute_oneshot(self, command: str, cwd: str = "", *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict:
|
||||
"""Execute a command bypassing any persistent shell.
|
||||
|
||||
Safe for concurrent use alongside a long-running execute() call.
|
||||
Backends that maintain a persistent shell (SSH, Local) override this
|
||||
to route through their oneshot path, avoiding the shell lock.
|
||||
Non-persistent backends delegate to execute().
|
||||
"""
|
||||
return self.execute(command, cwd=cwd, timeout=timeout,
|
||||
stdin_data=stdin_data)
|
||||
|
||||
def _timeout_result(self, timeout: int | None) -> dict:
|
||||
"""Standard return dict when a command times out."""
|
||||
return {
|
||||
|
||||
@@ -141,6 +141,19 @@ class PersistentShellMixin:
|
||||
command, cwd, timeout=timeout, stdin_data=stdin_data,
|
||||
)
|
||||
|
||||
def execute_oneshot(self, command: str, cwd: str = "", *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict:
|
||||
"""Always use the oneshot (non-persistent) execution path.
|
||||
|
||||
This bypasses _shell_lock so it can run concurrently with a
|
||||
long-running command in the persistent shell — used by
|
||||
execute_code's file-based RPC polling thread.
|
||||
"""
|
||||
return self._execute_oneshot(
|
||||
command, cwd, timeout=timeout, stdin_data=stdin_data,
|
||||
)
|
||||
|
||||
def cleanup(self):
|
||||
if self.persistent:
|
||||
self._cleanup_persistent_shell()
|
||||
|
||||
Reference in New Issue
Block a user