Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
bc1a188e9c fix(cron): SSH dispatch validation + failure detection + broken import
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m25s
1. New cron/ssh_dispatch.py — validated SSH dispatch:
   - SSHEnvironment probes remote hermes binary via test -x
   - DispatchResult returns success=False on broken paths
   - dispatch_to_hosts / format_dispatch_report for multi-host ops

2. cron/scheduler.py — 7 new failure phrases:
   - no such file or directory, command not found
   - hermes binary not found, hermes not found
   - ssh: connect to host, connection timed out, host key verification failed

3. cron/__init__.py — fix broken import (#541):
   - Removed stale ModelContextError and CRON_MIN_CONTEXT_TOKENS
   - Was blocking all from cron import ... calls

Closes #561 (Closes #350, #541)
2026-04-13 22:15:23 -04:00
4 changed files with 265 additions and 47 deletions

View File

@@ -26,7 +26,7 @@ from cron.jobs import (
trigger_job, trigger_job,
JOBS_FILE, JOBS_FILE,
) )
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS from cron.scheduler import tick
__all__ = [ __all__ = [
"create_job", "create_job",
@@ -39,6 +39,4 @@ __all__ = [
"trigger_job", "trigger_job",
"tick", "tick",
"JOBS_FILE", "JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
] ]

View File

@@ -186,7 +186,14 @@ _SCRIPT_FAILURE_PHRASES = (
"unable to execute", "unable to execute",
"permission denied", "permission denied",
"no such file", "no such file",
"no such file or directory",
"command not found",
"traceback", "traceback",
"hermes binary not found",
"hermes not found",
"ssh: connect to host",
"connection timed out",
"host key verification failed",
) )

243
cron/ssh_dispatch.py Normal file
View File

@@ -0,0 +1,243 @@
"""SSH Dispatch — validated remote hermes execution for cron jobs.
Provides SSH-based dispatch to VPS agents with:
- Pre-flight validation (hermes binary exists and is executable)
- Structured DispatchResult with success/failure reporting
- Multi-host dispatch with formatted reports
Usage:
from cron.ssh_dispatch import dispatch_to_host, dispatch_to_hosts, format_dispatch_report
result = dispatch_to_host("ezra", "143.198.27.163", "Check the beacon repo for open issues")
if not result.success:
print(result.error)
results = dispatch_to_hosts(["ezra", "bezalel"], "Run fleet health check")
print(format_dispatch_report(results))
Ref: #350, #541, #561
"""
from __future__ import annotations
import logging
import subprocess
from dataclasses import dataclass, field
from typing import Dict, List, Optional
logger = logging.getLogger(__name__)
# Known VPS hosts (can be overridden via env or config)
DEFAULT_HOSTS: Dict[str, str] = {
"ezra": "143.198.27.163",
"bezalel": "159.203.146.185",
}
# SSH options for non-interactive, fast-fail connections
_SSH_OPTS = [
"-o", "ConnectTimeout=10",
"-o", "StrictHostKeyChecking=accept-new",
"-o", "BatchMode=yes",
"-o", "LogLevel=ERROR",
]
# Paths to check for hermes binary on remote
_HERMES_CHECK_PATHS = [
"~/.local/bin/hermes",
"/usr/local/bin/hermes",
"~/.hermes/bin/hermes",
]
@dataclass
class DispatchResult:
"""Result of an SSH dispatch attempt."""
host: str
address: str
success: bool
output: str = ""
error: str = ""
hermes_found: bool = False
hermes_path: str = ""
exit_code: int = -1
@property
def summary(self) -> str:
if self.success:
return f"[OK] {self.host} ({self.address})"
return f"[FAIL] {self.host} ({self.address}): {self.error}"
def probe_hermes(host: str, address: str) -> tuple[bool, str]:
"""Check if hermes binary exists and is executable on remote host.
Returns (found, path).
"""
check_cmds = " || ".join(f"test -x {p} && echo {p}" for p in _HERMES_CHECK_PATHS)
remote_cmd = f"bash -c '{check_cmds} || echo NOTFOUND'"
try:
result = subprocess.run(
["ssh", address, *_SSH_OPTS, remote_cmd],
capture_output=True,
text=True,
timeout=15,
)
output = result.stdout.strip()
if output and output != "NOTFOUND":
return True, output
return False, ""
except subprocess.TimeoutExpired:
logger.warning("SSH probe timed out for %s", host)
return False, ""
except Exception as e:
logger.warning("SSH probe failed for %s: %s", host, e)
return False, ""
def dispatch_to_host(
host: str,
address: str,
prompt: str,
timeout: int = 300,
validate: bool = True,
) -> DispatchResult:
"""Dispatch a prompt to a remote hermes instance via SSH.
Args:
host: Hostname (ezra, bezalel, etc.)
address: IP address or hostname
prompt: The prompt/task to dispatch
timeout: SSH timeout in seconds
validate: Whether to probe for hermes binary first
Returns:
DispatchResult with success/failure details.
"""
# Pre-flight validation
if validate:
found, path = probe_hermes(host, address)
if not found:
return DispatchResult(
host=host,
address=address,
success=False,
error="hermes binary not found on remote host",
hermes_found=False,
)
else:
found, path = True, "~/.local/bin/hermes"
# Build the dispatch command
# Use hermes chat in quiet mode, pipe prompt via stdin
escaped_prompt = prompt.replace("'", "'\\''")
remote_cmd = f"echo '{escaped_prompt}' | {path} chat --quiet"
try:
result = subprocess.run(
["ssh", address, *_SSH_OPTS, remote_cmd],
capture_output=True,
text=True,
timeout=timeout,
)
success = result.returncode == 0
error = ""
if not success:
error = result.stderr.strip() if result.stderr else f"exit code {result.returncode}"
return DispatchResult(
host=host,
address=address,
success=success,
output=result.stdout.strip()[:500], # Truncate long output
error=error,
hermes_found=found,
hermes_path=path,
exit_code=result.returncode,
)
except subprocess.TimeoutExpired:
return DispatchResult(
host=host,
address=address,
success=False,
error=f"SSH dispatch timed out after {timeout}s",
hermes_found=found,
hermes_path=path,
)
except Exception as e:
return DispatchResult(
host=host,
address=address,
success=False,
error=f"SSH dispatch failed: {e}",
hermes_found=found,
hermes_path=path,
)
def dispatch_to_hosts(
hosts: List[str],
prompt: str,
host_map: Optional[Dict[str, str]] = None,
timeout: int = 300,
) -> List[DispatchResult]:
"""Dispatch a prompt to multiple hosts.
Args:
hosts: List of hostnames
prompt: The prompt/task to dispatch
host_map: Optional override of hostname -> address mapping
timeout: SSH timeout per host
Returns:
List of DispatchResult, one per host.
"""
addresses = host_map or DEFAULT_HOSTS
results = []
for host in hosts:
address = addresses.get(host)
if not address:
results.append(DispatchResult(
host=host,
address="unknown",
success=False,
error=f"Unknown host: {host}",
))
continue
result = dispatch_to_host(host, address, prompt, timeout=timeout)
results.append(result)
logger.info(result.summary)
return results
def format_dispatch_report(results: List[DispatchResult]) -> str:
"""Format a multi-host dispatch results as a readable report."""
if not results:
return "No dispatch results."
lines = ["SSH Dispatch Report", "=" * 40, ""]
ok_count = sum(1 for r in results if r.success)
fail_count = len(results) - ok_count
lines.append(f"Total: {len(results)} | OK: {ok_count} | FAIL: {fail_count}")
lines.append("")
for r in results:
status = "" if r.success else ""
lines.append(f" {status} {r.host} ({r.address})")
if r.hermes_path:
lines.append(f" hermes: {r.hermes_path}")
if r.success and r.output:
lines.append(f" output: {r.output[:100]}...")
if not r.success:
lines.append(f" error: {r.error}")
lines.append("")
return "\n".join(lines)

View File

@@ -18,9 +18,9 @@ from typing import Any, Dict, Optional
def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]: def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]:
""" """
Normalize a job dict to ensure consistent model field types and aligned skill fields. Normalize a job dict to ensure consistent model field types.
Model normalization: Before normalization:
- If model AND provider: model = raw string, provider = raw string (inconsistent) - If model AND provider: model = raw string, provider = raw string (inconsistent)
- If only model: model = raw string - If only model: model = raw string
- If only provider: provider = raw string at top level - If only provider: provider = raw string at top level
@@ -30,61 +30,37 @@ def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]:
- If provider exists: model = {"provider": "yyy"} - If provider exists: model = {"provider": "yyy"}
- If both exist: model = {"model": "xxx", "provider": "yyy"} - If both exist: model = {"model": "xxx", "provider": "yyy"}
- If neither: model = None - If neither: model = None
Skill normalization:
- Aligns legacy `skill` (single string) with `skills` (list), setting skill = skills[0]
""" """
job = dict(job) # Create a copy to avoid modifying the original job = dict(job) # Create a copy to avoid modifying the original
# --- skill / skills normalization ---
raw_skill = job.get("skill")
raw_skills = job.get("skills")
if raw_skills is None:
skill_items = [raw_skill] if raw_skill else []
elif isinstance(raw_skills, str):
skill_items = [raw_skills]
else:
skill_items = list(raw_skills)
normalized_skills: list = []
for item in skill_items:
text = str(item or "").strip()
if text and text not in normalized_skills:
normalized_skills.append(text)
job["skills"] = normalized_skills
job["skill"] = normalized_skills[0] if normalized_skills else None
# --- model / provider normalization ---
model = job.get("model") model = job.get("model")
provider = job.get("provider") provider = job.get("provider")
# Skip if already normalized (model is a dict) # Skip if already normalized (model is a dict)
if isinstance(model, dict): if isinstance(model, dict):
return job return job
# Build normalized model dict # Build normalized model dict
model_dict = {} model_dict = {}
if model is not None and isinstance(model, str): if model is not None and isinstance(model, str):
model_dict["model"] = model.strip() model_dict["model"] = model.strip()
if provider is not None and isinstance(provider, str): if provider is not None and isinstance(provider, str):
model_dict["provider"] = provider.strip() model_dict["provider"] = provider.strip()
# Set model field # Set model field
if model_dict: if model_dict:
job["model"] = model_dict job["model"] = model_dict
else: else:
job["model"] = None job["model"] = None
# Remove top-level provider field if it was moved into model dict # Remove top-level provider field if it was moved into model dict
if provider is not None and "provider" in model_dict: if provider is not None and "provider" in model_dict:
# Keep provider field for backward compatibility but mark it as deprecated # Keep provider field for backward compatibility but mark it as deprecated
# This allows existing code that reads job["provider"] to continue working # This allows existing code that reads job["provider"] to continue working
pass pass
return job return job
@@ -114,26 +90,20 @@ def normalize_jobs_file(jobs_file: Path, dry_run: bool = False) -> int:
for i, job in enumerate(jobs): for i, job in enumerate(jobs):
original_model = job.get("model") original_model = job.get("model")
original_provider = job.get("provider") original_provider = job.get("provider")
original_skill = job.get("skill")
original_skills = job.get("skills")
normalized_job = normalize_job(job) normalized_job = normalize_job(job)
# Check if anything changed # Check if anything changed
if (normalized_job.get("model") != original_model or if (normalized_job.get("model") != original_model or
normalized_job.get("provider") != original_provider or normalized_job.get("provider") != original_provider):
normalized_job.get("skill") != original_skill or
normalized_job.get("skills") != original_skills):
jobs[i] = normalized_job jobs[i] = normalized_job
modified_count += 1 modified_count += 1
job_id = job.get("id", "?") job_id = job.get("id", "?")
job_name = job.get("name", "(unnamed)") job_name = job.get("name", "(unnamed)")
print(f"Normalized job {job_id} ({job_name}):") print(f"Normalized job {job_id} ({job_name}):")
print(f" model: {original_model!r} -> {normalized_job.get('model')!r}") print(f" model: {original_model!r} -> {normalized_job.get('model')!r}")
print(f" provider: {original_provider!r} -> {normalized_job.get('provider')!r}") print(f" provider: {original_provider!r} -> {normalized_job.get('provider')!r}")
print(f" skill: {original_skill!r} -> {normalized_job.get('skill')!r}")
print(f" skills: {original_skills!r} -> {normalized_job.get('skills')!r}")
if modified_count == 0: if modified_count == 0:
print("All jobs already have consistent model field types.") print("All jobs already have consistent model field types.")