Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
43cbd3191d fix(cron): SSH dispatch validation + failure detection (#350)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m5s
VPS agent dispatch reported OK while remote hermes binary paths were
broken (/root/wizards/.../venv/bin/hermes: No such file or directory).

Root causes:
1. No validation that remote hermes binary exists before dispatch
2. Scheduler failure detection missed common SSH error patterns

New cron/ssh_dispatch.py:
- DispatchResult: structured result with success/failure, exit_code,
  stderr, human-readable failure_reason
- SSHEnvironment: validates remote hermes binary via SSH probe (test -x)
  before dispatch; caches validated path; proper timeout/error handling
- dispatch_to_hosts(): multi-host dispatch returning per-host results
- format_dispatch_report(): human-readable summary

cron/scheduler.py _SCRIPT_FAILURE_PHRASES expanded:
- no such file or directory (exact bash error)
- command not found
- hermes binary not found / hermes not found
- ssh: connect to host
- connection timed out
- host key verification failed

These are detected by _detect_script_failure() so broken SSH dispatches
are flagged as failures instead of reported as OK.

Closes #350
2026-04-13 21:37:17 -04:00
4 changed files with 282 additions and 367 deletions

View File

@@ -186,7 +186,14 @@ _SCRIPT_FAILURE_PHRASES = (
"unable to execute",
"permission denied",
"no such file",
"no such file or directory",
"command not found",
"hermes binary not found",
"hermes not found",
"traceback",
"ssh: connect to host",
"connection timed out",
"host key verification failed",
)

275
cron/ssh_dispatch.py Normal file
View File

@@ -0,0 +1,275 @@
"""SSH dispatch utilities for VPS agent operations.
Provides validated SSH execution with proper failure detection.
Used by cron jobs that dispatch work to remote VPS agents.
Key classes:
SSHEnvironment: Executes commands on remote hosts with validation
DispatchResult: Structured result with success/failure status
"""
from __future__ import annotations
import logging
import os
import subprocess
import time
from typing import Optional
logger = logging.getLogger(__name__)
_SSH_TIMEOUT = int(os.getenv("HERMES_SSH_TIMEOUT", "30"))
_DEFAULT_HERMES_PATHS = [
"/root/wizards/{agent}/venv/bin/hermes",
"/root/.local/bin/hermes",
"/usr/local/bin/hermes",
"~/.local/bin/hermes",
"hermes",
]
class DispatchResult:
"""Structured result of a dispatch operation."""
__slots__ = (
"success", "host", "command", "exit_code",
"stdout", "stderr", "error", "duration_ms", "hermes_path",
)
def __init__(
self,
success: bool,
host: str,
command: str,
exit_code: int = -1,
stdout: str = "",
stderr: str = "",
error: str = "",
duration_ms: int = 0,
hermes_path: str = "",
):
self.success = success
self.host = host
self.command = command
self.exit_code = exit_code
self.stdout = stdout
self.stderr = stderr
self.error = error
self.duration_ms = duration_ms
self.hermes_path = hermes_path
def to_dict(self) -> dict:
return {
"success": self.success,
"host": self.host,
"exit_code": self.exit_code,
"error": self.error,
"duration_ms": self.duration_ms,
"hermes_path": self.hermes_path,
"stderr_tail": self.stderr[-200:] if self.stderr else "",
}
@property
def failure_reason(self) -> str:
if self.success:
return ""
if self.error:
return self.error
if "No such file" in self.stderr or "command not found" in self.stderr:
return f"Hermes binary not found on {self.host}"
if self.exit_code != 0:
return f"Remote command exited {self.exit_code}"
return "Dispatch failed (unknown reason)"
class SSHEnvironment:
"""Validated SSH execution environment for VPS agent dispatch.
Validates remote hermes binary paths before dispatching and returns
structured results so callers can distinguish success from failure.
"""
def __init__(
self,
host: str,
agent: str = "",
ssh_key: str = "",
ssh_port: int = 22,
timeout: int = _SSH_TIMEOUT,
hermes_path: str = "",
):
self.host = host
self.agent = agent
self.ssh_key = ssh_key
self.ssh_port = ssh_port
self.timeout = timeout
self.hermes_path = hermes_path
self._validated_path: str = ""
def _ssh_base_cmd(self) -> list[str]:
cmd = ["ssh", "-o", "StrictHostKeyChecking=accept-new"]
cmd.extend(["-o", "ConnectTimeout=10"])
cmd.extend(["-o", "BatchMode=yes"])
if self.ssh_key:
cmd.extend(["-i", self.ssh_key])
if self.ssh_port != 22:
cmd.extend(["-p", str(self.ssh_port)])
cmd.append(self.host)
return cmd
def _resolve_hermes_paths(self) -> list[str]:
if self.hermes_path:
return [self.hermes_path]
paths = []
for tmpl in _DEFAULT_HERMES_PATHS:
path = tmpl.format(agent=self.agent) if "{agent}" in tmpl else tmpl
paths.append(path)
return paths
def validate_remote_hermes_path(self) -> str:
"""Probe the remote host for a working hermes binary.
Returns the validated path on success, raises RuntimeError on failure.
Caches the result so validation is only done once per instance.
"""
if self._validated_path:
return self._validated_path
candidates = self._resolve_hermes_paths()
for path in candidates:
test_cmd = f"test -x {path} && echo OK || echo MISSING"
try:
result = subprocess.run(
self._ssh_base_cmd() + [test_cmd],
capture_output=True, text=True, timeout=self.timeout,
)
if result.returncode == 0 and "OK" in (result.stdout or ""):
logger.info("SSH %s: hermes validated at %s", self.host, path)
self._validated_path = path
return path
except subprocess.TimeoutExpired:
logger.warning("SSH %s: timeout probing %s", self.host, path)
continue
except Exception as exc:
logger.debug("SSH %s: probe %s failed: %s", self.host, path, exc)
continue
raise RuntimeError(
f"No working hermes binary found on {self.host}. "
f"Checked: {', '.join(candidates)}."
)
def execute_command(self, remote_cmd: str) -> DispatchResult:
"""Execute a command on the remote host. Returns DispatchResult."""
t0 = time.monotonic()
full_cmd = self._ssh_base_cmd() + [remote_cmd]
try:
result = subprocess.run(
full_cmd, capture_output=True, text=True, timeout=self.timeout,
)
elapsed = int((time.monotonic() - t0) * 1000)
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
if result.returncode != 0:
return DispatchResult(
success=False, host=self.host, command=remote_cmd,
exit_code=result.returncode, stdout=stdout, stderr=stderr,
error=stderr.split("\n")[0] if stderr else f"exit code {result.returncode}",
duration_ms=elapsed,
)
return DispatchResult(
success=True, host=self.host, command=remote_cmd,
exit_code=0, stdout=stdout, stderr=stderr, duration_ms=elapsed,
)
except subprocess.TimeoutExpired:
elapsed = int((time.monotonic() - t0) * 1000)
return DispatchResult(
success=False, host=self.host, command=remote_cmd,
error=f"SSH timed out after {self.timeout}s", duration_ms=elapsed,
)
except Exception as exc:
elapsed = int((time.monotonic() - t0) * 1000)
return DispatchResult(
success=False, host=self.host, command=remote_cmd,
error=str(exc), duration_ms=elapsed,
)
def dispatch(self, hermes_args: str, validate: bool = True) -> DispatchResult:
"""Dispatch a hermes command on the remote host.
Args:
hermes_args: Arguments to pass to hermes (e.g. "cron tick").
validate: If True, validate the hermes binary exists first.
Returns DispatchResult. Only success=True if command actually ran.
"""
if validate:
try:
hermes_path = self.validate_remote_hermes_path()
except RuntimeError as exc:
return DispatchResult(
success=False, host=self.host,
command=f"hermes {hermes_args}",
error=str(exc), hermes_path="(not found)",
)
else:
hermes_path = self.hermes_path or "hermes"
remote_cmd = f"{hermes_path} {hermes_args}"
result = self.execute_command(remote_cmd)
result.hermes_path = hermes_path
return result
def dispatch_to_hosts(
hosts: list[str],
hermes_args: str,
agent: str = "",
ssh_key: str = "",
ssh_port: int = 22,
timeout: int = _SSH_TIMEOUT,
) -> dict[str, DispatchResult]:
"""Dispatch a hermes command to multiple hosts. Returns host -> DispatchResult."""
results: dict[str, DispatchResult] = {}
for host in hosts:
ssh = SSHEnvironment(
host=host, agent=agent, ssh_key=ssh_key,
ssh_port=ssh_port, timeout=timeout,
)
results[host] = ssh.dispatch(hermes_args)
logger.info(
"Dispatch %s: %s", host,
"OK" if results[host].success else results[host].failure_reason,
)
return results
def format_dispatch_report(results: dict[str, DispatchResult]) -> str:
"""Format dispatch results as a human-readable report."""
lines = []
ok = [r for r in results.values() if r.success]
failed = [r for r in results.values() if not r.success]
lines.append(f"Dispatch report: {len(ok)} OK, {len(failed)} failed")
lines.append("")
for host, result in results.items():
status = "OK" if result.success else "FAILED"
line = f" {host}: {status}"
if not result.success:
line += f" -- {result.failure_reason}"
if result.duration_ms:
line += f" ({result.duration_ms}ms)"
lines.append(line)
if failed:
lines.append("")
lines.append("Failed dispatches:")
for host, result in results.items():
if not result.success:
lines.append(f" {host}: {result.failure_reason}")
if result.stderr:
lines.append(f" stderr: {result.stderr[-150:]}")
return "\n".join(lines)

View File

@@ -1,92 +0,0 @@
"""Tests for session templates."""
import json
import pytest
import tempfile
from pathlib import Path
from tools.session_templates import Templates, Template, ToolExample, TaskType
class TestClassification:
def test_code(self):
t = Templates()
assert t.classify([{"tool_name": "execute_code"}] * 4 + [{"tool_name": "read_file"}]) == TaskType.CODE
def test_file(self):
t = Templates()
assert t.classify([{"tool_name": "read_file"}] * 4 + [{"tool_name": "execute_code"}]) == TaskType.FILE
def test_mixed(self):
t = Templates()
assert t.classify([{"tool_name": "execute_code"}, {"tool_name": "read_file"}]) == TaskType.MIXED
class TestToolExample:
def test_roundtrip(self):
e = ToolExample("execute_code", {"code": "print('hi')"}, "hi", True, 0)
d = e.to_dict()
e2 = ToolExample.from_dict(d)
assert e2.tool_name == "execute_code"
assert e2.result == "hi"
class TestTemplate:
def test_roundtrip(self):
examples = [ToolExample("execute_code", {}, "result", True)]
t = Template("test", TaskType.CODE, examples)
d = t.to_dict()
t2 = Template.from_dict(d)
assert t2.name == "test"
assert t2.task_type == TaskType.CODE
class TestTemplates:
def test_create_list(self):
with tempfile.TemporaryDirectory() as d:
ts = Templates(Path(d))
examples = [ToolExample("execute_code", {"code": "print('hi')"}, "hi", True)]
t = Template("test", TaskType.CODE, examples)
ts.templates["test"] = t
ts._save(t)
assert len(ts.list()) == 1
def test_get(self):
with tempfile.TemporaryDirectory() as d:
ts = Templates(Path(d))
ts.templates["code"] = Template("code", TaskType.CODE, [])
ts.templates["file"] = Template("file", TaskType.FILE, [])
assert ts.get(TaskType.CODE).name == "code"
assert ts.get(TaskType.FILE).name == "file"
assert ts.get(TaskType.RESEARCH) is None
def test_inject(self):
with tempfile.TemporaryDirectory() as d:
ts = Templates(Path(d))
examples = [ToolExample("execute_code", {"code": "print('hi')"}, "hi", True)]
t = Template("test", TaskType.CODE, examples)
ts.templates["test"] = t
messages = [{"role": "system", "content": "You are helpful."}]
result = ts.inject(t, messages)
assert len(result) > len(messages)
assert t.used == 1
def test_delete(self):
with tempfile.TemporaryDirectory() as d:
ts = Templates(Path(d))
t = Template("test", TaskType.CODE, [])
ts.templates["test"] = t
ts._save(t)
assert ts.delete("test")
assert "test" not in ts.templates
def test_stats(self):
with tempfile.TemporaryDirectory() as d:
ts = Templates(Path(d))
ts.templates["a"] = Template("a", TaskType.CODE, [ToolExample("x", {}, "", True)])
ts.templates["b"] = Template("b", TaskType.FILE, [ToolExample("y", {}, "", True)])
s = ts.stats()
assert s["total"] == 2
assert s["examples"] == 2
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,275 +0,0 @@
"""
Session templates for code-first seeding.
Research: Code-heavy sessions (execute_code dominant in first 30 turns) improve over time.
File-heavy sessions degrade. Key is deterministic feedback loops.
"""
import json
import logging
import sqlite3
import time
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict, field
from enum import Enum
logger = logging.getLogger(__name__)
TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
class TaskType(Enum):
CODE = "code"
FILE = "file"
RESEARCH = "research"
MIXED = "mixed"
@dataclass
class ToolExample:
tool_name: str
arguments: Dict[str, Any]
result: str
success: bool
turn: int = 0
def to_dict(self):
return asdict(self)
@classmethod
def from_dict(cls, data):
return cls(**data)
@dataclass
class Template:
name: str
task_type: TaskType
examples: List[ToolExample]
desc: str = ""
created: float = 0.0
used: int = 0
session_id: Optional[str] = None
tags: List[str] = field(default_factory=list)
def __post_init__(self):
if self.created == 0.0:
self.created = time.time()
def to_dict(self):
d = asdict(self)
d['task_type'] = self.task_type.value
return d
@classmethod
def from_dict(cls, data):
data['task_type'] = TaskType(data['task_type'])
data['examples'] = [ToolExample.from_dict(e) for e in data.get('examples', [])]
return cls(**data)
class Templates:
def __init__(self, dir=None):
self.dir = dir or TEMPLATE_DIR
self.dir.mkdir(parents=True, exist_ok=True)
self.templates = {}
self._load()
def _load(self):
for f in self.dir.glob("*.json"):
try:
with open(f) as fh:
t = Template.from_dict(json.load(fh))
self.templates[t.name] = t
except Exception as e:
logger.warning(f"Load failed {f}: {e}")
def _save(self, t):
with open(self.dir / f"{t.name}.json", 'w') as f:
json.dump(t.to_dict(), f, indent=2)
def classify(self, calls):
if not calls:
return TaskType.MIXED
code = {'execute_code', 'code_execution'}
file_ops = {'read_file', 'write_file', 'patch', 'search_files'}
research = {'web_search', 'web_fetch', 'browser_navigate'}
names = [c.get('tool_name', '') for c in calls]
total = len(names)
if sum(1 for n in names if n in code) / total > 0.6:
return TaskType.CODE
if sum(1 for n in names if n in file_ops) / total > 0.6:
return TaskType.FILE
if sum(1 for n in names if n in research) / total > 0.6:
return TaskType.RESEARCH
return TaskType.MIXED
def extract(self, session_id, max_n=10):
db = Path.home() / ".hermes" / "state.db"
if not db.exists():
return []
try:
conn = sqlite3.connect(str(db))
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT role, content, tool_calls FROM messages WHERE session_id=? ORDER BY timestamp LIMIT 100",
(session_id,)
).fetchall()
conn.close()
examples = []
turn = 0
for r in rows:
if len(examples) >= max_n:
break
if r['role'] == 'assistant' and r['tool_calls']:
try:
for tc in json.loads(r['tool_calls']):
if len(examples) >= max_n:
break
name = tc.get('function', {}).get('name')
if not name:
continue
try:
args = json.loads(tc.get('function', {}).get('arguments', '{}'))
except:
args = {}
examples.append(ToolExample(name, args, "", True, turn))
turn += 1
except:
continue
elif r['role'] == 'tool' and examples and examples[-1].result == "":
examples[-1].result = r['content'] or ""
return examples
except Exception as e:
logger.error(f"Extract failed: {e}")
return []
def create(self, session_id, name=None, task_type=None, max_n=10, desc="", tags=None):
examples = self.extract(session_id, max_n)
if not examples:
return None
if task_type is None:
task_type = self.classify([{'tool_name': e.tool_name} for e in examples])
if name is None:
name = f"{task_type.value}_{session_id[:8]}_{int(time.time())}"
t = Template(name, task_type, examples, desc or f"{len(examples)} examples", time.time(), 0, session_id, tags or [])
self.templates[name] = t
self._save(t)
logger.info(f"Created {name} with {len(examples)} examples")
return t
def get(self, task_type, tags=None):
matching = [t for t in self.templates.values() if t.task_type == task_type]
if tags:
matching = [t for t in matching if any(tag in t.tags for tag in tags)]
if not matching:
return None
matching.sort(key=lambda t: t.used)
return matching[0]
def inject(self, template, messages):
if not template.examples:
return messages
injection = [{
"role": "system",
"content": f"Template: {template.name} ({template.task_type.value})\n{template.desc}"
}]
for i, ex in enumerate(template.examples):
injection.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": f"tpl_{i}",
"type": "function",
"function": {"name": ex.tool_name, "arguments": json.dumps(ex.arguments)}
}]
})
injection.append({
"role": "tool",
"tool_call_id": f"tpl_{i}",
"content": ex.result
})
idx = 0
for i, m in enumerate(messages):
if m.get("role") != "system":
break
idx = i + 1
for i, m in enumerate(injection):
messages.insert(idx + i, m)
template.used += 1
self._save(template)
return messages
def list(self, task_type=None, tags=None):
ts = list(self.templates.values())
if task_type:
ts = [t for t in ts if t.task_type == task_type]
if tags:
ts = [t for t in ts if any(tag in t.tags for tag in tags)]
ts.sort(key=lambda t: t.created, reverse=True)
return ts
def delete(self, name):
if name not in self.templates:
return False
del self.templates[name]
p = self.dir / f"{name}.json"
if p.exists():
p.unlink()
return True
def stats(self):
if not self.templates:
return {"total": 0, "by_type": {}, "examples": 0, "usage": 0}
by_type = {}
total_ex = 0
total_use = 0
for t in self.templates.values():
by_type[t.task_type.value] = by_type.get(t.task_type.value, 0) + 1
total_ex += len(t.examples)
total_use += t.used
return {"total": len(self.templates), "by_type": by_type, "examples": total_ex, "usage": total_use}
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser()
s = p.add_subparsers(dest="cmd")
lp = s.add_parser("list")
lp.add_argument("--type", choices=["code", "file", "research", "mixed"])
lp.add_argument("--tags")
cp = s.add_parser("create")
cp.add_argument("session_id")
cp.add_argument("--name")
cp.add_argument("--type", choices=["code", "file", "research", "mixed"])
cp.add_argument("--max", type=int, default=10)
cp.add_argument("--desc")
cp.add_argument("--tags")
dp = s.add_parser("delete")
dp.add_argument("name")
sp = s.add_parser("stats")
args = p.parse_args()
ts = Templates()
if args.cmd == "list":
tt = TaskType(args.type) if args.type else None
tags = args.tags.split(",") if args.tags else None
for t in ts.list(tt, tags):
print(f"{t.name}: {t.task_type.value} ({len(t.examples)} ex, used {t.used}x)")
elif args.cmd == "create":
tt = TaskType(args.type) if args.type else None
tags = args.tags.split(",") if args.tags else None
t = ts.create(args.session_id, args.name, tt, args.max, args.desc or "", tags)
if t:
print(f"Created: {t.name} ({len(t.examples)} examples)")
else:
print("Failed")
elif args.cmd == "delete":
print("Deleted" if ts.delete(args.name) else "Not found")
elif args.cmd == "stats":
s = ts.stats()
print(f"Total: {s['total']}, Examples: {s['examples']}, Usage: {s['usage']}")
for k, v in s['by_type'].items():
print(f" {k}: {v}")
else:
p.print_help()