Files
hermes-agent/tools/process_registry.py
0xIbra 437ec17125 fix(cli): respect HERMES_HOME in all remaining hardcoded ~/.hermes paths
Several files resolved paths via Path.home() / ".hermes" or
os.path.expanduser("~/.hermes/..."), bypassing the HERMES_HOME
environment variable. This broke isolation when running multiple
Hermes instances with distinct HERMES_HOME directories.

Replace all hardcoded paths with calls to get_hermes_home() from
hermes_cli.config, consistent with the rest of the codebase.

Files fixed:
- tools/process_registry.py (processes.json)
- gateway/pairing.py (pairing/)
- gateway/sticker_cache.py (sticker_cache.json)
- gateway/channel_directory.py (channel_directory.json, sessions.json)
- gateway/config.py (gateway.json, config.yaml, sessions_dir)
- gateway/mirror.py (sessions/)
- gateway/hooks.py (hooks/)
- gateway/platforms/base.py (image_cache/, audio_cache/, document_cache/)
- gateway/platforms/whatsapp.py (whatsapp/session)
- gateway/delivery.py (cron/output)
- agent/auxiliary_client.py (auth.json)
- agent/prompt_builder.py (SOUL.md)
- cli.py (config.yaml, images/, pastes/, history)
- run_agent.py (logs/)
- tools/environments/base.py (sandboxes/)
- tools/environments/modal.py (modal_snapshots.json)
- tools/environments/singularity.py (singularity_snapshots.json)
- tools/tts_tool.py (audio_cache)
- hermes_cli/status.py (cron/jobs.json, sessions.json)
- hermes_cli/gateway.py (logs/, whatsapp session)
- hermes_cli/main.py (whatsapp/session)

Tests updated to use HERMES_HOME env var instead of patching Path.home().

Closes #892

(cherry picked from commit 78ac1bba43b8b74a934c6172f2c29bb4d03164b9)
2026-03-13 21:32:53 -07:00

866 lines
33 KiB
Python

"""
Process Registry -- In-memory registry for managed background processes.
Tracks processes spawned via terminal(background=true), providing:
- Output buffering (rolling 200KB window)
- Status polling and log retrieval
- Blocking wait with interrupt support
- Process killing
- Crash recovery via JSON checkpoint file
- Session-scoped tracking for gateway reset protection
Background processes execute THROUGH the environment interface -- nothing
runs on the host machine unless TERMINAL_ENV=local. For Docker, Singularity,
Modal, Daytona, and SSH backends, the command runs inside the sandbox.
Usage:
from tools.process_registry import process_registry
# Spawn a background process (called from terminal_tool)
session = process_registry.spawn(env, "pytest -v", task_id="task_123")
# Poll for status
result = process_registry.poll(session.id)
# Block until done
result = process_registry.wait(session.id, timeout=300)
# Kill it
process_registry.kill(session.id)
"""
import json
import logging
import os
import platform
import shlex
import shutil
import signal
import subprocess
import threading
import time
import uuid
_IS_WINDOWS = platform.system() == "Windows"
from tools.environments.local import _find_shell, _HERMES_PROVIDER_ENV_BLOCKLIST
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_cli.config import get_hermes_home
logger = logging.getLogger(__name__)
# Checkpoint file for crash recovery (gateway only)
CHECKPOINT_PATH = get_hermes_home() / "processes.json"
# Limits
MAX_OUTPUT_CHARS = 200_000 # 200KB rolling output buffer
FINISHED_TTL_SECONDS = 1800 # Keep finished processes for 30 minutes
MAX_PROCESSES = 64 # Max concurrent tracked processes (LRU pruning)
@dataclass
class ProcessSession:
"""A tracked background process with output buffering."""
id: str # Unique session ID ("proc_xxxxxxxxxxxx")
command: str # Original command string
task_id: str = "" # Task/sandbox isolation key
session_key: str = "" # Gateway session key (for reset protection)
pid: Optional[int] = None # OS process ID
process: Optional[subprocess.Popen] = None # Popen handle (local only)
env_ref: Any = None # Reference to the environment object
cwd: Optional[str] = None # Working directory
started_at: float = 0.0 # time.time() of spawn
exited: bool = False # Whether the process has finished
exit_code: Optional[int] = None # Exit code (None if still running)
output_buffer: str = "" # Rolling output (last MAX_OUTPUT_CHARS)
max_output_chars: int = MAX_OUTPUT_CHARS
detached: bool = False # True if recovered from crash (no pipe)
_lock: threading.Lock = field(default_factory=threading.Lock)
_reader_thread: Optional[threading.Thread] = field(default=None, repr=False)
_pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True)
class ProcessRegistry:
"""
In-memory registry of running and finished background processes.
Thread-safe. Accessed from:
- Executor threads (terminal_tool, process tool handlers)
- Gateway asyncio loop (watcher tasks, session reset checks)
- Cleanup thread (sandbox reaping coordination)
"""
_SHELL_NOISE_SUBSTRINGS = (
"bash: cannot set terminal process group",
"bash: no job control in this shell",
"no job control in this shell",
"cannot set terminal process group",
"tcsetattr: Inappropriate ioctl for device",
)
def __init__(self):
self._running: Dict[str, ProcessSession] = {}
self._finished: Dict[str, ProcessSession] = {}
self._lock = threading.Lock()
# Side-channel for check_interval watchers (gateway reads after agent run)
self.pending_watchers: List[Dict[str, Any]] = []
@staticmethod
def _clean_shell_noise(text: str) -> str:
"""Strip shell startup warnings from the beginning of output."""
lines = text.split("\n")
while lines and any(noise in lines[0] for noise in ProcessRegistry._SHELL_NOISE_SUBSTRINGS):
lines.pop(0)
return "\n".join(lines)
# ----- Spawn -----
def spawn_local(
self,
command: str,
cwd: str = None,
task_id: str = "",
session_key: str = "",
env_vars: dict = None,
use_pty: bool = False,
) -> ProcessSession:
"""
Spawn a background process locally.
Only for TERMINAL_ENV=local. Other backends use spawn_via_env().
Args:
use_pty: If True, use a pseudo-terminal via ptyprocess for interactive
CLI tools (Codex, Claude Code, Python REPL). Falls back to
subprocess.Popen if ptyprocess is not installed.
"""
session = ProcessSession(
id=f"proc_{uuid.uuid4().hex[:12]}",
command=command,
task_id=task_id,
session_key=session_key,
cwd=cwd or os.getcwd(),
started_at=time.time(),
)
if use_pty:
# Try PTY mode for interactive CLI tools
try:
if _IS_WINDOWS:
from winpty import PtyProcess as _PtyProcessCls
else:
from ptyprocess import PtyProcess as _PtyProcessCls
user_shell = _find_shell()
pty_env = {k: v for k, v in os.environ.items()
if k not in _HERMES_PROVIDER_ENV_BLOCKLIST}
pty_env.update(env_vars or {})
pty_env["PYTHONUNBUFFERED"] = "1"
pty_proc = _PtyProcessCls.spawn(
[user_shell, "-lic", command],
cwd=session.cwd,
env=pty_env,
dimensions=(30, 120),
)
session.pid = pty_proc.pid
# Store the pty handle on the session for read/write
session._pty = pty_proc
# PTY reader thread
reader = threading.Thread(
target=self._pty_reader_loop,
args=(session,),
daemon=True,
name=f"proc-pty-reader-{session.id}",
)
session._reader_thread = reader
reader.start()
with self._lock:
self._prune_if_needed()
self._running[session.id] = session
self._write_checkpoint()
return session
except ImportError:
logger.warning("ptyprocess not installed, falling back to pipe mode")
except Exception as e:
logger.warning("PTY spawn failed (%s), falling back to pipe mode", e)
# Standard Popen path (non-PTY or PTY fallback)
# Use the user's login shell for consistency with LocalEnvironment --
# ensures rc files are sourced and user tools are available.
user_shell = _find_shell()
# Force unbuffered output for Python scripts so progress is visible
# during background execution (libraries like tqdm/datasets buffer when
# stdout is a pipe, hiding output from process(action="poll")).
bg_env = {k: v for k, v in os.environ.items()
if k not in _HERMES_PROVIDER_ENV_BLOCKLIST}
bg_env.update(env_vars or {})
bg_env["PYTHONUNBUFFERED"] = "1"
proc = subprocess.Popen(
[user_shell, "-lic", command],
text=True,
cwd=session.cwd,
env=bg_env,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
preexec_fn=None if _IS_WINDOWS else os.setsid,
)
session.process = proc
session.pid = proc.pid
# Start output reader thread
reader = threading.Thread(
target=self._reader_loop,
args=(session,),
daemon=True,
name=f"proc-reader-{session.id}",
)
session._reader_thread = reader
reader.start()
with self._lock:
self._prune_if_needed()
self._running[session.id] = session
self._write_checkpoint()
return session
def spawn_via_env(
self,
env: Any,
command: str,
cwd: str = None,
task_id: str = "",
session_key: str = "",
timeout: int = 10,
) -> ProcessSession:
"""
Spawn a background process through a non-local environment backend.
For Docker/Singularity/Modal/Daytona/SSH: runs the command inside the sandbox
using the environment's execute() interface. We wrap the command to
capture the in-sandbox PID and redirect output to a log file inside
the sandbox, then poll the log via subsequent execute() calls.
This is less capable than local spawn (no live stdout pipe, no stdin),
but it ensures the command runs in the correct sandbox context.
"""
session = ProcessSession(
id=f"proc_{uuid.uuid4().hex[:12]}",
command=command,
task_id=task_id,
session_key=session_key,
cwd=cwd,
started_at=time.time(),
env_ref=env,
)
# Run the command in the sandbox with output capture
log_path = f"/tmp/hermes_bg_{session.id}.log"
pid_path = f"/tmp/hermes_bg_{session.id}.pid"
quoted_command = shlex.quote(command)
bg_command = (
f"nohup bash -c {quoted_command} > {log_path} 2>&1 & "
f"echo $! > {pid_path} && cat {pid_path}"
)
try:
result = env.execute(bg_command, timeout=timeout)
output = result.get("output", "").strip()
# Try to extract the PID from the output
for line in output.splitlines():
line = line.strip()
if line.isdigit():
session.pid = int(line)
break
except Exception as e:
session.exited = True
session.exit_code = -1
session.output_buffer = f"Failed to start: {e}"
if not session.exited:
# Start a poller thread that periodically reads the log file
reader = threading.Thread(
target=self._env_poller_loop,
args=(session, env, log_path, pid_path),
daemon=True,
name=f"proc-poller-{session.id}",
)
session._reader_thread = reader
reader.start()
with self._lock:
self._prune_if_needed()
self._running[session.id] = session
self._write_checkpoint()
return session
# ----- Reader / Poller Threads -----
def _reader_loop(self, session: ProcessSession):
"""Background thread: read stdout from a local Popen process."""
first_chunk = True
try:
while True:
chunk = session.process.stdout.read(4096)
if not chunk:
break
if first_chunk:
chunk = self._clean_shell_noise(chunk)
first_chunk = False
with session._lock:
session.output_buffer += chunk
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
except Exception as e:
logger.debug("Process stdout reader ended: %s", e)
# Process exited
try:
session.process.wait(timeout=5)
except Exception as e:
logger.debug("Process wait timed out or failed: %s", e)
session.exited = True
session.exit_code = session.process.returncode
self._move_to_finished(session)
def _env_poller_loop(
self, session: ProcessSession, env: Any, log_path: str, pid_path: str
):
"""Background thread: poll a sandbox log file for non-local backends."""
while not session.exited:
time.sleep(2) # Poll every 2 seconds
try:
# Read new output from the log file
result = env.execute(f"cat {log_path} 2>/dev/null", timeout=10)
new_output = result.get("output", "")
if new_output:
with session._lock:
session.output_buffer = new_output
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
# Check if process is still running
check = env.execute(
f"kill -0 $(cat {pid_path} 2>/dev/null) 2>/dev/null; echo $?",
timeout=5,
)
check_output = check.get("output", "").strip()
if check_output and check_output.splitlines()[-1].strip() != "0":
# Process has exited -- get exit code
exit_result = env.execute(
f"wait $(cat {pid_path} 2>/dev/null) 2>/dev/null; echo $?",
timeout=5,
)
exit_str = exit_result.get("output", "").strip()
try:
session.exit_code = int(exit_str.splitlines()[-1].strip())
except (ValueError, IndexError):
session.exit_code = -1
session.exited = True
self._move_to_finished(session)
return
except Exception:
# Environment might be gone (sandbox reaped, etc.)
session.exited = True
session.exit_code = -1
self._move_to_finished(session)
return
def _pty_reader_loop(self, session: ProcessSession):
"""Background thread: read output from a PTY process."""
pty = session._pty
try:
while pty.isalive():
try:
chunk = pty.read(4096)
if chunk:
# ptyprocess returns bytes
text = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace")
with session._lock:
session.output_buffer += text
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
except EOFError:
break
except Exception:
break
except Exception as e:
logger.debug("PTY stdout reader ended: %s", e)
# Process exited
try:
pty.wait()
except Exception as e:
logger.debug("PTY wait timed out or failed: %s", e)
session.exited = True
session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1
self._move_to_finished(session)
def _move_to_finished(self, session: ProcessSession):
"""Move a session from running to finished."""
with self._lock:
self._running.pop(session.id, None)
self._finished[session.id] = session
self._write_checkpoint()
# ----- Query Methods -----
def get(self, session_id: str) -> Optional[ProcessSession]:
"""Get a session by ID (running or finished)."""
with self._lock:
return self._running.get(session_id) or self._finished.get(session_id)
def poll(self, session_id: str) -> dict:
"""Check status and get new output for a background process."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
with session._lock:
output_preview = session.output_buffer[-1000:] if session.output_buffer else ""
result = {
"session_id": session.id,
"command": session.command,
"status": "exited" if session.exited else "running",
"pid": session.pid,
"uptime_seconds": int(time.time() - session.started_at),
"output_preview": output_preview,
}
if session.exited:
result["exit_code"] = session.exit_code
if session.detached:
result["detached"] = True
result["note"] = "Process recovered after restart -- output history unavailable"
return result
def read_log(self, session_id: str, offset: int = 0, limit: int = 200) -> dict:
"""Read the full output log with optional pagination by lines."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
with session._lock:
full_output = session.output_buffer
lines = full_output.splitlines()
total_lines = len(lines)
# Default: last N lines
if offset == 0 and limit > 0:
selected = lines[-limit:]
else:
selected = lines[offset:offset + limit]
return {
"session_id": session.id,
"status": "exited" if session.exited else "running",
"output": "\n".join(selected),
"total_lines": total_lines,
"showing": f"{len(selected)} lines",
}
def wait(self, session_id: str, timeout: int = None) -> dict:
"""
Block until a process exits, timeout, or interrupt.
Args:
session_id: The process to wait for.
timeout: Max seconds to block. Falls back to TERMINAL_TIMEOUT config.
Returns:
dict with status ("exited", "timeout", "interrupted", "not_found")
and output snapshot.
"""
from tools.terminal_tool import _interrupt_event
default_timeout = int(os.getenv("TERMINAL_TIMEOUT", "180"))
max_timeout = default_timeout
requested_timeout = timeout
timeout_note = None
if requested_timeout and requested_timeout > max_timeout:
effective_timeout = max_timeout
timeout_note = (
f"Requested wait of {requested_timeout}s was clamped "
f"to configured limit of {max_timeout}s"
)
else:
effective_timeout = requested_timeout or max_timeout
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
deadline = time.monotonic() + effective_timeout
while time.monotonic() < deadline:
if session.exited:
result = {
"status": "exited",
"exit_code": session.exit_code,
"output": session.output_buffer[-2000:],
}
if timeout_note:
result["timeout_note"] = timeout_note
return result
if _interrupt_event.is_set():
result = {
"status": "interrupted",
"output": session.output_buffer[-1000:],
"note": "User sent a new message -- wait interrupted",
}
if timeout_note:
result["timeout_note"] = timeout_note
return result
time.sleep(1)
result = {
"status": "timeout",
"output": session.output_buffer[-1000:],
}
if timeout_note:
result["timeout_note"] = timeout_note
else:
result["timeout_note"] = f"Waited {effective_timeout}s, process still running"
return result
def kill_process(self, session_id: str) -> dict:
"""Kill a background process."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
if session.exited:
return {
"status": "already_exited",
"exit_code": session.exit_code,
}
# Kill via PTY, Popen (local), or env execute (non-local)
try:
if session._pty:
# PTY process -- terminate via ptyprocess
try:
session._pty.terminate(force=True)
except Exception:
if session.pid:
os.kill(session.pid, signal.SIGTERM)
elif session.process:
# Local process -- kill the process group
try:
if _IS_WINDOWS:
session.process.terminate()
else:
os.killpg(os.getpgid(session.process.pid), signal.SIGTERM)
except (ProcessLookupError, PermissionError):
session.process.kill()
elif session.env_ref and session.pid:
# Non-local -- kill inside sandbox
session.env_ref.execute(f"kill {session.pid} 2>/dev/null", timeout=5)
session.exited = True
session.exit_code = -15 # SIGTERM
self._move_to_finished(session)
self._write_checkpoint()
return {"status": "killed", "session_id": session.id}
except Exception as e:
return {"status": "error", "error": str(e)}
def write_stdin(self, session_id: str, data: str) -> dict:
"""Send raw data to a running process's stdin (no newline appended)."""
session = self.get(session_id)
if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"}
if session.exited:
return {"status": "already_exited", "error": "Process has already finished"}
# PTY mode -- write through pty handle (expects bytes)
if hasattr(session, '_pty') and session._pty:
try:
pty_data = data.encode("utf-8") if isinstance(data, str) else data
session._pty.write(pty_data)
return {"status": "ok", "bytes_written": len(data)}
except Exception as e:
return {"status": "error", "error": str(e)}
# Popen mode -- write through stdin pipe
if not session.process or not session.process.stdin:
return {"status": "error", "error": "Process stdin not available (non-local backend or stdin closed)"}
try:
session.process.stdin.write(data)
session.process.stdin.flush()
return {"status": "ok", "bytes_written": len(data)}
except Exception as e:
return {"status": "error", "error": str(e)}
def submit_stdin(self, session_id: str, data: str = "") -> dict:
"""Send data + newline to a running process's stdin (like pressing Enter)."""
return self.write_stdin(session_id, data + "\n")
def list_sessions(self, task_id: str = None) -> list:
"""List all running and recently-finished processes."""
with self._lock:
all_sessions = list(self._running.values()) + list(self._finished.values())
if task_id:
all_sessions = [s for s in all_sessions if s.task_id == task_id]
result = []
for s in all_sessions:
entry = {
"session_id": s.id,
"command": s.command[:200],
"cwd": s.cwd,
"pid": s.pid,
"started_at": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(s.started_at)),
"uptime_seconds": int(time.time() - s.started_at),
"status": "exited" if s.exited else "running",
"output_preview": s.output_buffer[-200:] if s.output_buffer else "",
}
if s.exited:
entry["exit_code"] = s.exit_code
if s.detached:
entry["detached"] = True
result.append(entry)
return result
# ----- Session/Task Queries (for gateway integration) -----
def has_active_processes(self, task_id: str) -> bool:
"""Check if there are active (running) processes for a task_id."""
with self._lock:
return any(
s.task_id == task_id and not s.exited
for s in self._running.values()
)
def has_active_for_session(self, session_key: str) -> bool:
"""Check if there are active processes for a gateway session key."""
with self._lock:
return any(
s.session_key == session_key and not s.exited
for s in self._running.values()
)
def kill_all(self, task_id: str = None) -> int:
"""Kill all running processes, optionally filtered by task_id. Returns count killed."""
with self._lock:
targets = [
s for s in self._running.values()
if (task_id is None or s.task_id == task_id) and not s.exited
]
killed = 0
for session in targets:
result = self.kill_process(session.id)
if result.get("status") in ("killed", "already_exited"):
killed += 1
return killed
# ----- Cleanup / Pruning -----
def _prune_if_needed(self):
"""Remove oldest finished sessions if over MAX_PROCESSES. Must hold _lock."""
# First prune expired finished sessions
now = time.time()
expired = [
sid for sid, s in self._finished.items()
if (now - s.started_at) > FINISHED_TTL_SECONDS
]
for sid in expired:
del self._finished[sid]
# If still over limit, remove oldest finished
total = len(self._running) + len(self._finished)
if total >= MAX_PROCESSES and self._finished:
oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at)
del self._finished[oldest_id]
def cleanup_expired(self):
"""Public method to prune expired finished sessions."""
with self._lock:
self._prune_if_needed()
# ----- Checkpoint (crash recovery) -----
def _write_checkpoint(self):
"""Write running process metadata to checkpoint file atomically."""
try:
with self._lock:
entries = []
for s in self._running.values():
if not s.exited:
entries.append({
"session_id": s.id,
"command": s.command,
"pid": s.pid,
"cwd": s.cwd,
"started_at": s.started_at,
"task_id": s.task_id,
"session_key": s.session_key,
})
# Atomic write to avoid corruption on crash
from utils import atomic_json_write
atomic_json_write(CHECKPOINT_PATH, entries)
except Exception as e:
logger.debug("Failed to write checkpoint file: %s", e, exc_info=True)
def recover_from_checkpoint(self) -> int:
"""
On gateway startup, probe PIDs from checkpoint file.
Returns the number of processes recovered as detached.
"""
if not CHECKPOINT_PATH.exists():
return 0
try:
entries = json.loads(CHECKPOINT_PATH.read_text(encoding="utf-8"))
except Exception:
return 0
recovered = 0
for entry in entries:
pid = entry.get("pid")
if not pid:
continue
# Check if PID is still alive
alive = False
try:
os.kill(pid, 0)
alive = True
except (ProcessLookupError, PermissionError):
pass
if alive:
session = ProcessSession(
id=entry["session_id"],
command=entry.get("command", "unknown"),
task_id=entry.get("task_id", ""),
session_key=entry.get("session_key", ""),
pid=pid,
cwd=entry.get("cwd"),
started_at=entry.get("started_at", time.time()),
detached=True, # Can't read output, but can report status + kill
)
with self._lock:
self._running[session.id] = session
recovered += 1
logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid)
# Clear the checkpoint (will be rewritten as processes finish)
try:
from utils import atomic_json_write
atomic_json_write(CHECKPOINT_PATH, [])
except Exception as e:
logger.debug("Could not clear checkpoint file: %s", e, exc_info=True)
return recovered
# Module-level singleton
process_registry = ProcessRegistry()
# ---------------------------------------------------------------------------
# Registry -- the "process" tool schema + handler
# ---------------------------------------------------------------------------
from tools.registry import registry
PROCESS_SCHEMA = {
"name": "process",
"description": (
"Manage background processes started with terminal(background=true). "
"Actions: 'list' (show all), 'poll' (check status + new output), "
"'log' (full output with pagination), 'wait' (block until done or timeout), "
"'kill' (terminate), 'write' (send raw stdin data without newline), "
"'submit' (send data + Enter, for answering prompts)."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list", "poll", "log", "wait", "kill", "write", "submit"],
"description": "Action to perform on background processes"
},
"session_id": {
"type": "string",
"description": "Process session ID (from terminal background output). Required for all actions except 'list'."
},
"data": {
"type": "string",
"description": "Text to send to process stdin (for 'write' and 'submit' actions)"
},
"timeout": {
"type": "integer",
"description": "Max seconds to block for 'wait' action. Returns partial output on timeout.",
"minimum": 1
},
"offset": {
"type": "integer",
"description": "Line offset for 'log' action (default: last 200 lines)"
},
"limit": {
"type": "integer",
"description": "Max lines to return for 'log' action",
"minimum": 1
}
},
"required": ["action"]
}
}
def _handle_process(args, **kw):
import json as _json
task_id = kw.get("task_id")
action = args.get("action", "")
# Coerce to string — some models send session_id as an integer
session_id = str(args.get("session_id", "")) if args.get("session_id") is not None else ""
if action == "list":
return _json.dumps({"processes": process_registry.list_sessions(task_id=task_id)}, ensure_ascii=False)
elif action in ("poll", "log", "wait", "kill", "write", "submit"):
if not session_id:
return _json.dumps({"error": f"session_id is required for {action}"}, ensure_ascii=False)
if action == "poll":
return _json.dumps(process_registry.poll(session_id), ensure_ascii=False)
elif action == "log":
return _json.dumps(process_registry.read_log(
session_id, offset=args.get("offset", 0), limit=args.get("limit", 200)), ensure_ascii=False)
elif action == "wait":
return _json.dumps(process_registry.wait(session_id, timeout=args.get("timeout")), ensure_ascii=False)
elif action == "kill":
return _json.dumps(process_registry.kill_process(session_id), ensure_ascii=False)
elif action == "write":
return _json.dumps(process_registry.write_stdin(session_id, str(args.get("data", ""))), ensure_ascii=False)
elif action == "submit":
return _json.dumps(process_registry.submit_stdin(session_id, str(args.get("data", ""))), ensure_ascii=False)
return _json.dumps({"error": f"Unknown process action: {action}. Use: list, poll, log, wait, kill, write, submit"}, ensure_ascii=False)
registry.register(
name="process",
toolset="terminal",
schema=PROCESS_SCHEMA,
handler=_handle_process,
)