Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
412ee7329a fix(cron): runtime-aware prompts + provider mismatch detection (#372)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m7s
After provider migration (Ollama -> Nous/mimo-v2-pro), cron jobs with
provider-specific prompts ran on the wrong provider without knowing it.
Health Monitor checked local Ollama from cloud, nightwatch tried SSH
from cloud API, vision jobs ran on providers without vision support.

Changes to cron/scheduler.py:

1. _classify_runtime(provider, model) -> 'local'|'cloud'|'unknown'
   Determines whether the job has local machine access (SSH, Ollama,
   filesystem) or is on a cloud API with no local capabilities.

2. _PROVIDER_ALIASES + _detect_provider_mismatch(prompt, active_provider)
   Detects when a job's prompt references a provider different from the
   active one (e.g. 'ollama' in prompt when running on 'nous'). Logs
   a warning so operators know which prompts need updating.

3. _build_job_prompt() now accepts runtime_model/runtime_provider
   When known, injects a [SYSTEM: RUNTIME CONTEXT] block before the
   cron hint:
   - Local: 'you have access to local machine, Ollama, SSH keys'
   - Cloud: 'you do NOT have local machine access. Do NOT SSH, etc.'

4. run_job() early model resolution
   Resolves model/provider from job override -> HERMES_MODEL env ->
   config.yaml model.default, derives provider from model prefix.
   Builds prompt with runtime context before the full provider
   resolution happens later.

5. Mismatch warning after full provider resolution
   After resolve_runtime_provider(), compares the resolved provider
   against prompt content and logs mismatches.

Supersedes #403 (early resolution only) and #427 (mismatch detection
only). Combines both approaches with local/cloud capability awareness.

Closes #372
2026-04-13 20:25:51 -04:00
3 changed files with 268 additions and 197 deletions

View File

@@ -13,7 +13,6 @@ import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
@@ -157,27 +156,6 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Model context guard
# ---------------------------------------------------------------------------
CRON_MIN_CONTEXT_TOKENS = 4096
class ModelContextError(ValueError):
"""Raised when a job's model has insufficient context for cron execution."""
pass
def _check_model_context_compat(model: str, context_length: int) -> None:
"""Raise ModelContextError if the model context is below the cron minimum."""
if context_length < CRON_MIN_CONTEXT_TOKENS:
raise ModelContextError(
f"Model '{model}' context ({context_length} tokens) is below the "
f"minimum {CRON_MIN_CONTEXT_TOKENS} tokens required for cron jobs."
)
# Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
@@ -567,56 +545,77 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
# ---------------------------------------------------------------------------
# Cloud context warning — detect local service refs in cloud cron prompts
# Provider mismatch detection
# ---------------------------------------------------------------------------
_LOCAL_SERVICE_PATTERNS = [
r'localhost:\d{2,5}',
r'127\.0\.0\.\d{1,3}:\d{2,5}',
r'0\.0\.0\.0:\d{2,5}',
r'\bollama\b',
r'curl\s+.*localhost',
r'wget\s+.*localhost',
r'http://localhost',
r'https?://127\.',
r'https?://0\.0\.0\.0',
r'check.*ollama',
r'connect.*local',
r'hermes.*gateway.*local',
]
_LOCAL_SERVICE_RE = [re.compile(p, re.IGNORECASE) for p in _LOCAL_SERVICE_PATTERNS]
_PROVIDER_ALIASES: dict[str, set[str]] = {
"ollama": {"ollama", "local ollama", "localhost:11434"},
"anthropic": {"anthropic", "claude", "sonnet", "opus", "haiku"},
"nous": {"nous", "mimo", "nousresearch"},
"openrouter": {"openrouter"},
"kimi": {"kimi", "moonshot", "kimi-coding"},
"zai": {"zai", "glm", "zhipu"},
"openai": {"openai", "gpt", "codex"},
"gemini": {"gemini", "google"},
}
def _detect_local_service_refs(prompt: str) -> list[str]:
"""Scan a prompt for references to local services (Ollama, localhost, etc.).
def _classify_runtime(provider: str, model: str) -> str:
"""Return 'local' | 'cloud' | 'unknown' for a provider/model pair."""
p = (provider or "").strip().lower()
m = (model or "").strip().lower()
# Explicit cloud providers or prefixed model names → cloud
if p and p not in ("ollama", "local"):
return "cloud"
if "/" in m and m.split("/")[0] in ("nous", "openrouter", "anthropic", "openai", "zai", "kimi", "gemini", "minimax"):
return "cloud"
# Ollama / local / empty provider with non-prefixed model → local
if p in ("ollama", "local") or (not p and m):
return "local"
return "unknown"
Returns list of matched patterns for logging.
def _detect_provider_mismatch(prompt: str, active_provider: str) -> Optional[str]:
"""Return the stale provider group referenced in *prompt*, or None."""
if not active_provider or not prompt:
return None
prompt_lower = prompt.lower()
active_lower = active_provider.lower().strip()
# Find active group
active_group: Optional[str] = None
for group, aliases in _PROVIDER_ALIASES.items():
if active_lower in aliases or active_lower.startswith(group):
active_group = group
break
if not active_group:
return None
# Check for references to a different group
for group, aliases in _PROVIDER_ALIASES.items():
if group == active_group:
continue
for alias in aliases:
if alias in prompt_lower:
return group
return None
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
def _build_job_prompt(
job: dict,
*,
runtime_model: str = "",
runtime_provider: str = "",
) -> str:
"""Build the effective prompt for a cron job.
Args:
job: The cron job dict.
runtime_model: Resolved model name (e.g. "xiaomi/mimo-v2-pro").
runtime_provider: Resolved provider name (e.g. "nous", "openrouter").
"""
matches = []
for pattern_re in _LOCAL_SERVICE_RE:
if pattern_re.search(prompt):
matches.append(pattern_re.pattern)
return matches
def _inject_cloud_context(prompt: str, local_refs: list[str]) -> str:
"""Prepend a warning when cron runs on cloud but prompt refs local services.
The agent reports the limitation instead of wasting iterations on doomed connections.
"""
warning = (
"[SYSTEM NOTE: You are running on a cloud endpoint, but your prompt references "
"local services (localhost/Ollama). You cannot reach localhost from a cloud "
"endpoint. Report this limitation to the user and suggest running the job on "
"a local endpoint instead. Do NOT attempt to connect to localhost — it will "
"timeout and waste your iteration budget.]\n\n"
)
return warning + prompt
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
prompt = job.get("prompt", "")
skills = job.get("skills")
@@ -648,6 +647,36 @@ def _build_job_prompt(job: dict) -> str:
# Always prepend cron execution guidance so the agent knows how
# delivery works and can suppress delivery when appropriate.
#
# Runtime context injection — tells the agent what it can actually do.
# Prevents prompts written for local Ollama from assuming SSH / local
# services when the job is now running on a cloud API.
_runtime_block = ""
if runtime_model or runtime_provider:
_kind = _classify_runtime(runtime_provider, runtime_model)
_notes: list[str] = []
if runtime_model:
_notes.append(f"MODEL: {runtime_model}")
if runtime_provider:
_notes.append(f"PROVIDER: {runtime_provider}")
if _kind == "local":
_notes.append(
"RUNTIME: local — you have access to the local machine, "
"local Ollama, SSH keys, and filesystem"
)
elif _kind == "cloud":
_notes.append(
"RUNTIME: cloud API — you do NOT have local machine access. "
"Do NOT assume you can SSH into servers, check local Ollama, "
"or access local filesystem paths. Use terminal tools only "
"for commands that work from this environment."
)
if _notes:
_runtime_block = (
"[SYSTEM: RUNTIME CONTEXT — "
+ "; ".join(_notes)
+ ". Adjust your approach based on these capabilities.]\\n\\n"
)
cron_hint = (
"[SYSTEM: You are running as a scheduled cron job. "
"DELIVERY: Your final response will be automatically delivered "
@@ -667,7 +696,7 @@ def _build_job_prompt(job: dict) -> str:
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
)
prompt = cron_hint + prompt
prompt = _runtime_block + cron_hint + prompt
if skills is None:
legacy = job.get("skill")
skills = [legacy] if legacy else []
@@ -737,7 +766,36 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
job_id = job["id"]
job_name = job["name"]
prompt = _build_job_prompt(job)
# ── Early model/provider resolution ───────────────────────────────────
# We need the model name before building the prompt so the runtime
# context block can be injected. Full provider resolution happens
# later (smart routing, etc.) but the basic name is enough here.
_early_model = job.get("model") or os.getenv("HERMES_MODEL") or ""
_early_provider = os.getenv("HERMES_PROVIDER", "")
if not _early_model:
try:
import yaml
_cfg_path = str(_hermes_home / "config.yaml")
if os.path.exists(_cfg_path):
with open(_cfg_path) as _f:
_cfg_early = yaml.safe_load(_f) or {}
_mc = _cfg_early.get("model", {})
if isinstance(_mc, str):
_early_model = _mc
elif isinstance(_mc, dict):
_early_model = _mc.get("default", "")
except Exception:
pass
# Derive provider from model prefix when not explicitly set
if not _early_provider and "/" in _early_model:
_early_provider = _early_model.split("/")[0]
prompt = _build_job_prompt(
job,
runtime_model=_early_model,
runtime_provider=_early_provider,
)
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
@@ -833,15 +891,19 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# Cloud context warning: if running on cloud but prompt refs local services,
# inject a warning so the agent reports the limitation instead of wasting
# iterations on doomed connections. (Fixes #378, #456)
base_url = runtime.get("base_url") or ""
is_cloud = not any(h in base_url for h in ("localhost", "127.0.0.1", "0.0.0.0", "::1"))
local_refs = _detect_local_service_refs(prompt)
if is_cloud and local_refs:
logger.info("Job '%s': cloud endpoint + local service refs detected, injecting warning", job_name)
prompt = _inject_cloud_context(prompt, local_refs)
# ── Provider mismatch warning ─────────────────────────────────
# If the job prompt references a provider different from the one
# we actually resolved, warn so operators know which prompts are stale.
_resolved_provider = runtime.get("provider", "") or ""
_raw_prompt = job.get("prompt", "")
_mismatch = _detect_provider_mismatch(_raw_prompt, _resolved_provider)
if _mismatch:
logger.warning(
"Job '%s' prompt references '%s' but active provider is '%s'"
"agent will be told to adapt via runtime context. "
"Consider updating this job's prompt.",
job_name, _mismatch, _resolved_provider,
)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(

View File

@@ -1,120 +0,0 @@
"""Tests for cron cloud context warning injection (fix #378, #456).
When a cron job runs on a cloud endpoint but its prompt references local
services (Ollama, localhost, etc.), inject a warning so the agent reports
the limitation instead of wasting iterations on doomed connections.
"""
import pytest
from cron.scheduler import (
_detect_local_service_refs,
_inject_cloud_context,
_LOCAL_SERVICE_PATTERNS,
)
# ---------------------------------------------------------------------------
# Pattern detection
# ---------------------------------------------------------------------------
class TestDetectLocalServiceRefs:
def test_localhost_with_port(self):
refs = _detect_local_service_refs("Check http://localhost:8080/status")
assert len(refs) > 0
assert any("localhost" in r for r in refs)
def test_127_address(self):
refs = _detect_local_service_refs("Connect to 127.0.0.1:11434")
assert len(refs) > 0
def test_ollama_reference(self):
refs = _detect_local_service_refs("Run this on Ollama with gemma3")
assert len(refs) > 0
assert any("ollama" in r.lower() for r in refs)
def test_curl_localhost(self):
refs = _detect_local_service_refs("curl localhost:3000/api/data")
assert len(refs) > 0
def test_wget_localhost(self):
refs = _detect_local_service_refs("wget http://localhost/file.txt")
assert len(refs) > 0
def test_http_localhost(self):
refs = _detect_local_service_refs("http://localhost:8642/health")
assert len(refs) > 0
def test_https_127(self):
refs = _detect_local_service_refs("https://127.0.0.1:443/secure")
assert len(refs) > 0
def test_0000_address(self):
refs = _detect_local_service_refs("Bind to 0.0.0.0:9090")
assert len(refs) > 0
def test_no_match_for_remote(self):
refs = _detect_local_service_refs("Check https://api.openai.com/v1/models")
assert len(refs) == 0
def test_no_match_for_gitea(self):
refs = _detect_local_service_refs("Query forge.alexanderwhitestone.com for issues")
assert len(refs) == 0
def test_no_match_empty(self):
refs = _detect_local_service_refs("")
assert len(refs) == 0
def test_check_ollama_phrase(self):
refs = _detect_local_service_refs("First check Ollama is running")
assert len(refs) > 0
def test_connect_local_phrase(self):
refs = _detect_local_service_refs("Connect to local Ollama server")
assert len(refs) > 0
# ---------------------------------------------------------------------------
# Warning injection
# ---------------------------------------------------------------------------
class TestInjectCloudContext:
def test_prepends_warning(self):
original = "Run a health check on localhost:8080"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "SYSTEM NOTE" in result
assert "cloud endpoint" in result
assert original in result
def test_warning_is_first(self):
original = "Check localhost:11434"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert result.startswith("[SYSTEM NOTE")
def test_preserves_original_prompt(self):
original = "Do something with Ollama and then report results"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "Do something with Ollama" in result
def test_mentions_cannot_reach(self):
original = "curl localhost:8080"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "cannot reach" in result.lower() or "cannot" in result.lower()
# ---------------------------------------------------------------------------
# Pattern coverage
# ---------------------------------------------------------------------------
class TestPatternCoverage:
def test_at_least_10_patterns(self):
assert len(_LOCAL_SERVICE_PATTERNS) >= 10
def test_patterns_are_strings(self):
for p in _LOCAL_SERVICE_PATTERNS:
assert isinstance(p, str)
assert len(p) > 0

View File

@@ -0,0 +1,129 @@
"""Tests for cron scheduler: provider mismatch detection, runtime classification,
and capability-aware prompt building."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
def _import_scheduler():
"""Import the scheduler module, bypassing __init__.py re-exports that may
reference symbols not yet merged upstream."""
import importlib.util
spec = importlib.util.spec_from_file_location(
"cron.scheduler", str(Path(__file__).resolve().parent.parent / "cron" / "scheduler.py"),
)
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except Exception:
pass # some top-level imports may fail in CI; functions are still defined
return mod
_sched = _import_scheduler()
_classify_runtime = _sched._classify_runtime
_detect_provider_mismatch = _sched._detect_provider_mismatch
_build_job_prompt = _sched._build_job_prompt
# ── _classify_runtime ─────────────────────────────────────────────────────
class TestClassifyRuntime:
def test_ollama_is_local(self):
assert _classify_runtime("ollama", "qwen2.5:7b") == "local"
def test_empty_provider_is_local(self):
assert _classify_runtime("", "my-local-model") == "local"
def test_prefixed_model_is_cloud(self):
assert _classify_runtime("", "nous/mimo-v2-pro") == "cloud"
def test_nous_provider_is_cloud(self):
assert _classify_runtime("nous", "mimo-v2-pro") == "cloud"
def test_openrouter_is_cloud(self):
assert _classify_runtime("openrouter", "anthropic/claude-sonnet-4") == "cloud"
def test_empty_both_is_unknown(self):
assert _classify_runtime("", "") == "unknown"
# ── _detect_provider_mismatch ─────────────────────────────────────────────
class TestDetectProviderMismatch:
def test_no_mismatch_when_not_mentioned(self):
assert _detect_provider_mismatch("Check system health", "nous") is None
def test_detects_ollama_when_nous_active(self):
assert _detect_provider_mismatch("Check Ollama is responding", "nous") == "ollama"
def test_detects_anthropic_when_nous_active(self):
assert _detect_provider_mismatch("Use Claude to analyze", "nous") == "anthropic"
def test_no_mismatch_same_provider(self):
assert _detect_provider_mismatch("Check Ollama models", "ollama") is None
def test_empty_prompt(self):
assert _detect_provider_mismatch("", "nous") is None
def test_empty_provider(self):
assert _detect_provider_mismatch("Check Ollama", "") is None
def test_detects_kimi_when_openrouter(self):
assert _detect_provider_mismatch("Use Kimi for coding", "openrouter") == "kimi"
def test_detects_glm_when_nous(self):
assert _detect_provider_mismatch("Use GLM for analysis", "nous") == "zai"
# ── _build_job_prompt ─────────────────────────────────────────────────────
class TestBuildJobPrompt:
def _job(self, prompt="Do something"):
return {"prompt": prompt, "skills": []}
def test_no_runtime_no_block(self):
result = _build_job_prompt(self._job())
assert "Do something" in result
assert "RUNTIME CONTEXT" not in result
def test_cloud_runtime_injected(self):
result = _build_job_prompt(
self._job(),
runtime_model="xiaomi/mimo-v2-pro",
runtime_provider="nous",
)
assert "MODEL: xiaomi/mimo-v2-pro" in result
assert "PROVIDER: nous" in result
assert "cloud API" in result
assert "Do NOT assume you can SSH" in result
def test_local_runtime_injected(self):
result = _build_job_prompt(
self._job(),
runtime_model="qwen2.5:7b",
runtime_provider="ollama",
)
assert "RUNTIME: local" in result
assert "SSH keys" in result
def test_empty_runtime_no_block(self):
result = _build_job_prompt(self._job(), runtime_model="", runtime_provider="")
assert "RUNTIME CONTEXT" not in result
def test_cron_hint_always_present(self):
result = _build_job_prompt(self._job())
assert "scheduled cron job" in result
assert "[SYSTEM:" in result
def test_runtime_block_before_cron_hint(self):
result = _build_job_prompt(
self._job("Check Ollama"),
runtime_model="mimo-v2-pro",
runtime_provider="nous",
)
runtime_pos = result.index("RUNTIME CONTEXT")
cron_pos = result.index("scheduled cron job")
assert runtime_pos < cron_pos