diff --git a/AGENTS.md b/AGENTS.md index 316fdb67b..ea0c32edb 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -25,6 +25,7 @@ hermes-agent/ │ ├── uninstall.py # Uninstaller │ └── cron.py # Cron job management ├── tools/ # Tool implementations +│ ├── process_registry.py # Background process management (spawn, poll, wait, kill) │ ├── transcription_tools.py # Speech-to-text (Whisper API) ├── gateway/ # Messaging platform adapters │ ├── pairing.py # DM pairing code system @@ -412,6 +413,37 @@ The terminal tool includes safety checks for potentially destructive commands (e --- +## Background Process Management + +The `process` tool works alongside `terminal` for managing long-running background processes: + +**Starting a background process:** +```python +terminal(command="pytest -v tests/", background=true) +# Returns: {"session_id": "proc_abc123", "pid": 12345, ...} +``` + +**Managing it with the process tool:** +- `process(action="list")` -- show all running/recent processes +- `process(action="poll", session_id="proc_abc123")` -- check status + new output +- `process(action="log", session_id="proc_abc123")` -- full output with pagination +- `process(action="wait", session_id="proc_abc123", timeout=600)` -- block until done +- `process(action="kill", session_id="proc_abc123")` -- terminate +- `process(action="write", session_id="proc_abc123", data="y")` -- send stdin +- `process(action="submit", session_id="proc_abc123", data="yes")` -- send + Enter + +**Key behaviors:** +- Background processes execute through the configured terminal backend (local/Docker/Modal/SSH/Singularity) -- never directly on the host unless `TERMINAL_ENV=local` +- The `wait` action blocks the tool call until the process finishes, times out, or is interrupted by a new user message +- PTY mode (`pty=true` on terminal) enables interactive CLI tools (Codex, Claude Code) +- In RL training, background processes are auto-killed when the episode ends (`tool_context.cleanup()`) +- In the gateway, sessions with active background processes are exempt from idle reset +- The process registry checkpoints to `~/.hermes/processes.json` for crash recovery + +Files: `tools/process_registry.py` (registry), `model_tools.py` (tool definition + handler), `tools/terminal_tool.py` (spawn integration) + +--- + ## Adding New Tools Follow this strict order to maintain consistency: diff --git a/README.md b/README.md index fdd4d72f5..40cd6ae2f 100644 --- a/README.md +++ b/README.md @@ -219,9 +219,13 @@ When the agent tries to run a potentially dangerous command (rm -rf, chmod 777, Reply "yes"/"y" to approve or "no"/"n" to deny. In CLI mode, the existing interactive approval prompt (once/session/always/deny) is preserved. -### 🖥️ Terminal Backend +### 🖥️ Terminal & Process Management -The terminal tool can execute commands in different environments: +The terminal tool can execute commands in different environments, with full background process management via the `process` tool: + +**Background processes:** Start with `terminal(command="...", background=true)`, then use `process(action="poll/wait/log/kill/write")` to monitor, wait for completion, read output, terminate, or send input. The `wait` action blocks until the process finishes -- no polling loops needed. PTY mode (`pty=true`) enables interactive CLI tools like Codex and Claude Code. + +**Execution environments:** | Backend | Description | Use Case | |---------|-------------|----------| diff --git a/environments/hermes_base_env.py b/environments/hermes_base_env.py index 98a40dd5d..8fbfd50a5 100644 --- a/environments/hermes_base_env.py +++ b/environments/hermes_base_env.py @@ -258,6 +258,11 @@ class HermesAgentBaseEnv(BaseEnv): logger.info("Sampled toolsets from '%s': %s", config.distribution, group_toolsets) else: group_toolsets = config.enabled_toolsets # None means "all available" + if group_toolsets is None: + logger.warning( + "enabled_toolsets is None -- loading ALL tools including messaging. " + "Set explicit enabled_toolsets for RL training." + ) tools = get_tool_definitions( enabled_toolsets=group_toolsets, diff --git a/environments/tool_context.py b/environments/tool_context.py index dc207937e..a9c783c8f 100644 --- a/environments/tool_context.py +++ b/environments/tool_context.py @@ -438,11 +438,21 @@ class ToolContext: def cleanup(self): """ - Release all resources (terminal VMs, browser sessions) for this rollout. + Release all resources (terminal VMs, browser sessions, background processes) + for this rollout. Called automatically by the base environment via try/finally after compute_reward() completes. You generally don't need to call this yourself. """ + # Kill any background processes from this rollout (safety net) + try: + from tools.process_registry import process_registry + killed = process_registry.kill_all(task_id=self.task_id) + if killed: + logger.debug("Process cleanup for task %s: killed %d process(es)", self.task_id, killed) + except Exception as e: + logger.debug("Process cleanup for task %s: %s", self.task_id, e) + try: cleanup_vm(self.task_id) except Exception as e: diff --git a/gateway/config.py b/gateway/config.py index 5ca063a1f..8526c4369 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -65,7 +65,7 @@ class SessionResetPolicy: """ mode: str = "both" # "daily", "idle", or "both" at_hour: int = 4 # Hour for daily reset (0-23, local time) - idle_minutes: int = 120 # Minutes of inactivity before reset + idle_minutes: int = 1440 # Minutes of inactivity before reset (24 hours) def to_dict(self) -> Dict[str, Any]: return { @@ -79,7 +79,7 @@ class SessionResetPolicy: return cls( mode=data.get("mode", "both"), at_hour=data.get("at_hour", 4), - idle_minutes=data.get("idle_minutes", 120), + idle_minutes=data.get("idle_minutes", 1440), ) diff --git a/gateway/run.py b/gateway/run.py index 15daca264..9b03de5ec 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -72,7 +72,13 @@ class GatewayRunner: def __init__(self, config: Optional[GatewayConfig] = None): self.config = config or load_gateway_config() self.adapters: Dict[Platform, BasePlatformAdapter] = {} - self.session_store = SessionStore(self.config.sessions_dir, self.config) + + # Wire process registry into session store for reset protection + from tools.process_registry import process_registry + self.session_store = SessionStore( + self.config.sessions_dir, self.config, + has_active_processes_fn=lambda key: process_registry.has_active_for_session(key), + ) self.delivery_router = DeliveryRouter(self.config) self._running = False self._shutdown_event = asyncio.Event() @@ -106,6 +112,15 @@ class GatewayRunner: # Discover and load event hooks self.hooks.discover_and_load() + # Recover background processes from checkpoint (crash recovery) + try: + from tools.process_registry import process_registry + recovered = process_registry.recover_from_checkpoint() + if recovered: + print(f"[gateway] Recovered {recovered} background process(es) from previous run") + except Exception as e: + print(f"[gateway] Process checkpoint recovery: {e}") + connected_count = 0 # Initialize and connect each configured platform @@ -429,6 +444,15 @@ class GatewayRunner: "response": (response or "")[:500], }) + # Check for pending process watchers (check_interval on background processes) + try: + from tools.process_registry import process_registry + while process_registry.pending_watchers: + watcher = process_registry.pending_watchers.pop(0) + asyncio.create_task(self._run_process_watcher(watcher)) + except Exception as e: + print(f"[gateway] Process watcher setup error: {e}", flush=True) + # Check if the agent encountered a dangerous command needing approval # The terminal tool stores the last pending approval globally try: @@ -701,6 +725,75 @@ class GatewayRunner: return prefix return user_text + async def _run_process_watcher(self, watcher: dict) -> None: + """ + Periodically check a background process and push updates to the user. + + Runs as an asyncio task. Stays silent when nothing changed. + Auto-removes when the process exits or is killed. + """ + from tools.process_registry import process_registry + + session_id = watcher["session_id"] + interval = watcher["check_interval"] + session_key = watcher.get("session_key", "") + platform_name = watcher.get("platform", "") + chat_id = watcher.get("chat_id", "") + + print(f"[gateway] Process watcher started: {session_id} (every {interval}s)", flush=True) + + last_output_len = 0 + while True: + await asyncio.sleep(interval) + + session = process_registry.get(session_id) + if session is None: + break + + current_output_len = len(session.output_buffer) + has_new_output = current_output_len > last_output_len + last_output_len = current_output_len + + if session.exited: + # Process finished -- deliver final update + new_output = session.output_buffer[-1000:] if session.output_buffer else "" + message_text = ( + f"[Background process {session_id} finished with exit code {session.exit_code}~ " + f"Here's the final output:\n{new_output}]" + ) + # Try to deliver to the originating platform + adapter = None + for p, a in self.adapters.items(): + if p.value == platform_name: + adapter = a + break + if adapter and chat_id: + try: + await adapter.send(chat_id, message_text) + except Exception as e: + print(f"[gateway] Watcher delivery error: {e}", flush=True) + break + + elif has_new_output: + # New output available -- deliver status update + new_output = session.output_buffer[-500:] if session.output_buffer else "" + message_text = ( + f"[Background process {session_id} is still running~ " + f"New output:\n{new_output}]" + ) + adapter = None + for p, a in self.adapters.items(): + if p.value == platform_name: + adapter = a + break + if adapter and chat_id: + try: + await adapter.send(chat_id, message_text) + except Exception as e: + print(f"[gateway] Watcher delivery error: {e}", flush=True) + + print(f"[gateway] Process watcher ended: {session_id}", flush=True) + async def _run_agent( self, message: str, @@ -824,6 +917,10 @@ class GatewayRunner: tools_holder = [None] # Mutable container for the tool definitions def run_sync(): + # Pass session_key to process registry via env var so background + # processes can be mapped back to this gateway session + os.environ["HERMES_SESSION_KEY"] = session_key or "" + # Read from env var or use default (same as CLI) max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "60")) diff --git a/gateway/session.py b/gateway/session.py index d6ab462bb..ec392cfb7 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -270,11 +270,15 @@ class SessionStore: - {session_id}.jsonl: Conversation transcripts """ - def __init__(self, sessions_dir: Path, config: GatewayConfig): + def __init__(self, sessions_dir: Path, config: GatewayConfig, + has_active_processes_fn=None): self.sessions_dir = sessions_dir self.config = config self._entries: Dict[str, SessionEntry] = {} self._loaded = False + # Optional callback to check if a session has active background processes. + # When set, sessions with running processes are exempt from reset. + self._has_active_processes_fn = has_active_processes_fn def _ensure_loaded(self) -> None: """Load sessions from disk if not already loaded.""" @@ -320,7 +324,14 @@ class SessionStore: Check if a session should be reset based on policy. Returns True if the session is stale and should start fresh. + Sessions with active background processes are never reset. """ + # Don't reset sessions that have active background processes + if self._has_active_processes_fn: + session_key = self._generate_session_key(source) + if self._has_active_processes_fn(session_key): + return False + policy = self.config.get_reset_policy( platform=source.platform, session_type=source.chat_type diff --git a/model_tools.py b/model_tools.py index 87942e8e1..9ad6689ca 100644 --- a/model_tools.py +++ b/model_tools.py @@ -320,6 +320,20 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]: "type": "integer", "description": "Command timeout in seconds (optional)", "minimum": 1 + }, + "workdir": { + "type": "string", + "description": "Working directory for this command (absolute path). Defaults to the session working directory." + }, + "check_interval": { + "type": "integer", + "description": "Seconds between automatic status checks for background processes (gateway/messaging only, minimum 30). When set, I'll proactively report progress.", + "minimum": 30 + }, + "pty": { + "type": "boolean", + "description": "Run in pseudo-terminal (PTY) mode for interactive CLI tools like Codex, Claude Code, or Python REPL. Only works with local and SSH backends. Default: false.", + "default": False } }, "required": ["command"] @@ -930,6 +944,64 @@ def get_send_message_tool_definitions(): ] +def get_process_tool_definitions() -> List[Dict[str, Any]]: + """ + Get tool definitions for the process management tool. + + The process tool manages background processes started with terminal(background=true). + Actions: list, poll, log, wait, kill. Phase 2 adds: write, submit. + """ + return [ + { + "type": "function", + "function": { + "name": "process", + "description": ( + "Manage background processes started with terminal(background=true). " + "Actions: 'list' (show all), 'poll' (check status + new output), " + "'log' (full output with pagination), 'wait' (block until done or timeout), " + "'kill' (terminate), 'write' (send raw data to stdin), 'submit' (send data + Enter). " + "Use 'wait' when you have nothing else to do and want " + "to block until a background process finishes." + ), + "parameters": { + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": ["list", "poll", "log", "wait", "kill", "write", "submit"], + "description": "Action to perform on background processes" + }, + "session_id": { + "type": "string", + "description": "Process session ID (from terminal background output). Required for poll/log/wait/kill." + }, + "data": { + "type": "string", + "description": "Text to send to process stdin (for 'write' and 'submit' actions)" + }, + "timeout": { + "type": "integer", + "description": "Max seconds to block for 'wait' action. Returns partial output on timeout.", + "minimum": 1 + }, + "offset": { + "type": "integer", + "description": "Line offset for 'log' action (default: last 200 lines)" + }, + "limit": { + "type": "integer", + "description": "Max lines to return for 'log' action", + "minimum": 1 + } + }, + "required": ["action"] + } + } + } + ] + + def get_all_tool_names() -> List[str]: """ Get the names of all available tools across all toolsets. @@ -945,7 +1017,7 @@ def get_all_tool_names() -> List[str]: # Terminal tools (mini-swe-agent backend) if check_terminal_requirements(): - tool_names.extend(["terminal"]) + tool_names.extend(["terminal", "process"]) # Vision tools if check_vision_requirements(): @@ -1011,6 +1083,7 @@ TOOL_TO_TOOLSET_MAP = { "web_search": "web_tools", "web_extract": "web_tools", "terminal": "terminal_tools", + "process": "terminal_tools", "vision_analyze": "vision_tools", "mixture_of_agents": "moa_tools", "image_generate": "image_tools", @@ -1042,6 +1115,7 @@ TOOL_TO_TOOLSET_MAP = { "rl_stop_training": "rl_tools", "rl_get_results": "rl_tools", "rl_list_runs": "rl_tools", + "rl_test_inference": "rl_tools", # Text-to-speech tools "text_to_speech": "tts_tools", # File manipulation tools @@ -1113,6 +1187,9 @@ def get_tool_definitions( if check_terminal_requirements(): for tool in get_terminal_tool_definitions(): all_available_tools_map[tool["function"]["name"]] = tool + # Process management tool (paired with terminal) + for tool in get_process_tool_definitions(): + all_available_tools_map[tool["function"]["name"]] = tool if check_vision_requirements(): for tool in get_vision_tool_definitions(): @@ -1339,15 +1416,70 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A command = function_args.get("command") background = function_args.get("background", False) timeout = function_args.get("timeout") - # Note: force parameter exists internally but is NOT exposed to the model - # Dangerous command approval is handled via user prompts only + workdir = function_args.get("workdir") + check_interval = function_args.get("check_interval") + pty = function_args.get("pty", False) - return terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id) + return terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id, workdir=workdir, check_interval=check_interval, pty=pty) else: return json.dumps({"error": f"Unknown terminal function: {function_name}"}, ensure_ascii=False) +def handle_process_function_call(function_name: str, function_args: Dict[str, Any], task_id: Optional[str] = None) -> str: + """ + Handle function calls for the process management tool. + + Routes actions (list, poll, log, wait, kill) to the ProcessRegistry. + """ + from tools.process_registry import process_registry + + action = function_args.get("action", "") + session_id = function_args.get("session_id", "") + + if action == "list": + sessions = process_registry.list_sessions(task_id=task_id) + return json.dumps({"processes": sessions}, ensure_ascii=False) + + elif action == "poll": + if not session_id: + return json.dumps({"error": "session_id is required for poll"}, ensure_ascii=False) + return json.dumps(process_registry.poll(session_id), ensure_ascii=False) + + elif action == "log": + if not session_id: + return json.dumps({"error": "session_id is required for log"}, ensure_ascii=False) + offset = function_args.get("offset", 0) + limit = function_args.get("limit", 200) + return json.dumps(process_registry.read_log(session_id, offset=offset, limit=limit), ensure_ascii=False) + + elif action == "wait": + if not session_id: + return json.dumps({"error": "session_id is required for wait"}, ensure_ascii=False) + timeout = function_args.get("timeout") + return json.dumps(process_registry.wait(session_id, timeout=timeout), ensure_ascii=False) + + elif action == "kill": + if not session_id: + return json.dumps({"error": "session_id is required for kill"}, ensure_ascii=False) + return json.dumps(process_registry.kill_process(session_id), ensure_ascii=False) + + elif action == "write": + if not session_id: + return json.dumps({"error": "session_id is required for write"}, ensure_ascii=False) + data = function_args.get("data", "") + return json.dumps(process_registry.write_stdin(session_id, data), ensure_ascii=False) + + elif action == "submit": + if not session_id: + return json.dumps({"error": "session_id is required for submit"}, ensure_ascii=False) + data = function_args.get("data", "") + return json.dumps(process_registry.submit_stdin(session_id, data), ensure_ascii=False) + + else: + return json.dumps({"error": f"Unknown process action: {action}. Use: list, poll, log, wait, kill, write, submit"}, ensure_ascii=False) + + def handle_vision_function_call(function_name: str, function_args: Dict[str, Any]) -> str: """ Handle function calls for vision tools. @@ -1779,6 +1911,10 @@ def handle_function_call( elif function_name in ["terminal"]: return handle_terminal_function_call(function_name, function_args, task_id) + # Route process management tools + elif function_name in ["process"]: + return handle_process_function_call(function_name, function_args, task_id) + # Route vision tools elif function_name in ["vision_analyze"]: return handle_vision_function_call(function_name, function_args) diff --git a/pyproject.toml b/pyproject.toml index e0ee1ae63..9b07c8655 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ cron = ["croniter"] slack = ["slack-bolt>=1.18.0", "slack-sdk>=3.27.0"] cli = ["simple-term-menu"] tts-premium = ["elevenlabs"] +pty = ["ptyprocess>=0.7.0"] all = [ "hermes-agent[modal]", "hermes-agent[messaging]", @@ -51,6 +52,7 @@ all = [ "hermes-agent[dev]", "hermes-agent[tts-premium]", "hermes-agent[slack]", + "hermes-agent[pty]", ] [project.scripts] diff --git a/tools/process_registry.py b/tools/process_registry.py new file mode 100644 index 000000000..5f135ffa9 --- /dev/null +++ b/tools/process_registry.py @@ -0,0 +1,726 @@ +""" +Process Registry -- In-memory registry for managed background processes. + +Tracks processes spawned via terminal(background=true), providing: + - Output buffering (rolling 200KB window) + - Status polling and log retrieval + - Blocking wait with interrupt support + - Process killing + - Crash recovery via JSON checkpoint file + - Session-scoped tracking for gateway reset protection + +Background processes execute THROUGH the environment interface -- nothing +runs on the host machine unless TERMINAL_ENV=local. For Docker, Singularity, +Modal, and SSH backends, the command runs inside the sandbox. + +Usage: + from tools.process_registry import process_registry + + # Spawn a background process (called from terminal_tool) + session = process_registry.spawn(env, "pytest -v", task_id="task_123") + + # Poll for status + result = process_registry.poll(session.id) + + # Block until done + result = process_registry.wait(session.id, timeout=300) + + # Kill it + process_registry.kill(session.id) +""" + +import json +import os +import signal +import subprocess +import threading +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + + +# Checkpoint file for crash recovery (gateway only) +CHECKPOINT_PATH = Path(os.path.expanduser("~/.hermes/processes.json")) + +# Limits +MAX_OUTPUT_CHARS = 200_000 # 200KB rolling output buffer +FINISHED_TTL_SECONDS = 1800 # Keep finished processes for 30 minutes +MAX_PROCESSES = 64 # Max concurrent tracked processes (LRU pruning) + + +@dataclass +class ProcessSession: + """A tracked background process with output buffering.""" + id: str # Unique session ID ("proc_xxxxxxxxxxxx") + command: str # Original command string + task_id: str = "" # Task/sandbox isolation key + session_key: str = "" # Gateway session key (for reset protection) + pid: Optional[int] = None # OS process ID + process: Optional[subprocess.Popen] = None # Popen handle (local only) + env_ref: Any = None # Reference to the environment object + cwd: Optional[str] = None # Working directory + started_at: float = 0.0 # time.time() of spawn + exited: bool = False # Whether the process has finished + exit_code: Optional[int] = None # Exit code (None if still running) + output_buffer: str = "" # Rolling output (last MAX_OUTPUT_CHARS) + max_output_chars: int = MAX_OUTPUT_CHARS + detached: bool = False # True if recovered from crash (no pipe) + _lock: threading.Lock = field(default_factory=threading.Lock) + _reader_thread: Optional[threading.Thread] = field(default=None, repr=False) + _pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True) + + +class ProcessRegistry: + """ + In-memory registry of running and finished background processes. + + Thread-safe. Accessed from: + - Executor threads (terminal_tool, process tool handlers) + - Gateway asyncio loop (watcher tasks, session reset checks) + - Cleanup thread (sandbox reaping coordination) + """ + + def __init__(self): + self._running: Dict[str, ProcessSession] = {} + self._finished: Dict[str, ProcessSession] = {} + self._lock = threading.Lock() + + # Side-channel for check_interval watchers (gateway reads after agent run) + self.pending_watchers: List[Dict[str, Any]] = [] + + # ----- Spawn ----- + + def spawn_local( + self, + command: str, + cwd: str = None, + task_id: str = "", + session_key: str = "", + env_vars: dict = None, + use_pty: bool = False, + ) -> ProcessSession: + """ + Spawn a background process locally. + + Only for TERMINAL_ENV=local. Other backends use spawn_via_env(). + + Args: + use_pty: If True, use a pseudo-terminal via ptyprocess for interactive + CLI tools (Codex, Claude Code, Python REPL). Falls back to + subprocess.Popen if ptyprocess is not installed. + """ + session = ProcessSession( + id=f"proc_{uuid.uuid4().hex[:12]}", + command=command, + task_id=task_id, + session_key=session_key, + cwd=cwd or os.getcwd(), + started_at=time.time(), + ) + + if use_pty: + # Try PTY mode for interactive CLI tools + try: + import ptyprocess + pty_proc = ptyprocess.PtyProcess.spawn( + ["bash", "-c", command], + cwd=session.cwd, + env=os.environ | (env_vars or {}), + dimensions=(30, 120), + ) + session.pid = pty_proc.pid + # Store the pty handle on the session for read/write + session._pty = pty_proc + + # PTY reader thread + reader = threading.Thread( + target=self._pty_reader_loop, + args=(session,), + daemon=True, + name=f"proc-pty-reader-{session.id}", + ) + session._reader_thread = reader + reader.start() + + with self._lock: + self._prune_if_needed() + self._running[session.id] = session + + self._write_checkpoint() + return session + + except ImportError: + # ptyprocess not installed -- fall back to Popen + print(f"[ProcessRegistry] ptyprocess not installed, falling back to pipe mode", flush=True) + except Exception as e: + print(f"[ProcessRegistry] PTY spawn failed ({e}), falling back to pipe mode", flush=True) + + # Standard Popen path (non-PTY or PTY fallback) + proc = subprocess.Popen( + command, + shell=True, + text=True, + cwd=session.cwd, + env=os.environ | (env_vars or {}), + encoding="utf-8", + errors="replace", + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + stdin=subprocess.PIPE, + preexec_fn=os.setsid, + ) + + session.process = proc + session.pid = proc.pid + + # Start output reader thread + reader = threading.Thread( + target=self._reader_loop, + args=(session,), + daemon=True, + name=f"proc-reader-{session.id}", + ) + session._reader_thread = reader + reader.start() + + with self._lock: + self._prune_if_needed() + self._running[session.id] = session + + self._write_checkpoint() + return session + + def spawn_via_env( + self, + env: Any, + command: str, + cwd: str = None, + task_id: str = "", + session_key: str = "", + timeout: int = 10, + ) -> ProcessSession: + """ + Spawn a background process through a non-local environment backend. + + For Docker/Singularity/Modal/SSH: runs the command inside the sandbox + using the environment's execute() interface. We wrap the command to + capture the in-sandbox PID and redirect output to a log file inside + the sandbox, then poll the log via subsequent execute() calls. + + This is less capable than local spawn (no live stdout pipe, no stdin), + but it ensures the command runs in the correct sandbox context. + """ + session = ProcessSession( + id=f"proc_{uuid.uuid4().hex[:12]}", + command=command, + task_id=task_id, + session_key=session_key, + cwd=cwd, + started_at=time.time(), + env_ref=env, + ) + + # Run the command in the sandbox with output capture + log_path = f"/tmp/hermes_bg_{session.id}.log" + pid_path = f"/tmp/hermes_bg_{session.id}.pid" + bg_command = ( + f"nohup bash -c '{command}' > {log_path} 2>&1 & " + f"echo $! > {pid_path} && cat {pid_path}" + ) + + try: + result = env.execute(bg_command, timeout=timeout) + output = result.get("output", "").strip() + # Try to extract the PID from the output + for line in output.splitlines(): + line = line.strip() + if line.isdigit(): + session.pid = int(line) + break + except Exception as e: + session.exited = True + session.exit_code = -1 + session.output_buffer = f"Failed to start: {e}" + + if not session.exited: + # Start a poller thread that periodically reads the log file + reader = threading.Thread( + target=self._env_poller_loop, + args=(session, env, log_path, pid_path), + daemon=True, + name=f"proc-poller-{session.id}", + ) + session._reader_thread = reader + reader.start() + + with self._lock: + self._prune_if_needed() + self._running[session.id] = session + + self._write_checkpoint() + return session + + # ----- Reader / Poller Threads ----- + + def _reader_loop(self, session: ProcessSession): + """Background thread: read stdout from a local Popen process.""" + try: + while True: + chunk = session.process.stdout.read(4096) + if not chunk: + break + with session._lock: + session.output_buffer += chunk + if len(session.output_buffer) > session.max_output_chars: + session.output_buffer = session.output_buffer[-session.max_output_chars:] + except Exception: + pass + + # Process exited + try: + session.process.wait(timeout=5) + except Exception: + pass + session.exited = True + session.exit_code = session.process.returncode + self._move_to_finished(session) + + def _env_poller_loop( + self, session: ProcessSession, env: Any, log_path: str, pid_path: str + ): + """Background thread: poll a sandbox log file for non-local backends.""" + while not session.exited: + time.sleep(2) # Poll every 2 seconds + try: + # Read new output from the log file + result = env.execute(f"cat {log_path} 2>/dev/null", timeout=10) + new_output = result.get("output", "") + if new_output: + with session._lock: + session.output_buffer = new_output + if len(session.output_buffer) > session.max_output_chars: + session.output_buffer = session.output_buffer[-session.max_output_chars:] + + # Check if process is still running + check = env.execute( + f"kill -0 $(cat {pid_path} 2>/dev/null) 2>/dev/null; echo $?", + timeout=5, + ) + check_output = check.get("output", "").strip() + if check_output and check_output.splitlines()[-1].strip() != "0": + # Process has exited -- get exit code + exit_result = env.execute( + f"wait $(cat {pid_path} 2>/dev/null) 2>/dev/null; echo $?", + timeout=5, + ) + exit_str = exit_result.get("output", "").strip() + try: + session.exit_code = int(exit_str.splitlines()[-1].strip()) + except (ValueError, IndexError): + session.exit_code = -1 + session.exited = True + self._move_to_finished(session) + return + + except Exception: + # Environment might be gone (sandbox reaped, etc.) + session.exited = True + session.exit_code = -1 + self._move_to_finished(session) + return + + def _pty_reader_loop(self, session: ProcessSession): + """Background thread: read output from a PTY process.""" + pty = session._pty + try: + while pty.isalive(): + try: + chunk = pty.read(4096) + if chunk: + # ptyprocess returns bytes + text = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace") + with session._lock: + session.output_buffer += text + if len(session.output_buffer) > session.max_output_chars: + session.output_buffer = session.output_buffer[-session.max_output_chars:] + except EOFError: + break + except Exception: + break + except Exception: + pass + + # Process exited + try: + pty.wait() + except Exception: + pass + session.exited = True + session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1 + self._move_to_finished(session) + + def _move_to_finished(self, session: ProcessSession): + """Move a session from running to finished.""" + with self._lock: + self._running.pop(session.id, None) + self._finished[session.id] = session + self._write_checkpoint() + + # ----- Query Methods ----- + + def get(self, session_id: str) -> Optional[ProcessSession]: + """Get a session by ID (running or finished).""" + with self._lock: + return self._running.get(session_id) or self._finished.get(session_id) + + def poll(self, session_id: str) -> dict: + """Check status and get new output for a background process.""" + session = self.get(session_id) + if session is None: + return {"status": "not_found", "error": f"No process with ID {session_id}"} + + with session._lock: + output_preview = session.output_buffer[-1000:] if session.output_buffer else "" + + result = { + "session_id": session.id, + "command": session.command, + "status": "exited" if session.exited else "running", + "pid": session.pid, + "uptime_seconds": int(time.time() - session.started_at), + "output_preview": output_preview, + } + if session.exited: + result["exit_code"] = session.exit_code + if session.detached: + result["detached"] = True + result["note"] = "Process recovered after restart -- output history unavailable" + return result + + def read_log(self, session_id: str, offset: int = 0, limit: int = 200) -> dict: + """Read the full output log with optional pagination by lines.""" + session = self.get(session_id) + if session is None: + return {"status": "not_found", "error": f"No process with ID {session_id}"} + + with session._lock: + full_output = session.output_buffer + + lines = full_output.splitlines() + total_lines = len(lines) + + # Default: last N lines + if offset == 0 and limit > 0: + selected = lines[-limit:] + else: + selected = lines[offset:offset + limit] + + return { + "session_id": session.id, + "status": "exited" if session.exited else "running", + "output": "\n".join(selected), + "total_lines": total_lines, + "showing": f"{len(selected)} lines", + } + + def wait(self, session_id: str, timeout: int = None) -> dict: + """ + Block until a process exits, timeout, or interrupt. + + Args: + session_id: The process to wait for. + timeout: Max seconds to block. Falls back to TERMINAL_TIMEOUT config. + + Returns: + dict with status ("exited", "timeout", "interrupted", "not_found") + and output snapshot. + """ + from tools.terminal_tool import _interrupt_event + + default_timeout = int(os.getenv("TERMINAL_TIMEOUT", "180")) + max_timeout = default_timeout + requested_timeout = timeout + timeout_note = None + + if requested_timeout and requested_timeout > max_timeout: + effective_timeout = max_timeout + timeout_note = ( + f"Requested wait of {requested_timeout}s was clamped " + f"to configured limit of {max_timeout}s" + ) + else: + effective_timeout = requested_timeout or max_timeout + + session = self.get(session_id) + if session is None: + return {"status": "not_found", "error": f"No process with ID {session_id}"} + + deadline = time.monotonic() + effective_timeout + + while time.monotonic() < deadline: + if session.exited: + result = { + "status": "exited", + "exit_code": session.exit_code, + "output": session.output_buffer[-2000:], + } + if timeout_note: + result["timeout_note"] = timeout_note + return result + + if _interrupt_event.is_set(): + result = { + "status": "interrupted", + "output": session.output_buffer[-1000:], + "note": "User sent a new message -- wait interrupted", + } + if timeout_note: + result["timeout_note"] = timeout_note + return result + + time.sleep(1) + + result = { + "status": "timeout", + "output": session.output_buffer[-1000:], + } + if timeout_note: + result["timeout_note"] = timeout_note + else: + result["timeout_note"] = f"Waited {effective_timeout}s, process still running" + return result + + def kill_process(self, session_id: str) -> dict: + """Kill a background process.""" + session = self.get(session_id) + if session is None: + return {"status": "not_found", "error": f"No process with ID {session_id}"} + + if session.exited: + return { + "status": "already_exited", + "exit_code": session.exit_code, + } + + # Kill via PTY, Popen (local), or env execute (non-local) + try: + if session._pty: + # PTY process -- terminate via ptyprocess + try: + session._pty.terminate(force=True) + except Exception: + if session.pid: + os.kill(session.pid, signal.SIGTERM) + elif session.process: + # Local process -- kill the process group + try: + os.killpg(os.getpgid(session.process.pid), signal.SIGTERM) + except (ProcessLookupError, PermissionError): + session.process.kill() + elif session.env_ref and session.pid: + # Non-local -- kill inside sandbox + session.env_ref.execute(f"kill {session.pid} 2>/dev/null", timeout=5) + session.exited = True + session.exit_code = -15 # SIGTERM + self._move_to_finished(session) + self._write_checkpoint() + return {"status": "killed", "session_id": session.id} + except Exception as e: + return {"status": "error", "error": str(e)} + + def write_stdin(self, session_id: str, data: str) -> dict: + """Send raw data to a running process's stdin (no newline appended).""" + session = self.get(session_id) + if session is None: + return {"status": "not_found", "error": f"No process with ID {session_id}"} + if session.exited: + return {"status": "already_exited", "error": "Process has already finished"} + + # PTY mode -- write through pty handle + if hasattr(session, '_pty') and session._pty: + try: + session._pty.write(data) + return {"status": "ok", "bytes_written": len(data)} + except Exception as e: + return {"status": "error", "error": str(e)} + + # Popen mode -- write through stdin pipe + if not session.process or not session.process.stdin: + return {"status": "error", "error": "Process stdin not available (non-local backend or stdin closed)"} + try: + session.process.stdin.write(data) + session.process.stdin.flush() + return {"status": "ok", "bytes_written": len(data)} + except Exception as e: + return {"status": "error", "error": str(e)} + + def submit_stdin(self, session_id: str, data: str = "") -> dict: + """Send data + newline to a running process's stdin (like pressing Enter).""" + return self.write_stdin(session_id, data + "\n") + + def list_sessions(self, task_id: str = None) -> list: + """List all running and recently-finished processes.""" + with self._lock: + all_sessions = list(self._running.values()) + list(self._finished.values()) + + if task_id: + all_sessions = [s for s in all_sessions if s.task_id == task_id] + + result = [] + for s in all_sessions: + entry = { + "session_id": s.id, + "command": s.command[:200], + "cwd": s.cwd, + "pid": s.pid, + "started_at": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(s.started_at)), + "uptime_seconds": int(time.time() - s.started_at), + "status": "exited" if s.exited else "running", + "output_preview": s.output_buffer[-200:] if s.output_buffer else "", + } + if s.exited: + entry["exit_code"] = s.exit_code + if s.detached: + entry["detached"] = True + result.append(entry) + return result + + # ----- Session/Task Queries (for gateway integration) ----- + + def has_active_processes(self, task_id: str) -> bool: + """Check if there are active (running) processes for a task_id.""" + with self._lock: + return any( + s.task_id == task_id and not s.exited + for s in self._running.values() + ) + + def has_active_for_session(self, session_key: str) -> bool: + """Check if there are active processes for a gateway session key.""" + with self._lock: + return any( + s.session_key == session_key and not s.exited + for s in self._running.values() + ) + + def kill_all(self, task_id: str = None) -> int: + """Kill all running processes, optionally filtered by task_id. Returns count killed.""" + with self._lock: + targets = [ + s for s in self._running.values() + if (task_id is None or s.task_id == task_id) and not s.exited + ] + + killed = 0 + for session in targets: + result = self.kill_process(session.id) + if result.get("status") in ("killed", "already_exited"): + killed += 1 + return killed + + # ----- Cleanup / Pruning ----- + + def _prune_if_needed(self): + """Remove oldest finished sessions if over MAX_PROCESSES. Must hold _lock.""" + # First prune expired finished sessions + now = time.time() + expired = [ + sid for sid, s in self._finished.items() + if (now - s.started_at) > FINISHED_TTL_SECONDS + ] + for sid in expired: + del self._finished[sid] + + # If still over limit, remove oldest finished + total = len(self._running) + len(self._finished) + if total >= MAX_PROCESSES and self._finished: + oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at) + del self._finished[oldest_id] + + def cleanup_expired(self): + """Public method to prune expired finished sessions.""" + with self._lock: + self._prune_if_needed() + + # ----- Checkpoint (crash recovery) ----- + + def _write_checkpoint(self): + """Write running process metadata to checkpoint file.""" + try: + with self._lock: + entries = [] + for s in self._running.values(): + if not s.exited: + entries.append({ + "session_id": s.id, + "command": s.command, + "pid": s.pid, + "cwd": s.cwd, + "started_at": s.started_at, + "task_id": s.task_id, + "session_key": s.session_key, + }) + CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True) + CHECKPOINT_PATH.write_text( + json.dumps(entries, indent=2), encoding="utf-8" + ) + except Exception: + pass # Best-effort + + def recover_from_checkpoint(self) -> int: + """ + On gateway startup, probe PIDs from checkpoint file. + + Returns the number of processes recovered as detached. + """ + if not CHECKPOINT_PATH.exists(): + return 0 + + try: + entries = json.loads(CHECKPOINT_PATH.read_text(encoding="utf-8")) + except Exception: + return 0 + + recovered = 0 + for entry in entries: + pid = entry.get("pid") + if not pid: + continue + + # Check if PID is still alive + alive = False + try: + os.kill(pid, 0) + alive = True + except (ProcessLookupError, PermissionError): + pass + + if alive: + session = ProcessSession( + id=entry["session_id"], + command=entry.get("command", "unknown"), + task_id=entry.get("task_id", ""), + session_key=entry.get("session_key", ""), + pid=pid, + cwd=entry.get("cwd"), + started_at=entry.get("started_at", time.time()), + detached=True, # Can't read output, but can report status + kill + ) + with self._lock: + self._running[session.id] = session + recovered += 1 + print(f"[ProcessRegistry] Recovered detached process: {session.command[:60]} (pid={pid})", flush=True) + + # Clear the checkpoint (will be rewritten as processes finish) + try: + CHECKPOINT_PATH.write_text("[]", encoding="utf-8") + except Exception: + pass + + return recovered + + +# Module-level singleton +process_registry = ProcessRegistry() diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 776a34a9c..da2d483da 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -1122,22 +1122,33 @@ TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux environment. **Command Execution:** - Simple commands: Just provide the 'command' parameter -- Background processes: Set 'background': True for servers/long-running tasks +- Background processes: Set 'background': true to get a session_id for monitoring via the 'process' tool - Command timeout: Optional 'timeout' parameter in seconds +- Working directory: Optional 'workdir' parameter for per-command cwd +- PTY mode: Set 'pty': true for interactive CLI tools (Codex, Claude Code, etc.) **Examples:** - Run command: `{"command": "ls -la"}` -- Background task: `{"command": "source venv/bin/activate && python server.py", "background": True}` +- Background task: `{"command": "pytest -v tests/", "background": true}` -- returns session_id, use process tool to poll/wait/kill +- With workdir: `{"command": "npm install", "workdir": "/home/user/project"}` - With timeout: `{"command": "long_task.sh", "timeout": 300}` +- Interactive CLI: `{"command": "codex exec 'Add tests'", "background": true, "pty": true}` + +**Background Process Workflow:** +1. Start: `terminal(command="...", background=true)` -- returns session_id +2. Monitor: `process(action="poll", session_id="...")` -- check status + new output +3. Wait: `process(action="wait", session_id="...", timeout=600)` -- block until done +4. Interact: `process(action="write/submit", session_id="...", data="y")` -- send stdin +5. Kill: `process(action="kill", session_id="...")` -- terminate **Best Practices:** -- Run servers/long processes in background -- Monitor disk usage for large tasks +- Use background mode for long-running tasks, then process(wait) to block until completion +- Use workdir to run commands in specific project directories - Install whatever tools you need with apt-get or pip -- Try to create or use a venv with uv or python -m venv to keep isolation from global system packages. +- Try to create or use a venv with uv or python -m venv to keep isolation from global system packages **Things to avoid:** -- Do NOT use interactive tools such as tmux, vim, nano, python repl - you will get stuck. +- Do NOT use interactive tools (vim, nano, python repl) without pty=true -- they will hang without a pseudo-terminal. - Even git sometimes becomes interactive if the output is large. If you're not sure, pipe to cat. """ @@ -1295,6 +1306,16 @@ def _cleanup_inactive_envs(lifetime_seconds: int = 300): current_time = time.time() + # Check the process registry -- skip cleanup for sandboxes with active + # background processes (their _last_activity gets refreshed to keep them alive). + try: + from tools.process_registry import process_registry + for task_id in list(_last_activity.keys()): + if process_registry.has_active_processes(task_id): + _last_activity[task_id] = current_time # Keep sandbox alive + except ImportError: + pass + # Phase 1: collect stale entries and remove them from tracking dicts while # holding the lock. Do NOT call env.cleanup() inside the lock -- Modal and # Docker teardown can block for 10-15s, which would stall every concurrent @@ -1501,7 +1522,10 @@ def terminal_tool( background: bool = False, timeout: Optional[int] = None, task_id: Optional[str] = None, - force: bool = False + force: bool = False, + workdir: Optional[str] = None, + check_interval: Optional[int] = None, + pty: bool = False, ) -> str: """ Execute a command using mini-swe-agent's execution environments. @@ -1512,6 +1536,9 @@ def terminal_tool( timeout: Command timeout in seconds (default: from config) task_id: Unique identifier for environment isolation (optional) force: If True, skip dangerous command check (use after user confirms) + workdir: Working directory for this command (optional, uses session cwd if not set) + check_interval: Seconds between auto-checks for background processes (gateway only, min 30) + pty: If True, use pseudo-terminal for interactive CLI tools (local backend only) Returns: str: JSON string with output, exit_code, and error fields @@ -1662,20 +1689,69 @@ def terminal_tool( # Prepare command for execution if background: - # Run in background with nohup and redirect output - exec_command = f"nohup {command} > /tmp/bg_output.log 2>&1 &" + # Spawn a tracked background process via the process registry. + # For local backends: uses subprocess.Popen with output buffering. + # For non-local backends: runs inside the sandbox via env.execute(). + from tools.process_registry import process_registry + + session_key = os.getenv("HERMES_SESSION_KEY", "") + effective_cwd = workdir or cwd try: - result = env.execute(exec_command, timeout=10) - return json.dumps({ - "output": "Background task started successfully", + if env_type == "local": + proc_session = process_registry.spawn_local( + command=command, + cwd=effective_cwd, + task_id=effective_task_id, + session_key=session_key, + env_vars=env.env if hasattr(env, 'env') else None, + use_pty=pty, + ) + else: + proc_session = process_registry.spawn_via_env( + env=env, + command=command, + cwd=effective_cwd, + task_id=effective_task_id, + session_key=session_key, + ) + + result_data = { + "output": "Background process started", + "session_id": proc_session.id, + "pid": proc_session.pid, "exit_code": 0, - "error": None - }, ensure_ascii=False) + "error": None, + } + + # Transparent timeout clamping note + max_timeout = effective_timeout + if timeout and timeout > max_timeout: + result_data["timeout_note"] = ( + f"Requested timeout {timeout}s was clamped to " + f"configured limit of {max_timeout}s" + ) + + # Register check_interval watcher (gateway picks this up after agent run) + if check_interval and background: + effective_interval = max(30, check_interval) + if check_interval < 30: + result_data["check_interval_note"] = ( + f"Requested {check_interval}s raised to minimum 30s" + ) + process_registry.pending_watchers.append({ + "session_id": proc_session.id, + "check_interval": effective_interval, + "session_key": session_key, + "platform": os.getenv("HERMES_SESSION_PLATFORM", ""), + "chat_id": os.getenv("HERMES_SESSION_CHAT_ID", ""), + }) + + return json.dumps(result_data, ensure_ascii=False) except Exception as e: return json.dumps({ "output": "", "exit_code": -1, - "error": f"Failed to start background task: {str(e)}" + "error": f"Failed to start background process: {str(e)}" }, ensure_ascii=False) else: # Run foreground command with retry logic @@ -1685,7 +1761,10 @@ def terminal_tool( while retry_count <= max_retries: try: - result = env.execute(command, timeout=effective_timeout) + execute_kwargs = {"timeout": effective_timeout} + if workdir: + execute_kwargs["cwd"] = workdir + result = env.execute(command, **execute_kwargs) except Exception as e: error_str = str(e).lower() if "timeout" in error_str: diff --git a/toolsets.py b/toolsets.py index 19eb6f272..c39417196 100644 --- a/toolsets.py +++ b/toolsets.py @@ -56,8 +56,8 @@ TOOLSETS = { }, "terminal": { - "description": "Terminal/command execution tools", - "tools": ["terminal"], + "description": "Terminal/command execution and process management tools", + "tools": ["terminal", "process"], "includes": [] }, @@ -118,7 +118,7 @@ TOOLSETS = { "debugging": { "description": "Debugging and troubleshooting toolkit", - "tools": ["terminal"], + "tools": ["terminal", "process"], "includes": ["web", "file"] # For searching error messages and solutions, and file operations }, @@ -137,8 +137,8 @@ TOOLSETS = { "tools": [ # Web tools "web_search", "web_extract", - # Terminal - "terminal", + # Terminal + process management + "terminal", "process", # File manipulation "read_file", "write_file", "patch", "search", # Vision @@ -169,8 +169,8 @@ TOOLSETS = { "hermes-telegram": { "description": "Telegram bot toolset - full access for personal use (terminal has safety checks)", "tools": [ - # Terminal - enabled with dangerous command approval system - "terminal", + # Terminal + process management + "terminal", "process", # File manipulation "read_file", "write_file", "patch", "search", # Web tools @@ -194,8 +194,8 @@ TOOLSETS = { "hermes-discord": { "description": "Discord bot toolset - full access (terminal has safety checks via dangerous command approval)", "tools": [ - # Terminal - enabled with dangerous command approval system - "terminal", + # Terminal + process management + "terminal", "process", # File manipulation "read_file", "write_file", "patch", "search", # Web tools @@ -221,8 +221,8 @@ TOOLSETS = { "tools": [ # Web tools "web_search", "web_extract", - # Terminal - only for trusted personal accounts - "terminal", + # Terminal + process management + "terminal", "process", # File manipulation "read_file", "write_file", "patch", "search", # Vision @@ -244,8 +244,8 @@ TOOLSETS = { "hermes-slack": { "description": "Slack bot toolset - full access for workspace use (terminal has safety checks)", "tools": [ - # Terminal - enabled with dangerous command approval system - "terminal", + # Terminal + process management + "terminal", "process", # File manipulation "read_file", "write_file", "patch", "search", # Web tools