Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
412ee7329a fix(cron): runtime-aware prompts + provider mismatch detection (#372)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m7s
After provider migration (Ollama -> Nous/mimo-v2-pro), cron jobs with
provider-specific prompts ran on the wrong provider without knowing it.
Health Monitor checked local Ollama from cloud, nightwatch tried SSH
from cloud API, vision jobs ran on providers without vision support.

Changes to cron/scheduler.py:

1. _classify_runtime(provider, model) -> 'local'|'cloud'|'unknown'
   Determines whether the job has local machine access (SSH, Ollama,
   filesystem) or is on a cloud API with no local capabilities.

2. _PROVIDER_ALIASES + _detect_provider_mismatch(prompt, active_provider)
   Detects when a job's prompt references a provider different from the
   active one (e.g. 'ollama' in prompt when running on 'nous'). Logs
   a warning so operators know which prompts need updating.

3. _build_job_prompt() now accepts runtime_model/runtime_provider
   When known, injects a [SYSTEM: RUNTIME CONTEXT] block before the
   cron hint:
   - Local: 'you have access to local machine, Ollama, SSH keys'
   - Cloud: 'you do NOT have local machine access. Do NOT SSH, etc.'

4. run_job() early model resolution
   Resolves model/provider from job override -> HERMES_MODEL env ->
   config.yaml model.default, derives provider from model prefix.
   Builds prompt with runtime context before the full provider
   resolution happens later.

5. Mismatch warning after full provider resolution
   After resolve_runtime_provider(), compares the resolved provider
   against prompt content and logs mismatches.

Supersedes #403 (early resolution only) and #427 (mismatch detection
only). Combines both approaches with local/cloud capability awareness.

Closes #372
2026-04-13 20:25:51 -04:00
21 changed files with 333 additions and 2601 deletions

View File

@@ -1,11 +1,10 @@
"""Helpers for optional cheap-vs-strong and time-aware model routing."""
"""Helpers for optional cheap-vs-strong model routing."""
from __future__ import annotations
import os
import re
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from utils import is_truthy_value
@@ -193,104 +192,3 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
tuple(runtime.get("args") or ()),
),
}
# =========================================================================
# Time-aware cron model routing
# =========================================================================
#
# Empirical finding: cron error rate peaks at 18:00 (9.4%) vs 4.0% at 09:00.
# During high-error windows, route cron jobs to more capable models.
#
# Config (config.yaml):
# cron_model_routing:
# enabled: true
# fallback_model: "anthropic/claude-sonnet-4"
# fallback_provider: "openrouter"
# windows:
# - start_hour: 17
# end_hour: 22
# reason: "evening_error_peak"
# - start_hour: 2
# end_hour: 5
# reason: "overnight_api_instability"
# =========================================================================
def _hour_in_window(hour: int, start: int, end: int) -> bool:
"""Check if hour falls in [start, end) window, handling midnight wrap."""
if start <= end:
return start <= hour < end
else:
# Wraps midnight: e.g., 22-06
return hour >= start or hour < end
def resolve_cron_model(
base_model: str,
routing_config: Optional[Dict[str, Any]],
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Apply time-aware model override for cron jobs.
During configured high-error windows, returns a stronger model config.
Outside windows, returns the base model unchanged.
Args:
base_model: The model string already resolved (from job/config/env).
routing_config: The cron_model_routing dict from config.yaml.
now: Override current time (for testing). Defaults to datetime.now().
Returns:
Dict with keys: model, provider, overridden, reason.
- model: the effective model string to use
- provider: provider override (empty string = use default)
- overridden: True if time-based override was applied
- reason: why override was applied (empty string if not)
"""
cfg = routing_config or {}
if not _coerce_bool(cfg.get("enabled"), False):
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
windows = cfg.get("windows") or []
if not isinstance(windows, list) or not windows:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
current = now or datetime.now()
current_hour = current.hour
matched_window = None
for window in windows:
if not isinstance(window, dict):
continue
start = _coerce_int(window.get("start_hour"), -1)
end = _coerce_int(window.get("end_hour"), -1)
if start < 0 or end < 0:
continue
if _hour_in_window(current_hour, start, end):
matched_window = window
break
if not matched_window:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
# Window matched — use the override model from window or global fallback
override_model = str(matched_window.get("model") or "").strip()
override_provider = str(matched_window.get("provider") or "").strip()
if not override_model:
override_model = str(cfg.get("fallback_model") or "").strip()
if not override_provider:
override_provider = str(cfg.get("fallback_provider") or "").strip()
if not override_model:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
reason = str(matched_window.get("reason") or "time_window").strip()
return {
"model": override_model,
"provider": override_provider,
"overridden": True,
"reason": f"cron_routing:{reason}(hour={current_hour})",
}

192
cli.py
View File

@@ -3134,196 +3134,6 @@ class HermesCLI:
print(f" Home: {display}")
print()
def _handle_debug_command(self, command: str):
"""Generate a debug report with system info and logs, upload to paste service."""
import platform
import sys
import time as _time
# Parse optional lines argument
parts = command.split(maxsplit=1)
log_lines = 50
if len(parts) > 1:
try:
log_lines = min(int(parts[1]), 500)
except ValueError:
pass
_cprint(" Collecting debug info...")
# Collect system info
lines = []
lines.append("=== HERMES DEBUG REPORT ===")
lines.append(f"Generated: {_time.strftime('%Y-%m-%d %H:%M:%S %z')}")
lines.append("")
lines.append("--- System ---")
lines.append(f"Python: {sys.version}")
lines.append(f"Platform: {platform.platform()}")
lines.append(f"Architecture: {platform.machine()}")
lines.append(f"Hostname: {platform.node()}")
lines.append("")
# Hermes info
lines.append("--- Hermes ---")
try:
from hermes_constants import get_hermes_home, display_hermes_home
lines.append(f"Home: {display_hermes_home()}")
except Exception:
lines.append("Home: unknown")
try:
from hermes_constants import __version__
lines.append(f"Version: {__version__}")
except Exception:
lines.append("Version: unknown")
lines.append(f"Profile: {getattr(self, '_profile_name', 'default')}")
lines.append(f"Session: {self.session_id}")
lines.append(f"Model: {self.model}")
lines.append(f"Provider: {getattr(self, '_provider_name', 'unknown')}")
try:
lines.append(f"Working dir: {os.getcwd()}")
except Exception:
pass
# Config (redacted)
lines.append("")
lines.append("--- Config (redacted) ---")
try:
from hermes_constants import get_hermes_home
config_path = get_hermes_home() / "config.yaml"
if config_path.exists():
import yaml
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
# Redact secrets
for key in ("api_key", "token", "secret", "password"):
if key in cfg:
cfg[key] = "***REDACTED***"
lines.append(yaml.dump(cfg, default_flow_style=False)[:2000])
else:
lines.append("(no config file found)")
except Exception as e:
lines.append(f"(error reading config: {e})")
# Recent logs
lines.append("")
lines.append(f"--- Recent Logs (last {log_lines} lines) ---")
try:
from hermes_constants import get_hermes_home
log_dir = get_hermes_home() / "logs"
if log_dir.exists():
for log_file in sorted(log_dir.glob("*.log")):
try:
content = log_file.read_text(encoding="utf-8", errors="replace")
tail = content.strip().split("\n")[-log_lines:]
if tail:
lines.append(f"\n[{log_file.name}]")
lines.extend(tail)
except Exception:
pass
else:
lines.append("(no logs directory)")
except Exception:
lines.append("(error reading logs)")
# Tool info
lines.append("")
lines.append("--- Enabled Toolsets ---")
try:
lines.append(", ".join(self.enabled_toolsets) if self.enabled_toolsets else "(none)")
except Exception:
lines.append("(unknown)")
report = "\n".join(lines)
report_size = len(report)
# Try to upload to paste services
paste_url = None
services = [
("dpaste", _upload_dpaste),
("0x0.st", _upload_0x0st),
]
for name, uploader in services:
try:
url = uploader(report)
if url:
paste_url = url
break
except Exception:
continue
print()
if paste_url:
_cprint(f" Debug report uploaded: {paste_url}")
_cprint(f" Size: {report_size} bytes, {len(lines)} lines")
else:
# Fallback: save locally
try:
from hermes_constants import get_hermes_home
debug_path = get_hermes_home() / "debug-report.txt"
debug_path.write_text(report, encoding="utf-8")
_cprint(f" Paste services unavailable. Report saved to: {debug_path}")
_cprint(f" Size: {report_size} bytes, {len(lines)} lines")
except Exception as e:
_cprint(f" Failed to save report: {e}")
_cprint(f" Report ({report_size} bytes):")
print(report)
print()
def _upload_dpaste(content: str) -> str | None:
"""Upload content to dpaste.org. Returns URL or None."""
import urllib.request
import urllib.parse
data = urllib.parse.urlencode({
"content": content,
"syntax": "text",
"expiry_days": 7,
}).encode()
req = urllib.request.Request(
"https://dpaste.org/api/",
data=data,
headers={"User-Agent": "hermes-agent/debug"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
url = resp.read().decode().strip()
if url.startswith("http"):
return url
return None
def _upload_0x0st(content: str) -> str | None:
"""Upload content to 0x0.st. Returns URL or None."""
import urllib.request
import io
# 0x0.st expects multipart form with a file field
boundary = "----HermesDebugBoundary"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="debug.txt"\r\n'
f"Content-Type: text/plain\r\n\r\n"
f"{content}\r\n"
f"--{boundary}--\r\n"
).encode()
req = urllib.request.Request(
"https://0x0.st",
data=body,
headers={
"Content-Type": f"multipart/form-data; boundary={boundary}",
"User-Agent": "hermes-agent/debug",
},
)
with urllib.request.urlopen(req, timeout=10) as resp:
url = resp.read().decode().strip()
if url.startswith("http"):
return url
return None
def show_config(self):
"""Display current configuration with kawaii ASCII art."""
# Get terminal config from environment (which was set from cli-config.yaml)
@@ -4511,8 +4321,6 @@ def _upload_0x0st(content: str) -> str | None:
self.show_help()
elif canonical == "profile":
self._handle_profile_command()
elif canonical == "debug":
self._handle_debug_command(cmd_original)
elif canonical == "tools":
self._handle_tools_command(cmd_original)
elif canonical == "toolsets":

View File

@@ -547,30 +547,20 @@ def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Schedule a job to run on the next scheduler tick.
Clears stale error state when re-triggering a previously-failed job
so the stale failure doesn't persist until the next tick completes.
"""
"""Schedule a job to run on the next scheduler tick."""
job = get_job(job_id)
if not job:
return None
updates = {
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"next_run_at": _hermes_now().isoformat(),
}
# Clear stale error state when re-triggering
if job.get("last_status") == "error":
updates["last_status"] = "retrying"
updates["last_error"] = None
updates["error_cleared_at"] = _hermes_now().isoformat()
return update_job(job_id, updates)
return update_job(
job_id,
{
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"next_run_at": _hermes_now().isoformat(),
},
)
def run_job_now(job_id: str) -> Optional[Dict[str, Any]]:
@@ -628,7 +618,6 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
Updates last_run_at, last_status, increments completed count,
computes next_run_at, and auto-deletes if repeat limit reached.
Tracks health timestamps for error/success history.
"""
jobs = load_jobs()
for i, job in enumerate(jobs):
@@ -638,18 +627,6 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
job["last_status"] = "ok" if success else "error"
job["last_error"] = error if not success else None
# Track health timestamps
if success:
job["last_success_at"] = now
# Clear stale error tracking on success
if job.get("last_error_at"):
job["error_resolved_at"] = now
else:
job["last_error_at"] = now
# Clear resolved tracking on new error
if job.get("error_resolved_at"):
del job["error_resolved_at"]
# Increment completed count
if job.get("repeat"):
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1
@@ -679,32 +656,6 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
save_jobs(jobs)
def clear_job_error(job_id: str) -> Optional[Dict[str, Any]]:
"""
Clear stale error state for a job.
Resets last_status to 'ok', last_error to None, and
records when the error was cleared. Useful after auth
recovery when the job itself is healthy but stale error
state persists.
Returns:
Updated job dict, or None if not found.
"""
jobs = load_jobs()
for job in jobs:
if job["id"] == job_id:
job["last_status"] = "ok"
job["last_error"] = None
job["error_cleared_at"] = _hermes_now().isoformat()
save_jobs(jobs)
return job
save_jobs(jobs)
return None
def advance_next_run(job_id: str) -> bool:
"""Preemptively advance next_run_at for a recurring job before execution.

View File

@@ -37,7 +37,6 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
from hermes_constants import get_hermes_home
from hermes_cli.config import load_config
from hermes_time import now as _hermes_now
from agent.model_metadata import is_local_endpoint
logger = logging.getLogger(__name__)
@@ -545,8 +544,78 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
# ---------------------------------------------------------------------------
# Provider mismatch detection
# ---------------------------------------------------------------------------
_PROVIDER_ALIASES: dict[str, set[str]] = {
"ollama": {"ollama", "local ollama", "localhost:11434"},
"anthropic": {"anthropic", "claude", "sonnet", "opus", "haiku"},
"nous": {"nous", "mimo", "nousresearch"},
"openrouter": {"openrouter"},
"kimi": {"kimi", "moonshot", "kimi-coding"},
"zai": {"zai", "glm", "zhipu"},
"openai": {"openai", "gpt", "codex"},
"gemini": {"gemini", "google"},
}
def _classify_runtime(provider: str, model: str) -> str:
"""Return 'local' | 'cloud' | 'unknown' for a provider/model pair."""
p = (provider or "").strip().lower()
m = (model or "").strip().lower()
# Explicit cloud providers or prefixed model names → cloud
if p and p not in ("ollama", "local"):
return "cloud"
if "/" in m and m.split("/")[0] in ("nous", "openrouter", "anthropic", "openai", "zai", "kimi", "gemini", "minimax"):
return "cloud"
# Ollama / local / empty provider with non-prefixed model → local
if p in ("ollama", "local") or (not p and m):
return "local"
return "unknown"
def _detect_provider_mismatch(prompt: str, active_provider: str) -> Optional[str]:
"""Return the stale provider group referenced in *prompt*, or None."""
if not active_provider or not prompt:
return None
prompt_lower = prompt.lower()
active_lower = active_provider.lower().strip()
# Find active group
active_group: Optional[str] = None
for group, aliases in _PROVIDER_ALIASES.items():
if active_lower in aliases or active_lower.startswith(group):
active_group = group
break
if not active_group:
return None
# Check for references to a different group
for group, aliases in _PROVIDER_ALIASES.items():
if group == active_group:
continue
for alias in aliases:
if alias in prompt_lower:
return group
return None
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
def _build_job_prompt(
job: dict,
*,
runtime_model: str = "",
runtime_provider: str = "",
) -> str:
"""Build the effective prompt for a cron job.
Args:
job: The cron job dict.
runtime_model: Resolved model name (e.g. "xiaomi/mimo-v2-pro").
runtime_provider: Resolved provider name (e.g. "nous", "openrouter").
"""
prompt = job.get("prompt", "")
skills = job.get("skills")
@@ -578,6 +647,36 @@ def _build_job_prompt(job: dict) -> str:
# Always prepend cron execution guidance so the agent knows how
# delivery works and can suppress delivery when appropriate.
#
# Runtime context injection — tells the agent what it can actually do.
# Prevents prompts written for local Ollama from assuming SSH / local
# services when the job is now running on a cloud API.
_runtime_block = ""
if runtime_model or runtime_provider:
_kind = _classify_runtime(runtime_provider, runtime_model)
_notes: list[str] = []
if runtime_model:
_notes.append(f"MODEL: {runtime_model}")
if runtime_provider:
_notes.append(f"PROVIDER: {runtime_provider}")
if _kind == "local":
_notes.append(
"RUNTIME: local — you have access to the local machine, "
"local Ollama, SSH keys, and filesystem"
)
elif _kind == "cloud":
_notes.append(
"RUNTIME: cloud API — you do NOT have local machine access. "
"Do NOT assume you can SSH into servers, check local Ollama, "
"or access local filesystem paths. Use terminal tools only "
"for commands that work from this environment."
)
if _notes:
_runtime_block = (
"[SYSTEM: RUNTIME CONTEXT — "
+ "; ".join(_notes)
+ ". Adjust your approach based on these capabilities.]\\n\\n"
)
cron_hint = (
"[SYSTEM: You are running as a scheduled cron job. "
"DELIVERY: Your final response will be automatically delivered "
@@ -597,7 +696,7 @@ def _build_job_prompt(job: dict) -> str:
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
)
prompt = cron_hint + prompt
prompt = _runtime_block + cron_hint + prompt
if skills is None:
legacy = job.get("skill")
skills = [legacy] if legacy else []
@@ -667,7 +766,36 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
job_id = job["id"]
job_name = job["name"]
prompt = _build_job_prompt(job)
# ── Early model/provider resolution ───────────────────────────────────
# We need the model name before building the prompt so the runtime
# context block can be injected. Full provider resolution happens
# later (smart routing, etc.) but the basic name is enough here.
_early_model = job.get("model") or os.getenv("HERMES_MODEL") or ""
_early_provider = os.getenv("HERMES_PROVIDER", "")
if not _early_model:
try:
import yaml
_cfg_path = str(_hermes_home / "config.yaml")
if os.path.exists(_cfg_path):
with open(_cfg_path) as _f:
_cfg_early = yaml.safe_load(_f) or {}
_mc = _cfg_early.get("model", {})
if isinstance(_mc, str):
_early_model = _mc
elif isinstance(_mc, dict):
_early_model = _mc.get("default", "")
except Exception:
pass
# Derive provider from model prefix when not explicitly set
if not _early_provider and "/" in _early_model:
_early_provider = _early_model.split("/")[0]
prompt = _build_job_prompt(
job,
runtime_model=_early_model,
runtime_provider=_early_provider,
)
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
@@ -718,22 +846,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Reasoning config from env or config.yaml
from hermes_constants import parse_reasoning_effort
# Time-aware cron model routing — override model during high-error windows
try:
from agent.smart_model_routing import resolve_cron_model
_cron_routing_cfg = (_cfg.get("cron_model_routing") or {})
_cron_route = resolve_cron_model(model, _cron_routing_cfg)
if _cron_route["overridden"]:
_original_model = model
model = _cron_route["model"]
logger.info(
"Job '%s': cron model override %s -> %s (%s)",
job_id, _original_model, model, _cron_route["reason"],
)
except Exception as _e:
logger.debug("Job '%s': cron model routing skipped: %s", job_id, _e)
effort = os.getenv("HERMES_REASONING_EFFORT", "")
if not effort:
effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()
@@ -779,6 +891,20 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# ── Provider mismatch warning ─────────────────────────────────
# If the job prompt references a provider different from the one
# we actually resolved, warn so operators know which prompts are stale.
_resolved_provider = runtime.get("provider", "") or ""
_raw_prompt = job.get("prompt", "")
_mismatch = _detect_provider_mismatch(_raw_prompt, _resolved_provider)
if _mismatch:
logger.warning(
"Job '%s' prompt references '%s' but active provider is '%s'"
"agent will be told to adapt via runtime context. "
"Consider updating this job's prompt.",
job_name, _mismatch, _resolved_provider,
)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,
@@ -794,29 +920,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
# Build disabled toolsets — always exclude cronjob/messaging/clarify
# for cron sessions. When the runtime endpoint is cloud (not local),
# also disable terminal so the agent does not attempt SSH or shell
# commands that require local infrastructure (keys, filesystem).
# Jobs that declare requires_local_infra=true also get terminal
# disabled on cloud endpoints regardless of this check. #379
_cron_disabled = ["cronjob", "messaging", "clarify"]
_runtime_base_url = turn_route["runtime"].get("base_url", "")
_is_cloud = not is_local_endpoint(_runtime_base_url)
if _is_cloud:
_cron_disabled.append("terminal")
logger.info(
"Job '%s': cloud provider detected (%s), disabling terminal toolset",
job_name,
turn_route["runtime"].get("provider", "unknown"),
)
if job.get("requires_local_infra") and _is_cloud:
logger.warning(
"Job '%s': requires_local_infra=true but running on cloud provider — "
"terminal-dependent steps will fail gracefully",
job_name,
)
_agent_kwargs = _safe_agent_kwargs({
"model": turn_route["model"],
"api_key": turn_route["runtime"].get("api_key"),
@@ -824,7 +927,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"provider": turn_route["runtime"].get("provider"),
"api_mode": turn_route["runtime"].get("api_mode"),
"acp_command": turn_route["runtime"].get("command"),
"acp_args": list(turn_route["runtime"].get("args") or []),
"acp_args": turn_route["runtime"].get("args"),
"max_iterations": max_iterations,
"reasoning_config": reasoning_config,
"prefill_messages": prefill_messages,
@@ -832,7 +935,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"providers_ignored": pr.get("ignore"),
"providers_order": pr.get("order"),
"provider_sort": pr.get("sort"),
"disabled_toolsets": _cron_disabled,
"disabled_toolsets": ["cronjob", "messaging", "clarify"],
"tool_choice": "required",
"quiet_mode": True,
"skip_memory": True, # Cron system prompts would corrupt user representations

View File

@@ -1,154 +0,0 @@
#!/usr/bin/env python3
"""
deploy-crons — normalize cron job schemas for consistent model field types.
This script ensures that the model field in jobs.json is always a dict when
either model or provider is specified, preventing schema inconsistency.
Usage:
python deploy-crons.py [--dry-run] [--jobs-file PATH]
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Any, Dict, Optional
def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]:
"""
Normalize a job dict to ensure consistent model field types.
Before normalization:
- If model AND provider: model = raw string, provider = raw string (inconsistent)
- If only model: model = raw string
- If only provider: provider = raw string at top level
After normalization:
- If model exists: model = {"model": "xxx"}
- If provider exists: model = {"provider": "yyy"}
- If both exist: model = {"model": "xxx", "provider": "yyy"}
- If neither: model = None
"""
job = dict(job) # Create a copy to avoid modifying the original
model = job.get("model")
provider = job.get("provider")
# Skip if already normalized (model is a dict)
if isinstance(model, dict):
return job
# Build normalized model dict
model_dict = {}
if model is not None and isinstance(model, str):
model_dict["model"] = model.strip()
if provider is not None and isinstance(provider, str):
model_dict["provider"] = provider.strip()
# Set model field
if model_dict:
job["model"] = model_dict
else:
job["model"] = None
# Remove top-level provider field if it was moved into model dict
if provider is not None and "provider" in model_dict:
# Keep provider field for backward compatibility but mark it as deprecated
# This allows existing code that reads job["provider"] to continue working
pass
return job
def normalize_jobs_file(jobs_file: Path, dry_run: bool = False) -> int:
"""
Normalize all jobs in a jobs.json file.
Returns the number of jobs that were modified.
"""
if not jobs_file.exists():
print(f"Error: Jobs file not found: {jobs_file}", file=sys.stderr)
return 1
try:
with open(jobs_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in {jobs_file}: {e}", file=sys.stderr)
return 1
jobs = data.get("jobs", [])
if not jobs:
print("No jobs found in file.")
return 0
modified_count = 0
for i, job in enumerate(jobs):
original_model = job.get("model")
original_provider = job.get("provider")
normalized_job = normalize_job(job)
# Check if anything changed
if (normalized_job.get("model") != original_model or
normalized_job.get("provider") != original_provider):
jobs[i] = normalized_job
modified_count += 1
job_id = job.get("id", "?")
job_name = job.get("name", "(unnamed)")
print(f"Normalized job {job_id} ({job_name}):")
print(f" model: {original_model!r} -> {normalized_job.get('model')!r}")
print(f" provider: {original_provider!r} -> {normalized_job.get('provider')!r}")
if modified_count == 0:
print("All jobs already have consistent model field types.")
return 0
if dry_run:
print(f"DRY RUN: Would normalize {modified_count} jobs.")
return 0
# Write back to file
data["jobs"] = jobs
try:
with open(jobs_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Normalized {modified_count} jobs in {jobs_file}")
return 0
except Exception as e:
print(f"Error writing to {jobs_file}: {e}", file=sys.stderr)
return 1
def main():
parser = argparse.ArgumentParser(
description="Normalize cron job schemas for consistent model field types."
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be changed without modifying the file."
)
parser.add_argument(
"--jobs-file",
type=Path,
default=Path.home() / ".hermes" / "cron" / "jobs.json",
help="Path to jobs.json file (default: ~/.hermes/cron/jobs.json)"
)
args = parser.parse_args()
if args.dry_run:
print("DRY RUN MODE — no changes will be made.")
print()
return normalize_jobs_file(args.jobs_file, args.dry_run)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,170 +0,0 @@
# Honcho Memory Integration Evaluation (#322)
## Executive Summary
**Status:** Integration already implemented and production-ready.
**Recommendation:** KEEP — well-gated, zero overhead when disabled, supports self-hosted.
## Decision: Cloud vs Local
### The Question
"Do we want a cloud-dependent memory layer, or keep everything local?"
### Answer: BOTH — User's Choice
Honcho supports both deployment modes:
| Mode | Configuration | Data Location | Use Case |
|------|--------------|---------------|----------|
| Cloud | `HONCHO_API_KEY` | Honcho servers | Quick start, no infrastructure |
| Self-hosted | `HONCHO_BASE_URL=http://localhost:8000` | Your servers | Full sovereignty |
| Disabled | No config | N/A | Pure local (holographic fact_store only) |
### Why Keep It
1. **Opt-in Architecture**
- No Honcho config → zero overhead (cron guard, lazy init)
- Memory provider system allows switching between providers
- `hermes memory off` disables completely
2. **Zero Runtime Cost When Disabled**
```python
if not cfg.enabled or not (cfg.api_key or cfg.base_url):
return "" # No HTTP calls, no overhead
```
3. **Cross-Session User Modeling**
- Holographic fact_store lacks persistent user modeling
- Honcho provides: peer cards, dialectic Q&A, semantic search
- Complements (not replaces) local memory
4. **Self-Hosted Option**
- Set `HONCHO_BASE_URL=http://localhost:8000`
- Run Honcho server locally via Docker
- Full data sovereignty
5. **Production-Grade Implementation**
- 3 components, ~700 lines of code
- 7 tests passing
- Async prefetch (zero-latency context injection)
- Configurable recall modes (hybrid/context/tools)
- Write frequency control (async/turn/session/N-turns)
## Architecture
### Components (Already Implemented)
```
plugins/memory/honcho/
├── client.py # Config resolution (API key, base_url, profiles)
├── session.py # Session management, async prefetch, dialectic queries
├── __init__.py # MemoryProvider interface, 4 tool schemas
├── cli.py # CLI commands (setup, status, sessions, map, peer, mode)
├── plugin.yaml # Plugin metadata
└── README.md # Documentation
```
### Integration Points
1. **System Prompt**: Context injected on first turn (cached for prompt caching)
2. **Tool Registry**: 4 tools available when `recall_mode != "context"`
3. **Session End**: Messages flushed to Honcho
4. **Cron Guard**: Fully inactive in cron context
### Tools Available
| Tool | Cost | Speed | Purpose |
|------|------|-------|---------|
| `honcho_profile` | Free | Fast | Quick factual snapshot (peer card) |
| `honcho_search` | Free | Fast | Semantic search (raw excerpts) |
| `honcho_context` | Paid | Slow | Dialectic Q&A (synthesized answers) |
| `honcho_conclude` | Free | Fast | Save persistent facts about user |
## Configuration Guide
### Option 1: Cloud (Quick Start)
```bash
# Get API key from https://app.honcho.dev
export HONCHO_API_KEY="your-api-key"
hermes chat
```
### Option 2: Self-Hosted (Full Sovereignty)
```bash
# Run Honcho server locally
docker run -p 8000:8000 honcho/server
# Configure Hermes
export HONCHO_BASE_URL="http://localhost:8000"
hermes chat
```
### Option 3: CLI Setup
```bash
hermes honcho setup
```
### Option 4: Disabled (Pure Local)
```bash
# Don't set any Honcho config
hermes memory off # If previously enabled
hermes chat
```
## Memory Modes
| Mode | Context Injection | Tools | Cost | Use Case |
|------|------------------|-------|------|----------|
| hybrid | Yes | Yes | Medium | Default — auto-inject + on-demand |
| context | Yes | No | Low | Budget mode — auto-inject only |
| tools | No | Yes | Variable | Full control — agent decides |
## Risk Assessment
| Risk | Mitigation | Status |
|------|------------|--------|
| Cloud dependency | Self-hosted option available | ✅ |
| Cost from LLM calls | Recall mode "context" or "tools" reduces calls | ✅ |
| Data privacy | Self-hosted keeps data on your servers | ✅ |
| Performance overhead | Cron guard + lazy init + async prefetch | ✅ |
| Vendor lock-in | MemoryProvider interface allows swapping | ✅ |
## Comparison with Alternatives
| Feature | Honcho | Holographic | Mem0 | Hindsight |
|---------|--------|-------------|------|-----------|
| Cross-session modeling | ✅ | ❌ | ✅ | ✅ |
| Dialectic Q&A | ✅ | ❌ | ❌ | ❌ |
| Self-hosted | ✅ | N/A | ❌ | ❌ |
| Local-only option | ✅ | ✅ | ❌ | ✅ |
| Cost | Free/Paid | Free | Paid | Free |
## Conclusion
**Keep Honcho integration.** It provides unique cross-session user modeling capabilities that complement the local holographic fact_store. The integration is:
- Well-gated (opt-in, zero overhead when disabled)
- Flexible (cloud or self-hosted)
- Production-ready (7 tests passing, async prefetch, configurable)
- Non-exclusive (works alongside other memory providers)
### To Enable
```bash
# Cloud
hermes honcho setup
# Self-hosted
export HONCHO_BASE_URL="http://localhost:8000"
hermes chat
```
### To Disable
```bash
hermes memory off
```
---
*Evaluated by SANDALPHON — Cron/Ops lane*

View File

@@ -412,52 +412,6 @@ class GatewayConfig:
return self.unauthorized_dm_behavior
def _validate_fallback_providers() -> None:
"""Validate fallback_providers from config.yaml at gateway startup.
Checks that each entry has 'provider' and 'model' fields and logs
warnings for malformed entries. This catches broken fallback chains
before they silently degrade into no-fallback mode.
"""
try:
_home = get_hermes_home()
_config_path = _home / "config.yaml"
if not _config_path.exists():
return
import yaml
with open(_config_path, encoding="utf-8") as _f:
_cfg = yaml.safe_load(_f) or {}
fbp = _cfg.get("fallback_providers")
if not fbp:
return
if not isinstance(fbp, list):
logger.warning(
"fallback_providers should be a YAML list, got %s. "
"Fallback chain will be disabled.",
type(fbp).__name__,
)
return
for i, entry in enumerate(fbp):
if not isinstance(entry, dict):
logger.warning(
"fallback_providers[%d] is not a dict (got %s). Skipping entry.",
i, type(entry).__name__,
)
continue
if not entry.get("provider"):
logger.warning(
"fallback_providers[%d] missing 'provider' field. Skipping entry.",
i,
)
if not entry.get("model"):
logger.warning(
"fallback_providers[%d] missing 'model' field. Skipping entry.",
i,
)
except Exception:
pass # Non-fatal; validation is advisory
def load_gateway_config() -> GatewayConfig:
"""
Load gateway configuration from multiple sources.
@@ -691,19 +645,6 @@ def load_gateway_config() -> GatewayConfig:
platform.value, env_name,
)
# Warn about API Server enabled without a key (unauthenticated endpoint)
if Platform.API_SERVER in config.platforms:
api_cfg = config.platforms[Platform.API_SERVER]
if api_cfg.enabled and not api_cfg.extra.get("key"):
logger.warning(
"api_server is enabled but API_SERVER_KEY is not set. "
"The API endpoint will run unauthenticated. "
"Set API_SERVER_KEY in ~/.hermes/.env to secure it.",
)
# Validate fallback_providers structure from config.yaml
_validate_fallback_providers()
return config

View File

@@ -1026,16 +1026,6 @@ class GatewayRunner:
cfg = _y.safe_load(_f) or {}
fb = cfg.get("fallback_providers") or cfg.get("fallback_model") or None
if fb:
# Treat empty dict / disabled fallback as "not configured"
if isinstance(fb, dict):
_enabled = fb.get("enabled")
if _enabled is False or (
isinstance(_enabled, str)
and _enabled.strip().lower() in ("false", "0", "no", "off")
):
return None
if not fb.get("provider") and not fb.get("model"):
return None
return fb
except Exception:
pass

View File

@@ -1338,11 +1338,6 @@ _KNOWN_ROOT_KEYS = {
"fallback_providers", "credential_pool_strategies", "toolsets",
"agent", "terminal", "display", "compression", "delegation",
"auxiliary", "custom_providers", "memory", "gateway",
"session_reset", "browser", "checkpoints", "smart_model_routing",
"voice", "stt", "tts", "human_delay", "security", "privacy",
"cron", "logging", "approvals", "command_allowlist", "quick_commands",
"personalities", "skills", "honcho", "timezone", "discord",
"whatsapp", "prefill_messages_file", "file_read_max_chars",
}
# Valid fields inside a custom_providers list entry
@@ -1426,7 +1421,6 @@ def validate_config_structure(config: Optional[Dict[str, Any]] = None) -> List["
))
# ── fallback_model must be a top-level dict with provider + model ────
# Blank or explicitly disabled fallback is intentional — skip validation.
fb = config.get("fallback_model")
if fb is not None:
if not isinstance(fb, dict):
@@ -1436,40 +1430,21 @@ def validate_config_structure(config: Optional[Dict[str, Any]] = None) -> List["
"Change to:\n"
" fallback_model:\n"
" provider: openrouter\n"
" model: anthropic/claude-sonnet-4\n"
"Or disable with:\n"
" fallback_model:\n"
" enabled: false",
" model: anthropic/claude-sonnet-4",
))
elif fb:
# Skip warnings when fallback is explicitly disabled (enabled: false)
_enabled = fb.get("enabled")
if _enabled is False or (isinstance(_enabled, str) and _enabled.strip().lower() in ("false", "0", "no", "off")):
pass # intentionally disabled — no warnings
else:
# Check if both fields are blank (intentional disable)
provider = fb.get("provider")
model = fb.get("model")
provider_blank = not provider or (isinstance(provider, str) and not provider.strip())
model_blank = not model or (isinstance(model, str) and not model.strip())
# Only warn if at least one field is set (user might be trying to configure)
# If both are blank, treat as intentionally disabled
if not provider_blank or not model_blank:
if provider_blank:
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'provider' field — fallback will be disabled",
"Add: provider: openrouter (or another provider)\n"
"Or disable with: enabled: false",
))
if model_blank:
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'model' field — fallback will be disabled",
"Add: model: anthropic/claude-sonnet-4 (or another model)\n"
"Or disable with: enabled: false",
))
if not fb.get("provider"):
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'provider' field — fallback will be disabled",
"Add: provider: openrouter (or another provider)",
))
if not fb.get("model"):
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'model' field — fallback will be disabled",
"Add: model: anthropic/claude-sonnet-4 (or another model)",
))
# ── Check for fallback_model accidentally nested inside custom_providers ──
if isinstance(cp, dict) and "fallback_model" not in config and "fallback_model" in (cp or {}):
@@ -1503,72 +1478,6 @@ def validate_config_structure(config: Optional[Dict[str, Any]] = None) -> List["
f"Move '{key}' under the appropriate section",
))
# ── fallback_providers must be a list of dicts with provider + model ─
fbp = config.get("fallback_providers")
if fbp is not None:
if not isinstance(fbp, list):
issues.append(ConfigIssue(
"error",
f"fallback_providers should be a YAML list, got {type(fbp).__name__}",
"Change to:\n"
" fallback_providers:\n"
" - provider: openrouter\n"
" model: google/gemini-3-flash-preview",
))
elif fbp:
for i, entry in enumerate(fbp):
if not isinstance(entry, dict):
issues.append(ConfigIssue(
"warning",
f"fallback_providers[{i}] is not a dict (got {type(entry).__name__})",
"Each entry needs at minimum: provider, model",
))
continue
if not entry.get("provider"):
issues.append(ConfigIssue(
"warning",
f"fallback_providers[{i}] is missing 'provider' field — this fallback will be skipped",
"Add: provider: openrouter (or another provider name)",
))
if not entry.get("model"):
issues.append(ConfigIssue(
"warning",
f"fallback_providers[{i}] is missing 'model' field — this fallback will be skipped",
"Add: model: google/gemini-3-flash-preview (or another model slug)",
))
# ── session_reset validation ─────────────────────────────────────────
session_reset = config.get("session_reset", {})
if isinstance(session_reset, dict):
idle_minutes = session_reset.get("idle_minutes")
if idle_minutes is not None:
if not isinstance(idle_minutes, (int, float)) or idle_minutes <= 0:
issues.append(ConfigIssue(
"warning",
f"session_reset.idle_minutes={idle_minutes} is invalid (must be a positive number)",
"Set to a positive integer, e.g. 1440 (24 hours). Using 0 causes immediate resets.",
))
at_hour = session_reset.get("at_hour")
if at_hour is not None:
if not isinstance(at_hour, (int, float)) or not (0 <= at_hour <= 23):
issues.append(ConfigIssue(
"warning",
f"session_reset.at_hour={at_hour} is invalid (must be 0-23)",
"Set to an hour between 0 and 23, e.g. 4 for 4am",
))
# ── API Server key check ─────────────────────────────────────────────
# If api_server is enabled via env, but no key is set, warn.
# This catches the "API_SERVER_KEY not configured" error from gateway logs.
api_server_enabled = os.getenv("API_SERVER_ENABLED", "").lower() in ("true", "1", "yes")
api_server_key = os.getenv("API_SERVER_KEY", "").strip()
if api_server_enabled and not api_server_key:
issues.append(ConfigIssue(
"warning",
"API_SERVER is enabled but API_SERVER_KEY is not set — the API server will run unauthenticated",
"Set API_SERVER_KEY in ~/.hermes/.env to secure the API endpoint",
))
return issues

View File

@@ -93,39 +93,6 @@ def cron_list(show_all: bool = False):
script = job.get("script")
if script:
print(f" Script: {script}")
# Show health status
last_status = job.get("last_status")
last_error = job.get("last_error")
last_error_at = job.get("last_error_at")
last_success_at = job.get("last_success_at")
error_cleared_at = job.get("error_cleared_at")
error_resolved_at = job.get("error_resolved_at")
if last_status == "error" and last_error:
if error_cleared_at or error_resolved_at:
# Error was cleared/resolved
cleared_time = error_cleared_at or error_resolved_at
print(color(f" Status: ok (error cleared)", Colors.GREEN))
print(color(f" Last error: {last_error[:80]}...", Colors.DIM))
print(color(f" Resolved: {cleared_time}", Colors.DIM))
else:
# Current error
print(color(f" Status: ERROR", Colors.RED))
print(color(f" Error: {last_error[:80]}...", Colors.RED))
if last_error_at:
print(color(f" Since: {last_error_at}", Colors.RED))
elif last_status == "retrying":
print(color(f" Status: retrying (error cleared)", Colors.YELLOW))
elif last_status == "ok":
if last_success_at:
print(color(f" Status: ok (last success: {last_success_at})", Colors.GREEN))
elif last_status:
print(f" Status: {last_status}")
# Show success history if available
if last_success_at and last_status != "error":
print(f" Last ok: {last_success_at}")
print()
from hermes_cli.gateway import find_gateway_pids
@@ -255,18 +222,7 @@ def cron_edit(args):
def _job_action(action: str, job_id: str, success_verb: str, now: bool = False) -> int:
if action == "clear_error":
result = _cron_api(action="clear_error", job_id=job_id)
if not result.get("success"):
print(color(f"Failed to clear error: {result.get('error', 'unknown error')}", Colors.RED))
return 1
job = result.get("job", {})
name = job.get("name", job_id)
print(color(f"Cleared stale error state for job '{name}'", Colors.GREEN))
if job.get("error_cleared_at"):
print(f" Cleared at: {job['error_cleared_at']}")
return 0
if action == "run" and now:
if action == "run" and now:
# Synchronous execution — run job immediately and show result
result = _cron_api(action="run_now", job_id=job_id)
if not result.get("success"):
@@ -336,13 +292,9 @@ def cron_command(args):
now = getattr(args, 'now', False)
return _job_action("run", args.job_id, "Triggered", now=now)
if subcmd == "clear-error":
return _job_action("clear_error", args.job_id, "Cleared")
if subcmd in {"remove", "rm", "delete"}:
return _job_action("remove", args.job_id, "Removed")
print(f"Unknown cron command: {subcmd}")
print("Usage: hermes cron [list|create|edit|pause|resume|run|remove|clear-error|status|tick]")
print("Usage: hermes cron [list|create|edit|pause|resume|run|remove|status|tick]")
sys.exit(1)

View File

@@ -4576,9 +4576,6 @@ For more help on a command:
cron_run.add_argument("job_id", help="Job ID to trigger")
cron_run.add_argument("--now", action="store_true", help="Execute immediately and wait for result (clears stale errors)")
cron_clear_error = cron_subparsers.add_parser("clear-error", help="Clear stale error state for a job")
cron_clear_error.add_argument("job_id", help="Job ID to clear error for")
cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job")
cron_remove.add_argument("job_id", help="Job ID to remove")
@@ -5008,7 +5005,7 @@ For more help on a command:
# =========================================================================
sessions_parser = subparsers.add_parser(
"sessions",
help="Manage session history (list, rename, export, prune, gc, delete)",
help="Manage session history (list, rename, export, prune, delete)",
description="View and manage the SQLite session store"
)
sessions_subparsers = sessions_parser.add_subparsers(dest="sessions_action")
@@ -5031,14 +5028,6 @@ For more help on a command:
sessions_prune.add_argument("--source", help="Only prune sessions from this source")
sessions_prune.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
sessions_gc = sessions_subparsers.add_parser("gc", help="Garbage-collect empty/trivial sessions")
sessions_gc.add_argument("--empty-hours", type=int, default=24, help="Delete empty (0-msg) sessions older than N hours (default: 24)")
sessions_gc.add_argument("--trivial-days", type=int, default=7, help="Delete trivial (1-5 msg) sessions older than N days (default: 7)")
sessions_gc.add_argument("--trivial-max", type=int, default=5, help="Max messages to consider trivial (default: 5)")
sessions_gc.add_argument("--source", help="Only GC sessions from this source")
sessions_gc.add_argument("--dry-run", action="store_true", help="Show what would be deleted without deleting")
sessions_gc.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
sessions_stats = sessions_subparsers.add_parser("stats", help="Show session store statistics")
sessions_rename = sessions_subparsers.add_parser("rename", help="Set or change a session's title")
@@ -5208,49 +5197,6 @@ For more help on a command:
size_mb = os.path.getsize(db_path) / (1024 * 1024)
print(f"Database size: {size_mb:.1f} MB")
elif action == "gc":
dry_run = getattr(args, "dry_run", False)
if dry_run:
counts = db.garbage_collect(
empty_older_than_hours=args.empty_hours,
trivial_max_messages=args.trivial_max,
trivial_older_than_days=args.trivial_days,
source=args.source,
dry_run=True,
)
print(f"[dry-run] Would delete {counts['total']} session(s):")
print(f" Empty (0 msgs, >{args.empty_hours}h old): {counts['empty']}")
print(f" Trivial (<={args.trivial_max} msgs, >{args.trivial_days}d old): {counts['trivial']}")
else:
# Preview first
preview = db.garbage_collect(
empty_older_than_hours=args.empty_hours,
trivial_max_messages=args.trivial_max,
trivial_older_than_days=args.trivial_days,
source=args.source,
dry_run=True,
)
if preview["total"] == 0:
print("Nothing to collect.")
else:
if not args.yes:
if not _confirm_prompt(
f"Delete {preview['total']} session(s) "
f"({preview['empty']} empty, {preview['trivial']} trivial)? [y/N] "
):
print("Cancelled.")
return
counts = db.garbage_collect(
empty_older_than_hours=args.empty_hours,
trivial_max_messages=args.trivial_max,
trivial_older_than_days=args.trivial_days,
source=args.source,
dry_run=False,
)
print(f"Collected {counts['total']} session(s):")
print(f" Empty: {counts['empty']}")
print(f" Trivial: {counts['trivial']}")
else:
sessions_parser.print_help()

View File

@@ -32,7 +32,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 7
SCHEMA_VERSION = 6
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -66,7 +66,6 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT,
pricing_version TEXT,
title TEXT,
profile TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -87,7 +86,6 @@ CREATE TABLE IF NOT EXISTS messages (
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
@@ -332,19 +330,6 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add profile column to sessions for profile isolation (#323)
try:
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
)
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -377,19 +362,13 @@ class SessionDB:
system_prompt: str = None,
user_id: str = None,
parent_session_id: str = None,
profile: str = None,
) -> str:
"""Create a new session record. Returns the session_id.
Args:
profile: Profile name for session isolation. When set, sessions
are tagged so queries can filter by profile. (#323)
"""
"""Create a new session record. Returns the session_id."""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, profile, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
source,
@@ -398,7 +377,6 @@ class SessionDB:
json.dumps(model_config) if model_config else None,
system_prompt,
parent_session_id,
profile,
time.time(),
),
)
@@ -527,23 +505,19 @@ class SessionDB:
session_id: str,
source: str = "unknown",
model: str = None,
profile: str = None,
) -> None:
"""Ensure a session row exists, creating it with minimal metadata if absent.
Used by _flush_messages_to_session_db to recover from a failed
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
Args:
profile: Profile name for session isolation. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, profile, started_at)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, model, profile, time.time()),
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
)
self._execute_write(_do)
@@ -814,7 +788,6 @@ class SessionDB:
limit: int = 20,
offset: int = 0,
include_children: bool = False,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions with preview (first user message) and last active timestamp.
@@ -826,10 +799,6 @@ class SessionDB:
By default, child sessions (subagent runs, compression continuations)
are excluded. Pass ``include_children=True`` to include them.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
@@ -844,9 +813,6 @@ class SessionDB:
placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({placeholders})")
params.extend(exclude_sources)
if profile:
where_clauses.append("s.profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
@@ -1192,52 +1158,34 @@ class SessionDB:
source: str = None,
limit: int = 20,
offset: int = 0,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions, optionally filtered by source and profile.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
"""List sessions, optionally filtered by source."""
with self._lock:
cursor = self._conn.execute(query, params)
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
return [dict(row) for row in cursor.fetchall()]
# =========================================================================
# Utility
# =========================================================================
def session_count(self, source: str = None, profile: str = None) -> int:
"""Count sessions, optionally filtered by source and profile.
Args:
profile: Filter to this profile name. Pass None to count all. (#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
def session_count(self, source: str = None) -> int:
"""Count sessions, optionally filtered by source."""
with self._lock:
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
if source:
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
)
else:
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
return cursor.fetchone()[0]
def message_count(self, session_id: str = None) -> int:
@@ -1355,78 +1303,3 @@ class SessionDB:
return len(session_ids)
return self._execute_write(_do)
def garbage_collect(
self,
empty_older_than_hours: int = 24,
trivial_max_messages: int = 5,
trivial_older_than_days: int = 7,
source: str = None,
dry_run: bool = False,
) -> Dict[str, int]:
"""Delete empty and trivial sessions based on age.
Policy (matches #315):
- Empty sessions (0 messages) older than ``empty_older_than_hours``
- Trivial sessions (1..``trivial_max_messages`` msgs) older than
``trivial_older_than_days``
- Sessions with more than ``trivial_max_messages`` are kept indefinitely
- Active (not ended) sessions are never deleted
Returns a dict with counts: ``empty``, ``trivial``, ``total``.
"""
now = time.time()
empty_cutoff = now - (empty_older_than_hours * 3600)
trivial_cutoff = now - (trivial_older_than_days * 86400)
def _do(conn):
# --- Find empty sessions ---
empty_q = (
"SELECT id FROM sessions "
"WHERE message_count = 0 AND started_at < ? AND ended_at IS NOT NULL"
)
params = [empty_cutoff]
if source:
empty_q += " AND source = ?"
params.append(source)
empty_ids = [r[0] for r in conn.execute(empty_q, params).fetchall()]
# --- Find trivial sessions ---
trivial_q = (
"SELECT id FROM sessions "
"WHERE message_count BETWEEN 1 AND ? AND started_at < ? AND ended_at IS NOT NULL"
)
t_params = [trivial_max_messages, trivial_cutoff]
if source:
trivial_q += " AND source = ?"
t_params.append(source)
trivial_ids = [r[0] for r in conn.execute(trivial_q, t_params).fetchall()]
all_ids = set(empty_ids) | set(trivial_ids)
if dry_run:
return {"empty": len(empty_ids), "trivial": len(trivial_ids),
"total": len(all_ids)}
# --- Collect child sessions to delete first (FK constraint) ---
child_ids = set()
for sid in all_ids:
for r in conn.execute(
"SELECT id FROM sessions WHERE parent_session_id = ?", (sid,)
).fetchall():
child_ids.add(r[0])
# Delete children
for cid in child_ids:
conn.execute("DELETE FROM messages WHERE session_id = ?", (cid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (cid,))
# Delete targets
for sid in all_ids:
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
return {"empty": len(empty_ids), "trivial": len(trivial_ids),
"total": len(all_ids)}
return self._execute_write(_do)

View File

@@ -1,286 +0,0 @@
#!/usr/bin/env python3
"""
Model Watchdog — monitors tmux panes for model drift.
Checks all hermes TUI sessions in dev and timmy tmux sessions.
If any pane is running a non-mimo model, kills and restarts it.
Usage: python3 ~/.hermes/bin/model-watchdog.py [--fix]
--fix Actually restart drifted panes (default: dry-run)
"""
import subprocess
import sys
import re
import time
import os
ALLOWED_MODEL = "mimo-v2-pro"
# Profile -> expected model. If a pane is running this profile with this model, it's healthy.
# Profiles not in this map are checked against ALLOWED_MODEL.
PROFILE_MODELS = {
"default": "mimo-v2-pro",
"timmy-sprint": "mimo-v2-pro",
"fenrir": "mimo-v2-pro",
"bezalel": "gpt-5.4",
"burn": "mimo-v2-pro",
"creative": "claude-sonnet",
"research": "claude-sonnet",
"review": "claude-sonnet",
}
TMUX_SESSIONS = ["dev", "timmy"]
LOG_FILE = os.path.expanduser("~/.hermes/logs/model-watchdog.log")
def log(msg):
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
ts = time.strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def run(cmd):
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=10)
return r.stdout.strip(), r.returncode
def get_panes(session):
"""Get all pane info from ALL windows in a tmux session."""
# First get all windows
win_out, win_rc = run(f"tmux list-windows -t {session} -F '#{{window_name}}' 2>/dev/null")
if win_rc != 0:
return []
panes = []
for window_name in win_out.split("\n"):
if not window_name.strip():
continue
target = f"{session}:{window_name}"
out, rc = run(f"tmux list-panes -t {target} -F '#{{pane_index}}|#{{pane_pid}}|#{{pane_tty}}' 2>/dev/null")
if rc != 0:
continue
for line in out.split("\n"):
if "|" in line:
idx, pid, tty = line.split("|")
panes.append({
"session": session,
"window": window_name,
"index": int(idx),
"pid": int(pid),
"tty": tty,
})
return panes
def get_hermes_pid_for_tty(tty):
"""Find hermes process running on a specific TTY."""
out, _ = run(f"ps aux | grep '{tty}' | grep '[h]ermes' | grep -v 'gateway' | grep -v 'node' | awk '{{print $2}}'")
if out:
return int(out.split("\n")[0])
return None
def get_model_from_pane(session, pane_idx, window=None):
"""Capture the pane and extract the model from the status bar."""
target = f"{session}:{window}.{pane_idx}" if window else f"{session}.{pane_idx}"
out, _ = run(f"tmux capture-pane -t {target} -p 2>/dev/null | tail -30")
# Look for model in status bar: ⚕ model-name │
matches = re.findall(r'\s+(\S+)\s+│', out)
if matches:
return matches[0]
return None
def check_session_meta(session_id):
"""Check what model a hermes session was last using from its session file."""
import json
session_file = os.path.expanduser(f"~/.hermes/sessions/session_{session_id}.json")
if os.path.exists(session_file):
try:
with open(session_file) as f:
data = json.load(f)
return data.get("model"), data.get("provider")
except:
pass
# Try jsonl
jsonl_file = os.path.expanduser(f"~/.hermes/sessions/{session_id}.jsonl")
if os.path.exists(jsonl_file):
try:
with open(jsonl_file) as f:
for line in f:
d = json.loads(line.strip())
if d.get("role") == "session_meta":
return d.get("model"), d.get("provider")
break
except:
pass
return None, None
def is_drifted(model_name, profile=None):
"""Check if a model name indicates drift from the expected model for this profile."""
if model_name is None:
return False, "no-model-detected"
# If we know the profile, check against its expected model
if profile and profile in PROFILE_MODELS:
expected = PROFILE_MODELS[profile]
if expected in model_name:
return False, model_name
return True, model_name
# No profile known — fall back to ALLOWED_MODEL
if ALLOWED_MODEL in model_name:
return False, model_name
return True, model_name
def get_profile_from_pane(tty):
"""Detect which hermes profile a pane is running by inspecting its process args."""
# ps shows short TTY (s031) not full path (/dev/ttys031)
short_tty = tty.replace("/dev/ttys", "s").replace("/dev/ttys", "")
out, _ = run(f"ps aux | grep '{short_tty}' | grep '[h]ermes' | grep -v 'gateway' | grep -v 'node' | grep -v cron")
if not out:
return None
# Look for -p <profile> in the command line
match = re.search(r'-p\s+(\S+)', out)
if match:
return match.group(1)
return None
def kill_and_restart(session, pane_idx, window=None):
"""Kill the hermes process in a pane and restart it with the same profile."""
target = f"{session}:{window}.{pane_idx}" if window else f"{session}.{pane_idx}"
# Get the pane's TTY
out, _ = run(f"tmux list-panes -t {target} -F '#{{pane_tty}}'")
tty = out.strip()
# Detect which profile was running
profile = get_profile_from_pane(tty)
# Find and kill hermes on that TTY
hermes_pid = get_hermes_pid_for_tty(tty)
if hermes_pid:
log(f"Killing hermes PID {hermes_pid} on {target} (tty={tty}, profile={profile})")
run(f"kill {hermes_pid}")
time.sleep(2)
# Send Ctrl+C to clear any state
run(f"tmux send-keys -t {target} C-c")
time.sleep(1)
# Restart hermes with the same profile
if profile:
cmd = f"hermes -p {profile} chat"
else:
cmd = "hermes chat"
run(f"tmux send-keys -t {target} '{cmd}' Enter")
log(f"Restarted hermes in {target} with: {cmd}")
# Wait and verify
time.sleep(8)
new_model = get_model_from_pane(session, pane_idx, window)
if new_model and ALLOWED_MODEL in new_model:
log(f"{target} now on {new_model}")
return True
else:
log(f"{target} model after restart: {new_model}")
return False
def verify_expected_model(provider_yaml, expected):
"""Compare actual provider in a YAML config against expected value."""
return provider_yaml.strip() == expected.strip()
def check_config_drift():
"""Scan all relevant config.yaml files for provider drift. Does NOT modify anything.
Returns list of drift issues found."""
issues = []
CONFIGS = {
"main_config": (os.path.expanduser("~/.hermes/config.yaml"), "nous"),
"fenrir": (os.path.expanduser("~/.hermes/profiles/fenrir/config.yaml"), "nous"),
"timmy_sprint": (os.path.expanduser("~/.hermes/profiles/timmy-sprint/config.yaml"), "nous"),
"default_profile": (os.path.expanduser("~/.hermes/profiles/default/config.yaml"), "nous"),
}
for name, (path, expected_provider) in CONFIGS.items():
if not os.path.exists(path):
continue
try:
with open(path, "r") as f:
content = f.read()
# Parse YAML to correctly read model.provider (not the first provider: line)
try:
import yaml
cfg = yaml.safe_load(content) or {}
except ImportError:
# Fallback: find provider under model: block via indentation-aware scan
cfg = {}
in_model = False
for line in content.split("\n"):
stripped = line.strip()
indent = len(line) - len(line.lstrip())
if stripped.startswith("model:") and indent == 0:
in_model = True
continue
if in_model and indent == 0 and stripped:
in_model = False
if in_model and stripped.startswith("provider:"):
cfg = {"model": {"provider": stripped.split(":", 1)[1].strip()}}
break
actual = (cfg.get("model") or {}).get("provider", "")
if actual and expected_provider and actual != expected_provider:
issues.append(f"CONFIG DRIFT [{name}]: provider is '{actual}' (expected '{expected_provider}')")
except Exception as e:
issues.append(f"CONFIG CHECK ERROR [{name}]: {e}")
return issues
def main():
fix_mode = "--fix" in sys.argv
drift_found = False
issues = []
# Always check config files for provider drift (read-only, never writes)
config_drift_issues = check_config_drift()
if config_drift_issues:
for issue in config_drift_issues:
log(f"CONFIG DRIFT: {issue}")
for session in TMUX_SESSIONS:
panes = get_panes(session)
for pane in panes:
window = pane.get("window")
target = f"{session}:{window}.{pane['index']}" if window else f"{session}.{pane['index']}"
# Detect profile from running process
out, _ = run(f"tmux list-panes -t {target} -F '#{{pane_tty}}'")
tty = out.strip()
profile = get_profile_from_pane(tty)
model = get_model_from_pane(session, pane["index"], window)
drifted, model_name = is_drifted(model, profile)
if drifted:
drift_found = True
issues.append(f"{target}: {model_name} (profile={profile})")
log(f"DRIFT DETECTED: {target} is on '{model_name}' (profile={profile}, expected='{PROFILE_MODELS.get(profile, ALLOWED_MODEL)}')")
if fix_mode:
log(f"Auto-fixing {target}...")
success = kill_and_restart(session, pane["index"], window)
if not success:
issues.append(f" ↳ RESTART FAILED for {target}")
if not drift_found:
total = sum(len(get_panes(s)) for s in TMUX_SESSIONS)
log(f"All {total} panes healthy (on {ALLOWED_MODEL})")
# Print summary for cron output
if issues or config_drift_issues:
print("\n=== MODEL DRIFT REPORT ===")
for issue in issues:
print(f" [PANE] {issue}")
if config_drift_issues:
for issue in config_drift_issues:
print(f" [CONFIG] {issue}")
if not fix_mode:
print("\nRun with --fix to auto-restart drifted panes.")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -136,83 +136,6 @@ class TestFallbackModelValidation:
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_blank_fallback_fields_no_issues(self):
"""Blank fallback_model fields (both empty) should not trigger warnings."""
issues = validate_config_structure({
"fallback_model": {
"provider": "",
"model": "",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_blank_fallback_fields_with_whitespace_no_issues(self):
"""Blank fallback_model fields with whitespace should not trigger warnings."""
issues = validate_config_structure({
"fallback_model": {
"provider": " ",
"model": " ",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_none_fallback_fields_no_issues(self):
"""None fallback_model fields should not trigger warnings."""
issues = validate_config_structure({
"fallback_model": {
"provider": None,
"model": None,
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_enabled_false_no_issues(self):
"""enabled: false should suppress warnings."""
issues = validate_config_structure({
"fallback_model": {
"enabled": False,
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_enabled_false_string_no_issues(self):
"""enabled: 'false' (string) should suppress warnings."""
issues = validate_config_structure({
"fallback_model": {
"enabled": "false",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_partial_blank_fallback_warns(self):
"""Partial blank fallback (only one field blank) should warn."""
issues = validate_config_structure({
"fallback_model": {
"provider": "",
"model": "anthropic/claude-sonnet-4",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 1
assert "provider" in fb_issues[0].message
def test_valid_fallback_with_enabled_true(self):
"""Valid fallback with enabled: true should not warn."""
issues = validate_config_structure({
"fallback_model": {
"enabled": True,
"provider": "openrouter",
"model": "anthropic/claude-sonnet-4",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
class TestMissingModelSection:
"""Warn when custom_providers exists but model section is missing."""
@@ -249,111 +172,3 @@ class TestConfigIssueDataclass:
a = ConfigIssue("error", "msg", "hint")
b = ConfigIssue("error", "msg", "hint")
assert a == b
class TestFallbackProvidersValidation:
"""fallback_providers must be a list of dicts with provider + model."""
def test_non_list(self):
"""fallback_providers as string should error."""
issues = validate_config_structure({
"fallback_providers": "openrouter:google/gemini-3-flash-preview",
})
errors = [i for i in issues if i.severity == "error"]
assert any("fallback_providers" in i.message and "list" in i.message for i in errors)
def test_dict_instead_of_list(self):
"""fallback_providers as dict should error."""
issues = validate_config_structure({
"fallback_providers": {"provider": "openrouter", "model": "test"},
})
errors = [i for i in issues if i.severity == "error"]
assert any("fallback_providers" in i.message and "dict" in i.message for i in errors)
def test_entry_missing_provider(self):
"""Entry without provider should warn."""
issues = validate_config_structure({
"fallback_providers": [{"model": "google/gemini-3-flash-preview"}],
})
assert any("missing 'provider'" in i.message for i in issues)
def test_entry_missing_model(self):
"""Entry without model should warn."""
issues = validate_config_structure({
"fallback_providers": [{"provider": "openrouter"}],
})
assert any("missing 'model'" in i.message for i in issues)
def test_entry_not_dict(self):
"""Non-dict entries should warn."""
issues = validate_config_structure({
"fallback_providers": ["not-a-dict"],
})
assert any("not a dict" in i.message for i in issues)
def test_valid_entries(self):
"""Valid fallback_providers should produce no fallback-related issues."""
issues = validate_config_structure({
"fallback_providers": [
{"provider": "openrouter", "model": "google/gemini-3-flash-preview"},
{"provider": "gemini", "model": "gemini-2.5-flash"},
],
})
fb_issues = [i for i in issues if "fallback_providers" in i.message]
assert len(fb_issues) == 0
def test_empty_list_no_issues(self):
"""Empty list is valid (fallback disabled)."""
issues = validate_config_structure({
"fallback_providers": [],
})
fb_issues = [i for i in issues if "fallback_providers" in i.message]
assert len(fb_issues) == 0
class TestSessionResetValidation:
"""session_reset.idle_minutes must be positive."""
def test_zero_idle_minutes(self):
"""idle_minutes=0 should warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": 0},
})
assert any("idle_minutes=0" in i.message for i in issues)
def test_negative_idle_minutes(self):
"""idle_minutes=-5 should warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": -5},
})
assert any("idle_minutes=-5" in i.message for i in issues)
def test_string_idle_minutes(self):
"""idle_minutes as string should warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": "abc"},
})
assert any("idle_minutes=" in i.message for i in issues)
def test_valid_idle_minutes(self):
"""Valid idle_minutes should not warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": 1440},
})
idle_issues = [i for i in issues if "idle_minutes" in i.message]
assert len(idle_issues) == 0
def test_invalid_at_hour(self):
"""at_hour=25 should warn."""
issues = validate_config_structure({
"session_reset": {"at_hour": 25},
})
assert any("at_hour=25" in i.message for i in issues)
def test_valid_at_hour(self):
"""Valid at_hour should not warn."""
issues = validate_config_structure({
"session_reset": {"at_hour": 4},
})
hour_issues = [i for i in issues if "at_hour" in i.message]
assert len(hour_issues) == 0

View File

@@ -1,73 +0,0 @@
"""Tests for cron scheduler cloud-provider terminal disabling (#379).
When a cron job runs on a cloud inference endpoint (Nous, OpenRouter, etc.),
the terminal toolset must be disabled because SSH keys don't exist on cloud
servers. Only local endpoints (localhost, 127.0.0.1, RFC-1918) retain
terminal access.
"""
import pytest
from agent.model_metadata import is_local_endpoint
class TestIsLocalEndpoint:
"""Verify is_local_endpoint correctly classifies endpoints."""
def test_localhost(self):
assert is_local_endpoint("http://localhost:11434/v1") is True
def test_127_loopback(self):
assert is_local_endpoint("http://127.0.0.1:8080/v1") is True
def test_0_0_0_0(self):
assert is_local_endpoint("http://0.0.0.0:11434/v1") is True
def test_rfc1918_10(self):
assert is_local_endpoint("http://10.0.0.5:8080/v1") is True
def test_rfc1918_192(self):
assert is_local_endpoint("http://192.168.1.100:11434/v1") is True
def test_rfc1918_172(self):
assert is_local_endpoint("http://172.16.0.1:8080/v1") is True
def test_cloud_openrouter(self):
assert is_local_endpoint("https://openrouter.ai/api/v1") is False
def test_cloud_nous(self):
assert is_local_endpoint("https://inference-api.nousresearch.com/v1") is False
def test_cloud_anthropic(self):
assert is_local_endpoint("https://api.anthropic.com") is False
def test_empty_url(self):
assert is_local_endpoint("") is False
def test_none_url(self):
assert is_local_endpoint(None) is False
class TestCronDisabledToolsetsLogic:
"""Verify the disabled_toolsets logic matches scheduler expectations."""
def _build_disabled(self, base_url, job=None):
"""Mirror the scheduler's disabled_toolsets logic."""
from agent.model_metadata import is_local_endpoint
cron_disabled = ["cronjob", "messaging", "clarify"]
if not is_local_endpoint(base_url):
cron_disabled.append("terminal")
return cron_disabled
def test_local_keeps_terminal(self):
disabled = self._build_disabled("http://localhost:11434/v1")
assert "terminal" not in disabled
assert "cronjob" in disabled
def test_cloud_disables_terminal(self):
disabled = self._build_disabled("https://openrouter.ai/api/v1")
assert "terminal" in disabled
assert "cronjob" in disabled
def test_empty_url_disables_terminal(self):
disabled = self._build_disabled("")
assert "terminal" in disabled

View File

@@ -1,128 +0,0 @@
"""Tests for time-aware cron model routing — Issue #317."""
import pytest
from datetime import datetime
from agent.smart_model_routing import resolve_cron_model, _hour_in_window
class TestHourInWindow:
"""Hour-in-window detection including midnight wrap."""
def test_normal_window(self):
assert _hour_in_window(18, 17, 22) is True
assert _hour_in_window(16, 17, 22) is False
assert _hour_in_window(22, 17, 22) is False
def test_midnight_wrap(self):
assert _hour_in_window(23, 22, 6) is True
assert _hour_in_window(3, 22, 6) is True
assert _hour_in_window(10, 22, 6) is False
def test_edge_cases(self):
assert _hour_in_window(0, 0, 24) is True
assert _hour_in_window(23, 0, 24) is True
assert _hour_in_window(0, 22, 6) is True
assert _hour_in_window(5, 22, 6) is True
assert _hour_in_window(6, 22, 6) is False
class TestResolveCronModel:
"""Time-aware model resolution for cron jobs."""
def _config(self, **overrides):
base = {
"enabled": True,
"fallback_model": "anthropic/claude-sonnet-4",
"fallback_provider": "openrouter",
"windows": [
{"start_hour": 17, "end_hour": 22, "reason": "evening_error_peak"},
],
}
base.update(overrides)
return base
def test_disabled_returns_base(self):
result = resolve_cron_model("mimo", {"enabled": False}, now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_no_config_returns_base(self):
result = resolve_cron_model("mimo", None)
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_no_windows_returns_base(self):
result = resolve_cron_model("mimo", {"enabled": True, "windows": []}, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is False
def test_evening_window_overrides(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "anthropic/claude-sonnet-4"
assert result["provider"] == "openrouter"
assert result["overridden"] is True
assert "evening_error_peak" in result["reason"]
assert "hour=18" in result["reason"]
def test_outside_window_keeps_base(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 9, 0))
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_window_boundary_start_inclusive(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 17, 0))
assert result["overridden"] is True
def test_window_boundary_end_exclusive(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 22, 0))
assert result["overridden"] is False
def test_midnight_window(self):
config = self._config(windows=[{"start_hour": 22, "end_hour": 6, "reason": "overnight"}])
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 23, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 13, 3, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 10, 0))["overridden"] is False
def test_per_window_model_override(self):
config = self._config(windows=[{
"start_hour": 17, "end_hour": 22,
"model": "anthropic/claude-opus-4-6", "provider": "anthropic", "reason": "peak",
}])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "anthropic/claude-opus-4-6"
assert result["provider"] == "anthropic"
def test_first_matching_window_wins(self):
config = self._config(windows=[
{"start_hour": 17, "end_hour": 20, "model": "strong-1", "provider": "p1", "reason": "w1"},
{"start_hour": 19, "end_hour": 22, "model": "strong-2", "provider": "p2", "reason": "w2"},
])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 19, 0))
assert result["model"] == "strong-1"
def test_no_fallback_model_keeps_base(self):
config = {"enabled": True, "windows": [{"start_hour": 17, "end_hour": 22, "reason": "test"}]}
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is False
assert result["model"] == "mimo"
def test_malformed_windows_skipped(self):
config = self._config(windows=[
"not-a-dict",
{"start_hour": 17},
{"end_hour": 22},
{"start_hour": "bad", "end_hour": "bad"},
{"start_hour": 17, "end_hour": 22, "reason": "valid"},
])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is True
assert "valid" in result["reason"]
def test_multiple_windows_coverage(self):
config = self._config(windows=[
{"start_hour": 17, "end_hour": 22, "reason": "evening"},
{"start_hour": 2, "end_hour": 5, "reason": "overnight"},
])
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 20, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 13, 3, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 10, 0))["overridden"] is False

View File

@@ -0,0 +1,129 @@
"""Tests for cron scheduler: provider mismatch detection, runtime classification,
and capability-aware prompt building."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
def _import_scheduler():
"""Import the scheduler module, bypassing __init__.py re-exports that may
reference symbols not yet merged upstream."""
import importlib.util
spec = importlib.util.spec_from_file_location(
"cron.scheduler", str(Path(__file__).resolve().parent.parent / "cron" / "scheduler.py"),
)
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except Exception:
pass # some top-level imports may fail in CI; functions are still defined
return mod
_sched = _import_scheduler()
_classify_runtime = _sched._classify_runtime
_detect_provider_mismatch = _sched._detect_provider_mismatch
_build_job_prompt = _sched._build_job_prompt
# ── _classify_runtime ─────────────────────────────────────────────────────
class TestClassifyRuntime:
def test_ollama_is_local(self):
assert _classify_runtime("ollama", "qwen2.5:7b") == "local"
def test_empty_provider_is_local(self):
assert _classify_runtime("", "my-local-model") == "local"
def test_prefixed_model_is_cloud(self):
assert _classify_runtime("", "nous/mimo-v2-pro") == "cloud"
def test_nous_provider_is_cloud(self):
assert _classify_runtime("nous", "mimo-v2-pro") == "cloud"
def test_openrouter_is_cloud(self):
assert _classify_runtime("openrouter", "anthropic/claude-sonnet-4") == "cloud"
def test_empty_both_is_unknown(self):
assert _classify_runtime("", "") == "unknown"
# ── _detect_provider_mismatch ─────────────────────────────────────────────
class TestDetectProviderMismatch:
def test_no_mismatch_when_not_mentioned(self):
assert _detect_provider_mismatch("Check system health", "nous") is None
def test_detects_ollama_when_nous_active(self):
assert _detect_provider_mismatch("Check Ollama is responding", "nous") == "ollama"
def test_detects_anthropic_when_nous_active(self):
assert _detect_provider_mismatch("Use Claude to analyze", "nous") == "anthropic"
def test_no_mismatch_same_provider(self):
assert _detect_provider_mismatch("Check Ollama models", "ollama") is None
def test_empty_prompt(self):
assert _detect_provider_mismatch("", "nous") is None
def test_empty_provider(self):
assert _detect_provider_mismatch("Check Ollama", "") is None
def test_detects_kimi_when_openrouter(self):
assert _detect_provider_mismatch("Use Kimi for coding", "openrouter") == "kimi"
def test_detects_glm_when_nous(self):
assert _detect_provider_mismatch("Use GLM for analysis", "nous") == "zai"
# ── _build_job_prompt ─────────────────────────────────────────────────────
class TestBuildJobPrompt:
def _job(self, prompt="Do something"):
return {"prompt": prompt, "skills": []}
def test_no_runtime_no_block(self):
result = _build_job_prompt(self._job())
assert "Do something" in result
assert "RUNTIME CONTEXT" not in result
def test_cloud_runtime_injected(self):
result = _build_job_prompt(
self._job(),
runtime_model="xiaomi/mimo-v2-pro",
runtime_provider="nous",
)
assert "MODEL: xiaomi/mimo-v2-pro" in result
assert "PROVIDER: nous" in result
assert "cloud API" in result
assert "Do NOT assume you can SSH" in result
def test_local_runtime_injected(self):
result = _build_job_prompt(
self._job(),
runtime_model="qwen2.5:7b",
runtime_provider="ollama",
)
assert "RUNTIME: local" in result
assert "SSH keys" in result
def test_empty_runtime_no_block(self):
result = _build_job_prompt(self._job(), runtime_model="", runtime_provider="")
assert "RUNTIME CONTEXT" not in result
def test_cron_hint_always_present(self):
result = _build_job_prompt(self._job())
assert "scheduled cron job" in result
assert "[SYSTEM:" in result
def test_runtime_block_before_cron_hint(self):
result = _build_job_prompt(
self._job("Check Ollama"),
runtime_model="mimo-v2-pro",
runtime_provider="nous",
)
runtime_pos = result.index("RUNTIME CONTEXT")
cron_pos = result.index("scheduled cron job")
assert runtime_pos < cron_pos

View File

@@ -665,127 +665,6 @@ class TestPruneSessions:
# =========================================================================
# =========================================================================
# Garbage Collect
# =========================================================================
class TestGarbageCollect:
def test_gc_deletes_empty_old_sessions(self, db):
"""Empty sessions (0 messages) older than 24h should be deleted."""
db.create_session(session_id="empty_old", source="cli")
db.end_session("empty_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "empty_old"), # 48 hours ago
)
db._conn.commit()
# Recent empty session should be kept
db.create_session(session_id="empty_new", source="cli")
db.end_session("empty_new", end_reason="done")
result = db.garbage_collect()
assert result["empty"] == 1
assert result["trivial"] == 0
assert result["total"] == 1
assert db.get_session("empty_old") is None
assert db.get_session("empty_new") is not None
def test_gc_deletes_trivial_old_sessions(self, db):
"""Sessions with 1-5 messages older than 7 days should be deleted."""
db.create_session(session_id="trivial_old", source="cli")
for i in range(3):
db.append_message("trivial_old", role="user", content=f"msg {i}")
db.end_session("trivial_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 10 * 86400, "trivial_old"), # 10 days ago
)
db._conn.commit()
result = db.garbage_collect()
assert result["trivial"] == 1
assert result["total"] == 1
assert db.get_session("trivial_old") is None
def test_gc_keeps_active_sessions(self, db):
"""Active (not ended) sessions should never be deleted."""
db.create_session(session_id="active_old", source="cli")
# Backdate but don't end
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "active_old"),
)
db._conn.commit()
result = db.garbage_collect()
assert result["total"] == 0
assert db.get_session("active_old") is not None
def test_gc_keeps_substantial_sessions(self, db):
"""Sessions with >5 messages should never be deleted."""
db.create_session(session_id="big_old", source="cli")
for i in range(10):
db.append_message("big_old", role="user", content=f"msg {i}")
db.end_session("big_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 365 * 86400, "big_old"), # 1 year ago
)
db._conn.commit()
result = db.garbage_collect()
assert result["total"] == 0
assert db.get_session("big_old") is not None
def test_gc_dry_run_does_not_delete(self, db):
"""dry_run=True should return counts but not delete anything."""
db.create_session(session_id="empty_old", source="cli")
db.end_session("empty_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "empty_old"),
)
db._conn.commit()
result = db.garbage_collect(dry_run=True)
assert result["total"] == 1
assert db.get_session("empty_old") is not None # Still exists
def test_gc_with_source_filter(self, db):
"""--source should only GC sessions from that source."""
for sid, src in [("old_cli", "cli"), ("old_tg", "telegram")]:
db.create_session(session_id=sid, source=src)
db.end_session(sid, end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, sid),
)
db._conn.commit()
result = db.garbage_collect(source="cli")
assert result["total"] == 1
assert db.get_session("old_cli") is None
assert db.get_session("old_tg") is not None
def test_gc_handles_child_sessions(self, db):
"""Child sessions should be deleted when parent is GC'd."""
db.create_session(session_id="parent_old", source="cli")
db.end_session("parent_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "parent_old"),
)
# Create child session
db.create_session(session_id="child", source="cli", parent_session_id="parent_old")
db.end_session("child", end_reason="done")
db._conn.commit()
result = db.garbage_collect()
assert result["total"] == 1
assert db.get_session("parent_old") is None
assert db.get_session("child") is None
# Schema and WAL mode
# =========================================================================

View File

@@ -1,210 +0,0 @@
"""Tests for warm session provisioning (#327)."""
import json
import os
import pytest
from pathlib import Path
from tempfile import mkdtemp
from unittest.mock import patch
def _make_messages(tool_names=None, error_rate=0.0, count=50):
"""Generate fake session messages for testing."""
msgs = [{"role": "system", "content": "You are helpful."}]
tools = tool_names or ["terminal", "read_file", "write_file"]
for i in range(count):
msgs.append({"role": "user", "content": f"Do task {i}"})
tool_name = tools[i % len(tools)]
tc_id = f"call_{i}"
msgs.append({
"role": "assistant",
"content": f"Working on task {i}",
"tool_calls": [{"id": tc_id, "function": {"name": tool_name, "arguments": "{}"}}],
})
if i / count < error_rate:
msgs.append({"role": "tool", "tool_call_id": tc_id, "content": "Error: something went wrong"})
else:
msgs.append({"role": "tool", "tool_call_id": tc_id, "content": f"Result for task {i}: " + "x" * 100})
return msgs
@pytest.fixture(autouse=True)
def warm_dir(tmp_path, monkeypatch):
"""Redirect warm sessions to temp directory."""
d = tmp_path / "warm_sessions" / "templates"
d.mkdir(parents=True)
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
return d
class TestClassifySession:
def test_coding(self):
from tools.warm_session import classify_session
msgs = _make_messages(["execute_code", "write_file", "read_file", "patch", "terminal"])
assert classify_session(msgs) == "coding"
def test_research(self):
from tools.warm_session import classify_session
msgs = _make_messages(["web_search", "web_extract", "session_search", "read_file"])
assert classify_session(msgs) == "research"
def test_ops(self):
from tools.warm_session import classify_session
msgs = _make_messages(["terminal", "terminal", "terminal", "cronjob", "process"])
assert classify_session(msgs) == "ops"
def test_general(self):
from tools.warm_session import classify_session
msgs = _make_messages(["read_file", "web_search", "execute_code", "cronjob", "send_message"])
assert classify_session(msgs) == "general"
def test_no_tools(self):
from tools.warm_session import classify_session
assert classify_session([{"role": "user", "content": "hi"}]) == "general"
class TestScoreSession:
def test_good_session(self):
from tools.warm_session import score_session
msgs = _make_messages(error_rate=0.0, count=50)
s = score_session(msgs)
assert s["is_proficient"] is True
assert s["is_successful"] is True
assert s["error_rate"] == 0.0
assert s["total_messages"] == 151
def test_error_session(self):
from tools.warm_session import score_session
msgs = _make_messages(error_rate=0.5, count=50)
s = score_session(msgs)
assert s["is_successful"] is False
assert s["error_rate"] > 0.3
def test_short_session(self):
from tools.warm_session import score_session
msgs = _make_messages(count=5)
s = score_session(msgs)
assert s["is_proficient"] is False
def test_diverse_tools(self):
from tools.warm_session import score_session
msgs = _make_messages(["a", "b", "c", "d", "e", "f", "g", "h"])
s = score_session(msgs)
assert s["unique_tools"] == 8
class TestExtractWarmSeed:
def test_extracts_messages(self):
from tools.warm_session import extract_warm_seed
msgs = _make_messages(count=30)
seed = extract_warm_seed(msgs, max_messages=20)
assert len(seed) <= 20
assert len(seed) > 0
def test_includes_system(self):
from tools.warm_session import extract_warm_seed
msgs = _make_messages(count=20)
seed = extract_warm_seed(msgs)
system_msgs = [m for m in seed if m.get("role") == "system"]
assert len(system_msgs) >= 1
def test_skips_error_results(self):
from tools.warm_session import extract_warm_seed
msgs = _make_messages(error_rate=0.5, count=20)
seed = extract_warm_seed(msgs)
tool_results = [m for m in seed if m.get("role") == "tool"]
for tr in tool_results:
assert "Error" not in tr.get("content", "")
class TestCaptureTemplate:
def test_captures_good_session(self):
from tools.warm_session import capture_template
msgs = _make_messages(count=50)
template = capture_template("sess_001", msgs, name="test_coding")
assert template is not None
assert template["category"] == "coding"
assert template["source_session"] == "sess_001"
def test_rejects_bad_session(self):
from tools.warm_session import capture_template
msgs = _make_messages(error_rate=0.8, count=10)
template = capture_template("sess_bad", msgs)
assert template is None
def test_saves_to_disk(self):
from tools.warm_session import capture_template, _warm_sessions_dir
msgs = _make_messages(count=50)
capture_template("sess_disk", msgs, name="test_disk")
path = _warm_sessions_dir() / "test_disk.json"
assert path.exists()
class TestListAndLoad:
def test_list_templates(self):
from tools.warm_session import capture_template, list_templates
msgs = _make_messages(count=50)
capture_template("s1", msgs, name="list_test")
templates = list_templates()
assert any(t["name"] == "list_test" for t in templates)
def test_list_by_category(self):
from tools.warm_session import capture_template, list_templates
msgs = _make_messages(["web_search", "web_extract", "read_file"], count=50)
capture_template("s2", msgs, name="research_test")
research = list_templates("research")
assert any(t["name"] == "research_test" for t in research)
coding = list_templates("coding")
assert not any(t["name"] == "research_test" for t in coding)
def test_load_template(self):
from tools.warm_session import capture_template, load_template
msgs = _make_messages(count=50)
capture_template("s3", msgs, name="load_test")
loaded = load_template("load_test")
assert loaded is not None
assert loaded["source_session"] == "s3"
def test_load_nonexistent(self):
from tools.warm_session import load_template
assert load_template("does_not_exist") is None
class TestProvisionSession:
def test_provision(self):
from tools.warm_session import capture_template, provision_session
msgs = _make_messages(count=50)
capture_template("sp1", msgs, name="prov_test")
ok, seed, name = provision_session()
assert ok is True
assert len(seed) > 0
assert name == "prov_test"
def test_provision_empty(self):
from tools.warm_session import provision_session
ok, seed, msg = provision_session()
assert ok is False
assert "No warm templates" in msg
def test_provision_by_category(self):
from tools.warm_session import capture_template, provision_session
msgs = _make_messages(["terminal", "cronjob", "process"], count=50)
capture_template("sp2", msgs, name="ops_prov")
ok, seed, name = provision_session(category="ops")
assert ok is True
class TestGetProvisionStats:
def test_empty(self):
from tools.warm_session import get_provision_stats
stats = get_provision_stats()
assert stats["total_templates"] == 0
assert stats["total_provisions"] == 0
def test_after_capture(self):
from tools.warm_session import capture_template, get_provision_stats
msgs = _make_messages(count=50)
capture_template("stat1", msgs, name="stat_test")
stats = get_provision_stats()
assert stats["total_templates"] == 1

View File

@@ -201,17 +201,6 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
"paused_at": job.get("paused_at"),
"paused_reason": job.get("paused_reason"),
}
# Health timestamps
if job.get("last_error_at"):
result["last_error_at"] = job["last_error_at"]
if job.get("last_success_at"):
result["last_success_at"] = job["last_success_at"]
if job.get("error_resolved_at"):
result["error_resolved_at"] = job["error_resolved_at"]
if job.get("error_cleared_at"):
result["error_cleared_at"] = job["error_cleared_at"]
if job.get("script"):
result["script"] = job["script"]
return result
@@ -337,13 +326,6 @@ def cronjob(
if result is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps(result, indent=2)
if normalized == "clear_error":
from cron.jobs import clear_job_error
job = clear_job_error(job_id)
if job is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps({"success": True, "job": _format_job(job)}, indent=2)
if normalized == "update":
updates: Dict[str, Any] = {}

View File

@@ -1,423 +0,0 @@
"""Warm Session Provisioning — Pre-proficient agent sessions.
Based on empirical finding: marathon sessions (100+ msgs) have LOWER per-tool
error rates (5.7%) than mid-length sessions (9.0% at 51-100 msgs). Agents
improve with experience in a session.
This module captures successful session patterns as "warm templates" and
provisions new sessions from them, giving users a pre-proficient starting
point instead of a cold start.
Architecture:
warm_sessions/
├── templates/ # Saved warm templates
│ ├── coding.json # Code-heavy session template
│ ├── research.json # Research/analysis template
│ ├── ops.json # DevOps/infrastructure template
│ └── general.json # General-purpose template
└── metrics.json # Template performance metrics
"""
import json
import logging
import os
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Minimum messages for a session to be considered "proficient"
_MIN_PROFICIENT_MESSAGES = 20
# Maximum error rate for a session to be considered "successful"
_MAX_ERROR_RATE = 0.10
# Maximum templates to keep per category
_MAX_TEMPLATES_PER_CATEGORY = 5
def _warm_sessions_dir() -> Path:
"""Get the warm sessions directory."""
home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
d = home / "warm_sessions" / "templates"
d.mkdir(parents=True, exist_ok=True)
return d
def _metrics_path() -> Path:
"""Get the metrics file path."""
home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
d = home / "warm_sessions"
d.mkdir(parents=True, exist_ok=True)
return d / "metrics.json"
def _load_metrics() -> Dict[str, Any]:
"""Load template performance metrics."""
path = _metrics_path()
if path.exists():
try:
return json.loads(path.read_text())
except Exception:
pass
return {"templates": {}, "provisions": [], "version": 1}
def _save_metrics(metrics: Dict[str, Any]) -> None:
"""Save template performance metrics."""
_metrics_path().write_text(json.dumps(metrics, indent=2, default=str))
def classify_session(messages: List[Dict[str, Any]]) -> str:
"""Classify a session into a category based on tool usage patterns.
Returns one of: 'coding', 'research', 'ops', 'general'
"""
tool_calls = []
for msg in messages:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
for tc in msg.get("tool_calls", []):
fn = tc.get("function", {})
name = fn.get("name", "")
tool_calls.append(name)
if not tool_calls:
return "general"
# Count tool categories
coding_tools = {"execute_code", "write_file", "read_file", "patch", "search_files"}
research_tools = {"web_search", "web_extract", "session_search", "fact_store", "read_file"}
ops_tools = {"terminal", "cronjob", "process", "send_message"}
coding_count = sum(1 for t in tool_calls if t in coding_tools)
research_count = sum(1 for t in tool_calls if t in research_tools)
ops_count = sum(1 for t in tool_calls if t in ops_tools)
total = len(tool_calls)
if coding_count / total > 0.4:
return "coding"
if research_count / total > 0.4:
return "research"
if ops_count / total > 0.4:
return "ops"
return "general"
def score_session(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Score a session for template-worthiness.
Returns dict with:
total_messages: int
tool_calls: int
tool_results: int
error_count: int (tool results containing error indicators)
error_rate: float (0.0-1.0)
unique_tools: int
diverse_tools: list
is_proficient: bool (meets minimum quality bar)
is_successful: bool (low error rate)
score: float (0.0-1.0 overall quality score)
category: str
"""
tool_calls = []
tool_results = []
errors = 0
for msg in messages:
role = msg.get("role", "")
if role == "assistant" and msg.get("tool_calls"):
for tc in msg.get("tool_calls", []):
fn = tc.get("function", {})
tool_calls.append(fn.get("name", ""))
elif role == "tool":
content = str(msg.get("content", ""))
tool_results.append(content)
# Detect errors in tool results
content_lower = content.lower()
if any(err in content_lower for err in [
"error", "traceback", "exception", "failed", "permission denied",
"not found", "command not found", "no such file",
]):
errors += 1
total = len(messages)
tc_count = len(tool_calls)
tr_count = len(tool_results)
unique_tools = list(set(tool_calls))
error_rate = errors / max(tr_count, 1)
is_proficient = total >= _MIN_PROFICIENT_MESSAGES and tc_count >= 5
is_successful = error_rate <= _MAX_ERROR_RATE and tc_count >= 3
# Score: weighted combination of message count, tool diversity, and success rate
msg_score = min(1.0, total / 100) # 100 msgs = perfect
diversity_score = min(1.0, len(unique_tools) / 8) # 8 unique tools = perfect
success_score = 1.0 - error_rate
overall_score = (msg_score * 0.2 + diversity_score * 0.3 + success_score * 0.5)
return {
"total_messages": total,
"tool_calls": tc_count,
"tool_results": tr_count,
"error_count": errors,
"error_rate": round(error_rate, 3),
"unique_tools": len(unique_tools),
"diverse_tools": unique_tools,
"is_proficient": is_proficient,
"is_successful": is_successful,
"score": round(overall_score, 3),
"category": classify_session(messages),
}
def extract_warm_seed(messages: List[Dict[str, Any]], max_messages: int = 30) -> List[Dict[str, Any]]:
"""Extract a warm seed from a session's messages.
Selects the most instructive messages — prioritizing:
1. Successful tool calls with meaningful results
2. User-agent exchanges that established patterns
3. System/meta messages that set context
Returns a curated subset suitable for seeding a new session.
"""
seed = []
successful_tool_pairs = [] # (assistant_with_tool_call, tool_result)
user_exchanges = [] # user messages with their assistant responses
i = 0
while i < len(messages):
msg = messages[i]
role = msg.get("role", "")
if role == "system":
# Include system messages for context
seed.append(msg)
elif role == "user":
# Look ahead for assistant response
if i + 1 < len(messages) and messages[i + 1].get("role") == "assistant":
user_exchanges.append((msg, messages[i + 1]))
elif role == "assistant" and msg.get("tool_calls"):
# Collect successful tool call + result pairs
for tc in msg.get("tool_calls", []):
tc_id = tc.get("id", "")
# Find matching tool result
for j in range(i + 1, min(i + 5, len(messages))):
if messages[j].get("role") == "tool" and messages[j].get("tool_call_id") == tc_id:
content = str(messages[j].get("content", ""))
is_error = any(e in content.lower() for e in [
"error", "traceback", "exception", "failed"
])
if not is_error and len(content) > 50:
successful_tool_pairs.append((msg, messages[j]))
break
i += 1
# Take the best successful tool examples (up to half the budget)
tool_budget = max_messages // 2
for assistant_msg, tool_msg in successful_tool_pairs[:tool_budget]:
seed.append(assistant_msg)
seed.append(tool_msg)
# Fill remaining with user exchanges
remaining = max_messages - len(seed)
for user_msg, assistant_msg in user_exchanges[:remaining // 2]:
seed.append(user_msg)
seed.append(assistant_msg)
return seed[:max_messages]
def capture_template(
session_id: str,
messages: List[Dict[str, Any]],
name: str = "",
description: str = "",
) -> Optional[Dict[str, Any]]:
"""Capture a session as a warm template.
Returns the template dict if successful, None if session doesn't meet quality bar.
"""
score_info = score_session(messages)
if not score_info["is_successful"]:
logger.info(
"Session %s not suitable for template: error_rate=%.1f%% (max %.0f%%)",
session_id, score_info["error_rate"] * 100, _MAX_ERROR_RATE * 100,
)
return None
if score_info["tool_calls"] < 3:
logger.info("Session %s not suitable: only %d tool calls", session_id, score_info["tool_calls"])
return None
seed = extract_warm_seed(messages)
if len(seed) < 5:
logger.info("Session %s: extracted seed too small (%d msgs)", session_id, len(seed))
return None
category = score_info["category"]
template_name = name or f"{category}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
template = {
"name": template_name,
"description": description or f"Warm template from session {session_id}",
"category": category,
"source_session": session_id,
"created_at": datetime.now().isoformat(),
"score_info": score_info,
"seed_messages": seed,
"seed_message_count": len(seed),
"version": 1,
}
# Save template
template_dir = _warm_sessions_dir()
template_path = template_dir / f"{template_name}.json"
template_path.write_text(json.dumps(template, indent=2, ensure_ascii=False))
# Update metrics
metrics = _load_metrics()
metrics["templates"][template_name] = {
"category": category,
"created_at": template["created_at"],
"score": score_info["score"],
"source_session": session_id,
"seed_messages": len(seed),
"provision_count": 0,
}
_save_metrics(metrics)
logger.info(
"Captured warm template '%s' (category=%s, score=%.2f, %d seed msgs from session %s)",
template_name, category, score_info["score"], len(seed), session_id,
)
return template
def list_templates(category: str = "") -> List[Dict[str, Any]]:
"""List available warm templates, optionally filtered by category."""
template_dir = _warm_sessions_dir()
templates = []
for path in sorted(template_dir.glob("*.json")):
try:
data = json.loads(path.read_text())
if category and data.get("category") != category:
continue
templates.append({
"name": data.get("name", path.stem),
"category": data.get("category", "unknown"),
"description": data.get("description", ""),
"score": data.get("score_info", {}).get("score", 0),
"seed_messages": data.get("seed_message_count", 0),
"created_at": data.get("created_at", ""),
"source_session": data.get("source_session", ""),
"path": str(path),
})
except Exception as e:
logger.debug("Failed to load template %s: %s", path, e)
# Sort by score descending
templates.sort(key=lambda t: t.get("score", 0), reverse=True)
return templates
def load_template(name: str) -> Optional[Dict[str, Any]]:
"""Load a warm template by name."""
template_dir = _warm_sessions_dir()
# Try exact match first
path = template_dir / f"{name}.json"
if path.exists():
try:
return json.loads(path.read_text())
except Exception as e:
logger.warning("Failed to load template %s: %s", name, e)
# Try category match
for path in template_dir.glob("*.json"):
try:
data = json.loads(path.read_text())
if data.get("category") == name:
return data
except Exception:
pass
return None
def provision_session(
category: str = "",
task_hint: str = "",
) -> Tuple[bool, List[Dict[str, Any]], str]:
"""Provision a warm session by loading the best matching template.
Args:
category: Desired template category (coding, research, ops, general).
If empty, selects the highest-scored template overall.
task_hint: Description of the task to help select the best template.
Returns:
(success, seed_messages, template_name_or_error)
"""
# Find best template
if category:
templates = list_templates(category)
else:
templates = list_templates()
if not templates:
return False, [], "No warm templates available"
# Select best template
best = templates[0]
template = load_template(best["name"])
if not template:
return False, [], f"Failed to load template: {best['name']}"
seed_messages = template.get("seed_messages", [])
# Record provision event
metrics = _load_metrics()
metrics.setdefault("provisions", []).append({
"template": best["name"],
"category": best["category"],
"timestamp": datetime.now().isoformat(),
"task_hint": task_hint[:200] if task_hint else "",
})
# Update provision count
if best["name"] in metrics.get("templates", {}):
metrics["templates"][best["name"]]["provision_count"] = (
metrics["templates"][best["name"]].get("provision_count", 0) + 1
)
_save_metrics(metrics)
logger.info(
"Provisioned warm session from template '%s' (category=%s, %d seed msgs)",
best["name"], best["category"], len(seed_messages),
)
return True, seed_messages, best["name"]
def get_provision_stats() -> Dict[str, Any]:
"""Get statistics about warm session provisioning."""
metrics = _load_metrics()
templates = metrics.get("templates", {})
provisions = metrics.get("provisions", [])
total_provisions = len(provisions)
categories = {}
for t in templates.values():
cat = t.get("category", "unknown")
categories[cat] = categories.get(cat, 0) + 1
return {
"total_templates": len(templates),
"total_provisions": total_provisions,
"categories": categories,
"recent_provisions": provisions[-10:] if provisions else [],
}