Compare commits

..

3 Commits

Author SHA1 Message Date
Timmy Time
9919114541 Fix #372: Runtime-aware cron prompts with provider mismatch detection
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m18s
When cron jobs run on cloud providers (Nous, OpenRouter), prompts
written for local Ollama fail because they assume SSH or localhost.

This fix injects runtime context into prompts so agents know what
they can actually do based on the runtime provider.

Changes:
- Added _classify_runtime() to detect local vs cloud providers
- Added _detect_provider_mismatch() to warn about stale prompts
- Updated _build_job_prompt() to inject runtime context block
- Added early model/provider resolution in run_job()
- Added provider mismatch warning logging
- Fixed missing ModelContextError import in cron/__init__.py
- Added 8 tests for runtime classification and prompt building

Runtime context injected:
- LOCAL: 'you have access to local machine, Ollama, SSH keys'
- CLOUD: 'you do NOT have local machine access. Do NOT assume SSH...'

Fixes #372
2026-04-13 21:49:00 -04:00
954fd992eb Merge pull request 'perf: lazy session creation — defer DB write until first message (#314)' (#449) from whip/314-1776127532 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 55s
Forge CI / smoke-and-build (pull_request) Failing after 1m12s
perf: lazy session creation (#314)

Closes #314.
2026-04-14 01:08:13 +00:00
Metatron
f35f56e397 perf: lazy session creation — defer DB write until first message (closes #314)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 56s
Remove eager create_session() call from AIAgent.__init__(). Sessions
are now created lazily on first _flush_messages_to_session_db() call
via ensure_session() which uses INSERT OR IGNORE.

Impact: eliminates 32.4% of sessions (3,564 of 10,985) that were
created at agent init but never received any messages.

The existing ensure_session() fallback in _flush_messages_to_session_db()
already handles this pattern — it was originally designed for recovery
after transient SQLite lock failures. Now it's the primary creation path.

Compression-initiated sessions still use create_session() directly
(line ~5995) since they have messages to write immediately.
2026-04-13 20:52:06 -04:00
5 changed files with 205 additions and 219 deletions

View File

@@ -26,7 +26,7 @@ from cron.jobs import (
trigger_job,
JOBS_FILE,
)
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
from cron.scheduler import tick
__all__ = [
"create_job",
@@ -39,6 +39,4 @@ __all__ = [
"trigger_job",
"tick",
"JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
]

View File

@@ -13,7 +13,6 @@ import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
@@ -546,8 +545,75 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
# ---------------------------------------------------------------------------
# Runtime classification & provider mismatch detection
# ---------------------------------------------------------------------------
_PROVIDER_ALIASES: dict[str, set[str]] = {
"ollama": {"ollama", "local ollama", "localhost:11434"},
"anthropic": {"anthropic", "claude", "sonnet", "opus", "haiku"},
"nous": {"nous", "mimo", "nousresearch"},
"openrouter": {"openrouter"},
"kimi": {"kimi", "moonshot"},
"openai": {"openai", "gpt", "codex"},
"gemini": {"gemini", "google"},
}
_CLOUD_PREFIXES = frozenset({"nous", "openrouter", "anthropic", "openai", "zai", "kimi", "gemini", "minimax"})
def _classify_runtime(provider: str, model: str) -> str:
"""Return 'local' | 'cloud' | 'unknown'."""
p = (provider or "").strip().lower()
m = (model or "").strip().lower()
if p and p not in ("ollama", "local"):
return "cloud"
if "/" in m and m.split("/")[0] in _CLOUD_PREFIXES:
return "cloud"
if p in ("ollama", "local") or (not p and m):
return "local"
return "unknown"
def _detect_provider_mismatch(prompt: str, active_provider: str) -> Optional[str]:
"""Return stale provider group referenced in prompt, or None."""
if not active_provider or not prompt:
return None
prompt_lower = prompt.lower()
active_lower = active_provider.lower().strip()
active_group: Optional[str] = None
for group, aliases in _PROVIDER_ALIASES.items():
if active_lower in aliases or active_lower.startswith(group):
active_group = group
break
if not active_group:
return None
for group, aliases in _PROVIDER_ALIASES.items():
if group == active_group:
continue
for alias in aliases:
if alias in prompt_lower:
return group
return None
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
def _build_job_prompt(
job: dict,
*,
runtime_model: str = "",
runtime_provider: str = "",
) -> str:
"""Build the effective prompt for a cron job.
Args:
job: The cron job dict.
runtime_model: Resolved model name (e.g. "xiaomi/mimo-v2-pro").
runtime_provider: Resolved provider name (e.g. "nous", "openrouter").
"""
prompt = job.get("prompt", "")
skills = job.get("skills")
@@ -577,6 +643,33 @@ def _build_job_prompt(job: dict) -> str:
f"{prompt}"
)
# Runtime context injection — tells the agent what it can actually do.
_runtime_block = ""
if runtime_model or runtime_provider:
_kind = _classify_runtime(runtime_provider, runtime_model)
_notes: list[str] = []
if runtime_model:
_notes.append(f"MODEL: {runtime_model}")
if runtime_provider:
_notes.append(f"PROVIDER: {runtime_provider}")
if _kind == "local":
_notes.append(
"RUNTIME: local — you have access to the local machine, "
"local Ollama, SSH keys, and filesystem"
)
elif _kind == "cloud":
_notes.append(
"RUNTIME: cloud API — you do NOT have local machine access. "
"Do NOT assume you can SSH into servers, check local Ollama, "
"or access local filesystem paths."
)
if _notes:
_runtime_block = (
"[SYSTEM: RUNTIME CONTEXT — "
+ "; ".join(_notes)
+ ". Adjust your approach based on these capabilities.]\\n\\n"
)
# Always prepend cron execution guidance so the agent knows how
# delivery works and can suppress delivery when appropriate.
cron_hint = (
@@ -596,9 +689,9 @@ def _build_job_prompt(job: dict) -> str:
"response. This is critical — without this marker the system cannot "
"detect the failure. Examples: "
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
"\\\"[SCRIPT_FAILED]: script exited with code 1\\\".]\\\\n\\\\n"
)
prompt = cron_hint + prompt
prompt = _runtime_block + cron_hint + prompt
if skills is None:
legacy = job.get("skill")
skills = [legacy] if legacy else []
@@ -644,66 +737,6 @@ def _build_job_prompt(job: dict) -> str:
return "\n".join(parts)
# Well-known local inference endpoints to probe for prefer_local jobs
_LOCAL_ENDPOINTS = [
{"name": "ollama", "base_url": "http://localhost:11434/v1", "health": "http://localhost:11434/api/tags"},
{"name": "llama-cpp", "base_url": "http://localhost:8080/v1", "health": "http://localhost:8080/health"},
{"name": "vllm", "base_url": "http://localhost:8000/v1", "health": "http://localhost:8000/v1/models"},
]
def _probe_local_endpoint(url: str, timeout: float = 2.0) -> bool:
"""Quick probe to check if a local inference server is running."""
import urllib.request
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.status == 200
except Exception:
return False
def _resolve_prefer_local(job: dict) -> tuple[Optional[str], Optional[str], str]:
"""For jobs with prefer_local=true, find a running local inference server.
Returns (provider_override, base_url_override, status_message).
None values mean "use default resolution".
"""
if not job.get("prefer_local"):
return None, None, ""
# If the job already specifies an explicit base_url and it's local, honor it
explicit_url = job.get("base_url", "")
if explicit_url:
from agent.model_metadata import is_local_endpoint
if is_local_endpoint(explicit_url):
return None, None, f"prefer_local: explicit base_url {explicit_url} is already local"
# Probe well-known local endpoints
for ep in _LOCAL_ENDPOINTS:
if _probe_local_endpoint(ep["health"]):
logger.info(
"Job '%s': prefer_local → found %s at %s",
job.get("name", "?"), ep["name"], ep["base_url"],
)
return None, ep["base_url"], (
f"prefer_local: using {ep['name']} at {ep['base_url']}"
)
# No local server found — warn and fall back to default
logger.warning(
"Job '%s': prefer_local=true but no local inference server found "
"(probed: %s). Falling back to default provider.",
job.get("name", "?"),
", ".join(ep["name"] for ep in _LOCAL_ENDPOINTS),
)
return None, None, (
"prefer_local: no local server found (tried: "
+ ", ".join(ep["name"] for ep in _LOCAL_ENDPOINTS)
+ "). Using default provider."
)
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a single cron job.
@@ -728,7 +761,32 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
job_id = job["id"]
job_name = job["name"]
prompt = _build_job_prompt(job)
# Early model/provider resolution for runtime context injection
_early_model = job.get("model") or os.getenv("HERMES_MODEL") or ""
_early_provider = os.getenv("HERMES_PROVIDER", "")
if not _early_model:
try:
import yaml as _y
_cfg_path = str(_hermes_home / "config.yaml")
if os.path.exists(_cfg_path):
with open(_cfg_path) as _f:
_cfg_early = _y.safe_load(_f) or {}
_mc = _cfg_early.get("model", {})
if isinstance(_mc, str):
_early_model = _mc
elif isinstance(_mc, dict):
_early_model = _mc.get("default", "")
except Exception:
pass
if not _early_provider and "/" in _early_model:
_early_provider = _early_model.split("/")[0]
prompt = _build_job_prompt(
job,
runtime_model=_early_model,
runtime_provider=_early_provider,
)
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
@@ -825,12 +883,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
pr = _cfg.get("provider_routing", {})
smart_routing = _cfg.get("smart_model_routing", {}) or {}
# prefer_local: if the job declares prefer_local=true, probe for a
# local inference server and override the base_url when found. (#378)
_pl_provider, _pl_base_url, _pl_status = _resolve_prefer_local(job)
if _pl_status:
logger.info("Job '%s': %s", job_name, _pl_status)
from hermes_cli.runtime_provider import (
resolve_runtime_provider,
format_runtime_provider_error,
@@ -839,16 +891,24 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
runtime_kwargs = {
"requested": job.get("provider") or os.getenv("HERMES_INFERENCE_PROVIDER"),
}
# prefer_local override: use the discovered local endpoint
if _pl_base_url:
runtime_kwargs["explicit_base_url"] = _pl_base_url
elif job.get("base_url"):
if job.get("base_url"):
runtime_kwargs["explicit_base_url"] = job.get("base_url")
runtime = resolve_runtime_provider(**runtime_kwargs)
except Exception as exc:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# Provider mismatch warning
_resolved_provider = runtime.get("provider", "") or ""
_raw_prompt = job.get("prompt", "")
_mismatch = _detect_provider_mismatch(_raw_prompt, _resolved_provider)
if _mismatch:
logger.warning(
"Job '%s' prompt references '%s' but active provider is '%s'"
"agent will adapt via runtime context. Consider updating prompt.",
job_name, _mismatch, _resolved_provider,
)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,

View File

@@ -1001,30 +1001,10 @@ class AIAgent:
self._session_db = session_db
self._parent_session_id = parent_session_id
self._last_flushed_db_idx = 0 # tracks DB-write cursor to prevent duplicate writes
if self._session_db:
try:
self._session_db.create_session(
session_id=self.session_id,
source=self.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
model=self.model,
model_config={
"max_iterations": self.max_iterations,
"reasoning_config": reasoning_config,
"max_tokens": max_tokens,
},
user_id=None,
parent_session_id=self._parent_session_id,
)
except Exception as e:
# Transient SQLite lock contention (e.g. CLI and gateway writing
# concurrently) must NOT permanently disable session_search for
# this agent. Keep _session_db alive — subsequent message
# flushes and session_search calls will still work once the
# lock clears. The session row may be missing from the index
# for this run, but that is recoverable (flushes upsert rows).
logger.warning(
"Session DB create_session failed (session_search still available): %s", e
)
# Lazy session creation: defer until first message flush (#314).
# _flush_messages_to_session_db() calls ensure_session() which uses
# INSERT OR IGNORE — creating the row only when messages arrive.
# This eliminates 32% of sessions that are created but never used.
# In-memory todo list for task planning (one per agent/session)
from tools.todo_tool import TodoStore

View File

@@ -1,116 +0,0 @@
"""Tests for cron prefer_local auto-routing (#378).
Jobs with prefer_local=true should automatically route to a local inference
server (Ollama, llama.cpp, vllm) when one is available, instead of falling
through to the cloud default.
"""
import re
import pytest
# Patterns mirrored from scheduler for test isolation
_LOCAL_ENDPOINTS = [
{"name": "ollama", "base_url": "http://localhost:11434/v1", "health": "http://localhost:11434/api/tags"},
{"name": "llama-cpp", "base_url": "http://localhost:8080/v1", "health": "http://localhost:8080/health"},
{"name": "vllm", "base_url": "http://localhost:8000/v1", "health": "http://localhost:8000/v1/models"},
]
def _probe_local_endpoint(url: str, timeout: float = 2.0) -> bool:
import urllib.request
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.status == 200
except Exception:
return False
def _is_local_endpoint(base_url: str) -> bool:
if not base_url:
return False
from urllib.parse import urlparse
parsed = urlparse(base_url)
host = (parsed.hostname or "").lower()
return host in ("localhost", "127.0.0.1", "0.0.0.0") or (
host.startswith("10.") or host.startswith("192.168.") or
any(host.startswith(f"172.{i}.") for i in range(16, 32))
)
def _resolve_prefer_local(job: dict):
if not job.get("prefer_local"):
return None, None, ""
explicit_url = job.get("base_url", "")
if explicit_url and _is_local_endpoint(explicit_url):
return None, None, f"prefer_local: explicit base_url {explicit_url} is already local"
for ep in _LOCAL_ENDPOINTS:
if _probe_local_endpoint(ep["health"], timeout=0.5):
return None, ep["base_url"], f"prefer_local: using {ep['name']} at {ep['base_url']}"
return None, None, "prefer_local: no local server found"
class TestProbeLocalEndpoint:
def test_unreachable_returns_false(self):
"""A port with nothing listening should return False."""
assert _probe_local_endpoint("http://localhost:19999/api/tags", timeout=0.5) is False
def test_invalid_url_returns_false(self):
assert _probe_local_endpoint("not-a-url", timeout=0.5) is False
class TestResolvePreferLocal:
def test_no_prefer_local(self):
"""When prefer_local is not set, return empty overrides."""
job = {"name": "test", "prompt": "hello"}
prov, url, status = _resolve_prefer_local(job)
assert prov is None
assert url is None
assert status == ""
def test_prefer_local_with_explicit_local_url(self):
"""When base_url is already local, skip probing."""
job = {"name": "test", "prefer_local": True, "base_url": "http://localhost:11434/v1"}
prov, url, status = _resolve_prefer_local(job)
assert prov is None
assert url is None # Don't override — already local
assert "already local" in status
def test_prefer_local_no_server_found(self):
"""When no local server is running, status indicates fallback."""
job = {"name": "test", "prefer_local": True}
prov, url, status = _resolve_prefer_local(job)
# Unless Ollama happens to be running, this should fail
if url is None:
assert "no local server" in status
def test_prefer_local_false(self):
"""prefer_local=false should act like unset."""
job = {"name": "test", "prefer_local": False}
prov, url, status = _resolve_prefer_local(job)
assert prov is None
assert url is None
assert status == ""
class TestLocalEndpointsConfig:
"""Verify the well-known endpoints list covers expected servers."""
def test_ollama_in_endpoints(self):
names = [ep["name"] for ep in _LOCAL_ENDPOINTS]
assert "ollama" in names
def test_llama_cpp_in_endpoints(self):
names = [ep["name"] for ep in _LOCAL_ENDPOINTS]
assert "llama-cpp" in names
def test_all_endpoints_have_health(self):
for ep in _LOCAL_ENDPOINTS:
assert "health" in ep
assert ep["health"].startswith("http")
def test_all_endpoints_have_base_url(self):
for ep in _LOCAL_ENDPOINTS:
assert "base_url" in ep
assert "/v1" in ep["base_url"]

View File

@@ -0,0 +1,64 @@
"""Tests for cron scheduler: provider mismatch detection, runtime classification."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
def _import_scheduler():
import importlib.util
spec = importlib.util.spec_from_file_location(
"cron.scheduler", str(Path(__file__).resolve().parent.parent / "cron" / "scheduler.py"),
)
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except Exception:
pass
return mod
_sched = _import_scheduler()
_classify_runtime = _sched._classify_runtime
_detect_provider_mismatch = _sched._detect_provider_mismatch
_build_job_prompt = _sched._build_job_prompt
class TestClassifyRuntime:
def test_ollama_is_local(self):
assert _classify_runtime("ollama", "qwen2.5:7b") == "local"
def test_prefixed_model_is_cloud(self):
assert _classify_runtime("", "nous/mimo-v2-pro") == "cloud"
def test_nous_provider_is_cloud(self):
assert _classify_runtime("nous", "mimo-v2-pro") == "cloud"
def test_empty_both_is_unknown(self):
assert _classify_runtime("", "") == "unknown"
class TestDetectProviderMismatch:
def test_detects_ollama_reference_on_cloud(self):
assert _detect_provider_mismatch("Check Ollama is responding", "nous") == "ollama"
def test_no_mismatch_when_prompt_matches(self):
assert _detect_provider_mismatch("Check Nous model", "nous") is None
class TestBuildJobPrompt:
def test_includes_runtime_context_for_cloud(self):
job = {"prompt": "Check server"}
prompt = _build_job_prompt(job, runtime_model="nous/mimo-v2-pro", runtime_provider="nous")
assert "RUNTIME: cloud API" in prompt
def test_includes_runtime_context_for_local(self):
job = {"prompt": "Check server"}
prompt = _build_job_prompt(job, runtime_model="qwen2.5:7b", runtime_provider="ollama")
assert "RUNTIME: local" in prompt
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])