Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
c4d757268e fix(cron): SSH dispatch validation + failure phrase detection (#350)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m0s
VPS agent dispatch jobs reported 'OK' while remote hermes binary paths
were broken. The dispatch worker drained the queue without validating
that commands actually ran, and the scheduler's failure detection
missed common SSH error patterns.

Changes:

1. New cron/ssh_dispatch.py — SSH dispatch utilities
   - DispatchResult: structured result with success/failure status,
     exit code, stderr, human-readable failure_reason
   - SSHEnvironment: validates remote hermes binary exists before
     dispatch via SSH probe (test -x), caches validated path,
     executes commands with proper timeout/error handling
   - dispatch_to_hosts(): multi-host dispatch with per-host results
   - format_dispatch_report(): human-readable report of all results

2. cron/scheduler.py — expanded _SCRIPT_FAILURE_PHRASES
   Added SSH-specific patterns that were missing:
   - 'no such file or directory' (exact bash error)
   - 'command not found' (bash fallback)
   - 'hermes binary not found' / 'hermes not found'
   - 'ssh: connect to host' (SSH connection failure)
   - 'connection timed out' (SSH timeout)
   - 'host key verification failed' (key mismatch)
   These are detected by _detect_script_failure() so broken SSH
   dispatches are properly flagged instead of reported as OK.

Closes #350
2026-04-13 21:05:49 -04:00
3 changed files with 425 additions and 0 deletions

View File

@@ -186,7 +186,14 @@ _SCRIPT_FAILURE_PHRASES = (
"unable to execute",
"permission denied",
"no such file",
"no such file or directory",
"command not found",
"hermes binary not found",
"hermes not found",
"traceback",
"ssh: connect to host",
"connection timed out",
"host key verification failed",
)

319
cron/ssh_dispatch.py Normal file
View File

@@ -0,0 +1,319 @@
"""SSH dispatch utilities for VPS agent operations.
Provides validated SSH execution with proper failure detection.
Used by cron jobs that dispatch work to remote VPS agents.
Key classes:
SSHEnvironment: Executes commands on remote hosts with validation
DispatchResult: Structured result with success/failure status
"""
from __future__ import annotations
import json
import logging
import os
import subprocess
import time
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Default timeout for SSH commands (seconds)
_SSH_TIMEOUT = int(os.getenv("HERMES_SSH_TIMEOUT", "30"))
# Default hermes binary paths to probe on remote hosts
_DEFAULT_HERMES_PATHS = [
"/root/wizards/{agent}/venv/bin/hermes",
"/root/.local/bin/hermes",
"/usr/local/bin/hermes",
"~/.local/bin/hermes",
"hermes", # fallback to PATH
]
class DispatchResult:
"""Structured result of a dispatch operation."""
__slots__ = ("success", "host", "command", "exit_code", "stdout", "stderr", "error", "duration_ms", "hermes_path")
def __init__(
self,
success: bool,
host: str,
command: str,
exit_code: int = -1,
stdout: str = "",
stderr: str = "",
error: str = "",
duration_ms: int = 0,
hermes_path: str = "",
):
self.success = success
self.host = host
self.command = command
self.exit_code = exit_code
self.stdout = stdout
self.stderr = stderr
self.error = error
self.duration_ms = duration_ms
self.hermes_path = hermes_path
def to_dict(self) -> dict:
return {
"success": self.success,
"host": self.host,
"exit_code": self.exit_code,
"error": self.error,
"duration_ms": self.duration_ms,
"hermes_path": self.hermes_path,
"stderr_tail": self.stderr[-200:] if self.stderr else "",
}
@property
def failure_reason(self) -> str:
"""Human-readable failure reason."""
if self.success:
return ""
if self.error:
return self.error
if "No such file" in self.stderr or "command not found" in self.stderr:
return f"Hermes binary not found on {self.host}"
if self.exit_code != 0:
return f"Remote command exited {self.exit_code}"
return "Dispatch failed (unknown reason)"
class SSHEnvironment:
"""Validated SSH execution environment for VPS agent dispatch.
Validates remote hermes binary paths before dispatching and returns
structured results so callers can distinguish success from failure.
Usage:
ssh = SSHEnvironment(host="root@ezra", agent="allegro")
result = ssh.dispatch("--help")
if not result.success:
logger.error("Dispatch failed: %s", result.failure_reason)
"""
def __init__(
self,
host: str,
agent: str = "",
ssh_key: str = "",
ssh_port: int = 22,
timeout: int = _SSH_TIMEOUT,
hermes_path: str = "",
):
self.host = host
self.agent = agent
self.ssh_key = ssh_key
self.ssh_port = ssh_port
self.timeout = timeout
self.hermes_path = hermes_path
self._validated_path: str = ""
def _ssh_base_cmd(self) -> list[str]:
"""Build the base SSH command."""
cmd = ["ssh", "-o", "StrictHostKeyChecking=accept-new"]
cmd.extend(["-o", "ConnectTimeout=10"])
cmd.extend(["-o", "BatchMode=yes"]) # fail fast on auth issues
if self.ssh_key:
cmd.extend(["-i", self.ssh_key])
if self.ssh_port != 22:
cmd.extend(["-p", str(self.ssh_port)])
cmd.append(self.host)
return cmd
def _resolve_hermes_paths(self) -> list[str]:
"""Return candidate hermes binary paths for the remote host."""
if self.hermes_path:
return [self.hermes_path]
paths = []
for tmpl in _DEFAULT_HERMES_PATHS:
path = tmpl.format(agent=self.agent) if "{agent}" in tmpl else tmpl
paths.append(path)
return paths
def validate_remote_hermes_path(self) -> str:
"""Probe the remote host for a working hermes binary.
Returns the validated path on success, raises RuntimeError on failure.
Caches the result so validation is only done once per instance.
"""
if self._validated_path:
return self._validated_path
candidates = self._resolve_hermes_paths()
for path in candidates:
# Test: does the binary exist and is it executable?
test_cmd = f"test -x {path} && echo OK || echo MISSING"
try:
result = subprocess.run(
self._ssh_base_cmd() + [test_cmd],
capture_output=True, text=True, timeout=self.timeout,
)
if result.returncode == 0 and "OK" in (result.stdout or ""):
logger.info(
"SSH %s: hermes binary validated at %s", self.host, path,
)
self._validated_path = path
return path
except subprocess.TimeoutExpired:
logger.warning("SSH %s: timeout probing %s", self.host, path)
continue
except Exception as exc:
logger.debug("SSH %s: probe %s failed: %s", self.host, path, exc)
continue
raise RuntimeError(
f"No working hermes binary found on {self.host}. "
f"Checked: {', '.join(candidates)}. "
f"Install hermes or set HERMES_PATH on the remote host."
)
def execute_command(self, remote_cmd: str) -> DispatchResult:
"""Execute a command on the remote host.
Returns a DispatchResult with success/failure status.
"""
t0 = time.monotonic()
full_cmd = self._ssh_base_cmd() + [remote_cmd]
try:
result = subprocess.run(
full_cmd,
capture_output=True, text=True, timeout=self.timeout,
)
elapsed = int((time.monotonic() - t0) * 1000)
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
if result.returncode != 0:
return DispatchResult(
success=False,
host=self.host,
command=remote_cmd,
exit_code=result.returncode,
stdout=stdout,
stderr=stderr,
error=stderr.split("\n")[0] if stderr else f"exit code {result.returncode}",
duration_ms=elapsed,
)
return DispatchResult(
success=True,
host=self.host,
command=remote_cmd,
exit_code=0,
stdout=stdout,
stderr=stderr,
duration_ms=elapsed,
)
except subprocess.TimeoutExpired:
elapsed = int((time.monotonic() - t0) * 1000)
return DispatchResult(
success=False,
host=self.host,
command=remote_cmd,
error=f"SSH timed out after {self.timeout}s",
duration_ms=elapsed,
)
except Exception as exc:
elapsed = int((time.monotonic() - t0) * 1000)
return DispatchResult(
success=False,
host=self.host,
command=remote_cmd,
error=str(exc),
duration_ms=elapsed,
)
def dispatch(self, hermes_args: str, validate: bool = True) -> DispatchResult:
"""Dispatch a hermes command on the remote host.
Args:
hermes_args: Arguments to pass to hermes (e.g. "cron tick").
validate: If True, validate the hermes binary exists first.
Returns a DispatchResult. Only success=True if the command actually ran.
"""
if validate:
try:
hermes_path = self.validate_remote_hermes_path()
except RuntimeError as exc:
return DispatchResult(
success=False,
host=self.host,
command=f"hermes {hermes_args}",
error=str(exc),
hermes_path="(not found)",
)
else:
hermes_path = self.hermes_path or "hermes"
remote_cmd = f"{hermes_path} {hermes_args}"
result = self.execute_command(remote_cmd)
result.hermes_path = hermes_path
return result
def dispatch_to_hosts(
hosts: list[str],
hermes_args: str,
agent: str = "",
ssh_key: str = "",
ssh_port: int = 22,
timeout: int = _SSH_TIMEOUT,
) -> dict[str, DispatchResult]:
"""Dispatch a hermes command to multiple hosts.
Returns a dict mapping host -> DispatchResult.
Failed dispatches have success=False in the result.
"""
results: dict[str, DispatchResult] = {}
for host in hosts:
ssh = SSHEnvironment(
host=host,
agent=agent,
ssh_key=ssh_key,
ssh_port=ssh_port,
timeout=timeout,
)
results[host] = ssh.dispatch(hermes_args)
logger.info(
"Dispatch %s: %s",
host,
"OK" if results[host].success else results[host].failure_reason,
)
return results
def format_dispatch_report(results: dict[str, DispatchResult]) -> str:
"""Format dispatch results as a human-readable report."""
lines = []
ok = [r for r in results.values() if r.success]
failed = [r for r in results.values() if not r.success]
lines.append(f"Dispatch report: {len(ok)} OK, {len(failed)} failed")
lines.append("")
for host, result in results.items():
status = "OK" if result.success else "FAILED"
line = f" {host}: {status}"
if not result.success:
line += f"{result.failure_reason}"
if result.duration_ms:
line += f" ({result.duration_ms}ms)"
lines.append(line)
if failed:
lines.append("")
lines.append("Failed dispatches:")
for host, result in results.items():
if not result.success:
lines.append(f" {host}: {result.failure_reason}")
if result.stderr:
lines.append(f" stderr: {result.stderr[-150:]}")
return "\n".join(lines)

View File

@@ -0,0 +1,99 @@
"""Tests for cron SSH dispatch utilities."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
def _import_dispatch():
import importlib.util
spec = importlib.util.spec_from_file_location(
"cron.ssh_dispatch",
str(Path(__file__).resolve().parent.parent / "cron" / "ssh_dispatch.py"),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_mod = _import_dispatch()
DispatchResult = _mod.DispatchResult
SSHEnvironment = _mod.SSHEnvironment
format_dispatch_report = _mod.format_dispatch_report
class TestDispatchResult:
def test_success_result(self):
r = DispatchResult(success=True, host="root@ezra", command="hermes cron tick")
assert r.success
assert r.failure_reason == ""
def test_failure_with_no_such_file(self):
r = DispatchResult(
success=False, host="root@ezra", command="hermes cron tick",
stderr="bash: line 1: /root/wizards/allegro/venv/bin/hermes: No such file or directory",
)
assert not r.success
assert "not found" in r.failure_reason.lower() or "No such file" in r.stderr
def test_failure_with_timeout(self):
r = DispatchResult(
success=False, host="root@ezra", command="hermes cron tick",
error="SSH timed out after 30s",
)
assert not r.success
assert "timed out" in r.failure_reason
def test_to_dict(self):
r = DispatchResult(success=False, host="root@ezra", command="hermes cron tick", exit_code=127, stderr="command not found")
d = r.to_dict()
assert d["success"] is False
assert d["exit_code"] == 127
class TestFormatDispatchReport:
def test_all_ok(self):
results = {
"root@ezra": DispatchResult(success=True, host="root@ezra", command="hermes cron tick", duration_ms=150),
"root@allegro": DispatchResult(success=True, host="root@allegro", command="hermes cron tick", duration_ms=200),
}
report = format_dispatch_report(results)
assert "2 OK, 0 failed" in report
assert "root@ezra: OK" in report
def test_mixed_results(self):
results = {
"root@ezra": DispatchResult(success=True, host="root@ezra", command="hermes cron tick"),
"root@allegro": DispatchResult(
success=False, host="root@allegro", command="hermes cron tick",
error="Hermes binary not found on root@allegro",
),
}
report = format_dispatch_report(results)
assert "1 OK, 1 failed" in report
assert "root@allegro: FAILED" in report
assert "Hermes binary not found" in report
class TestSSHEnvironment:
def test_resolve_hermes_paths_custom(self):
ssh = SSHEnvironment(host="root@ezra", agent="allegro", hermes_path="/opt/hermes/bin/hermes")
paths = ssh._resolve_hermes_paths()
assert paths == ["/opt/hermes/bin/hermes"]
def test_resolve_hermes_paths_default(self):
ssh = SSHEnvironment(host="root@ezra", agent="allegro")
paths = ssh._resolve_hermes_paths()
assert len(paths) > 1
assert any("allegro" in p for p in paths) # agent name interpolated
def test_ssh_base_cmd(self):
ssh = SSHEnvironment(host="root@ezra", ssh_key="/root/.ssh/id_rsa", ssh_port=2222)
cmd = ssh._ssh_base_cmd()
assert "ssh" in cmd[0]
assert "root@ezra" in cmd
assert "-i" in cmd
assert "/root/.ssh/id_rsa" in cmd
assert "-p" in cmd
assert "2222" in cmd