diff --git a/cron/scheduler.py b/cron/scheduler.py index 876000a89..409ecb847 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -186,7 +186,14 @@ _SCRIPT_FAILURE_PHRASES = ( "unable to execute", "permission denied", "no such file", + "no such file or directory", + "command not found", + "hermes binary not found", + "hermes not found", "traceback", + "ssh: connect to host", + "connection timed out", + "host key verification failed", ) diff --git a/cron/ssh_dispatch.py b/cron/ssh_dispatch.py new file mode 100644 index 000000000..7aa6b79ba --- /dev/null +++ b/cron/ssh_dispatch.py @@ -0,0 +1,275 @@ +"""SSH dispatch utilities for VPS agent operations. + +Provides validated SSH execution with proper failure detection. +Used by cron jobs that dispatch work to remote VPS agents. + +Key classes: + SSHEnvironment: Executes commands on remote hosts with validation + DispatchResult: Structured result with success/failure status +""" + +from __future__ import annotations + +import logging +import os +import subprocess +import time +from typing import Optional + +logger = logging.getLogger(__name__) + +_SSH_TIMEOUT = int(os.getenv("HERMES_SSH_TIMEOUT", "30")) + +_DEFAULT_HERMES_PATHS = [ + "/root/wizards/{agent}/venv/bin/hermes", + "/root/.local/bin/hermes", + "/usr/local/bin/hermes", + "~/.local/bin/hermes", + "hermes", +] + + +class DispatchResult: + """Structured result of a dispatch operation.""" + + __slots__ = ( + "success", "host", "command", "exit_code", + "stdout", "stderr", "error", "duration_ms", "hermes_path", + ) + + def __init__( + self, + success: bool, + host: str, + command: str, + exit_code: int = -1, + stdout: str = "", + stderr: str = "", + error: str = "", + duration_ms: int = 0, + hermes_path: str = "", + ): + self.success = success + self.host = host + self.command = command + self.exit_code = exit_code + self.stdout = stdout + self.stderr = stderr + self.error = error + self.duration_ms = duration_ms + self.hermes_path = hermes_path + + def to_dict(self) -> dict: + return { + "success": self.success, + "host": self.host, + "exit_code": self.exit_code, + "error": self.error, + "duration_ms": self.duration_ms, + "hermes_path": self.hermes_path, + "stderr_tail": self.stderr[-200:] if self.stderr else "", + } + + @property + def failure_reason(self) -> str: + if self.success: + return "" + if self.error: + return self.error + if "No such file" in self.stderr or "command not found" in self.stderr: + return f"Hermes binary not found on {self.host}" + if self.exit_code != 0: + return f"Remote command exited {self.exit_code}" + return "Dispatch failed (unknown reason)" + + +class SSHEnvironment: + """Validated SSH execution environment for VPS agent dispatch. + + Validates remote hermes binary paths before dispatching and returns + structured results so callers can distinguish success from failure. + """ + + def __init__( + self, + host: str, + agent: str = "", + ssh_key: str = "", + ssh_port: int = 22, + timeout: int = _SSH_TIMEOUT, + hermes_path: str = "", + ): + self.host = host + self.agent = agent + self.ssh_key = ssh_key + self.ssh_port = ssh_port + self.timeout = timeout + self.hermes_path = hermes_path + self._validated_path: str = "" + + def _ssh_base_cmd(self) -> list[str]: + cmd = ["ssh", "-o", "StrictHostKeyChecking=accept-new"] + cmd.extend(["-o", "ConnectTimeout=10"]) + cmd.extend(["-o", "BatchMode=yes"]) + if self.ssh_key: + cmd.extend(["-i", self.ssh_key]) + if self.ssh_port != 22: + cmd.extend(["-p", str(self.ssh_port)]) + cmd.append(self.host) + return cmd + + def _resolve_hermes_paths(self) -> list[str]: + if self.hermes_path: + return [self.hermes_path] + paths = [] + for tmpl in _DEFAULT_HERMES_PATHS: + path = tmpl.format(agent=self.agent) if "{agent}" in tmpl else tmpl + paths.append(path) + return paths + + def validate_remote_hermes_path(self) -> str: + """Probe the remote host for a working hermes binary. + + Returns the validated path on success, raises RuntimeError on failure. + Caches the result so validation is only done once per instance. + """ + if self._validated_path: + return self._validated_path + + candidates = self._resolve_hermes_paths() + for path in candidates: + test_cmd = f"test -x {path} && echo OK || echo MISSING" + try: + result = subprocess.run( + self._ssh_base_cmd() + [test_cmd], + capture_output=True, text=True, timeout=self.timeout, + ) + if result.returncode == 0 and "OK" in (result.stdout or ""): + logger.info("SSH %s: hermes validated at %s", self.host, path) + self._validated_path = path + return path + except subprocess.TimeoutExpired: + logger.warning("SSH %s: timeout probing %s", self.host, path) + continue + except Exception as exc: + logger.debug("SSH %s: probe %s failed: %s", self.host, path, exc) + continue + + raise RuntimeError( + f"No working hermes binary found on {self.host}. " + f"Checked: {', '.join(candidates)}." + ) + + def execute_command(self, remote_cmd: str) -> DispatchResult: + """Execute a command on the remote host. Returns DispatchResult.""" + t0 = time.monotonic() + full_cmd = self._ssh_base_cmd() + [remote_cmd] + try: + result = subprocess.run( + full_cmd, capture_output=True, text=True, timeout=self.timeout, + ) + elapsed = int((time.monotonic() - t0) * 1000) + stderr = (result.stderr or "").strip() + stdout = (result.stdout or "").strip() + + if result.returncode != 0: + return DispatchResult( + success=False, host=self.host, command=remote_cmd, + exit_code=result.returncode, stdout=stdout, stderr=stderr, + error=stderr.split("\n")[0] if stderr else f"exit code {result.returncode}", + duration_ms=elapsed, + ) + return DispatchResult( + success=True, host=self.host, command=remote_cmd, + exit_code=0, stdout=stdout, stderr=stderr, duration_ms=elapsed, + ) + except subprocess.TimeoutExpired: + elapsed = int((time.monotonic() - t0) * 1000) + return DispatchResult( + success=False, host=self.host, command=remote_cmd, + error=f"SSH timed out after {self.timeout}s", duration_ms=elapsed, + ) + except Exception as exc: + elapsed = int((time.monotonic() - t0) * 1000) + return DispatchResult( + success=False, host=self.host, command=remote_cmd, + error=str(exc), duration_ms=elapsed, + ) + + def dispatch(self, hermes_args: str, validate: bool = True) -> DispatchResult: + """Dispatch a hermes command on the remote host. + + Args: + hermes_args: Arguments to pass to hermes (e.g. "cron tick"). + validate: If True, validate the hermes binary exists first. + + Returns DispatchResult. Only success=True if command actually ran. + """ + if validate: + try: + hermes_path = self.validate_remote_hermes_path() + except RuntimeError as exc: + return DispatchResult( + success=False, host=self.host, + command=f"hermes {hermes_args}", + error=str(exc), hermes_path="(not found)", + ) + else: + hermes_path = self.hermes_path or "hermes" + + remote_cmd = f"{hermes_path} {hermes_args}" + result = self.execute_command(remote_cmd) + result.hermes_path = hermes_path + return result + + +def dispatch_to_hosts( + hosts: list[str], + hermes_args: str, + agent: str = "", + ssh_key: str = "", + ssh_port: int = 22, + timeout: int = _SSH_TIMEOUT, +) -> dict[str, DispatchResult]: + """Dispatch a hermes command to multiple hosts. Returns host -> DispatchResult.""" + results: dict[str, DispatchResult] = {} + for host in hosts: + ssh = SSHEnvironment( + host=host, agent=agent, ssh_key=ssh_key, + ssh_port=ssh_port, timeout=timeout, + ) + results[host] = ssh.dispatch(hermes_args) + logger.info( + "Dispatch %s: %s", host, + "OK" if results[host].success else results[host].failure_reason, + ) + return results + + +def format_dispatch_report(results: dict[str, DispatchResult]) -> str: + """Format dispatch results as a human-readable report.""" + lines = [] + ok = [r for r in results.values() if r.success] + failed = [r for r in results.values() if not r.success] + + lines.append(f"Dispatch report: {len(ok)} OK, {len(failed)} failed") + lines.append("") + for host, result in results.items(): + status = "OK" if result.success else "FAILED" + line = f" {host}: {status}" + if not result.success: + line += f" -- {result.failure_reason}" + if result.duration_ms: + line += f" ({result.duration_ms}ms)" + lines.append(line) + + if failed: + lines.append("") + lines.append("Failed dispatches:") + for host, result in results.items(): + if not result.success: + lines.append(f" {host}: {result.failure_reason}") + if result.stderr: + lines.append(f" stderr: {result.stderr[-150:]}") + + return "\n".join(lines)