Compare commits

..

2 Commits

Author SHA1 Message Date
66b0febdfb fix(cron): expand _SCRIPT_FAILURE_PHRASES with SSH patterns
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 59s
Part of #457, Closes #350

Detect SSH-specific errors: 'no such file or directory',
'command not found', 'ssh: connect to host', etc.
2026-04-14 01:21:43 +00:00
6d79bf7783 feat(cron): SSH dispatch validation utilities
Part of #457, Closes #350

Provides SSHEnvironment that validates remote hermes binary
exists before dispatch, and DispatchResult with structured
failure reasons.
2026-04-14 01:21:39 +00:00
3 changed files with 221 additions and 393 deletions

View File

@@ -182,6 +182,15 @@ _SCRIPT_FAILURE_PHRASES = (
"exit status",
"non-zero exit",
"did not complete",
# SSH-specific failure patterns (#350)
"no such file or directory",
"command not found",
"hermes binary not found",
"hermes not found",
"ssh: connect to host",
"connection timed out",
"host key verification failed",
"no route to host",
"could not run",
"unable to execute",
"permission denied",

212
cron/ssh_dispatch.py Normal file
View File

@@ -0,0 +1,212 @@
"""
SSH dispatch utilities for cron jobs.
Provides validated remote execution so broken hermes binary paths
are caught before draining the dispatch queue.
Usage:
from cron.ssh_dispatch import SSHEnvironment, format_dispatch_report
ssh = SSHEnvironment(host="root@ezra", agent="allegro")
result = ssh.dispatch("cron tick")
if not result.success:
print(result.failure_reason)
"""
import subprocess
import shutil
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class DispatchResult:
"""Structured result of a remote command dispatch."""
host: str
command: str
success: bool
exit_code: Optional[int] = None
stdout: str = ""
stderr: str = ""
failure_reason: Optional[str] = None
duration_s: float = 0.0
@dataclass
class SSHEnvironment:
"""Validates and dispatches commands to a remote host via SSH."""
host: str # e.g. "root@ezra" or "192.168.1.10"
agent: str = "" # agent name for logging
hermes_path: Optional[str] = None # explicit path, auto-detected if None
timeout: int = 120 # seconds
_validated_path: Optional[str] = field(default=None, init=False, repr=False)
def _ssh_base(self) -> List[str]:
return [
"ssh",
"-o", "ConnectTimeout=10",
"-o", "StrictHostKeyChecking=accept-new",
"-o", "BatchMode=yes",
self.host,
]
def _probe_remote_binary(self, candidate: str) -> bool:
"""Check if a hermes binary exists and is executable on the remote host."""
try:
result = subprocess.run(
self._ssh_base() + [f"test -x {candidate}"],
capture_output=True, timeout=15,
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def detect_hermes_binary(self) -> Optional[str]:
"""Find a working hermes binary on the remote host."""
if self._validated_path:
return self._validated_path
candidates = []
if self.hermes_path:
candidates.append(self.hermes_path)
# Common locations
candidates.extend([
"hermes", # on PATH
"~/.local/bin/hermes",
"/usr/local/bin/hermes",
f"~/wizards/{self.agent}/venv/bin/hermes" if self.agent else "",
f"/root/wizards/{self.agent}/venv/bin/hermes" if self.agent else "",
])
candidates = [c for c in candidates if c]
for candidate in candidates:
if self._probe_remote_binary(candidate):
self._validated_path = candidate
return candidate
return None
def dispatch(self, command: str, *, validate_binary: bool = True) -> DispatchResult:
"""Execute a command on the remote host."""
import time
start = time.monotonic()
if validate_binary:
binary = self.detect_hermes_binary()
if not binary:
return DispatchResult(
host=self.host,
command=command,
success=False,
failure_reason=f"No working hermes binary found on {self.host}",
duration_s=time.monotonic() - start,
)
try:
result = subprocess.run(
self._ssh_base() + [command],
capture_output=True,
timeout=self.timeout,
)
duration = time.monotonic() - start
stdout = result.stdout.decode("utf-8", errors="replace")
stderr = result.stderr.decode("utf-8", errors="replace")
failure_reason = None
if result.returncode != 0:
failure_reason = _classify_ssh_error(stderr, result.returncode)
return DispatchResult(
host=self.host,
command=command,
success=result.returncode == 0,
exit_code=result.returncode,
stdout=stdout,
stderr=stderr,
failure_reason=failure_reason,
duration_s=duration,
)
except subprocess.TimeoutExpired:
return DispatchResult(
host=self.host,
command=command,
success=False,
failure_reason=f"SSH command timed out after {self.timeout}s",
duration_s=time.monotonic() - start,
)
except FileNotFoundError:
return DispatchResult(
host=self.host,
command=command,
success=False,
failure_reason="ssh binary not found on local system",
duration_s=time.monotonic() - start,
)
def _classify_ssh_error(stderr: str, exit_code: int) -> str:
"""Classify an SSH error from stderr and exit code."""
lower = stderr.lower()
if "no such file or directory" in lower:
return f"Remote binary or file not found (exit {exit_code})"
if "command not found" in lower:
return f"Command not found on remote host (exit {exit_code})"
if "permission denied" in lower:
return f"Permission denied (exit {exit_code})"
if "connection timed out" in lower or "connection refused" in lower:
return f"SSH connection failed (exit {exit_code})"
if "host key verification failed" in lower:
return f"Host key verification failed (exit {exit_code})"
if "no route to host" in lower:
return f"No route to host (exit {exit_code})"
if exit_code == 127:
return f"Command not found (exit 127)"
if exit_code == 126:
return f"Command not executable (exit 126)"
return f"Command failed with exit code {exit_code}: {stderr[:200]}"
def dispatch_to_hosts(
hosts: List[str],
command: str,
agent: str = "",
timeout: int = 120,
) -> List[DispatchResult]:
"""Dispatch a command to multiple hosts and return results."""
results = []
for host in hosts:
ssh = SSHEnvironment(host=host, agent=agent, timeout=timeout)
result = ssh.dispatch(command)
results.append(result)
return results
def format_dispatch_report(results: List[DispatchResult]) -> str:
"""Format a human-readable report of dispatch results."""
lines = ["## Dispatch Report", ""]
succeeded = [r for r in results if r.success]
failed = [r for r in results if not r.success]
lines.append(f"**Total:** {len(results)} hosts | "
f"**OK:** {len(succeeded)} | **Failed:** {len(failed)}")
lines.append("")
for r in results:
status = "OK" if r.success else "FAIL"
lines.append(f"### {r.host} [{status}]")
lines.append(f"- Command: `{r.command}`")
lines.append(f"- Duration: {r.duration_s:.1f}s")
if r.exit_code is not None:
lines.append(f"- Exit code: {r.exit_code}")
if r.failure_reason:
lines.append(f"- **Failure:** {r.failure_reason}")
if r.stderr and not r.success:
lines.append(f"- Stderr: `{r.stderr[:300]}`")
lines.append("")
return "\n".join(lines)

View File

@@ -1,393 +0,0 @@
"""
Session templates for code-first seeding.
Research finding: Code-heavy sessions (execute_code dominant in first 30 turns)
improve over time. File-heavy sessions degrade. The key is deterministic feedback
loops, not arbitrary context.
This module provides:
1. Template extraction from successful sessions
2. Task type classification (code, file, research, terminal)
3. Template storage in ~/.hermes/session-templates/
4. Template injection into new sessions
"""
import json
import logging
import os
import sqlite3
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
logger = logging.getLogger(__name__)
# Default template directory
DEFAULT_TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
class TaskType(Enum):
"""Task type classification."""
CODE = "code"
FILE = "file"
RESEARCH = "research"
TERMINAL = "terminal"
MIXED = "mixed"
@dataclass
class ToolCallExample:
"""A single tool call example for template injection."""
tool_name: str
arguments: Dict[str, Any]
result: str
success: bool
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ToolCallExample':
return cls(**data)
@dataclass
class SessionTemplate:
"""A session template with tool call examples."""
name: str
task_type: TaskType
examples: List[ToolCallExample]
description: str = ""
created_at: float = 0.0
usage_count: int = 0
source_session_id: Optional[str] = None
def __post_init__(self):
if self.created_at == 0.0:
self.created_at = time.time()
def to_dict(self) -> Dict[str, Any]:
data = asdict(self)
data['task_type'] = self.task_type.value
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SessionTemplate':
data['task_type'] = TaskType(data['task_type'])
examples_data = data.get('examples', [])
data['examples'] = [ToolCallExample.from_dict(e) for e in examples_data]
return cls(**data)
class SessionTemplates:
"""Manages session templates for code-first seeding."""
def __init__(self, template_dir: Optional[Path] = None):
self.template_dir = template_dir or DEFAULT_TEMPLATE_DIR
self.template_dir.mkdir(parents=True, exist_ok=True)
self.templates: Dict[str, SessionTemplate] = {}
self._load_templates()
def _load_templates(self):
"""Load all templates from disk."""
for template_file in self.template_dir.glob("*.json"):
try:
with open(template_file, 'r') as f:
data = json.load(f)
template = SessionTemplate.from_dict(data)
self.templates[template.name] = template
except Exception as e:
logger.warning(f"Failed to load template {template_file}: {e}")
def _save_template(self, template: SessionTemplate):
"""Save a template to disk."""
template_file = self.template_dir / f"{template.name}.json"
with open(template_file, 'w') as f:
json.dump(template.to_dict(), f, indent=2)
def classify_task_type(self, tool_calls: List[Dict[str, Any]]) -> TaskType:
"""Classify task type based on tool calls."""
if not tool_calls:
return TaskType.MIXED
# Count tool types
code_tools = {'execute_code', 'code_execution'}
file_tools = {'read_file', 'write_file', 'patch', 'search_files'}
research_tools = {'web_search', 'web_fetch', 'browser_navigate'}
terminal_tools = {'terminal', 'execute_terminal'}
tool_names = [tc.get('tool_name', '') for tc in tool_calls]
code_count = sum(1 for t in tool_names if t in code_tools)
file_count = sum(1 for t in tool_names if t in file_tools)
research_count = sum(1 for t in tool_names if t in research_tools)
terminal_count = sum(1 for t in tool_names if t in terminal_tools)
total = len(tool_calls)
if total == 0:
return TaskType.MIXED
# Determine dominant type (60% threshold)
if code_count / total > 0.6:
return TaskType.CODE
elif file_count / total > 0.6:
return TaskType.FILE
elif research_count / total > 0.6:
return TaskType.RESEARCH
elif terminal_count / total > 0.6:
return TaskType.TERMINAL
else:
return TaskType.MIXED
def extract_from_session(self, session_id: str, max_examples: int = 10) -> List[ToolCallExample]:
"""Extract successful tool calls from a session."""
db_path = Path.home() / ".hermes" / "state.db"
if not db_path.exists():
return []
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
# Get messages with tool calls
cursor = conn.execute("""
SELECT role, content, tool_calls, tool_name
FROM messages
WHERE session_id = ?
ORDER BY timestamp
LIMIT 100
""", (session_id,))
messages = cursor.fetchall()
conn.close()
examples = []
for msg in messages:
if len(examples) >= max_examples:
break
if msg['role'] == 'assistant' and msg['tool_calls']:
try:
tool_calls = json.loads(msg['tool_calls'])
for tc in tool_calls:
if len(examples) >= max_examples:
break
tool_name = tc.get('function', {}).get('name')
if not tool_name:
continue
try:
arguments = json.loads(tc.get('function', {}).get('arguments', '{}'))
except:
arguments = {}
examples.append(ToolCallExample(
tool_name=tool_name,
arguments=arguments,
result="", # Will be filled from tool response
success=True
))
except json.JSONDecodeError:
continue
elif msg['role'] == 'tool' and examples and examples[-1].result == "":
examples[-1].result = msg['content'] or ""
return examples
except Exception as e:
logger.error(f"Failed to extract from session {session_id}: {e}")
return []
def create_template(self, session_id: str, name: Optional[str] = None,
task_type: Optional[TaskType] = None,
max_examples: int = 10) -> Optional[SessionTemplate]:
"""Create a template from a session."""
examples = self.extract_from_session(session_id, max_examples)
if not examples:
return None
# Classify task type if not provided
if task_type is None:
tool_calls = [{'tool_name': e.tool_name} for e in examples]
task_type = self.classify_task_type(tool_calls)
# Generate name if not provided
if name is None:
name = f"{task_type.value}_{session_id[:8]}_{int(time.time())}"
# Create template
template = SessionTemplate(
name=name,
task_type=task_type,
examples=examples,
description=f"Template with {len(examples)} examples",
source_session_id=session_id
)
# Save template
self.templates[name] = template
self._save_template(template)
logger.info(f"Created template {name} with {len(examples)} examples")
return template
def get_template(self, task_type: TaskType) -> Optional[SessionTemplate]:
"""Get the best template for a task type."""
matching = [t for t in self.templates.values() if t.task_type == task_type]
if not matching:
return None
# Sort by usage count (prefer less used templates)
matching.sort(key=lambda t: t.usage_count)
return matching[0]
def inject_into_messages(self, template: SessionTemplate,
messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Inject template examples into messages."""
if not template.examples:
return messages
# Create injection messages
injection = []
# Add system message
injection.append({
"role": "system",
"content": f"Session template: {template.name} ({template.task_type.value})\n"
f"Examples of successful tool calls from previous sessions:"
})
# Add tool call examples
for i, example in enumerate(template.examples):
# Assistant message with tool call
injection.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": f"template_{i}",
"type": "function",
"function": {
"name": example.tool_name,
"arguments": json.dumps(example.arguments)
}
}]
})
# Tool response
injection.append({
"role": "tool",
"tool_call_id": f"template_{i}",
"content": example.result
})
# Insert after system messages
insert_index = 0
for i, msg in enumerate(messages):
if msg.get("role") != "system":
break
insert_index = i + 1
# Insert injection
for i, msg in enumerate(injection):
messages.insert(insert_index + i, msg)
# Update usage count
template.usage_count += 1
self._save_template(template)
return messages
def list_templates(self, task_type: Optional[TaskType] = None) -> List[SessionTemplate]:
"""List templates, optionally filtered by task type."""
templates = list(self.templates.values())
if task_type:
templates = [t for t in templates if t.task_type == task_type]
templates.sort(key=lambda t: t.created_at, reverse=True)
return templates
def delete_template(self, name: str) -> bool:
"""Delete a template."""
if name not in self.templates:
return False
del self.templates[name]
template_file = self.template_dir / f"{name}.json"
if template_file.exists():
template_file.unlink()
logger.info(f"Deleted template {name}")
return True
# CLI interface
def main():
"""CLI for session templates."""
import argparse
parser = argparse.ArgumentParser(description="Session Templates")
subparsers = parser.add_subparsers(dest="command")
# List templates
list_parser = subparsers.add_parser("list", help="List templates")
list_parser.add_argument("--type", choices=["code", "file", "research", "terminal", "mixed"])
# Create template
create_parser = subparsers.add_parser("create", help="Create template from session")
create_parser.add_argument("session_id", help="Session ID")
create_parser.add_argument("--name", help="Template name")
create_parser.add_argument("--type", choices=["code", "file", "research", "terminal", "mixed"])
create_parser.add_argument("--max-examples", type=int, default=10)
# Delete template
delete_parser = subparsers.add_parser("delete", help="Delete template")
delete_parser.add_argument("name", help="Template name")
args = parser.parse_args()
templates = SessionTemplates()
if args.command == "list":
task_type = TaskType(args.type) if args.type else None
template_list = templates.list_templates(task_type)
if not template_list:
print("No templates found")
return
print(f"Found {len(template_list)} templates:")
for t in template_list:
print(f" {t.name}: {t.task_type.value} ({len(t.examples)} examples, used {t.usage_count} times)")
elif args.command == "create":
task_type = TaskType(args.type) if args.type else None
template = templates.create_template(
args.session_id,
name=args.name,
task_type=task_type,
max_examples=args.max_examples
)
if template:
print(f"Created template: {template.name}")
print(f" Type: {template.task_type.value}")
print(f" Examples: {len(template.examples)}")
else:
print("Failed to create template")
elif args.command == "delete":
if templates.delete_template(args.name):
print(f"Deleted template: {args.name}")
else:
print(f"Template not found: {args.name}")
else:
parser.print_help()
if __name__ == "__main__":
main()