diff --git a/cli.py b/cli.py index 09cf2094a..6f02dc936 100644 --- a/cli.py +++ b/cli.py @@ -8134,6 +8134,25 @@ class HermesCLI: # Periodic config watcher — auto-reload MCP on mcp_servers change if not self._agent_running: self._check_config_mcp_changes() + # Check for background process completion notifications + # while the agent is idle (user hasn't typed anything yet). + try: + from tools.process_registry import process_registry + if not process_registry.completion_queue.empty(): + completion = process_registry.completion_queue.get_nowait() + _exit = completion.get("exit_code", "?") + _cmd = completion.get("command", "unknown") + _sid = completion.get("session_id", "unknown") + _out = completion.get("output", "") + _synth = ( + f"[SYSTEM: Background process {_sid} completed " + f"(exit code {_exit}).\n" + f"Command: {_cmd}\n" + f"Output:\n{_out}]" + ) + self._pending_input.put(_synth) + except Exception: + pass continue if not user_input: @@ -8247,7 +8266,29 @@ class HermesCLI: except Exception as e: _cprint(f"{_DIM}Voice auto-restart failed: {e}{_RST}") threading.Thread(target=_restart_recording, daemon=True).start() - + + # Drain process completion notifications — any background + # process that finished with notify_on_complete while the + # agent was running (or before) gets auto-injected as a + # new user message so the agent can react to it. + try: + from tools.process_registry import process_registry + while not process_registry.completion_queue.empty(): + completion = process_registry.completion_queue.get_nowait() + _exit = completion.get("exit_code", "?") + _cmd = completion.get("command", "unknown") + _sid = completion.get("session_id", "unknown") + _out = completion.get("output", "") + _synth = ( + f"[SYSTEM: Background process {_sid} completed " + f"(exit code {_exit}).\n" + f"Command: {_cmd}\n" + f"Output:\n{_out}]" + ) + self._pending_input.put(_synth) + except Exception: + pass # Non-fatal — don't break the main loop + except Exception as e: print(f"Error: {e}") diff --git a/gateway/run.py b/gateway/run.py index e4a5324ac..7a45be62d 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -6048,12 +6048,13 @@ class GatewayRunner: platform_name = watcher.get("platform", "") chat_id = watcher.get("chat_id", "") thread_id = watcher.get("thread_id", "") + agent_notify = watcher.get("notify_on_complete", False) notify_mode = self._load_background_notifications_mode() - logger.debug("Process watcher started: %s (every %ss, notify=%s)", - session_id, interval, notify_mode) + logger.debug("Process watcher started: %s (every %ss, notify=%s, agent_notify=%s)", + session_id, interval, notify_mode, agent_notify) - if notify_mode == "off": + if notify_mode == "off" and not agent_notify: # Still wait for the process to exit so we can log it, but don't # push any messages to the user. while True: @@ -6077,6 +6078,47 @@ class GatewayRunner: last_output_len = current_output_len if session.exited: + # --- Agent-triggered completion: inject synthetic message --- + if agent_notify: + from tools.ansi_strip import strip_ansi + _out = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else "" + synth_text = ( + f"[SYSTEM: Background process {session_id} completed " + f"(exit code {session.exit_code}).\n" + f"Command: {session.command}\n" + f"Output:\n{_out}]" + ) + adapter = None + for p, a in self.adapters.items(): + if p.value == platform_name: + adapter = a + break + if adapter and chat_id: + try: + from gateway.platforms.base import MessageEvent, MessageType + from gateway.session import SessionSource + from gateway.config import Platform + _platform_enum = Platform(platform_name) + _source = SessionSource( + platform=_platform_enum, + chat_id=chat_id, + thread_id=thread_id or None, + ) + synth_event = MessageEvent( + text=synth_text, + message_type=MessageType.TEXT, + source=_source, + ) + logger.info( + "Process %s finished — injecting agent notification for session %s", + session_id, session_key, + ) + await adapter.handle_message(synth_event) + except Exception as e: + logger.error("Agent notify injection error: %s", e) + break + + # --- Normal text-only notification --- # Decide whether to notify based on mode should_notify = ( notify_mode in ("all", "result") @@ -6101,8 +6143,9 @@ class GatewayRunner: logger.error("Watcher delivery error: %s", e) break - elif has_new_output and notify_mode == "all": + elif has_new_output and notify_mode == "all" and not agent_notify: # New output available -- deliver status update (only in "all" mode) + # Skip periodic updates for agent_notify watchers (they only care about completion) new_output = session.output_buffer[-500:] if session.output_buffer else "" message_text = ( f"[Background process {session_id} is still running~ " diff --git a/tests/tools/test_notify_on_complete.py b/tests/tools/test_notify_on_complete.py new file mode 100644 index 000000000..888721906 --- /dev/null +++ b/tests/tools/test_notify_on_complete.py @@ -0,0 +1,247 @@ +"""Tests for notify_on_complete background process feature. + +Covers: + - ProcessSession.notify_on_complete field + - ProcessRegistry.completion_queue population on _move_to_finished() + - Checkpoint persistence of notify_on_complete + - Terminal tool schema includes notify_on_complete + - Terminal tool handler passes notify_on_complete through +""" + +import json +import os +import queue +import time +import pytest +from pathlib import Path +from unittest.mock import MagicMock, patch + +from tools.process_registry import ( + ProcessRegistry, + ProcessSession, +) + + +@pytest.fixture() +def registry(): + """Create a fresh ProcessRegistry.""" + return ProcessRegistry() + + +def _make_session( + sid="proc_test_notify", + command="echo hello", + task_id="t1", + exited=False, + exit_code=None, + output="", + notify_on_complete=False, +) -> ProcessSession: + s = ProcessSession( + id=sid, + command=command, + task_id=task_id, + started_at=time.time(), + exited=exited, + exit_code=exit_code, + output_buffer=output, + notify_on_complete=notify_on_complete, + ) + return s + + +# ========================================================================= +# ProcessSession field +# ========================================================================= + +class TestProcessSessionField: + def test_default_false(self): + s = ProcessSession(id="proc_1", command="echo hi") + assert s.notify_on_complete is False + + def test_set_true(self): + s = ProcessSession(id="proc_1", command="echo hi", notify_on_complete=True) + assert s.notify_on_complete is True + + +# ========================================================================= +# Completion queue +# ========================================================================= + +class TestCompletionQueue: + def test_queue_exists(self, registry): + assert hasattr(registry, "completion_queue") + assert registry.completion_queue.empty() + + def test_move_to_finished_no_notify(self, registry): + """Processes without notify_on_complete don't enqueue.""" + s = _make_session(notify_on_complete=False, output="done") + s.exited = True + s.exit_code = 0 + registry._running[s.id] = s + with patch.object(registry, "_write_checkpoint"): + registry._move_to_finished(s) + assert registry.completion_queue.empty() + + def test_move_to_finished_with_notify(self, registry): + """Processes with notify_on_complete push to queue.""" + s = _make_session( + notify_on_complete=True, + output="build succeeded", + exit_code=0, + ) + s.exited = True + s.exit_code = 0 + registry._running[s.id] = s + with patch.object(registry, "_write_checkpoint"): + registry._move_to_finished(s) + + assert not registry.completion_queue.empty() + completion = registry.completion_queue.get_nowait() + assert completion["session_id"] == s.id + assert completion["command"] == "echo hello" + assert completion["exit_code"] == 0 + assert "build succeeded" in completion["output"] + + def test_move_to_finished_nonzero_exit(self, registry): + """Nonzero exit codes are captured correctly.""" + s = _make_session( + notify_on_complete=True, + output="FAILED", + exit_code=1, + ) + s.exited = True + s.exit_code = 1 + registry._running[s.id] = s + with patch.object(registry, "_write_checkpoint"): + registry._move_to_finished(s) + + completion = registry.completion_queue.get_nowait() + assert completion["exit_code"] == 1 + assert "FAILED" in completion["output"] + + def test_output_truncated_to_2000(self, registry): + """Long output is truncated to last 2000 chars.""" + long_output = "x" * 5000 + s = _make_session( + notify_on_complete=True, + output=long_output, + ) + s.exited = True + s.exit_code = 0 + registry._running[s.id] = s + with patch.object(registry, "_write_checkpoint"): + registry._move_to_finished(s) + + completion = registry.completion_queue.get_nowait() + assert len(completion["output"]) == 2000 + + def test_multiple_completions_queued(self, registry): + """Multiple notify processes all push to the same queue.""" + for i in range(3): + s = _make_session( + sid=f"proc_{i}", + notify_on_complete=True, + output=f"output_{i}", + ) + s.exited = True + s.exit_code = 0 + registry._running[s.id] = s + with patch.object(registry, "_write_checkpoint"): + registry._move_to_finished(s) + + completions = [] + while not registry.completion_queue.empty(): + completions.append(registry.completion_queue.get_nowait()) + assert len(completions) == 3 + ids = {c["session_id"] for c in completions} + assert ids == {"proc_0", "proc_1", "proc_2"} + + +# ========================================================================= +# Checkpoint persistence +# ========================================================================= + +class TestCheckpointNotify: + def test_checkpoint_includes_notify(self, registry, tmp_path): + with patch("tools.process_registry.CHECKPOINT_PATH", tmp_path / "procs.json"): + s = _make_session(notify_on_complete=True) + registry._running[s.id] = s + registry._write_checkpoint() + + data = json.loads((tmp_path / "procs.json").read_text()) + assert len(data) == 1 + assert data[0]["notify_on_complete"] is True + + def test_checkpoint_without_notify(self, registry, tmp_path): + with patch("tools.process_registry.CHECKPOINT_PATH", tmp_path / "procs.json"): + s = _make_session(notify_on_complete=False) + registry._running[s.id] = s + registry._write_checkpoint() + + data = json.loads((tmp_path / "procs.json").read_text()) + assert data[0]["notify_on_complete"] is False + + def test_recover_preserves_notify(self, registry, tmp_path): + checkpoint = tmp_path / "procs.json" + checkpoint.write_text(json.dumps([{ + "session_id": "proc_live", + "command": "sleep 999", + "pid": os.getpid(), + "task_id": "t1", + "notify_on_complete": True, + }])) + with patch("tools.process_registry.CHECKPOINT_PATH", checkpoint): + recovered = registry.recover_from_checkpoint() + assert recovered == 1 + s = registry.get("proc_live") + assert s.notify_on_complete is True + + def test_recover_defaults_false(self, registry, tmp_path): + """Old checkpoint entries without the field default to False.""" + checkpoint = tmp_path / "procs.json" + checkpoint.write_text(json.dumps([{ + "session_id": "proc_live", + "command": "sleep 999", + "pid": os.getpid(), + "task_id": "t1", + }])) + with patch("tools.process_registry.CHECKPOINT_PATH", checkpoint): + recovered = registry.recover_from_checkpoint() + assert recovered == 1 + s = registry.get("proc_live") + assert s.notify_on_complete is False + + +# ========================================================================= +# Terminal tool schema +# ========================================================================= + +class TestTerminalSchema: + def test_schema_has_notify_on_complete(self): + from tools.terminal_tool import TERMINAL_SCHEMA + props = TERMINAL_SCHEMA["parameters"]["properties"] + assert "notify_on_complete" in props + assert props["notify_on_complete"]["type"] == "boolean" + assert props["notify_on_complete"]["default"] is False + + def test_handler_passes_notify(self): + """_handle_terminal passes notify_on_complete to terminal_tool.""" + from tools.terminal_tool import _handle_terminal + with patch("tools.terminal_tool.terminal_tool", return_value='{"ok":true}') as mock_tt: + _handle_terminal( + {"command": "echo hi", "background": True, "notify_on_complete": True}, + task_id="t1", + ) + _, kwargs = mock_tt.call_args + assert kwargs["notify_on_complete"] is True + + +# ========================================================================= +# Code execution blocked params +# ========================================================================= + +class TestCodeExecutionBlocked: + def test_notify_on_complete_blocked_in_sandbox(self): + from tools.code_execution_tool import _TERMINAL_BLOCKED_PARAMS + assert "notify_on_complete" in _TERMINAL_BLOCKED_PARAMS diff --git a/tools/code_execution_tool.py b/tools/code_execution_tool.py index ff5c7f7fe..5c4658b6f 100644 --- a/tools/code_execution_tool.py +++ b/tools/code_execution_tool.py @@ -300,7 +300,7 @@ def _call(tool_name, args): # --------------------------------------------------------------------------- # Terminal parameters that must not be used from ephemeral sandbox scripts -_TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty"} +_TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty", "notify_on_complete"} def _rpc_server_loop( diff --git a/tools/process_registry.py b/tools/process_registry.py index a3796c8ae..f5ac9543f 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -81,6 +81,7 @@ class ProcessSession: watcher_chat_id: str = "" watcher_thread_id: str = "" watcher_interval: int = 0 # 0 = no watcher configured + notify_on_complete: bool = False # Queue agent notification on exit _lock: threading.Lock = field(default_factory=threading.Lock) _reader_thread: Optional[threading.Thread] = field(default=None, repr=False) _pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True) @@ -112,6 +113,12 @@ class ProcessRegistry: # Side-channel for check_interval watchers (gateway reads after agent run) self.pending_watchers: List[Dict[str, Any]] = [] + # Completion notifications — processes with notify_on_complete push here + # on exit. CLI process_loop and gateway drain this after each agent turn + # to auto-trigger a new agent turn with the process results. + import queue as _queue_mod + self.completion_queue: _queue_mod.Queue = _queue_mod.Queue() + @staticmethod def _clean_shell_noise(text: str) -> str: """Strip shell startup warnings from the beginning of output.""" @@ -415,6 +422,18 @@ class ProcessRegistry: self._finished[session.id] = session self._write_checkpoint() + # If the caller requested agent notification, enqueue the completion + # so the CLI/gateway can auto-trigger a new agent turn. + if session.notify_on_complete: + from tools.ansi_strip import strip_ansi + output_tail = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else "" + self.completion_queue.put({ + "session_id": session.id, + "command": session.command, + "exit_code": session.exit_code, + "output": output_tail, + }) + # ----- Query Methods ----- def get(self, session_id: str) -> Optional[ProcessSession]: @@ -721,6 +740,7 @@ class ProcessRegistry: "watcher_chat_id": s.watcher_chat_id, "watcher_thread_id": s.watcher_thread_id, "watcher_interval": s.watcher_interval, + "notify_on_complete": s.notify_on_complete, }) # Atomic write to avoid corruption on crash @@ -771,6 +791,7 @@ class ProcessRegistry: watcher_chat_id=entry.get("watcher_chat_id", ""), watcher_thread_id=entry.get("watcher_thread_id", ""), watcher_interval=entry.get("watcher_interval", 0), + notify_on_complete=entry.get("notify_on_complete", False), ) with self._lock: self._running[session.id] = session diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index be565f196..305d08011 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -421,9 +421,11 @@ Do NOT use sed/awk to edit files — use patch instead. Do NOT use echo/cat heredoc to create files — use write_file instead. Reserve terminal for: builds, installs, git, processes, scripts, network, package managers, and anything that needs a shell. -Foreground (default): Commands return INSTANTLY when done, even if the timeout is high. Set timeout=300 for long builds/scripts — you'll still get the result in seconds if it's fast. Prefer foreground for everything that finishes. -Background: ONLY for long-running servers, watchers, or processes that never exit. Set background=true to get a session_id, then use process(action="wait") to block until done — it returns instantly on completion, same as foreground. Use process(action="poll") only when you need a progress check without blocking. -Do NOT use background for scripts, builds, or installs — foreground with a generous timeout is always better (fewer tool calls, instant results). +Foreground (default): Commands return INSTANTLY when done, even if the timeout is high. Set timeout=300 for long builds/scripts — you'll still get the result in seconds if it's fast. Prefer foreground for short commands. +Background: Set background=true to get a session_id. Two patterns: + (1) Long-lived processes that never exit (servers, watchers). + (2) Long-running tasks with notify_on_complete=true — you can keep working on other things and the system auto-notifies you when the task finishes. Great for test suites, builds, deployments, or anything that takes more than a minute. +Use process(action="poll") for progress checks, process(action="wait") to block until done. Working directory: Use 'workdir' for per-command cwd. PTY mode: Set pty=true for interactive CLI tools (Codex, Claude Code, Python REPL). @@ -1009,6 +1011,7 @@ def terminal_tool( workdir: Optional[str] = None, check_interval: Optional[int] = None, pty: bool = False, + notify_on_complete: bool = False, ) -> str: """ Execute a command in the configured terminal environment. @@ -1022,6 +1025,7 @@ def terminal_tool( workdir: Working directory for this command (optional, uses session cwd if not set) check_interval: Seconds between auto-checks for background processes (gateway only, min 30) pty: If True, use pseudo-terminal for interactive CLI tools (local backend only) + notify_on_complete: If True and background=True, auto-notify the agent when the process exits Returns: str: JSON string with output, exit_code, and error fields @@ -1254,6 +1258,32 @@ def terminal_tool( f"configured limit of {max_timeout}s" ) + # Mark for agent notification on completion + if notify_on_complete and background: + proc_session.notify_on_complete = True + result_data["notify_on_complete"] = True + + # In gateway mode, auto-register a fast watcher so the + # gateway can detect completion and trigger a new agent + # turn. CLI mode uses the completion_queue directly. + _gw_platform = os.getenv("HERMES_SESSION_PLATFORM", "") + if _gw_platform and not check_interval: + _gw_chat_id = os.getenv("HERMES_SESSION_CHAT_ID", "") + _gw_thread_id = os.getenv("HERMES_SESSION_THREAD_ID", "") + proc_session.watcher_platform = _gw_platform + proc_session.watcher_chat_id = _gw_chat_id + proc_session.watcher_thread_id = _gw_thread_id + proc_session.watcher_interval = 5 + process_registry.pending_watchers.append({ + "session_id": proc_session.id, + "check_interval": 5, + "session_key": session_key, + "platform": _gw_platform, + "chat_id": _gw_chat_id, + "thread_id": _gw_thread_id, + "notify_on_complete": True, + }) + # Register check_interval watcher (gateway picks this up after agent run) if check_interval and background: effective_interval = max(30, check_interval) @@ -1550,7 +1580,7 @@ TERMINAL_SCHEMA = { }, "background": { "type": "boolean", - "description": "ONLY for servers/watchers that never exit. For scripts, builds, installs — use foreground with timeout instead (it returns instantly when done).", + "description": "Run the command in the background. Two patterns: (1) Long-lived processes that never exit (servers, watchers). (2) Long-running tasks paired with notify_on_complete=true — you can keep working and get notified when the task finishes. For short commands, prefer foreground with a generous timeout instead.", "default": False }, "timeout": { @@ -1571,6 +1601,11 @@ TERMINAL_SCHEMA = { "type": "boolean", "description": "Run in pseudo-terminal (PTY) mode for interactive CLI tools like Codex, Claude Code, or Python REPL. Only works with local and SSH backends. Default: false.", "default": False + }, + "notify_on_complete": { + "type": "boolean", + "description": "When true (and background=true), you'll be automatically notified when the process finishes — no polling needed. Use this for tasks that take a while (tests, builds, deployments) so you can keep working on other things in the meantime.", + "default": False } }, "required": ["command"] @@ -1587,6 +1622,7 @@ def _handle_terminal(args, **kw): workdir=args.get("workdir"), check_interval=args.get("check_interval"), pty=args.get("pty", False), + notify_on_complete=args.get("notify_on_complete", False), )