Add background process management with process tool, wait, PTY, and stdin support

New process registry and tool for managing long-running background processes
across all terminal backends (local, Docker, Singularity, Modal, SSH).

Process Registry (tools/process_registry.py):
- ProcessSession tracking with rolling 200KB output buffer
- spawn_local() with optional PTY via ptyprocess for interactive CLIs
- spawn_via_env() for non-local backends (runs inside sandbox, never on host)
- Background reader threads per process (Popen stdout or PTY)
- wait() with timeout clamping, interrupt support, and transparent limit reporting
- JSON checkpoint to ~/.hermes/processes.json for gateway crash recovery
- Module-level singleton shared across agent loop, gateway, and RL

Process Tool (model_tools.py):
- 7 actions: list, poll, log, wait, kill, write, submit
- Paired with terminal in all toolsets (CLI, messaging, RL)
- Timeout clamping with transparent notes in response

Terminal Tool Updates (tools/terminal_tool.py):
- Replaced nohup background mode with registry spawn (returns session_id)
- Added workdir parameter for per-command working directory
- Added check_interval parameter for gateway auto-check watchers
- Added pty parameter for interactive CLI tools (Codex, Claude Code)
- Updated TERMINAL_TOOL_DESCRIPTION with full background workflow docs
- Cleanup thread now respects active background processes (won't reap sandbox)

Gateway Integration (gateway/run.py, session.py, config.py):
- Session reset protection: sessions with active processes exempt from reset
- Default idle timeout increased from 2 hours to 24 hours
- from_dict fallback aligned to match (was 120, now 1440)
- session_key env var propagated to process registry for session mapping
- Crash recovery on gateway startup via checkpoint probe
- check_interval watcher: asyncio task polls process, delivers updates to platform

RL Safety (environments/):
- tool_context.py cleanup() kills background processes on episode end
- hermes_base_env.py warns when enabled_toolsets is None (loads all tools)
- Process tool safe in RL via wait() blocking the agent loop

Also:
- Added ptyprocess as optional dependency (in pyproject.toml [pty] extra + [all])
- Fixed pre-existing bug: rl_test_inference missing from TOOL_TO_TOOLSET_MAP
- Updated AGENTS.md with process management docs and project structure
- Updated README.md terminal section with process management overview
This commit is contained in:
teknium1
2026-02-17 02:51:31 -08:00
parent 48b5cfd085
commit 061fa70907
12 changed files with 1142 additions and 40 deletions

View File

@@ -25,6 +25,7 @@ hermes-agent/
│ ├── uninstall.py # Uninstaller
│ └── cron.py # Cron job management
├── tools/ # Tool implementations
│ ├── process_registry.py # Background process management (spawn, poll, wait, kill)
│ ├── transcription_tools.py # Speech-to-text (Whisper API)
├── gateway/ # Messaging platform adapters
│ ├── pairing.py # DM pairing code system
@@ -412,6 +413,37 @@ The terminal tool includes safety checks for potentially destructive commands (e
---
## Background Process Management
The `process` tool works alongside `terminal` for managing long-running background processes:
**Starting a background process:**
```python
terminal(command="pytest -v tests/", background=true)
# Returns: {"session_id": "proc_abc123", "pid": 12345, ...}
```
**Managing it with the process tool:**
- `process(action="list")` -- show all running/recent processes
- `process(action="poll", session_id="proc_abc123")` -- check status + new output
- `process(action="log", session_id="proc_abc123")` -- full output with pagination
- `process(action="wait", session_id="proc_abc123", timeout=600)` -- block until done
- `process(action="kill", session_id="proc_abc123")` -- terminate
- `process(action="write", session_id="proc_abc123", data="y")` -- send stdin
- `process(action="submit", session_id="proc_abc123", data="yes")` -- send + Enter
**Key behaviors:**
- Background processes execute through the configured terminal backend (local/Docker/Modal/SSH/Singularity) -- never directly on the host unless `TERMINAL_ENV=local`
- The `wait` action blocks the tool call until the process finishes, times out, or is interrupted by a new user message
- PTY mode (`pty=true` on terminal) enables interactive CLI tools (Codex, Claude Code)
- In RL training, background processes are auto-killed when the episode ends (`tool_context.cleanup()`)
- In the gateway, sessions with active background processes are exempt from idle reset
- The process registry checkpoints to `~/.hermes/processes.json` for crash recovery
Files: `tools/process_registry.py` (registry), `model_tools.py` (tool definition + handler), `tools/terminal_tool.py` (spawn integration)
---
## Adding New Tools
Follow this strict order to maintain consistency:

View File

@@ -219,9 +219,13 @@ When the agent tries to run a potentially dangerous command (rm -rf, chmod 777,
Reply "yes"/"y" to approve or "no"/"n" to deny. In CLI mode, the existing interactive approval prompt (once/session/always/deny) is preserved.
### 🖥️ Terminal Backend
### 🖥️ Terminal & Process Management
The terminal tool can execute commands in different environments:
The terminal tool can execute commands in different environments, with full background process management via the `process` tool:
**Background processes:** Start with `terminal(command="...", background=true)`, then use `process(action="poll/wait/log/kill/write")` to monitor, wait for completion, read output, terminate, or send input. The `wait` action blocks until the process finishes -- no polling loops needed. PTY mode (`pty=true`) enables interactive CLI tools like Codex and Claude Code.
**Execution environments:**
| Backend | Description | Use Case |
|---------|-------------|----------|

View File

@@ -258,6 +258,11 @@ class HermesAgentBaseEnv(BaseEnv):
logger.info("Sampled toolsets from '%s': %s", config.distribution, group_toolsets)
else:
group_toolsets = config.enabled_toolsets # None means "all available"
if group_toolsets is None:
logger.warning(
"enabled_toolsets is None -- loading ALL tools including messaging. "
"Set explicit enabled_toolsets for RL training."
)
tools = get_tool_definitions(
enabled_toolsets=group_toolsets,

View File

@@ -438,11 +438,21 @@ class ToolContext:
def cleanup(self):
"""
Release all resources (terminal VMs, browser sessions) for this rollout.
Release all resources (terminal VMs, browser sessions, background processes)
for this rollout.
Called automatically by the base environment via try/finally after
compute_reward() completes. You generally don't need to call this yourself.
"""
# Kill any background processes from this rollout (safety net)
try:
from tools.process_registry import process_registry
killed = process_registry.kill_all(task_id=self.task_id)
if killed:
logger.debug("Process cleanup for task %s: killed %d process(es)", self.task_id, killed)
except Exception as e:
logger.debug("Process cleanup for task %s: %s", self.task_id, e)
try:
cleanup_vm(self.task_id)
except Exception as e:

View File

@@ -65,7 +65,7 @@ class SessionResetPolicy:
"""
mode: str = "both" # "daily", "idle", or "both"
at_hour: int = 4 # Hour for daily reset (0-23, local time)
idle_minutes: int = 120 # Minutes of inactivity before reset
idle_minutes: int = 1440 # Minutes of inactivity before reset (24 hours)
def to_dict(self) -> Dict[str, Any]:
return {
@@ -79,7 +79,7 @@ class SessionResetPolicy:
return cls(
mode=data.get("mode", "both"),
at_hour=data.get("at_hour", 4),
idle_minutes=data.get("idle_minutes", 120),
idle_minutes=data.get("idle_minutes", 1440),
)

View File

@@ -72,7 +72,13 @@ class GatewayRunner:
def __init__(self, config: Optional[GatewayConfig] = None):
self.config = config or load_gateway_config()
self.adapters: Dict[Platform, BasePlatformAdapter] = {}
self.session_store = SessionStore(self.config.sessions_dir, self.config)
# Wire process registry into session store for reset protection
from tools.process_registry import process_registry
self.session_store = SessionStore(
self.config.sessions_dir, self.config,
has_active_processes_fn=lambda key: process_registry.has_active_for_session(key),
)
self.delivery_router = DeliveryRouter(self.config)
self._running = False
self._shutdown_event = asyncio.Event()
@@ -106,6 +112,15 @@ class GatewayRunner:
# Discover and load event hooks
self.hooks.discover_and_load()
# Recover background processes from checkpoint (crash recovery)
try:
from tools.process_registry import process_registry
recovered = process_registry.recover_from_checkpoint()
if recovered:
print(f"[gateway] Recovered {recovered} background process(es) from previous run")
except Exception as e:
print(f"[gateway] Process checkpoint recovery: {e}")
connected_count = 0
# Initialize and connect each configured platform
@@ -429,6 +444,15 @@ class GatewayRunner:
"response": (response or "")[:500],
})
# Check for pending process watchers (check_interval on background processes)
try:
from tools.process_registry import process_registry
while process_registry.pending_watchers:
watcher = process_registry.pending_watchers.pop(0)
asyncio.create_task(self._run_process_watcher(watcher))
except Exception as e:
print(f"[gateway] Process watcher setup error: {e}", flush=True)
# Check if the agent encountered a dangerous command needing approval
# The terminal tool stores the last pending approval globally
try:
@@ -701,6 +725,75 @@ class GatewayRunner:
return prefix
return user_text
async def _run_process_watcher(self, watcher: dict) -> None:
"""
Periodically check a background process and push updates to the user.
Runs as an asyncio task. Stays silent when nothing changed.
Auto-removes when the process exits or is killed.
"""
from tools.process_registry import process_registry
session_id = watcher["session_id"]
interval = watcher["check_interval"]
session_key = watcher.get("session_key", "")
platform_name = watcher.get("platform", "")
chat_id = watcher.get("chat_id", "")
print(f"[gateway] Process watcher started: {session_id} (every {interval}s)", flush=True)
last_output_len = 0
while True:
await asyncio.sleep(interval)
session = process_registry.get(session_id)
if session is None:
break
current_output_len = len(session.output_buffer)
has_new_output = current_output_len > last_output_len
last_output_len = current_output_len
if session.exited:
# Process finished -- deliver final update
new_output = session.output_buffer[-1000:] if session.output_buffer else ""
message_text = (
f"[Background process {session_id} finished with exit code {session.exit_code}~ "
f"Here's the final output:\n{new_output}]"
)
# Try to deliver to the originating platform
adapter = None
for p, a in self.adapters.items():
if p.value == platform_name:
adapter = a
break
if adapter and chat_id:
try:
await adapter.send(chat_id, message_text)
except Exception as e:
print(f"[gateway] Watcher delivery error: {e}", flush=True)
break
elif has_new_output:
# New output available -- deliver status update
new_output = session.output_buffer[-500:] if session.output_buffer else ""
message_text = (
f"[Background process {session_id} is still running~ "
f"New output:\n{new_output}]"
)
adapter = None
for p, a in self.adapters.items():
if p.value == platform_name:
adapter = a
break
if adapter and chat_id:
try:
await adapter.send(chat_id, message_text)
except Exception as e:
print(f"[gateway] Watcher delivery error: {e}", flush=True)
print(f"[gateway] Process watcher ended: {session_id}", flush=True)
async def _run_agent(
self,
message: str,
@@ -824,6 +917,10 @@ class GatewayRunner:
tools_holder = [None] # Mutable container for the tool definitions
def run_sync():
# Pass session_key to process registry via env var so background
# processes can be mapped back to this gateway session
os.environ["HERMES_SESSION_KEY"] = session_key or ""
# Read from env var or use default (same as CLI)
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "60"))

View File

@@ -270,11 +270,15 @@ class SessionStore:
- {session_id}.jsonl: Conversation transcripts
"""
def __init__(self, sessions_dir: Path, config: GatewayConfig):
def __init__(self, sessions_dir: Path, config: GatewayConfig,
has_active_processes_fn=None):
self.sessions_dir = sessions_dir
self.config = config
self._entries: Dict[str, SessionEntry] = {}
self._loaded = False
# Optional callback to check if a session has active background processes.
# When set, sessions with running processes are exempt from reset.
self._has_active_processes_fn = has_active_processes_fn
def _ensure_loaded(self) -> None:
"""Load sessions from disk if not already loaded."""
@@ -320,7 +324,14 @@ class SessionStore:
Check if a session should be reset based on policy.
Returns True if the session is stale and should start fresh.
Sessions with active background processes are never reset.
"""
# Don't reset sessions that have active background processes
if self._has_active_processes_fn:
session_key = self._generate_session_key(source)
if self._has_active_processes_fn(session_key):
return False
policy = self.config.get_reset_policy(
platform=source.platform,
session_type=source.chat_type

View File

@@ -320,6 +320,20 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]:
"type": "integer",
"description": "Command timeout in seconds (optional)",
"minimum": 1
},
"workdir": {
"type": "string",
"description": "Working directory for this command (absolute path). Defaults to the session working directory."
},
"check_interval": {
"type": "integer",
"description": "Seconds between automatic status checks for background processes (gateway/messaging only, minimum 30). When set, I'll proactively report progress.",
"minimum": 30
},
"pty": {
"type": "boolean",
"description": "Run in pseudo-terminal (PTY) mode for interactive CLI tools like Codex, Claude Code, or Python REPL. Only works with local and SSH backends. Default: false.",
"default": False
}
},
"required": ["command"]
@@ -930,6 +944,64 @@ def get_send_message_tool_definitions():
]
def get_process_tool_definitions() -> List[Dict[str, Any]]:
"""
Get tool definitions for the process management tool.
The process tool manages background processes started with terminal(background=true).
Actions: list, poll, log, wait, kill. Phase 2 adds: write, submit.
"""
return [
{
"type": "function",
"function": {
"name": "process",
"description": (
"Manage background processes started with terminal(background=true). "
"Actions: 'list' (show all), 'poll' (check status + new output), "
"'log' (full output with pagination), 'wait' (block until done or timeout), "
"'kill' (terminate), 'write' (send raw data to stdin), 'submit' (send data + Enter). "
"Use 'wait' when you have nothing else to do and want "
"to block until a background process finishes."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list", "poll", "log", "wait", "kill", "write", "submit"],
"description": "Action to perform on background processes"
},
"session_id": {
"type": "string",
"description": "Process session ID (from terminal background output). Required for poll/log/wait/kill."
},
"data": {
"type": "string",
"description": "Text to send to process stdin (for 'write' and 'submit' actions)"
},
"timeout": {
"type": "integer",
"description": "Max seconds to block for 'wait' action. Returns partial output on timeout.",
"minimum": 1
},
"offset": {
"type": "integer",
"description": "Line offset for 'log' action (default: last 200 lines)"
},
"limit": {
"type": "integer",
"description": "Max lines to return for 'log' action",
"minimum": 1
}
},
"required": ["action"]
}
}
}
]
def get_all_tool_names() -> List[str]:
"""
Get the names of all available tools across all toolsets.
@@ -945,7 +1017,7 @@ def get_all_tool_names() -> List[str]:
# Terminal tools (mini-swe-agent backend)
if check_terminal_requirements():
tool_names.extend(["terminal"])
tool_names.extend(["terminal", "process"])
# Vision tools
if check_vision_requirements():
@@ -1011,6 +1083,7 @@ TOOL_TO_TOOLSET_MAP = {
"web_search": "web_tools",
"web_extract": "web_tools",
"terminal": "terminal_tools",
"process": "terminal_tools",
"vision_analyze": "vision_tools",
"mixture_of_agents": "moa_tools",
"image_generate": "image_tools",
@@ -1042,6 +1115,7 @@ TOOL_TO_TOOLSET_MAP = {
"rl_stop_training": "rl_tools",
"rl_get_results": "rl_tools",
"rl_list_runs": "rl_tools",
"rl_test_inference": "rl_tools",
# Text-to-speech tools
"text_to_speech": "tts_tools",
# File manipulation tools
@@ -1113,6 +1187,9 @@ def get_tool_definitions(
if check_terminal_requirements():
for tool in get_terminal_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
# Process management tool (paired with terminal)
for tool in get_process_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
if check_vision_requirements():
for tool in get_vision_tool_definitions():
@@ -1339,15 +1416,70 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A
command = function_args.get("command")
background = function_args.get("background", False)
timeout = function_args.get("timeout")
# Note: force parameter exists internally but is NOT exposed to the model
# Dangerous command approval is handled via user prompts only
workdir = function_args.get("workdir")
check_interval = function_args.get("check_interval")
pty = function_args.get("pty", False)
return terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id)
return terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id, workdir=workdir, check_interval=check_interval, pty=pty)
else:
return json.dumps({"error": f"Unknown terminal function: {function_name}"}, ensure_ascii=False)
def handle_process_function_call(function_name: str, function_args: Dict[str, Any], task_id: Optional[str] = None) -> str:
"""
Handle function calls for the process management tool.
Routes actions (list, poll, log, wait, kill) to the ProcessRegistry.
"""
from tools.process_registry import process_registry
action = function_args.get("action", "")
session_id = function_args.get("session_id", "")
if action == "list":
sessions = process_registry.list_sessions(task_id=task_id)
return json.dumps({"processes": sessions}, ensure_ascii=False)
elif action == "poll":
if not session_id:
return json.dumps({"error": "session_id is required for poll"}, ensure_ascii=False)
return json.dumps(process_registry.poll(session_id), ensure_ascii=False)
elif action == "log":
if not session_id:
return json.dumps({"error": "session_id is required for log"}, ensure_ascii=False)
offset = function_args.get("offset", 0)
limit = function_args.get("limit", 200)
return json.dumps(process_registry.read_log(session_id, offset=offset, limit=limit), ensure_ascii=False)
elif action == "wait":
if not session_id:
return json.dumps({"error": "session_id is required for wait"}, ensure_ascii=False)
timeout = function_args.get("timeout")
return json.dumps(process_registry.wait(session_id, timeout=timeout), ensure_ascii=False)
elif action == "kill":
if not session_id:
return json.dumps({"error": "session_id is required for kill"}, ensure_ascii=False)
return json.dumps(process_registry.kill_process(session_id), ensure_ascii=False)
elif action == "write":
if not session_id:
return json.dumps({"error": "session_id is required for write"}, ensure_ascii=False)
data = function_args.get("data", "")
return json.dumps(process_registry.write_stdin(session_id, data), ensure_ascii=False)
elif action == "submit":
if not session_id:
return json.dumps({"error": "session_id is required for submit"}, ensure_ascii=False)
data = function_args.get("data", "")
return json.dumps(process_registry.submit_stdin(session_id, data), ensure_ascii=False)
else:
return json.dumps({"error": f"Unknown process action: {action}. Use: list, poll, log, wait, kill, write, submit"}, ensure_ascii=False)
def handle_vision_function_call(function_name: str, function_args: Dict[str, Any]) -> str:
"""
Handle function calls for vision tools.
@@ -1779,6 +1911,10 @@ def handle_function_call(
elif function_name in ["terminal"]:
return handle_terminal_function_call(function_name, function_args, task_id)
# Route process management tools
elif function_name in ["process"]:
return handle_process_function_call(function_name, function_args, task_id)
# Route vision tools
elif function_name in ["vision_analyze"]:
return handle_vision_function_call(function_name, function_args)

View File

@@ -43,6 +43,7 @@ cron = ["croniter"]
slack = ["slack-bolt>=1.18.0", "slack-sdk>=3.27.0"]
cli = ["simple-term-menu"]
tts-premium = ["elevenlabs"]
pty = ["ptyprocess>=0.7.0"]
all = [
"hermes-agent[modal]",
"hermes-agent[messaging]",
@@ -51,6 +52,7 @@ all = [
"hermes-agent[dev]",
"hermes-agent[tts-premium]",
"hermes-agent[slack]",
"hermes-agent[pty]",
]
[project.scripts]

726
tools/process_registry.py Normal file
View File

@@ -0,0 +1,726 @@
"""
Process Registry -- In-memory registry for managed background processes.
Tracks processes spawned via terminal(background=true), providing:
- Output buffering (rolling 200KB window)
- Status polling and log retrieval
- Blocking wait with interrupt support
- Process killing
- Crash recovery via JSON checkpoint file
- Session-scoped tracking for gateway reset protection
Background processes execute THROUGH the environment interface -- nothing
runs on the host machine unless TERMINAL_ENV=local. For Docker, Singularity,
Modal, and SSH backends, the command runs inside the sandbox.
Usage:
from tools.process_registry import process_registry
# Spawn a background process (called from terminal_tool)
session = process_registry.spawn(env, "pytest -v", task_id="task_123")
# Poll for status
result = process_registry.poll(session.id)
# Block until done
result = process_registry.wait(session.id, timeout=300)
# Kill it
process_registry.kill(session.id)
"""
import json
import os
import signal
import subprocess
import threading
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
# Checkpoint file for crash recovery (gateway only)
CHECKPOINT_PATH = Path(os.path.expanduser("~/.hermes/processes.json"))
# Limits
MAX_OUTPUT_CHARS = 200_000 # 200KB rolling output buffer
FINISHED_TTL_SECONDS = 1800 # Keep finished processes for 30 minutes
MAX_PROCESSES = 64 # Max concurrent tracked processes (LRU pruning)
@dataclass
class ProcessSession:
"""A tracked background process with output buffering."""
id: str # Unique session ID ("proc_xxxxxxxxxxxx")
command: str # Original command string
task_id: str = "" # Task/sandbox isolation key
session_key: str = "" # Gateway session key (for reset protection)
pid: Optional[int] = None # OS process ID
process: Optional[subprocess.Popen] = None # Popen handle (local only)
env_ref: Any = None # Reference to the environment object
cwd: Optional[str] = None # Working directory
started_at: float = 0.0 # time.time() of spawn
exited: bool = False # Whether the process has finished
exit_code: Optional[int] = None # Exit code (None if still running)
output_buffer: str = "" # Rolling output (last MAX_OUTPUT_CHARS)
max_output_chars: int = MAX_OUTPUT_CHARS
detached: bool = False # True if recovered from crash (no pipe)
_lock: threading.Lock = field(default_factory=threading.Lock)
_reader_thread: Optional[threading.Thread] = field(default=None, repr=False)
_pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True)
class ProcessRegistry:
"""
In-memory registry of running and finished background processes.
Thread-safe. Accessed from:
- Executor threads (terminal_tool, process tool handlers)
- Gateway asyncio loop (watcher tasks, session reset checks)
- Cleanup thread (sandbox reaping coordination)
"""
def __init__(self):
self._running: Dict[str, ProcessSession] = {}
self._finished: Dict[str, ProcessSession] = {}
self._lock = threading.Lock()
# Side-channel for check_interval watchers (gateway reads after agent run)
self.pending_watchers: List[Dict[str, Any]] = []
# ----- Spawn -----
def spawn_local(
self,
command: str,
cwd: str = None,
task_id: str = "",
session_key: str = "",
env_vars: dict = None,
use_pty: bool = False,
) -> ProcessSession:
"""
Spawn a background process locally.
Only for TERMINAL_ENV=local. Other backends use spawn_via_env().
Args:
use_pty: If True, use a pseudo-terminal via ptyprocess for interactive
CLI tools (Codex, Claude Code, Python REPL). Falls back to
subprocess.Popen if ptyprocess is not installed.
"""
session = ProcessSession(
id=f"proc_{uuid.uuid4().hex[:12]}",
command=command,
task_id=task_id,
session_key=session_key,
cwd=cwd or os.getcwd(),
started_at=time.time(),
)
if use_pty:
# Try PTY mode for interactive CLI tools
try:
import ptyprocess
pty_proc = ptyprocess.PtyProcess.spawn(
["bash", "-c", command],
cwd=session.cwd,
env=os.environ | (env_vars or {}),
dimensions=(30, 120),
)
session.pid = pty_proc.pid
# Store the pty handle on the session for read/write
session._pty = pty_proc
# PTY reader thread
reader = threading.Thread(
target=self._pty_reader_loop,
args=(session,),
daemon=True,
name=f"proc-pty-reader-{session.id}",
)
session._reader_thread = reader
reader.start()
with self._lock:
self._prune_if_needed()
self._running[session.id] = session
self._write_checkpoint()
return session
except ImportError:
# ptyprocess not installed -- fall back to Popen
print(f"[ProcessRegistry] ptyprocess not installed, falling back to pipe mode", flush=True)
except Exception as e:
print(f"[ProcessRegistry] PTY spawn failed ({e}), falling back to pipe mode", flush=True)
# Standard Popen path (non-PTY or PTY fallback)
proc = subprocess.Popen(
command,
shell=True,
text=True,
cwd=session.cwd,
env=os.environ | (env_vars or {}),
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
preexec_fn=os.setsid,
)
session.process = proc
session.pid = proc.pid
# Start output reader thread
reader = threading.Thread(
target=self._reader_loop,
args=(session,),
daemon=True,
name=f"proc-reader-{session.id}",
)
session._reader_thread = reader
reader.start()
with self._lock:
self._prune_if_needed()
self._running[session.id] = session
self._write_checkpoint()
return session
def spawn_via_env(
self,
env: Any,
command: str,
cwd: str = None,
task_id: str = "",
session_key: str = "",
timeout: int = 10,
) -> ProcessSession:
"""
Spawn a background process through a non-local environment backend.
For Docker/Singularity/Modal/SSH: runs the command inside the sandbox
using the environment's execute() interface. We wrap the command to
capture the in-sandbox PID and redirect output to a log file inside
the sandbox, then poll the log via subsequent execute() calls.
This is less capable than local spawn (no live stdout pipe, no stdin),
but it ensures the command runs in the correct sandbox context.
"""
session = ProcessSession(
id=f"proc_{uuid.uuid4().hex[:12]}",
command=command,
task_id=task_id,
session_key=session_key,
cwd=cwd,
started_at=time.time(),
env_ref=env,
)
# Run the command in the sandbox with output capture
log_path = f"/tmp/hermes_bg_{session.id}.log"
pid_path = f"/tmp/hermes_bg_{session.id}.pid"
bg_command = (
f"nohup bash -c '{command}' > {log_path} 2>&1 & "
f"echo $! > {pid_path} && cat {pid_path}"
)
try:
result = env.execute(bg_command, timeout=timeout)
output = result.get("output", "").strip()
# Try to extract the PID from the output
for line in output.splitlines():
line = line.strip()
if line.isdigit():
session.pid = int(line)
break
except Exception as e:
session.exited = True
session.exit_code = -1
session.output_buffer = f"Failed to start: {e}"
if not session.exited:
# Start a poller thread that periodically reads the log file
reader = threading.Thread(
target=self._env_poller_loop,
args=(session, env, log_path, pid_path),
daemon=True,
name=f"proc-poller-{session.id}",
)
session._reader_thread = reader
reader.start()
with self._lock:
self._prune_if_needed()
self._running[session.id] = session
self._write_checkpoint()
return session
# ----- Reader / Poller Threads -----
def _reader_loop(self, session: ProcessSession):
"""Background thread: read stdout from a local Popen process."""
try:
while True:
chunk = session.process.stdout.read(4096)
if not chunk:
break
with session._lock:
session.output_buffer += chunk
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
except Exception:
pass
# Process exited
try:
session.process.wait(timeout=5)
except Exception:
pass
session.exited = True
session.exit_code = session.process.returncode
self._move_to_finished(session)
def _env_poller_loop(
self, session: ProcessSession, env: Any, log_path: str, pid_path: str
):
"""Background thread: poll a sandbox log file for non-local backends."""
while not session.exited:
time.sleep(2) # Poll every 2 seconds
try:
# Read new output from the log file
result = env.execute(f"cat {log_path} 2>/dev/null", timeout=10)
new_output = result.get("output", "")
if new_output:
with session._lock:
session.output_buffer = new_output
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
# Check if process is still running
check = env.execute(
f"kill -0 $(cat {pid_path} 2>/dev/null) 2>/dev/null; echo $?",
timeout=5,
)
check_output = check.get("output", "").strip()
if check_output and check_output.splitlines()[-1].strip() != "0":
# Process has exited -- get exit code
exit_result = env.execute(
f"wait $(cat {pid_path} 2>/dev/null) 2>/dev/null; echo $?",
timeout=5,
)
exit_str = exit_result.get("output", "").strip()
try:
session.exit_code = int(exit_str.splitlines()[-1].strip())
except (ValueError, IndexError):
session.exit_code = -1
session.exited = True
self._move_to_finished(session)
return
except Exception:
# Environment might be gone (sandbox reaped, etc.)
session.exited = True
session.exit_code = -1
self._move_to_finished(session)
return
def _pty_reader_loop(self, session: ProcessSession):
"""Background thread: read output from a PTY process."""
pty = session._pty
try:
while pty.isalive():
try:
chunk = pty.read(4096)
if chunk:
# ptyprocess returns bytes
text = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace")
with session._lock:
session.output_buffer += text
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
except EOFError:
break
except Exception:
break
except Exception:
pass
# Process exited
try:
pty.wait()
except Exception:
pass
session.exited = True
session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1
self._move_to_finished(session)
def _move_to_finished(self, session: ProcessSession):
"""Move a session from running to finished."""
with self._lock:
self._running.pop(session.id, None)
self._finished[session.id] = session
self._write_checkpoint()
# ----- Query Methods -----
def get(self, session_id: str) -> Optional[ProcessSession]:
"""Get a session by ID (running or finished)."""
with self._lock:
return self._running.get(session_id) or self._finished.get(session_id)
def poll(self, session_id: str) -> dict:
"""Check status and get new output for a background process."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
with session._lock:
output_preview = session.output_buffer[-1000:] if session.output_buffer else ""
result = {
"session_id": session.id,
"command": session.command,
"status": "exited" if session.exited else "running",
"pid": session.pid,
"uptime_seconds": int(time.time() - session.started_at),
"output_preview": output_preview,
}
if session.exited:
result["exit_code"] = session.exit_code
if session.detached:
result["detached"] = True
result["note"] = "Process recovered after restart -- output history unavailable"
return result
def read_log(self, session_id: str, offset: int = 0, limit: int = 200) -> dict:
"""Read the full output log with optional pagination by lines."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
with session._lock:
full_output = session.output_buffer
lines = full_output.splitlines()
total_lines = len(lines)
# Default: last N lines
if offset == 0 and limit > 0:
selected = lines[-limit:]
else:
selected = lines[offset:offset + limit]
return {
"session_id": session.id,
"status": "exited" if session.exited else "running",
"output": "\n".join(selected),
"total_lines": total_lines,
"showing": f"{len(selected)} lines",
}
def wait(self, session_id: str, timeout: int = None) -> dict:
"""
Block until a process exits, timeout, or interrupt.
Args:
session_id: The process to wait for.
timeout: Max seconds to block. Falls back to TERMINAL_TIMEOUT config.
Returns:
dict with status ("exited", "timeout", "interrupted", "not_found")
and output snapshot.
"""
from tools.terminal_tool import _interrupt_event
default_timeout = int(os.getenv("TERMINAL_TIMEOUT", "180"))
max_timeout = default_timeout
requested_timeout = timeout
timeout_note = None
if requested_timeout and requested_timeout > max_timeout:
effective_timeout = max_timeout
timeout_note = (
f"Requested wait of {requested_timeout}s was clamped "
f"to configured limit of {max_timeout}s"
)
else:
effective_timeout = requested_timeout or max_timeout
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
deadline = time.monotonic() + effective_timeout
while time.monotonic() < deadline:
if session.exited:
result = {
"status": "exited",
"exit_code": session.exit_code,
"output": session.output_buffer[-2000:],
}
if timeout_note:
result["timeout_note"] = timeout_note
return result
if _interrupt_event.is_set():
result = {
"status": "interrupted",
"output": session.output_buffer[-1000:],
"note": "User sent a new message -- wait interrupted",
}
if timeout_note:
result["timeout_note"] = timeout_note
return result
time.sleep(1)
result = {
"status": "timeout",
"output": session.output_buffer[-1000:],
}
if timeout_note:
result["timeout_note"] = timeout_note
else:
result["timeout_note"] = f"Waited {effective_timeout}s, process still running"
return result
def kill_process(self, session_id: str) -> dict:
"""Kill a background process."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
if session.exited:
return {
"status": "already_exited",
"exit_code": session.exit_code,
}
# Kill via PTY, Popen (local), or env execute (non-local)
try:
if session._pty:
# PTY process -- terminate via ptyprocess
try:
session._pty.terminate(force=True)
except Exception:
if session.pid:
os.kill(session.pid, signal.SIGTERM)
elif session.process:
# Local process -- kill the process group
try:
os.killpg(os.getpgid(session.process.pid), signal.SIGTERM)
except (ProcessLookupError, PermissionError):
session.process.kill()
elif session.env_ref and session.pid:
# Non-local -- kill inside sandbox
session.env_ref.execute(f"kill {session.pid} 2>/dev/null", timeout=5)
session.exited = True
session.exit_code = -15 # SIGTERM
self._move_to_finished(session)
self._write_checkpoint()
return {"status": "killed", "session_id": session.id}
except Exception as e:
return {"status": "error", "error": str(e)}
def write_stdin(self, session_id: str, data: str) -> dict:
"""Send raw data to a running process's stdin (no newline appended)."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
if session.exited:
return {"status": "already_exited", "error": "Process has already finished"}
# PTY mode -- write through pty handle
if hasattr(session, '_pty') and session._pty:
try:
session._pty.write(data)
return {"status": "ok", "bytes_written": len(data)}
except Exception as e:
return {"status": "error", "error": str(e)}
# Popen mode -- write through stdin pipe
if not session.process or not session.process.stdin:
return {"status": "error", "error": "Process stdin not available (non-local backend or stdin closed)"}
try:
session.process.stdin.write(data)
session.process.stdin.flush()
return {"status": "ok", "bytes_written": len(data)}
except Exception as e:
return {"status": "error", "error": str(e)}
def submit_stdin(self, session_id: str, data: str = "") -> dict:
"""Send data + newline to a running process's stdin (like pressing Enter)."""
return self.write_stdin(session_id, data + "\n")
def list_sessions(self, task_id: str = None) -> list:
"""List all running and recently-finished processes."""
with self._lock:
all_sessions = list(self._running.values()) + list(self._finished.values())
if task_id:
all_sessions = [s for s in all_sessions if s.task_id == task_id]
result = []
for s in all_sessions:
entry = {
"session_id": s.id,
"command": s.command[:200],
"cwd": s.cwd,
"pid": s.pid,
"started_at": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(s.started_at)),
"uptime_seconds": int(time.time() - s.started_at),
"status": "exited" if s.exited else "running",
"output_preview": s.output_buffer[-200:] if s.output_buffer else "",
}
if s.exited:
entry["exit_code"] = s.exit_code
if s.detached:
entry["detached"] = True
result.append(entry)
return result
# ----- Session/Task Queries (for gateway integration) -----
def has_active_processes(self, task_id: str) -> bool:
"""Check if there are active (running) processes for a task_id."""
with self._lock:
return any(
s.task_id == task_id and not s.exited
for s in self._running.values()
)
def has_active_for_session(self, session_key: str) -> bool:
"""Check if there are active processes for a gateway session key."""
with self._lock:
return any(
s.session_key == session_key and not s.exited
for s in self._running.values()
)
def kill_all(self, task_id: str = None) -> int:
"""Kill all running processes, optionally filtered by task_id. Returns count killed."""
with self._lock:
targets = [
s for s in self._running.values()
if (task_id is None or s.task_id == task_id) and not s.exited
]
killed = 0
for session in targets:
result = self.kill_process(session.id)
if result.get("status") in ("killed", "already_exited"):
killed += 1
return killed
# ----- Cleanup / Pruning -----
def _prune_if_needed(self):
"""Remove oldest finished sessions if over MAX_PROCESSES. Must hold _lock."""
# First prune expired finished sessions
now = time.time()
expired = [
sid for sid, s in self._finished.items()
if (now - s.started_at) > FINISHED_TTL_SECONDS
]
for sid in expired:
del self._finished[sid]
# If still over limit, remove oldest finished
total = len(self._running) + len(self._finished)
if total >= MAX_PROCESSES and self._finished:
oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at)
del self._finished[oldest_id]
def cleanup_expired(self):
"""Public method to prune expired finished sessions."""
with self._lock:
self._prune_if_needed()
# ----- Checkpoint (crash recovery) -----
def _write_checkpoint(self):
"""Write running process metadata to checkpoint file."""
try:
with self._lock:
entries = []
for s in self._running.values():
if not s.exited:
entries.append({
"session_id": s.id,
"command": s.command,
"pid": s.pid,
"cwd": s.cwd,
"started_at": s.started_at,
"task_id": s.task_id,
"session_key": s.session_key,
})
CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True)
CHECKPOINT_PATH.write_text(
json.dumps(entries, indent=2), encoding="utf-8"
)
except Exception:
pass # Best-effort
def recover_from_checkpoint(self) -> int:
"""
On gateway startup, probe PIDs from checkpoint file.
Returns the number of processes recovered as detached.
"""
if not CHECKPOINT_PATH.exists():
return 0
try:
entries = json.loads(CHECKPOINT_PATH.read_text(encoding="utf-8"))
except Exception:
return 0
recovered = 0
for entry in entries:
pid = entry.get("pid")
if not pid:
continue
# Check if PID is still alive
alive = False
try:
os.kill(pid, 0)
alive = True
except (ProcessLookupError, PermissionError):
pass
if alive:
session = ProcessSession(
id=entry["session_id"],
command=entry.get("command", "unknown"),
task_id=entry.get("task_id", ""),
session_key=entry.get("session_key", ""),
pid=pid,
cwd=entry.get("cwd"),
started_at=entry.get("started_at", time.time()),
detached=True, # Can't read output, but can report status + kill
)
with self._lock:
self._running[session.id] = session
recovered += 1
print(f"[ProcessRegistry] Recovered detached process: {session.command[:60]} (pid={pid})", flush=True)
# Clear the checkpoint (will be rewritten as processes finish)
try:
CHECKPOINT_PATH.write_text("[]", encoding="utf-8")
except Exception:
pass
return recovered
# Module-level singleton
process_registry = ProcessRegistry()

View File

@@ -1122,22 +1122,33 @@ TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux environment.
**Command Execution:**
- Simple commands: Just provide the 'command' parameter
- Background processes: Set 'background': True for servers/long-running tasks
- Background processes: Set 'background': true to get a session_id for monitoring via the 'process' tool
- Command timeout: Optional 'timeout' parameter in seconds
- Working directory: Optional 'workdir' parameter for per-command cwd
- PTY mode: Set 'pty': true for interactive CLI tools (Codex, Claude Code, etc.)
**Examples:**
- Run command: `{"command": "ls -la"}`
- Background task: `{"command": "source venv/bin/activate && python server.py", "background": True}`
- Background task: `{"command": "pytest -v tests/", "background": true}` -- returns session_id, use process tool to poll/wait/kill
- With workdir: `{"command": "npm install", "workdir": "/home/user/project"}`
- With timeout: `{"command": "long_task.sh", "timeout": 300}`
- Interactive CLI: `{"command": "codex exec 'Add tests'", "background": true, "pty": true}`
**Background Process Workflow:**
1. Start: `terminal(command="...", background=true)` -- returns session_id
2. Monitor: `process(action="poll", session_id="...")` -- check status + new output
3. Wait: `process(action="wait", session_id="...", timeout=600)` -- block until done
4. Interact: `process(action="write/submit", session_id="...", data="y")` -- send stdin
5. Kill: `process(action="kill", session_id="...")` -- terminate
**Best Practices:**
- Run servers/long processes in background
- Monitor disk usage for large tasks
- Use background mode for long-running tasks, then process(wait) to block until completion
- Use workdir to run commands in specific project directories
- Install whatever tools you need with apt-get or pip
- Try to create or use a venv with uv or python -m venv to keep isolation from global system packages.
- Try to create or use a venv with uv or python -m venv to keep isolation from global system packages
**Things to avoid:**
- Do NOT use interactive tools such as tmux, vim, nano, python repl - you will get stuck.
- Do NOT use interactive tools (vim, nano, python repl) without pty=true -- they will hang without a pseudo-terminal.
- Even git sometimes becomes interactive if the output is large. If you're not sure, pipe to cat.
"""
@@ -1295,6 +1306,16 @@ def _cleanup_inactive_envs(lifetime_seconds: int = 300):
current_time = time.time()
# Check the process registry -- skip cleanup for sandboxes with active
# background processes (their _last_activity gets refreshed to keep them alive).
try:
from tools.process_registry import process_registry
for task_id in list(_last_activity.keys()):
if process_registry.has_active_processes(task_id):
_last_activity[task_id] = current_time # Keep sandbox alive
except ImportError:
pass
# Phase 1: collect stale entries and remove them from tracking dicts while
# holding the lock. Do NOT call env.cleanup() inside the lock -- Modal and
# Docker teardown can block for 10-15s, which would stall every concurrent
@@ -1501,7 +1522,10 @@ def terminal_tool(
background: bool = False,
timeout: Optional[int] = None,
task_id: Optional[str] = None,
force: bool = False
force: bool = False,
workdir: Optional[str] = None,
check_interval: Optional[int] = None,
pty: bool = False,
) -> str:
"""
Execute a command using mini-swe-agent's execution environments.
@@ -1512,6 +1536,9 @@ def terminal_tool(
timeout: Command timeout in seconds (default: from config)
task_id: Unique identifier for environment isolation (optional)
force: If True, skip dangerous command check (use after user confirms)
workdir: Working directory for this command (optional, uses session cwd if not set)
check_interval: Seconds between auto-checks for background processes (gateway only, min 30)
pty: If True, use pseudo-terminal for interactive CLI tools (local backend only)
Returns:
str: JSON string with output, exit_code, and error fields
@@ -1662,20 +1689,69 @@ def terminal_tool(
# Prepare command for execution
if background:
# Run in background with nohup and redirect output
exec_command = f"nohup {command} > /tmp/bg_output.log 2>&1 &"
# Spawn a tracked background process via the process registry.
# For local backends: uses subprocess.Popen with output buffering.
# For non-local backends: runs inside the sandbox via env.execute().
from tools.process_registry import process_registry
session_key = os.getenv("HERMES_SESSION_KEY", "")
effective_cwd = workdir or cwd
try:
result = env.execute(exec_command, timeout=10)
return json.dumps({
"output": "Background task started successfully",
if env_type == "local":
proc_session = process_registry.spawn_local(
command=command,
cwd=effective_cwd,
task_id=effective_task_id,
session_key=session_key,
env_vars=env.env if hasattr(env, 'env') else None,
use_pty=pty,
)
else:
proc_session = process_registry.spawn_via_env(
env=env,
command=command,
cwd=effective_cwd,
task_id=effective_task_id,
session_key=session_key,
)
result_data = {
"output": "Background process started",
"session_id": proc_session.id,
"pid": proc_session.pid,
"exit_code": 0,
"error": None
}, ensure_ascii=False)
"error": None,
}
# Transparent timeout clamping note
max_timeout = effective_timeout
if timeout and timeout > max_timeout:
result_data["timeout_note"] = (
f"Requested timeout {timeout}s was clamped to "
f"configured limit of {max_timeout}s"
)
# Register check_interval watcher (gateway picks this up after agent run)
if check_interval and background:
effective_interval = max(30, check_interval)
if check_interval < 30:
result_data["check_interval_note"] = (
f"Requested {check_interval}s raised to minimum 30s"
)
process_registry.pending_watchers.append({
"session_id": proc_session.id,
"check_interval": effective_interval,
"session_key": session_key,
"platform": os.getenv("HERMES_SESSION_PLATFORM", ""),
"chat_id": os.getenv("HERMES_SESSION_CHAT_ID", ""),
})
return json.dumps(result_data, ensure_ascii=False)
except Exception as e:
return json.dumps({
"output": "",
"exit_code": -1,
"error": f"Failed to start background task: {str(e)}"
"error": f"Failed to start background process: {str(e)}"
}, ensure_ascii=False)
else:
# Run foreground command with retry logic
@@ -1685,7 +1761,10 @@ def terminal_tool(
while retry_count <= max_retries:
try:
result = env.execute(command, timeout=effective_timeout)
execute_kwargs = {"timeout": effective_timeout}
if workdir:
execute_kwargs["cwd"] = workdir
result = env.execute(command, **execute_kwargs)
except Exception as e:
error_str = str(e).lower()
if "timeout" in error_str:

View File

@@ -56,8 +56,8 @@ TOOLSETS = {
},
"terminal": {
"description": "Terminal/command execution tools",
"tools": ["terminal"],
"description": "Terminal/command execution and process management tools",
"tools": ["terminal", "process"],
"includes": []
},
@@ -118,7 +118,7 @@ TOOLSETS = {
"debugging": {
"description": "Debugging and troubleshooting toolkit",
"tools": ["terminal"],
"tools": ["terminal", "process"],
"includes": ["web", "file"] # For searching error messages and solutions, and file operations
},
@@ -137,8 +137,8 @@ TOOLSETS = {
"tools": [
# Web tools
"web_search", "web_extract",
# Terminal
"terminal",
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search",
# Vision
@@ -169,8 +169,8 @@ TOOLSETS = {
"hermes-telegram": {
"description": "Telegram bot toolset - full access for personal use (terminal has safety checks)",
"tools": [
# Terminal - enabled with dangerous command approval system
"terminal",
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search",
# Web tools
@@ -194,8 +194,8 @@ TOOLSETS = {
"hermes-discord": {
"description": "Discord bot toolset - full access (terminal has safety checks via dangerous command approval)",
"tools": [
# Terminal - enabled with dangerous command approval system
"terminal",
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search",
# Web tools
@@ -221,8 +221,8 @@ TOOLSETS = {
"tools": [
# Web tools
"web_search", "web_extract",
# Terminal - only for trusted personal accounts
"terminal",
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search",
# Vision
@@ -244,8 +244,8 @@ TOOLSETS = {
"hermes-slack": {
"description": "Slack bot toolset - full access for workspace use (terminal has safety checks)",
"tools": [
# Terminal - enabled with dangerous command approval system
"terminal",
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search",
# Web tools