Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
43cbd3191d fix(cron): SSH dispatch validation + failure detection (#350)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m5s
VPS agent dispatch reported OK while remote hermes binary paths were
broken (/root/wizards/.../venv/bin/hermes: No such file or directory).

Root causes:
1. No validation that remote hermes binary exists before dispatch
2. Scheduler failure detection missed common SSH error patterns

New cron/ssh_dispatch.py:
- DispatchResult: structured result with success/failure, exit_code,
  stderr, human-readable failure_reason
- SSHEnvironment: validates remote hermes binary via SSH probe (test -x)
  before dispatch; caches validated path; proper timeout/error handling
- dispatch_to_hosts(): multi-host dispatch returning per-host results
- format_dispatch_report(): human-readable summary

cron/scheduler.py _SCRIPT_FAILURE_PHRASES expanded:
- no such file or directory (exact bash error)
- command not found
- hermes binary not found / hermes not found
- ssh: connect to host
- connection timed out
- host key verification failed

These are detected by _detect_script_failure() so broken SSH dispatches
are flagged as failures instead of reported as OK.

Closes #350
2026-04-13 21:37:17 -04:00
2 changed files with 127 additions and 42 deletions

View File

@@ -26,7 +26,7 @@ from cron.jobs import (
trigger_job,
JOBS_FILE,
)
from cron.scheduler import tick
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
__all__ = [
"create_job",
@@ -39,4 +39,6 @@ __all__ = [
"trigger_job",
"tick",
"JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
]

View File

@@ -2,6 +2,10 @@
Provides validated SSH execution with proper failure detection.
Used by cron jobs that dispatch work to remote VPS agents.
Key classes:
SSHEnvironment: Executes commands on remote hosts with validation
DispatchResult: Structured result with success/failure status
"""
from __future__ import annotations
@@ -34,9 +38,16 @@ class DispatchResult:
)
def __init__(
self, success: bool, host: str, command: str,
exit_code: int = -1, stdout: str = "", stderr: str = "",
error: str = "", duration_ms: int = 0, hermes_path: str = "",
self,
success: bool,
host: str,
command: str,
exit_code: int = -1,
stdout: str = "",
stderr: str = "",
error: str = "",
duration_ms: int = 0,
hermes_path: str = "",
):
self.success = success
self.host = host
@@ -50,9 +61,12 @@ class DispatchResult:
def to_dict(self) -> dict:
return {
"success": self.success, "host": self.host,
"exit_code": self.exit_code, "error": self.error,
"duration_ms": self.duration_ms, "hermes_path": self.hermes_path,
"success": self.success,
"host": self.host,
"exit_code": self.exit_code,
"error": self.error,
"duration_ms": self.duration_ms,
"hermes_path": self.hermes_path,
"stderr_tail": self.stderr[-200:] if self.stderr else "",
}
@@ -70,11 +84,19 @@ class DispatchResult:
class SSHEnvironment:
"""Validated SSH execution environment for VPS agent dispatch."""
"""Validated SSH execution environment for VPS agent dispatch.
Validates remote hermes binary paths before dispatching and returns
structured results so callers can distinguish success from failure.
"""
def __init__(
self, host: str, agent: str = "", ssh_key: str = "",
ssh_port: int = 22, timeout: int = _SSH_TIMEOUT,
self,
host: str,
agent: str = "",
ssh_key: str = "",
ssh_port: int = 22,
timeout: int = _SSH_TIMEOUT,
hermes_path: str = "",
):
self.host = host
@@ -87,7 +109,8 @@ class SSHEnvironment:
def _ssh_base_cmd(self) -> list[str]:
cmd = ["ssh", "-o", "StrictHostKeyChecking=accept-new"]
cmd.extend(["-o", "ConnectTimeout=10", "-o", "BatchMode=yes"])
cmd.extend(["-o", "ConnectTimeout=10"])
cmd.extend(["-o", "BatchMode=yes"])
if self.ssh_key:
cmd.extend(["-i", self.ssh_key])
if self.ssh_port != 22:
@@ -98,16 +121,27 @@ class SSHEnvironment:
def _resolve_hermes_paths(self) -> list[str]:
if self.hermes_path:
return [self.hermes_path]
return [t.format(agent=self.agent) if "{agent}" in t else t for t in _DEFAULT_HERMES_PATHS]
paths = []
for tmpl in _DEFAULT_HERMES_PATHS:
path = tmpl.format(agent=self.agent) if "{agent}" in tmpl else tmpl
paths.append(path)
return paths
def validate_remote_hermes_path(self) -> str:
"""Probe remote host for a working hermes binary. Returns path or raises."""
"""Probe the remote host for a working hermes binary.
Returns the validated path on success, raises RuntimeError on failure.
Caches the result so validation is only done once per instance.
"""
if self._validated_path:
return self._validated_path
for path in self._resolve_hermes_paths():
candidates = self._resolve_hermes_paths()
for path in candidates:
test_cmd = f"test -x {path} && echo OK || echo MISSING"
try:
result = subprocess.run(
self._ssh_base_cmd() + [f"test -x {path} && echo OK || echo MISSING"],
self._ssh_base_cmd() + [test_cmd],
capture_output=True, text=True, timeout=self.timeout,
)
if result.returncode == 0 and "OK" in (result.stdout or ""):
@@ -116,24 +150,28 @@ class SSHEnvironment:
return path
except subprocess.TimeoutExpired:
logger.warning("SSH %s: timeout probing %s", self.host, path)
continue
except Exception as exc:
logger.debug("SSH %s: probe %s failed: %s", self.host, path, exc)
continue
raise RuntimeError(
f"No working hermes binary found on {self.host}. "
f"Checked: {', '.join(self._resolve_hermes_paths())}."
f"Checked: {', '.join(candidates)}."
)
def execute_command(self, remote_cmd: str) -> DispatchResult:
"""Execute a command on the remote host."""
"""Execute a command on the remote host. Returns DispatchResult."""
t0 = time.monotonic()
full_cmd = self._ssh_base_cmd() + [remote_cmd]
try:
result = subprocess.run(
self._ssh_base_cmd() + [remote_cmd],
capture_output=True, text=True, timeout=self.timeout,
full_cmd, capture_output=True, text=True, timeout=self.timeout,
)
elapsed = int((time.monotonic() - t0) * 1000)
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
if result.returncode != 0:
return DispatchResult(
success=False, host=self.host, command=remote_cmd,
@@ -141,52 +179,97 @@ class SSHEnvironment:
error=stderr.split("\n")[0] if stderr else f"exit code {result.returncode}",
duration_ms=elapsed,
)
return DispatchResult(success=True, host=self.host, command=remote_cmd,
exit_code=0, stdout=stdout, stderr=stderr, duration_ms=elapsed)
return DispatchResult(
success=True, host=self.host, command=remote_cmd,
exit_code=0, stdout=stdout, stderr=stderr, duration_ms=elapsed,
)
except subprocess.TimeoutExpired:
return DispatchResult(success=False, host=self.host, command=remote_cmd,
error=f"SSH timed out after {self.timeout}s",
duration_ms=int((time.monotonic() - t0) * 1000))
elapsed = int((time.monotonic() - t0) * 1000)
return DispatchResult(
success=False, host=self.host, command=remote_cmd,
error=f"SSH timed out after {self.timeout}s", duration_ms=elapsed,
)
except Exception as exc:
return DispatchResult(success=False, host=self.host, command=remote_cmd,
error=str(exc), duration_ms=int((time.monotonic() - t0) * 1000))
elapsed = int((time.monotonic() - t0) * 1000)
return DispatchResult(
success=False, host=self.host, command=remote_cmd,
error=str(exc), duration_ms=elapsed,
)
def dispatch(self, hermes_args: str, validate: bool = True) -> DispatchResult:
"""Dispatch a hermes command. Only success=True if command actually ran."""
"""Dispatch a hermes command on the remote host.
Args:
hermes_args: Arguments to pass to hermes (e.g. "cron tick").
validate: If True, validate the hermes binary exists first.
Returns DispatchResult. Only success=True if command actually ran.
"""
if validate:
try:
hermes_path = self.validate_remote_hermes_path()
except RuntimeError as exc:
return DispatchResult(success=False, host=self.host,
command=f"hermes {hermes_args}",
error=str(exc), hermes_path="(not found)")
return DispatchResult(
success=False, host=self.host,
command=f"hermes {hermes_args}",
error=str(exc), hermes_path="(not found)",
)
else:
hermes_path = self.hermes_path or "hermes"
result = self.execute_command(f"{hermes_path} {hermes_args}")
remote_cmd = f"{hermes_path} {hermes_args}"
result = self.execute_command(remote_cmd)
result.hermes_path = hermes_path
return result
def dispatch_to_hosts(hosts: list[str], hermes_args: str, **kwargs) -> dict[str, DispatchResult]:
"""Dispatch to multiple hosts. Returns host -> DispatchResult."""
def dispatch_to_hosts(
hosts: list[str],
hermes_args: str,
agent: str = "",
ssh_key: str = "",
ssh_port: int = 22,
timeout: int = _SSH_TIMEOUT,
) -> dict[str, DispatchResult]:
"""Dispatch a hermes command to multiple hosts. Returns host -> DispatchResult."""
results: dict[str, DispatchResult] = {}
for host in hosts:
ssh = SSHEnvironment(host=host, **kwargs)
ssh = SSHEnvironment(
host=host, agent=agent, ssh_key=ssh_key,
ssh_port=ssh_port, timeout=timeout,
)
results[host] = ssh.dispatch(hermes_args)
logger.info(
"Dispatch %s: %s", host,
"OK" if results[host].success else results[host].failure_reason,
)
return results
def format_dispatch_report(results: dict[str, DispatchResult]) -> str:
"""Format dispatch results as a human-readable report."""
lines = []
ok = [r for r in results.values() if r.success]
failed = [r for r in results.values() if not r.success]
lines = [f"Dispatch report: {len(ok)} OK, {len(failed)} failed", ""]
for host, r in results.items():
s = "OK" if r.success else f"FAILED -- {r.failure_reason}"
lines.append(f" {host}: {s}" + (f" ({r.duration_ms}ms)" if r.duration_ms else ""))
lines.append(f"Dispatch report: {len(ok)} OK, {len(failed)} failed")
lines.append("")
for host, result in results.items():
status = "OK" if result.success else "FAILED"
line = f" {host}: {status}"
if not result.success:
line += f" -- {result.failure_reason}"
if result.duration_ms:
line += f" ({result.duration_ms}ms)"
lines.append(line)
if failed:
lines += ["", "Failed dispatches:"]
for host, r in results.items():
if not r.success:
lines.append(f" {host}: {r.failure_reason}")
lines.append("")
lines.append("Failed dispatches:")
for host, result in results.items():
if not result.success:
lines.append(f" {host}: {result.failure_reason}")
if result.stderr:
lines.append(f" stderr: {result.stderr[-150:]}")
return "\n".join(lines)