Compare commits

...

8 Commits

Author SHA1 Message Date
Alexander Whitestone
c4d757268e fix(cron): SSH dispatch validation + failure phrase detection (#350)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m0s
VPS agent dispatch jobs reported 'OK' while remote hermes binary paths
were broken. The dispatch worker drained the queue without validating
that commands actually ran, and the scheduler's failure detection
missed common SSH error patterns.

Changes:

1. New cron/ssh_dispatch.py — SSH dispatch utilities
   - DispatchResult: structured result with success/failure status,
     exit code, stderr, human-readable failure_reason
   - SSHEnvironment: validates remote hermes binary exists before
     dispatch via SSH probe (test -x), caches validated path,
     executes commands with proper timeout/error handling
   - dispatch_to_hosts(): multi-host dispatch with per-host results
   - format_dispatch_report(): human-readable report of all results

2. cron/scheduler.py — expanded _SCRIPT_FAILURE_PHRASES
   Added SSH-specific patterns that were missing:
   - 'no such file or directory' (exact bash error)
   - 'command not found' (bash fallback)
   - 'hermes binary not found' / 'hermes not found'
   - 'ssh: connect to host' (SSH connection failure)
   - 'connection timed out' (SSH timeout)
   - 'host key verification failed' (key mismatch)
   These are detected by _detect_script_failure() so broken SSH
   dispatches are properly flagged instead of reported as OK.

Closes #350
2026-04-13 21:05:49 -04:00
8d0cad13c4 Merge pull request 'fix: watchdog config drift check uses YAML parse, not grep (#377)' (#398) from burn/377-1776117775 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 28s
2026-04-14 00:34:14 +00:00
b9aca0a3b4 Merge pull request 'feat: time-aware model routing for cron jobs (#317)' (#432) from burn/317-1776125702 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:34:06 +00:00
99d36533d5 Merge pull request 'feat: add /debug slash command with paste service upload (#320)' (#416) from burn/320-1776120221 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:33:59 +00:00
Alexander Whitestone
5989600d80 feat: time-aware model routing for cron jobs (#317)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m1s
Empirical audit: cron error rate peaks at 18:00 (9.4%) vs 4.0% at 09:00.
During configured high-error windows, automatically route cron jobs to
more capable models when the user is not present to correct errors.

- agent/smart_model_routing.py: resolve_cron_model() + _hour_in_window()
- cron/scheduler.py: wired into run_job() after base model resolution
- tests/test_cron_model_routing.py: 16 tests

Config:
  cron_model_routing:
    enabled: true
    fallback_model: "anthropic/claude-sonnet-4"
    fallback_provider: "openrouter"
    windows:
      - {start_hour: 17, end_hour: 22, reason: evening_error_peak}
      - {start_hour: 2, end_hour: 5, reason: overnight_api_instability}

Features: midnight-wrap, per-window overrides, first-match-wins,
graceful degradation on malformed config.

Closes #317
2026-04-13 20:19:37 -04:00
f1626a932c feat: add /debug command handler with paste service upload (#320)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m1s
2026-04-13 22:48:33 +00:00
d68ab4cff4 feat: add /debug slash command to command registry (#320) 2026-04-13 22:47:51 +00:00
87867f3d10 fix: config drift check uses YAML parse not grep (#377)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 59s
2026-04-13 22:12:56 +00:00
7 changed files with 1151 additions and 2 deletions

View File

@@ -1,10 +1,11 @@
"""Helpers for optional cheap-vs-strong model routing."""
"""Helpers for optional cheap-vs-strong and time-aware model routing."""
from __future__ import annotations
import os
import re
from typing import Any, Dict, Optional
from datetime import datetime
from typing import Any, Dict, List, Optional
from utils import is_truthy_value
@@ -192,3 +193,104 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
tuple(runtime.get("args") or ()),
),
}
# =========================================================================
# Time-aware cron model routing
# =========================================================================
#
# Empirical finding: cron error rate peaks at 18:00 (9.4%) vs 4.0% at 09:00.
# During high-error windows, route cron jobs to more capable models.
#
# Config (config.yaml):
# cron_model_routing:
# enabled: true
# fallback_model: "anthropic/claude-sonnet-4"
# fallback_provider: "openrouter"
# windows:
# - start_hour: 17
# end_hour: 22
# reason: "evening_error_peak"
# - start_hour: 2
# end_hour: 5
# reason: "overnight_api_instability"
# =========================================================================
def _hour_in_window(hour: int, start: int, end: int) -> bool:
"""Check if hour falls in [start, end) window, handling midnight wrap."""
if start <= end:
return start <= hour < end
else:
# Wraps midnight: e.g., 22-06
return hour >= start or hour < end
def resolve_cron_model(
base_model: str,
routing_config: Optional[Dict[str, Any]],
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Apply time-aware model override for cron jobs.
During configured high-error windows, returns a stronger model config.
Outside windows, returns the base model unchanged.
Args:
base_model: The model string already resolved (from job/config/env).
routing_config: The cron_model_routing dict from config.yaml.
now: Override current time (for testing). Defaults to datetime.now().
Returns:
Dict with keys: model, provider, overridden, reason.
- model: the effective model string to use
- provider: provider override (empty string = use default)
- overridden: True if time-based override was applied
- reason: why override was applied (empty string if not)
"""
cfg = routing_config or {}
if not _coerce_bool(cfg.get("enabled"), False):
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
windows = cfg.get("windows") or []
if not isinstance(windows, list) or not windows:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
current = now or datetime.now()
current_hour = current.hour
matched_window = None
for window in windows:
if not isinstance(window, dict):
continue
start = _coerce_int(window.get("start_hour"), -1)
end = _coerce_int(window.get("end_hour"), -1)
if start < 0 or end < 0:
continue
if _hour_in_window(current_hour, start, end):
matched_window = window
break
if not matched_window:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
# Window matched — use the override model from window or global fallback
override_model = str(matched_window.get("model") or "").strip()
override_provider = str(matched_window.get("provider") or "").strip()
if not override_model:
override_model = str(cfg.get("fallback_model") or "").strip()
if not override_provider:
override_provider = str(cfg.get("fallback_provider") or "").strip()
if not override_model:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
reason = str(matched_window.get("reason") or "time_window").strip()
return {
"model": override_model,
"provider": override_provider,
"overridden": True,
"reason": f"cron_routing:{reason}(hour={current_hour})",
}

192
cli.py
View File

@@ -3134,6 +3134,196 @@ class HermesCLI:
print(f" Home: {display}")
print()
def _handle_debug_command(self, command: str):
"""Generate a debug report with system info and logs, upload to paste service."""
import platform
import sys
import time as _time
# Parse optional lines argument
parts = command.split(maxsplit=1)
log_lines = 50
if len(parts) > 1:
try:
log_lines = min(int(parts[1]), 500)
except ValueError:
pass
_cprint(" Collecting debug info...")
# Collect system info
lines = []
lines.append("=== HERMES DEBUG REPORT ===")
lines.append(f"Generated: {_time.strftime('%Y-%m-%d %H:%M:%S %z')}")
lines.append("")
lines.append("--- System ---")
lines.append(f"Python: {sys.version}")
lines.append(f"Platform: {platform.platform()}")
lines.append(f"Architecture: {platform.machine()}")
lines.append(f"Hostname: {platform.node()}")
lines.append("")
# Hermes info
lines.append("--- Hermes ---")
try:
from hermes_constants import get_hermes_home, display_hermes_home
lines.append(f"Home: {display_hermes_home()}")
except Exception:
lines.append("Home: unknown")
try:
from hermes_constants import __version__
lines.append(f"Version: {__version__}")
except Exception:
lines.append("Version: unknown")
lines.append(f"Profile: {getattr(self, '_profile_name', 'default')}")
lines.append(f"Session: {self.session_id}")
lines.append(f"Model: {self.model}")
lines.append(f"Provider: {getattr(self, '_provider_name', 'unknown')}")
try:
lines.append(f"Working dir: {os.getcwd()}")
except Exception:
pass
# Config (redacted)
lines.append("")
lines.append("--- Config (redacted) ---")
try:
from hermes_constants import get_hermes_home
config_path = get_hermes_home() / "config.yaml"
if config_path.exists():
import yaml
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
# Redact secrets
for key in ("api_key", "token", "secret", "password"):
if key in cfg:
cfg[key] = "***REDACTED***"
lines.append(yaml.dump(cfg, default_flow_style=False)[:2000])
else:
lines.append("(no config file found)")
except Exception as e:
lines.append(f"(error reading config: {e})")
# Recent logs
lines.append("")
lines.append(f"--- Recent Logs (last {log_lines} lines) ---")
try:
from hermes_constants import get_hermes_home
log_dir = get_hermes_home() / "logs"
if log_dir.exists():
for log_file in sorted(log_dir.glob("*.log")):
try:
content = log_file.read_text(encoding="utf-8", errors="replace")
tail = content.strip().split("\n")[-log_lines:]
if tail:
lines.append(f"\n[{log_file.name}]")
lines.extend(tail)
except Exception:
pass
else:
lines.append("(no logs directory)")
except Exception:
lines.append("(error reading logs)")
# Tool info
lines.append("")
lines.append("--- Enabled Toolsets ---")
try:
lines.append(", ".join(self.enabled_toolsets) if self.enabled_toolsets else "(none)")
except Exception:
lines.append("(unknown)")
report = "\n".join(lines)
report_size = len(report)
# Try to upload to paste services
paste_url = None
services = [
("dpaste", _upload_dpaste),
("0x0.st", _upload_0x0st),
]
for name, uploader in services:
try:
url = uploader(report)
if url:
paste_url = url
break
except Exception:
continue
print()
if paste_url:
_cprint(f" Debug report uploaded: {paste_url}")
_cprint(f" Size: {report_size} bytes, {len(lines)} lines")
else:
# Fallback: save locally
try:
from hermes_constants import get_hermes_home
debug_path = get_hermes_home() / "debug-report.txt"
debug_path.write_text(report, encoding="utf-8")
_cprint(f" Paste services unavailable. Report saved to: {debug_path}")
_cprint(f" Size: {report_size} bytes, {len(lines)} lines")
except Exception as e:
_cprint(f" Failed to save report: {e}")
_cprint(f" Report ({report_size} bytes):")
print(report)
print()
def _upload_dpaste(content: str) -> str | None:
"""Upload content to dpaste.org. Returns URL or None."""
import urllib.request
import urllib.parse
data = urllib.parse.urlencode({
"content": content,
"syntax": "text",
"expiry_days": 7,
}).encode()
req = urllib.request.Request(
"https://dpaste.org/api/",
data=data,
headers={"User-Agent": "hermes-agent/debug"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
url = resp.read().decode().strip()
if url.startswith("http"):
return url
return None
def _upload_0x0st(content: str) -> str | None:
"""Upload content to 0x0.st. Returns URL or None."""
import urllib.request
import io
# 0x0.st expects multipart form with a file field
boundary = "----HermesDebugBoundary"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="debug.txt"\r\n'
f"Content-Type: text/plain\r\n\r\n"
f"{content}\r\n"
f"--{boundary}--\r\n"
).encode()
req = urllib.request.Request(
"https://0x0.st",
data=body,
headers={
"Content-Type": f"multipart/form-data; boundary={boundary}",
"User-Agent": "hermes-agent/debug",
},
)
with urllib.request.urlopen(req, timeout=10) as resp:
url = resp.read().decode().strip()
if url.startswith("http"):
return url
return None
def show_config(self):
"""Display current configuration with kawaii ASCII art."""
# Get terminal config from environment (which was set from cli-config.yaml)
@@ -4321,6 +4511,8 @@ class HermesCLI:
self.show_help()
elif canonical == "profile":
self._handle_profile_command()
elif canonical == "debug":
self._handle_debug_command(cmd_original)
elif canonical == "tools":
self._handle_tools_command(cmd_original)
elif canonical == "toolsets":

View File

@@ -186,7 +186,14 @@ _SCRIPT_FAILURE_PHRASES = (
"unable to execute",
"permission denied",
"no such file",
"no such file or directory",
"command not found",
"hermes binary not found",
"hermes not found",
"traceback",
"ssh: connect to host",
"connection timed out",
"host key verification failed",
)
@@ -718,6 +725,22 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Reasoning config from env or config.yaml
from hermes_constants import parse_reasoning_effort
# Time-aware cron model routing — override model during high-error windows
try:
from agent.smart_model_routing import resolve_cron_model
_cron_routing_cfg = (_cfg.get("cron_model_routing") or {})
_cron_route = resolve_cron_model(model, _cron_routing_cfg)
if _cron_route["overridden"]:
_original_model = model
model = _cron_route["model"]
logger.info(
"Job '%s': cron model override %s -> %s (%s)",
job_id, _original_model, model, _cron_route["reason"],
)
except Exception as _e:
logger.debug("Job '%s': cron model routing skipped: %s", job_id, _e)
effort = os.getenv("HERMES_REASONING_EFFORT", "")
if not effort:
effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()

319
cron/ssh_dispatch.py Normal file
View File

@@ -0,0 +1,319 @@
"""SSH dispatch utilities for VPS agent operations.
Provides validated SSH execution with proper failure detection.
Used by cron jobs that dispatch work to remote VPS agents.
Key classes:
SSHEnvironment: Executes commands on remote hosts with validation
DispatchResult: Structured result with success/failure status
"""
from __future__ import annotations
import json
import logging
import os
import subprocess
import time
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Default timeout for SSH commands (seconds)
_SSH_TIMEOUT = int(os.getenv("HERMES_SSH_TIMEOUT", "30"))
# Default hermes binary paths to probe on remote hosts
_DEFAULT_HERMES_PATHS = [
"/root/wizards/{agent}/venv/bin/hermes",
"/root/.local/bin/hermes",
"/usr/local/bin/hermes",
"~/.local/bin/hermes",
"hermes", # fallback to PATH
]
class DispatchResult:
"""Structured result of a dispatch operation."""
__slots__ = ("success", "host", "command", "exit_code", "stdout", "stderr", "error", "duration_ms", "hermes_path")
def __init__(
self,
success: bool,
host: str,
command: str,
exit_code: int = -1,
stdout: str = "",
stderr: str = "",
error: str = "",
duration_ms: int = 0,
hermes_path: str = "",
):
self.success = success
self.host = host
self.command = command
self.exit_code = exit_code
self.stdout = stdout
self.stderr = stderr
self.error = error
self.duration_ms = duration_ms
self.hermes_path = hermes_path
def to_dict(self) -> dict:
return {
"success": self.success,
"host": self.host,
"exit_code": self.exit_code,
"error": self.error,
"duration_ms": self.duration_ms,
"hermes_path": self.hermes_path,
"stderr_tail": self.stderr[-200:] if self.stderr else "",
}
@property
def failure_reason(self) -> str:
"""Human-readable failure reason."""
if self.success:
return ""
if self.error:
return self.error
if "No such file" in self.stderr or "command not found" in self.stderr:
return f"Hermes binary not found on {self.host}"
if self.exit_code != 0:
return f"Remote command exited {self.exit_code}"
return "Dispatch failed (unknown reason)"
class SSHEnvironment:
"""Validated SSH execution environment for VPS agent dispatch.
Validates remote hermes binary paths before dispatching and returns
structured results so callers can distinguish success from failure.
Usage:
ssh = SSHEnvironment(host="root@ezra", agent="allegro")
result = ssh.dispatch("--help")
if not result.success:
logger.error("Dispatch failed: %s", result.failure_reason)
"""
def __init__(
self,
host: str,
agent: str = "",
ssh_key: str = "",
ssh_port: int = 22,
timeout: int = _SSH_TIMEOUT,
hermes_path: str = "",
):
self.host = host
self.agent = agent
self.ssh_key = ssh_key
self.ssh_port = ssh_port
self.timeout = timeout
self.hermes_path = hermes_path
self._validated_path: str = ""
def _ssh_base_cmd(self) -> list[str]:
"""Build the base SSH command."""
cmd = ["ssh", "-o", "StrictHostKeyChecking=accept-new"]
cmd.extend(["-o", "ConnectTimeout=10"])
cmd.extend(["-o", "BatchMode=yes"]) # fail fast on auth issues
if self.ssh_key:
cmd.extend(["-i", self.ssh_key])
if self.ssh_port != 22:
cmd.extend(["-p", str(self.ssh_port)])
cmd.append(self.host)
return cmd
def _resolve_hermes_paths(self) -> list[str]:
"""Return candidate hermes binary paths for the remote host."""
if self.hermes_path:
return [self.hermes_path]
paths = []
for tmpl in _DEFAULT_HERMES_PATHS:
path = tmpl.format(agent=self.agent) if "{agent}" in tmpl else tmpl
paths.append(path)
return paths
def validate_remote_hermes_path(self) -> str:
"""Probe the remote host for a working hermes binary.
Returns the validated path on success, raises RuntimeError on failure.
Caches the result so validation is only done once per instance.
"""
if self._validated_path:
return self._validated_path
candidates = self._resolve_hermes_paths()
for path in candidates:
# Test: does the binary exist and is it executable?
test_cmd = f"test -x {path} && echo OK || echo MISSING"
try:
result = subprocess.run(
self._ssh_base_cmd() + [test_cmd],
capture_output=True, text=True, timeout=self.timeout,
)
if result.returncode == 0 and "OK" in (result.stdout or ""):
logger.info(
"SSH %s: hermes binary validated at %s", self.host, path,
)
self._validated_path = path
return path
except subprocess.TimeoutExpired:
logger.warning("SSH %s: timeout probing %s", self.host, path)
continue
except Exception as exc:
logger.debug("SSH %s: probe %s failed: %s", self.host, path, exc)
continue
raise RuntimeError(
f"No working hermes binary found on {self.host}. "
f"Checked: {', '.join(candidates)}. "
f"Install hermes or set HERMES_PATH on the remote host."
)
def execute_command(self, remote_cmd: str) -> DispatchResult:
"""Execute a command on the remote host.
Returns a DispatchResult with success/failure status.
"""
t0 = time.monotonic()
full_cmd = self._ssh_base_cmd() + [remote_cmd]
try:
result = subprocess.run(
full_cmd,
capture_output=True, text=True, timeout=self.timeout,
)
elapsed = int((time.monotonic() - t0) * 1000)
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
if result.returncode != 0:
return DispatchResult(
success=False,
host=self.host,
command=remote_cmd,
exit_code=result.returncode,
stdout=stdout,
stderr=stderr,
error=stderr.split("\n")[0] if stderr else f"exit code {result.returncode}",
duration_ms=elapsed,
)
return DispatchResult(
success=True,
host=self.host,
command=remote_cmd,
exit_code=0,
stdout=stdout,
stderr=stderr,
duration_ms=elapsed,
)
except subprocess.TimeoutExpired:
elapsed = int((time.monotonic() - t0) * 1000)
return DispatchResult(
success=False,
host=self.host,
command=remote_cmd,
error=f"SSH timed out after {self.timeout}s",
duration_ms=elapsed,
)
except Exception as exc:
elapsed = int((time.monotonic() - t0) * 1000)
return DispatchResult(
success=False,
host=self.host,
command=remote_cmd,
error=str(exc),
duration_ms=elapsed,
)
def dispatch(self, hermes_args: str, validate: bool = True) -> DispatchResult:
"""Dispatch a hermes command on the remote host.
Args:
hermes_args: Arguments to pass to hermes (e.g. "cron tick").
validate: If True, validate the hermes binary exists first.
Returns a DispatchResult. Only success=True if the command actually ran.
"""
if validate:
try:
hermes_path = self.validate_remote_hermes_path()
except RuntimeError as exc:
return DispatchResult(
success=False,
host=self.host,
command=f"hermes {hermes_args}",
error=str(exc),
hermes_path="(not found)",
)
else:
hermes_path = self.hermes_path or "hermes"
remote_cmd = f"{hermes_path} {hermes_args}"
result = self.execute_command(remote_cmd)
result.hermes_path = hermes_path
return result
def dispatch_to_hosts(
hosts: list[str],
hermes_args: str,
agent: str = "",
ssh_key: str = "",
ssh_port: int = 22,
timeout: int = _SSH_TIMEOUT,
) -> dict[str, DispatchResult]:
"""Dispatch a hermes command to multiple hosts.
Returns a dict mapping host -> DispatchResult.
Failed dispatches have success=False in the result.
"""
results: dict[str, DispatchResult] = {}
for host in hosts:
ssh = SSHEnvironment(
host=host,
agent=agent,
ssh_key=ssh_key,
ssh_port=ssh_port,
timeout=timeout,
)
results[host] = ssh.dispatch(hermes_args)
logger.info(
"Dispatch %s: %s",
host,
"OK" if results[host].success else results[host].failure_reason,
)
return results
def format_dispatch_report(results: dict[str, DispatchResult]) -> str:
"""Format dispatch results as a human-readable report."""
lines = []
ok = [r for r in results.values() if r.success]
failed = [r for r in results.values() if not r.success]
lines.append(f"Dispatch report: {len(ok)} OK, {len(failed)} failed")
lines.append("")
for host, result in results.items():
status = "OK" if result.success else "FAILED"
line = f" {host}: {status}"
if not result.success:
line += f"{result.failure_reason}"
if result.duration_ms:
line += f" ({result.duration_ms}ms)"
lines.append(line)
if failed:
lines.append("")
lines.append("Failed dispatches:")
for host, result in results.items():
if not result.success:
lines.append(f" {host}: {result.failure_reason}")
if result.stderr:
lines.append(f" stderr: {result.stderr[-150:]}")
return "\n".join(lines)

286
model-watchdog.py Normal file
View File

@@ -0,0 +1,286 @@
#!/usr/bin/env python3
"""
Model Watchdog — monitors tmux panes for model drift.
Checks all hermes TUI sessions in dev and timmy tmux sessions.
If any pane is running a non-mimo model, kills and restarts it.
Usage: python3 ~/.hermes/bin/model-watchdog.py [--fix]
--fix Actually restart drifted panes (default: dry-run)
"""
import subprocess
import sys
import re
import time
import os
ALLOWED_MODEL = "mimo-v2-pro"
# Profile -> expected model. If a pane is running this profile with this model, it's healthy.
# Profiles not in this map are checked against ALLOWED_MODEL.
PROFILE_MODELS = {
"default": "mimo-v2-pro",
"timmy-sprint": "mimo-v2-pro",
"fenrir": "mimo-v2-pro",
"bezalel": "gpt-5.4",
"burn": "mimo-v2-pro",
"creative": "claude-sonnet",
"research": "claude-sonnet",
"review": "claude-sonnet",
}
TMUX_SESSIONS = ["dev", "timmy"]
LOG_FILE = os.path.expanduser("~/.hermes/logs/model-watchdog.log")
def log(msg):
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
ts = time.strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def run(cmd):
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=10)
return r.stdout.strip(), r.returncode
def get_panes(session):
"""Get all pane info from ALL windows in a tmux session."""
# First get all windows
win_out, win_rc = run(f"tmux list-windows -t {session} -F '#{{window_name}}' 2>/dev/null")
if win_rc != 0:
return []
panes = []
for window_name in win_out.split("\n"):
if not window_name.strip():
continue
target = f"{session}:{window_name}"
out, rc = run(f"tmux list-panes -t {target} -F '#{{pane_index}}|#{{pane_pid}}|#{{pane_tty}}' 2>/dev/null")
if rc != 0:
continue
for line in out.split("\n"):
if "|" in line:
idx, pid, tty = line.split("|")
panes.append({
"session": session,
"window": window_name,
"index": int(idx),
"pid": int(pid),
"tty": tty,
})
return panes
def get_hermes_pid_for_tty(tty):
"""Find hermes process running on a specific TTY."""
out, _ = run(f"ps aux | grep '{tty}' | grep '[h]ermes' | grep -v 'gateway' | grep -v 'node' | awk '{{print $2}}'")
if out:
return int(out.split("\n")[0])
return None
def get_model_from_pane(session, pane_idx, window=None):
"""Capture the pane and extract the model from the status bar."""
target = f"{session}:{window}.{pane_idx}" if window else f"{session}.{pane_idx}"
out, _ = run(f"tmux capture-pane -t {target} -p 2>/dev/null | tail -30")
# Look for model in status bar: ⚕ model-name │
matches = re.findall(r'\s+(\S+)\s+│', out)
if matches:
return matches[0]
return None
def check_session_meta(session_id):
"""Check what model a hermes session was last using from its session file."""
import json
session_file = os.path.expanduser(f"~/.hermes/sessions/session_{session_id}.json")
if os.path.exists(session_file):
try:
with open(session_file) as f:
data = json.load(f)
return data.get("model"), data.get("provider")
except:
pass
# Try jsonl
jsonl_file = os.path.expanduser(f"~/.hermes/sessions/{session_id}.jsonl")
if os.path.exists(jsonl_file):
try:
with open(jsonl_file) as f:
for line in f:
d = json.loads(line.strip())
if d.get("role") == "session_meta":
return d.get("model"), d.get("provider")
break
except:
pass
return None, None
def is_drifted(model_name, profile=None):
"""Check if a model name indicates drift from the expected model for this profile."""
if model_name is None:
return False, "no-model-detected"
# If we know the profile, check against its expected model
if profile and profile in PROFILE_MODELS:
expected = PROFILE_MODELS[profile]
if expected in model_name:
return False, model_name
return True, model_name
# No profile known — fall back to ALLOWED_MODEL
if ALLOWED_MODEL in model_name:
return False, model_name
return True, model_name
def get_profile_from_pane(tty):
"""Detect which hermes profile a pane is running by inspecting its process args."""
# ps shows short TTY (s031) not full path (/dev/ttys031)
short_tty = tty.replace("/dev/ttys", "s").replace("/dev/ttys", "")
out, _ = run(f"ps aux | grep '{short_tty}' | grep '[h]ermes' | grep -v 'gateway' | grep -v 'node' | grep -v cron")
if not out:
return None
# Look for -p <profile> in the command line
match = re.search(r'-p\s+(\S+)', out)
if match:
return match.group(1)
return None
def kill_and_restart(session, pane_idx, window=None):
"""Kill the hermes process in a pane and restart it with the same profile."""
target = f"{session}:{window}.{pane_idx}" if window else f"{session}.{pane_idx}"
# Get the pane's TTY
out, _ = run(f"tmux list-panes -t {target} -F '#{{pane_tty}}'")
tty = out.strip()
# Detect which profile was running
profile = get_profile_from_pane(tty)
# Find and kill hermes on that TTY
hermes_pid = get_hermes_pid_for_tty(tty)
if hermes_pid:
log(f"Killing hermes PID {hermes_pid} on {target} (tty={tty}, profile={profile})")
run(f"kill {hermes_pid}")
time.sleep(2)
# Send Ctrl+C to clear any state
run(f"tmux send-keys -t {target} C-c")
time.sleep(1)
# Restart hermes with the same profile
if profile:
cmd = f"hermes -p {profile} chat"
else:
cmd = "hermes chat"
run(f"tmux send-keys -t {target} '{cmd}' Enter")
log(f"Restarted hermes in {target} with: {cmd}")
# Wait and verify
time.sleep(8)
new_model = get_model_from_pane(session, pane_idx, window)
if new_model and ALLOWED_MODEL in new_model:
log(f"{target} now on {new_model}")
return True
else:
log(f"{target} model after restart: {new_model}")
return False
def verify_expected_model(provider_yaml, expected):
"""Compare actual provider in a YAML config against expected value."""
return provider_yaml.strip() == expected.strip()
def check_config_drift():
"""Scan all relevant config.yaml files for provider drift. Does NOT modify anything.
Returns list of drift issues found."""
issues = []
CONFIGS = {
"main_config": (os.path.expanduser("~/.hermes/config.yaml"), "nous"),
"fenrir": (os.path.expanduser("~/.hermes/profiles/fenrir/config.yaml"), "nous"),
"timmy_sprint": (os.path.expanduser("~/.hermes/profiles/timmy-sprint/config.yaml"), "nous"),
"default_profile": (os.path.expanduser("~/.hermes/profiles/default/config.yaml"), "nous"),
}
for name, (path, expected_provider) in CONFIGS.items():
if not os.path.exists(path):
continue
try:
with open(path, "r") as f:
content = f.read()
# Parse YAML to correctly read model.provider (not the first provider: line)
try:
import yaml
cfg = yaml.safe_load(content) or {}
except ImportError:
# Fallback: find provider under model: block via indentation-aware scan
cfg = {}
in_model = False
for line in content.split("\n"):
stripped = line.strip()
indent = len(line) - len(line.lstrip())
if stripped.startswith("model:") and indent == 0:
in_model = True
continue
if in_model and indent == 0 and stripped:
in_model = False
if in_model and stripped.startswith("provider:"):
cfg = {"model": {"provider": stripped.split(":", 1)[1].strip()}}
break
actual = (cfg.get("model") or {}).get("provider", "")
if actual and expected_provider and actual != expected_provider:
issues.append(f"CONFIG DRIFT [{name}]: provider is '{actual}' (expected '{expected_provider}')")
except Exception as e:
issues.append(f"CONFIG CHECK ERROR [{name}]: {e}")
return issues
def main():
fix_mode = "--fix" in sys.argv
drift_found = False
issues = []
# Always check config files for provider drift (read-only, never writes)
config_drift_issues = check_config_drift()
if config_drift_issues:
for issue in config_drift_issues:
log(f"CONFIG DRIFT: {issue}")
for session in TMUX_SESSIONS:
panes = get_panes(session)
for pane in panes:
window = pane.get("window")
target = f"{session}:{window}.{pane['index']}" if window else f"{session}.{pane['index']}"
# Detect profile from running process
out, _ = run(f"tmux list-panes -t {target} -F '#{{pane_tty}}'")
tty = out.strip()
profile = get_profile_from_pane(tty)
model = get_model_from_pane(session, pane["index"], window)
drifted, model_name = is_drifted(model, profile)
if drifted:
drift_found = True
issues.append(f"{target}: {model_name} (profile={profile})")
log(f"DRIFT DETECTED: {target} is on '{model_name}' (profile={profile}, expected='{PROFILE_MODELS.get(profile, ALLOWED_MODEL)}')")
if fix_mode:
log(f"Auto-fixing {target}...")
success = kill_and_restart(session, pane["index"], window)
if not success:
issues.append(f" ↳ RESTART FAILED for {target}")
if not drift_found:
total = sum(len(get_panes(s)) for s in TMUX_SESSIONS)
log(f"All {total} panes healthy (on {ALLOWED_MODEL})")
# Print summary for cron output
if issues or config_drift_issues:
print("\n=== MODEL DRIFT REPORT ===")
for issue in issues:
print(f" [PANE] {issue}")
if config_drift_issues:
for issue in config_drift_issues:
print(f" [CONFIG] {issue}")
if not fix_mode:
print("\nRun with --fix to auto-restart drifted panes.")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,128 @@
"""Tests for time-aware cron model routing — Issue #317."""
import pytest
from datetime import datetime
from agent.smart_model_routing import resolve_cron_model, _hour_in_window
class TestHourInWindow:
"""Hour-in-window detection including midnight wrap."""
def test_normal_window(self):
assert _hour_in_window(18, 17, 22) is True
assert _hour_in_window(16, 17, 22) is False
assert _hour_in_window(22, 17, 22) is False
def test_midnight_wrap(self):
assert _hour_in_window(23, 22, 6) is True
assert _hour_in_window(3, 22, 6) is True
assert _hour_in_window(10, 22, 6) is False
def test_edge_cases(self):
assert _hour_in_window(0, 0, 24) is True
assert _hour_in_window(23, 0, 24) is True
assert _hour_in_window(0, 22, 6) is True
assert _hour_in_window(5, 22, 6) is True
assert _hour_in_window(6, 22, 6) is False
class TestResolveCronModel:
"""Time-aware model resolution for cron jobs."""
def _config(self, **overrides):
base = {
"enabled": True,
"fallback_model": "anthropic/claude-sonnet-4",
"fallback_provider": "openrouter",
"windows": [
{"start_hour": 17, "end_hour": 22, "reason": "evening_error_peak"},
],
}
base.update(overrides)
return base
def test_disabled_returns_base(self):
result = resolve_cron_model("mimo", {"enabled": False}, now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_no_config_returns_base(self):
result = resolve_cron_model("mimo", None)
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_no_windows_returns_base(self):
result = resolve_cron_model("mimo", {"enabled": True, "windows": []}, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is False
def test_evening_window_overrides(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "anthropic/claude-sonnet-4"
assert result["provider"] == "openrouter"
assert result["overridden"] is True
assert "evening_error_peak" in result["reason"]
assert "hour=18" in result["reason"]
def test_outside_window_keeps_base(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 9, 0))
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_window_boundary_start_inclusive(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 17, 0))
assert result["overridden"] is True
def test_window_boundary_end_exclusive(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 22, 0))
assert result["overridden"] is False
def test_midnight_window(self):
config = self._config(windows=[{"start_hour": 22, "end_hour": 6, "reason": "overnight"}])
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 23, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 13, 3, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 10, 0))["overridden"] is False
def test_per_window_model_override(self):
config = self._config(windows=[{
"start_hour": 17, "end_hour": 22,
"model": "anthropic/claude-opus-4-6", "provider": "anthropic", "reason": "peak",
}])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "anthropic/claude-opus-4-6"
assert result["provider"] == "anthropic"
def test_first_matching_window_wins(self):
config = self._config(windows=[
{"start_hour": 17, "end_hour": 20, "model": "strong-1", "provider": "p1", "reason": "w1"},
{"start_hour": 19, "end_hour": 22, "model": "strong-2", "provider": "p2", "reason": "w2"},
])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 19, 0))
assert result["model"] == "strong-1"
def test_no_fallback_model_keeps_base(self):
config = {"enabled": True, "windows": [{"start_hour": 17, "end_hour": 22, "reason": "test"}]}
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is False
assert result["model"] == "mimo"
def test_malformed_windows_skipped(self):
config = self._config(windows=[
"not-a-dict",
{"start_hour": 17},
{"end_hour": 22},
{"start_hour": "bad", "end_hour": "bad"},
{"start_hour": 17, "end_hour": 22, "reason": "valid"},
])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is True
assert "valid" in result["reason"]
def test_multiple_windows_coverage(self):
config = self._config(windows=[
{"start_hour": 17, "end_hour": 22, "reason": "evening"},
{"start_hour": 2, "end_hour": 5, "reason": "overnight"},
])
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 20, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 13, 3, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 10, 0))["overridden"] is False

View File

@@ -0,0 +1,99 @@
"""Tests for cron SSH dispatch utilities."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
def _import_dispatch():
import importlib.util
spec = importlib.util.spec_from_file_location(
"cron.ssh_dispatch",
str(Path(__file__).resolve().parent.parent / "cron" / "ssh_dispatch.py"),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_mod = _import_dispatch()
DispatchResult = _mod.DispatchResult
SSHEnvironment = _mod.SSHEnvironment
format_dispatch_report = _mod.format_dispatch_report
class TestDispatchResult:
def test_success_result(self):
r = DispatchResult(success=True, host="root@ezra", command="hermes cron tick")
assert r.success
assert r.failure_reason == ""
def test_failure_with_no_such_file(self):
r = DispatchResult(
success=False, host="root@ezra", command="hermes cron tick",
stderr="bash: line 1: /root/wizards/allegro/venv/bin/hermes: No such file or directory",
)
assert not r.success
assert "not found" in r.failure_reason.lower() or "No such file" in r.stderr
def test_failure_with_timeout(self):
r = DispatchResult(
success=False, host="root@ezra", command="hermes cron tick",
error="SSH timed out after 30s",
)
assert not r.success
assert "timed out" in r.failure_reason
def test_to_dict(self):
r = DispatchResult(success=False, host="root@ezra", command="hermes cron tick", exit_code=127, stderr="command not found")
d = r.to_dict()
assert d["success"] is False
assert d["exit_code"] == 127
class TestFormatDispatchReport:
def test_all_ok(self):
results = {
"root@ezra": DispatchResult(success=True, host="root@ezra", command="hermes cron tick", duration_ms=150),
"root@allegro": DispatchResult(success=True, host="root@allegro", command="hermes cron tick", duration_ms=200),
}
report = format_dispatch_report(results)
assert "2 OK, 0 failed" in report
assert "root@ezra: OK" in report
def test_mixed_results(self):
results = {
"root@ezra": DispatchResult(success=True, host="root@ezra", command="hermes cron tick"),
"root@allegro": DispatchResult(
success=False, host="root@allegro", command="hermes cron tick",
error="Hermes binary not found on root@allegro",
),
}
report = format_dispatch_report(results)
assert "1 OK, 1 failed" in report
assert "root@allegro: FAILED" in report
assert "Hermes binary not found" in report
class TestSSHEnvironment:
def test_resolve_hermes_paths_custom(self):
ssh = SSHEnvironment(host="root@ezra", agent="allegro", hermes_path="/opt/hermes/bin/hermes")
paths = ssh._resolve_hermes_paths()
assert paths == ["/opt/hermes/bin/hermes"]
def test_resolve_hermes_paths_default(self):
ssh = SSHEnvironment(host="root@ezra", agent="allegro")
paths = ssh._resolve_hermes_paths()
assert len(paths) > 1
assert any("allegro" in p for p in paths) # agent name interpolated
def test_ssh_base_cmd(self):
ssh = SSHEnvironment(host="root@ezra", ssh_key="/root/.ssh/id_rsa", ssh_port=2222)
cmd = ssh._ssh_base_cmd()
assert "ssh" in cmd[0]
assert "root@ezra" in cmd
assert "-i" in cmd
assert "/root/.ssh/id_rsa" in cmd
assert "-p" in cmd
assert "2222" in cmd