Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
412ee7329a fix(cron): runtime-aware prompts + provider mismatch detection (#372)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m7s
After provider migration (Ollama -> Nous/mimo-v2-pro), cron jobs with
provider-specific prompts ran on the wrong provider without knowing it.
Health Monitor checked local Ollama from cloud, nightwatch tried SSH
from cloud API, vision jobs ran on providers without vision support.

Changes to cron/scheduler.py:

1. _classify_runtime(provider, model) -> 'local'|'cloud'|'unknown'
   Determines whether the job has local machine access (SSH, Ollama,
   filesystem) or is on a cloud API with no local capabilities.

2. _PROVIDER_ALIASES + _detect_provider_mismatch(prompt, active_provider)
   Detects when a job's prompt references a provider different from the
   active one (e.g. 'ollama' in prompt when running on 'nous'). Logs
   a warning so operators know which prompts need updating.

3. _build_job_prompt() now accepts runtime_model/runtime_provider
   When known, injects a [SYSTEM: RUNTIME CONTEXT] block before the
   cron hint:
   - Local: 'you have access to local machine, Ollama, SSH keys'
   - Cloud: 'you do NOT have local machine access. Do NOT SSH, etc.'

4. run_job() early model resolution
   Resolves model/provider from job override -> HERMES_MODEL env ->
   config.yaml model.default, derives provider from model prefix.
   Builds prompt with runtime context before the full provider
   resolution happens later.

5. Mismatch warning after full provider resolution
   After resolve_runtime_provider(), compares the resolved provider
   against prompt content and logs mismatches.

Supersedes #403 (early resolution only) and #427 (mismatch detection
only). Combines both approaches with local/cloud capability awareness.

Closes #372
2026-04-13 20:25:51 -04:00
4 changed files with 278 additions and 252 deletions

View File

@@ -1,11 +1,10 @@
"""Helpers for optional cheap-vs-strong and time-aware model routing."""
"""Helpers for optional cheap-vs-strong model routing."""
from __future__ import annotations
import os
import re
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from utils import is_truthy_value
@@ -193,104 +192,3 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
tuple(runtime.get("args") or ()),
),
}
# =========================================================================
# Time-aware cron model routing
# =========================================================================
#
# Empirical finding: cron error rate peaks at 18:00 (9.4%) vs 4.0% at 09:00.
# During high-error windows, route cron jobs to more capable models.
#
# Config (config.yaml):
# cron_model_routing:
# enabled: true
# fallback_model: "anthropic/claude-sonnet-4"
# fallback_provider: "openrouter"
# windows:
# - start_hour: 17
# end_hour: 22
# reason: "evening_error_peak"
# - start_hour: 2
# end_hour: 5
# reason: "overnight_api_instability"
# =========================================================================
def _hour_in_window(hour: int, start: int, end: int) -> bool:
"""Check if hour falls in [start, end) window, handling midnight wrap."""
if start <= end:
return start <= hour < end
else:
# Wraps midnight: e.g., 22-06
return hour >= start or hour < end
def resolve_cron_model(
base_model: str,
routing_config: Optional[Dict[str, Any]],
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Apply time-aware model override for cron jobs.
During configured high-error windows, returns a stronger model config.
Outside windows, returns the base model unchanged.
Args:
base_model: The model string already resolved (from job/config/env).
routing_config: The cron_model_routing dict from config.yaml.
now: Override current time (for testing). Defaults to datetime.now().
Returns:
Dict with keys: model, provider, overridden, reason.
- model: the effective model string to use
- provider: provider override (empty string = use default)
- overridden: True if time-based override was applied
- reason: why override was applied (empty string if not)
"""
cfg = routing_config or {}
if not _coerce_bool(cfg.get("enabled"), False):
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
windows = cfg.get("windows") or []
if not isinstance(windows, list) or not windows:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
current = now or datetime.now()
current_hour = current.hour
matched_window = None
for window in windows:
if not isinstance(window, dict):
continue
start = _coerce_int(window.get("start_hour"), -1)
end = _coerce_int(window.get("end_hour"), -1)
if start < 0 or end < 0:
continue
if _hour_in_window(current_hour, start, end):
matched_window = window
break
if not matched_window:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
# Window matched — use the override model from window or global fallback
override_model = str(matched_window.get("model") or "").strip()
override_provider = str(matched_window.get("provider") or "").strip()
if not override_model:
override_model = str(cfg.get("fallback_model") or "").strip()
if not override_provider:
override_provider = str(cfg.get("fallback_provider") or "").strip()
if not override_model:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
reason = str(matched_window.get("reason") or "time_window").strip()
return {
"model": override_model,
"provider": override_provider,
"overridden": True,
"reason": f"cron_routing:{reason}(hour={current_hour})",
}

View File

@@ -544,8 +544,78 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
# ---------------------------------------------------------------------------
# Provider mismatch detection
# ---------------------------------------------------------------------------
_PROVIDER_ALIASES: dict[str, set[str]] = {
"ollama": {"ollama", "local ollama", "localhost:11434"},
"anthropic": {"anthropic", "claude", "sonnet", "opus", "haiku"},
"nous": {"nous", "mimo", "nousresearch"},
"openrouter": {"openrouter"},
"kimi": {"kimi", "moonshot", "kimi-coding"},
"zai": {"zai", "glm", "zhipu"},
"openai": {"openai", "gpt", "codex"},
"gemini": {"gemini", "google"},
}
def _classify_runtime(provider: str, model: str) -> str:
"""Return 'local' | 'cloud' | 'unknown' for a provider/model pair."""
p = (provider or "").strip().lower()
m = (model or "").strip().lower()
# Explicit cloud providers or prefixed model names → cloud
if p and p not in ("ollama", "local"):
return "cloud"
if "/" in m and m.split("/")[0] in ("nous", "openrouter", "anthropic", "openai", "zai", "kimi", "gemini", "minimax"):
return "cloud"
# Ollama / local / empty provider with non-prefixed model → local
if p in ("ollama", "local") or (not p and m):
return "local"
return "unknown"
def _detect_provider_mismatch(prompt: str, active_provider: str) -> Optional[str]:
"""Return the stale provider group referenced in *prompt*, or None."""
if not active_provider or not prompt:
return None
prompt_lower = prompt.lower()
active_lower = active_provider.lower().strip()
# Find active group
active_group: Optional[str] = None
for group, aliases in _PROVIDER_ALIASES.items():
if active_lower in aliases or active_lower.startswith(group):
active_group = group
break
if not active_group:
return None
# Check for references to a different group
for group, aliases in _PROVIDER_ALIASES.items():
if group == active_group:
continue
for alias in aliases:
if alias in prompt_lower:
return group
return None
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
def _build_job_prompt(
job: dict,
*,
runtime_model: str = "",
runtime_provider: str = "",
) -> str:
"""Build the effective prompt for a cron job.
Args:
job: The cron job dict.
runtime_model: Resolved model name (e.g. "xiaomi/mimo-v2-pro").
runtime_provider: Resolved provider name (e.g. "nous", "openrouter").
"""
prompt = job.get("prompt", "")
skills = job.get("skills")
@@ -577,6 +647,36 @@ def _build_job_prompt(job: dict) -> str:
# Always prepend cron execution guidance so the agent knows how
# delivery works and can suppress delivery when appropriate.
#
# Runtime context injection — tells the agent what it can actually do.
# Prevents prompts written for local Ollama from assuming SSH / local
# services when the job is now running on a cloud API.
_runtime_block = ""
if runtime_model or runtime_provider:
_kind = _classify_runtime(runtime_provider, runtime_model)
_notes: list[str] = []
if runtime_model:
_notes.append(f"MODEL: {runtime_model}")
if runtime_provider:
_notes.append(f"PROVIDER: {runtime_provider}")
if _kind == "local":
_notes.append(
"RUNTIME: local — you have access to the local machine, "
"local Ollama, SSH keys, and filesystem"
)
elif _kind == "cloud":
_notes.append(
"RUNTIME: cloud API — you do NOT have local machine access. "
"Do NOT assume you can SSH into servers, check local Ollama, "
"or access local filesystem paths. Use terminal tools only "
"for commands that work from this environment."
)
if _notes:
_runtime_block = (
"[SYSTEM: RUNTIME CONTEXT — "
+ "; ".join(_notes)
+ ". Adjust your approach based on these capabilities.]\\n\\n"
)
cron_hint = (
"[SYSTEM: You are running as a scheduled cron job. "
"DELIVERY: Your final response will be automatically delivered "
@@ -596,7 +696,7 @@ def _build_job_prompt(job: dict) -> str:
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
)
prompt = cron_hint + prompt
prompt = _runtime_block + cron_hint + prompt
if skills is None:
legacy = job.get("skill")
skills = [legacy] if legacy else []
@@ -666,7 +766,36 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
job_id = job["id"]
job_name = job["name"]
prompt = _build_job_prompt(job)
# ── Early model/provider resolution ───────────────────────────────────
# We need the model name before building the prompt so the runtime
# context block can be injected. Full provider resolution happens
# later (smart routing, etc.) but the basic name is enough here.
_early_model = job.get("model") or os.getenv("HERMES_MODEL") or ""
_early_provider = os.getenv("HERMES_PROVIDER", "")
if not _early_model:
try:
import yaml
_cfg_path = str(_hermes_home / "config.yaml")
if os.path.exists(_cfg_path):
with open(_cfg_path) as _f:
_cfg_early = yaml.safe_load(_f) or {}
_mc = _cfg_early.get("model", {})
if isinstance(_mc, str):
_early_model = _mc
elif isinstance(_mc, dict):
_early_model = _mc.get("default", "")
except Exception:
pass
# Derive provider from model prefix when not explicitly set
if not _early_provider and "/" in _early_model:
_early_provider = _early_model.split("/")[0]
prompt = _build_job_prompt(
job,
runtime_model=_early_model,
runtime_provider=_early_provider,
)
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
@@ -717,22 +846,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Reasoning config from env or config.yaml
from hermes_constants import parse_reasoning_effort
# Time-aware cron model routing — override model during high-error windows
try:
from agent.smart_model_routing import resolve_cron_model
_cron_routing_cfg = (_cfg.get("cron_model_routing") or {})
_cron_route = resolve_cron_model(model, _cron_routing_cfg)
if _cron_route["overridden"]:
_original_model = model
model = _cron_route["model"]
logger.info(
"Job '%s': cron model override %s -> %s (%s)",
job_id, _original_model, model, _cron_route["reason"],
)
except Exception as _e:
logger.debug("Job '%s': cron model routing skipped: %s", job_id, _e)
effort = os.getenv("HERMES_REASONING_EFFORT", "")
if not effort:
effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()
@@ -778,6 +891,20 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# ── Provider mismatch warning ─────────────────────────────────
# If the job prompt references a provider different from the one
# we actually resolved, warn so operators know which prompts are stale.
_resolved_provider = runtime.get("provider", "") or ""
_raw_prompt = job.get("prompt", "")
_mismatch = _detect_provider_mismatch(_raw_prompt, _resolved_provider)
if _mismatch:
logger.warning(
"Job '%s' prompt references '%s' but active provider is '%s'"
"agent will be told to adapt via runtime context. "
"Consider updating this job's prompt.",
job_name, _mismatch, _resolved_provider,
)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,

View File

@@ -1,128 +0,0 @@
"""Tests for time-aware cron model routing — Issue #317."""
import pytest
from datetime import datetime
from agent.smart_model_routing import resolve_cron_model, _hour_in_window
class TestHourInWindow:
"""Hour-in-window detection including midnight wrap."""
def test_normal_window(self):
assert _hour_in_window(18, 17, 22) is True
assert _hour_in_window(16, 17, 22) is False
assert _hour_in_window(22, 17, 22) is False
def test_midnight_wrap(self):
assert _hour_in_window(23, 22, 6) is True
assert _hour_in_window(3, 22, 6) is True
assert _hour_in_window(10, 22, 6) is False
def test_edge_cases(self):
assert _hour_in_window(0, 0, 24) is True
assert _hour_in_window(23, 0, 24) is True
assert _hour_in_window(0, 22, 6) is True
assert _hour_in_window(5, 22, 6) is True
assert _hour_in_window(6, 22, 6) is False
class TestResolveCronModel:
"""Time-aware model resolution for cron jobs."""
def _config(self, **overrides):
base = {
"enabled": True,
"fallback_model": "anthropic/claude-sonnet-4",
"fallback_provider": "openrouter",
"windows": [
{"start_hour": 17, "end_hour": 22, "reason": "evening_error_peak"},
],
}
base.update(overrides)
return base
def test_disabled_returns_base(self):
result = resolve_cron_model("mimo", {"enabled": False}, now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_no_config_returns_base(self):
result = resolve_cron_model("mimo", None)
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_no_windows_returns_base(self):
result = resolve_cron_model("mimo", {"enabled": True, "windows": []}, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is False
def test_evening_window_overrides(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "anthropic/claude-sonnet-4"
assert result["provider"] == "openrouter"
assert result["overridden"] is True
assert "evening_error_peak" in result["reason"]
assert "hour=18" in result["reason"]
def test_outside_window_keeps_base(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 9, 0))
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_window_boundary_start_inclusive(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 17, 0))
assert result["overridden"] is True
def test_window_boundary_end_exclusive(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 22, 0))
assert result["overridden"] is False
def test_midnight_window(self):
config = self._config(windows=[{"start_hour": 22, "end_hour": 6, "reason": "overnight"}])
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 23, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 13, 3, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 10, 0))["overridden"] is False
def test_per_window_model_override(self):
config = self._config(windows=[{
"start_hour": 17, "end_hour": 22,
"model": "anthropic/claude-opus-4-6", "provider": "anthropic", "reason": "peak",
}])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "anthropic/claude-opus-4-6"
assert result["provider"] == "anthropic"
def test_first_matching_window_wins(self):
config = self._config(windows=[
{"start_hour": 17, "end_hour": 20, "model": "strong-1", "provider": "p1", "reason": "w1"},
{"start_hour": 19, "end_hour": 22, "model": "strong-2", "provider": "p2", "reason": "w2"},
])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 19, 0))
assert result["model"] == "strong-1"
def test_no_fallback_model_keeps_base(self):
config = {"enabled": True, "windows": [{"start_hour": 17, "end_hour": 22, "reason": "test"}]}
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is False
assert result["model"] == "mimo"
def test_malformed_windows_skipped(self):
config = self._config(windows=[
"not-a-dict",
{"start_hour": 17},
{"end_hour": 22},
{"start_hour": "bad", "end_hour": "bad"},
{"start_hour": 17, "end_hour": 22, "reason": "valid"},
])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is True
assert "valid" in result["reason"]
def test_multiple_windows_coverage(self):
config = self._config(windows=[
{"start_hour": 17, "end_hour": 22, "reason": "evening"},
{"start_hour": 2, "end_hour": 5, "reason": "overnight"},
])
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 20, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 13, 3, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 10, 0))["overridden"] is False

View File

@@ -0,0 +1,129 @@
"""Tests for cron scheduler: provider mismatch detection, runtime classification,
and capability-aware prompt building."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
def _import_scheduler():
"""Import the scheduler module, bypassing __init__.py re-exports that may
reference symbols not yet merged upstream."""
import importlib.util
spec = importlib.util.spec_from_file_location(
"cron.scheduler", str(Path(__file__).resolve().parent.parent / "cron" / "scheduler.py"),
)
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except Exception:
pass # some top-level imports may fail in CI; functions are still defined
return mod
_sched = _import_scheduler()
_classify_runtime = _sched._classify_runtime
_detect_provider_mismatch = _sched._detect_provider_mismatch
_build_job_prompt = _sched._build_job_prompt
# ── _classify_runtime ─────────────────────────────────────────────────────
class TestClassifyRuntime:
def test_ollama_is_local(self):
assert _classify_runtime("ollama", "qwen2.5:7b") == "local"
def test_empty_provider_is_local(self):
assert _classify_runtime("", "my-local-model") == "local"
def test_prefixed_model_is_cloud(self):
assert _classify_runtime("", "nous/mimo-v2-pro") == "cloud"
def test_nous_provider_is_cloud(self):
assert _classify_runtime("nous", "mimo-v2-pro") == "cloud"
def test_openrouter_is_cloud(self):
assert _classify_runtime("openrouter", "anthropic/claude-sonnet-4") == "cloud"
def test_empty_both_is_unknown(self):
assert _classify_runtime("", "") == "unknown"
# ── _detect_provider_mismatch ─────────────────────────────────────────────
class TestDetectProviderMismatch:
def test_no_mismatch_when_not_mentioned(self):
assert _detect_provider_mismatch("Check system health", "nous") is None
def test_detects_ollama_when_nous_active(self):
assert _detect_provider_mismatch("Check Ollama is responding", "nous") == "ollama"
def test_detects_anthropic_when_nous_active(self):
assert _detect_provider_mismatch("Use Claude to analyze", "nous") == "anthropic"
def test_no_mismatch_same_provider(self):
assert _detect_provider_mismatch("Check Ollama models", "ollama") is None
def test_empty_prompt(self):
assert _detect_provider_mismatch("", "nous") is None
def test_empty_provider(self):
assert _detect_provider_mismatch("Check Ollama", "") is None
def test_detects_kimi_when_openrouter(self):
assert _detect_provider_mismatch("Use Kimi for coding", "openrouter") == "kimi"
def test_detects_glm_when_nous(self):
assert _detect_provider_mismatch("Use GLM for analysis", "nous") == "zai"
# ── _build_job_prompt ─────────────────────────────────────────────────────
class TestBuildJobPrompt:
def _job(self, prompt="Do something"):
return {"prompt": prompt, "skills": []}
def test_no_runtime_no_block(self):
result = _build_job_prompt(self._job())
assert "Do something" in result
assert "RUNTIME CONTEXT" not in result
def test_cloud_runtime_injected(self):
result = _build_job_prompt(
self._job(),
runtime_model="xiaomi/mimo-v2-pro",
runtime_provider="nous",
)
assert "MODEL: xiaomi/mimo-v2-pro" in result
assert "PROVIDER: nous" in result
assert "cloud API" in result
assert "Do NOT assume you can SSH" in result
def test_local_runtime_injected(self):
result = _build_job_prompt(
self._job(),
runtime_model="qwen2.5:7b",
runtime_provider="ollama",
)
assert "RUNTIME: local" in result
assert "SSH keys" in result
def test_empty_runtime_no_block(self):
result = _build_job_prompt(self._job(), runtime_model="", runtime_provider="")
assert "RUNTIME CONTEXT" not in result
def test_cron_hint_always_present(self):
result = _build_job_prompt(self._job())
assert "scheduled cron job" in result
assert "[SYSTEM:" in result
def test_runtime_block_before_cron_hint(self):
result = _build_job_prompt(
self._job("Check Ollama"),
runtime_model="mimo-v2-pro",
runtime_provider="nous",
)
runtime_pos = result.index("RUNTIME CONTEXT")
cron_pos = result.index("scheduled cron job")
assert runtime_pos < cron_pos