diff --git a/cron/__init__.py b/cron/__init__.py index d2ea5cd86..15e7b1501 100644 --- a/cron/__init__.py +++ b/cron/__init__.py @@ -26,7 +26,7 @@ from cron.jobs import ( trigger_job, JOBS_FILE, ) -from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS +from cron.scheduler import tick __all__ = [ "create_job", @@ -39,6 +39,4 @@ __all__ = [ "trigger_job", "tick", "JOBS_FILE", - "ModelContextError", - "CRON_MIN_CONTEXT_TOKENS", ] diff --git a/cron/scheduler.py b/cron/scheduler.py index 876000a89..409ecb847 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -186,7 +186,14 @@ _SCRIPT_FAILURE_PHRASES = ( "unable to execute", "permission denied", "no such file", + "no such file or directory", + "command not found", + "hermes binary not found", + "hermes not found", "traceback", + "ssh: connect to host", + "connection timed out", + "host key verification failed", ) diff --git a/cron/ssh_dispatch.py b/cron/ssh_dispatch.py new file mode 100644 index 000000000..57a0f6fee --- /dev/null +++ b/cron/ssh_dispatch.py @@ -0,0 +1,192 @@ +"""SSH dispatch utilities for VPS agent operations. + +Provides validated SSH execution with proper failure detection. +Used by cron jobs that dispatch work to remote VPS agents. +""" + +from __future__ import annotations + +import logging +import os +import subprocess +import time +from typing import Optional + +logger = logging.getLogger(__name__) + +_SSH_TIMEOUT = int(os.getenv("HERMES_SSH_TIMEOUT", "30")) + +_DEFAULT_HERMES_PATHS = [ + "/root/wizards/{agent}/venv/bin/hermes", + "/root/.local/bin/hermes", + "/usr/local/bin/hermes", + "~/.local/bin/hermes", + "hermes", +] + + +class DispatchResult: + """Structured result of a dispatch operation.""" + + __slots__ = ( + "success", "host", "command", "exit_code", + "stdout", "stderr", "error", "duration_ms", "hermes_path", + ) + + def __init__( + self, success: bool, host: str, command: str, + exit_code: int = -1, stdout: str = "", stderr: str = "", + error: str = "", duration_ms: int = 0, hermes_path: str = "", + ): + self.success = success + self.host = host + self.command = command + self.exit_code = exit_code + self.stdout = stdout + self.stderr = stderr + self.error = error + self.duration_ms = duration_ms + self.hermes_path = hermes_path + + def to_dict(self) -> dict: + return { + "success": self.success, "host": self.host, + "exit_code": self.exit_code, "error": self.error, + "duration_ms": self.duration_ms, "hermes_path": self.hermes_path, + "stderr_tail": self.stderr[-200:] if self.stderr else "", + } + + @property + def failure_reason(self) -> str: + if self.success: + return "" + if self.error: + return self.error + if "No such file" in self.stderr or "command not found" in self.stderr: + return f"Hermes binary not found on {self.host}" + if self.exit_code != 0: + return f"Remote command exited {self.exit_code}" + return "Dispatch failed (unknown reason)" + + +class SSHEnvironment: + """Validated SSH execution environment for VPS agent dispatch.""" + + def __init__( + self, host: str, agent: str = "", ssh_key: str = "", + ssh_port: int = 22, timeout: int = _SSH_TIMEOUT, + hermes_path: str = "", + ): + self.host = host + self.agent = agent + self.ssh_key = ssh_key + self.ssh_port = ssh_port + self.timeout = timeout + self.hermes_path = hermes_path + self._validated_path: str = "" + + def _ssh_base_cmd(self) -> list[str]: + cmd = ["ssh", "-o", "StrictHostKeyChecking=accept-new"] + cmd.extend(["-o", "ConnectTimeout=10", "-o", "BatchMode=yes"]) + if self.ssh_key: + cmd.extend(["-i", self.ssh_key]) + if self.ssh_port != 22: + cmd.extend(["-p", str(self.ssh_port)]) + cmd.append(self.host) + return cmd + + def _resolve_hermes_paths(self) -> list[str]: + if self.hermes_path: + return [self.hermes_path] + return [t.format(agent=self.agent) if "{agent}" in t else t for t in _DEFAULT_HERMES_PATHS] + + def validate_remote_hermes_path(self) -> str: + """Probe remote host for a working hermes binary. Returns path or raises.""" + if self._validated_path: + return self._validated_path + for path in self._resolve_hermes_paths(): + try: + result = subprocess.run( + self._ssh_base_cmd() + [f"test -x {path} && echo OK || echo MISSING"], + capture_output=True, text=True, timeout=self.timeout, + ) + if result.returncode == 0 and "OK" in (result.stdout or ""): + logger.info("SSH %s: hermes validated at %s", self.host, path) + self._validated_path = path + return path + except subprocess.TimeoutExpired: + logger.warning("SSH %s: timeout probing %s", self.host, path) + except Exception as exc: + logger.debug("SSH %s: probe %s failed: %s", self.host, path, exc) + raise RuntimeError( + f"No working hermes binary found on {self.host}. " + f"Checked: {', '.join(self._resolve_hermes_paths())}." + ) + + def execute_command(self, remote_cmd: str) -> DispatchResult: + """Execute a command on the remote host.""" + t0 = time.monotonic() + try: + result = subprocess.run( + self._ssh_base_cmd() + [remote_cmd], + capture_output=True, text=True, timeout=self.timeout, + ) + elapsed = int((time.monotonic() - t0) * 1000) + stderr = (result.stderr or "").strip() + stdout = (result.stdout or "").strip() + if result.returncode != 0: + return DispatchResult( + success=False, host=self.host, command=remote_cmd, + exit_code=result.returncode, stdout=stdout, stderr=stderr, + error=stderr.split("\n")[0] if stderr else f"exit code {result.returncode}", + duration_ms=elapsed, + ) + return DispatchResult(success=True, host=self.host, command=remote_cmd, + exit_code=0, stdout=stdout, stderr=stderr, duration_ms=elapsed) + except subprocess.TimeoutExpired: + return DispatchResult(success=False, host=self.host, command=remote_cmd, + error=f"SSH timed out after {self.timeout}s", + duration_ms=int((time.monotonic() - t0) * 1000)) + except Exception as exc: + return DispatchResult(success=False, host=self.host, command=remote_cmd, + error=str(exc), duration_ms=int((time.monotonic() - t0) * 1000)) + + def dispatch(self, hermes_args: str, validate: bool = True) -> DispatchResult: + """Dispatch a hermes command. Only success=True if command actually ran.""" + if validate: + try: + hermes_path = self.validate_remote_hermes_path() + except RuntimeError as exc: + return DispatchResult(success=False, host=self.host, + command=f"hermes {hermes_args}", + error=str(exc), hermes_path="(not found)") + else: + hermes_path = self.hermes_path or "hermes" + result = self.execute_command(f"{hermes_path} {hermes_args}") + result.hermes_path = hermes_path + return result + + +def dispatch_to_hosts(hosts: list[str], hermes_args: str, **kwargs) -> dict[str, DispatchResult]: + """Dispatch to multiple hosts. Returns host -> DispatchResult.""" + results: dict[str, DispatchResult] = {} + for host in hosts: + ssh = SSHEnvironment(host=host, **kwargs) + results[host] = ssh.dispatch(hermes_args) + return results + + +def format_dispatch_report(results: dict[str, DispatchResult]) -> str: + """Format dispatch results as a human-readable report.""" + ok = [r for r in results.values() if r.success] + failed = [r for r in results.values() if not r.success] + lines = [f"Dispatch report: {len(ok)} OK, {len(failed)} failed", ""] + for host, r in results.items(): + s = "OK" if r.success else f"FAILED -- {r.failure_reason}" + lines.append(f" {host}: {s}" + (f" ({r.duration_ms}ms)" if r.duration_ms else "")) + if failed: + lines += ["", "Failed dispatches:"] + for host, r in results.items(): + if not r.success: + lines.append(f" {host}: {r.failure_reason}") + return "\n".join(lines)