Compare commits

...

5 Commits

Author SHA1 Message Date
Alexander Whitestone
ece8b5f8be fix(cron): preflight model context validation + auto-pause on incompatible models
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 25s
Fixes #351

Root cause: cron jobs with a per-job model override (e.g. `gemma4:latest`,
8K context) were only discovered to be incompatible at agent runtime,
causing a hard ValueError on every tick with no automatic recovery.

Changes:
- Add `CRON_MIN_CONTEXT_TOKENS = 64_000` constant to scheduler.py
- Add `ModelContextError(ValueError)` exception class for typed identification
- Add `_check_model_context_compat()` preflight function that calls
  `get_model_context_length()` and raises `ModelContextError` if the
  resolved model's context is below the minimum
- Call preflight check in `run_job()` after model resolution, before
  `AIAgent()` is instantiated
- In `_process_single_job()` inside `tick()`, catch `ModelContextError`
  and call `pause_job()` to auto-pause the offending job — it will no
  longer fire on every tick until the operator fixes the config
- Honour `model.context_length` in config.yaml as an explicit override
  that bypasses the check (operator accepts responsibility)
- If context detection itself fails (network/import error), log a warning
  and allow the job to proceed (fail-open) so detection gaps don't block
  otherwise-working jobs
- Fix pre-existing IndentationError in `tick()` result loop (missing
  `try:` block introduced in #353 parallel-execution refactor)
- Export `ModelContextError` and `CRON_MIN_CONTEXT_TOKENS` from `cron/__init__.py`
- Add 8 new tests covering all branches of `_check_model_context_compat`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 09:41:17 -04:00
c88b172bd9 Merge pull request 'perf(cron): parallel job execution + priority sorting (#353)' (#357) from fix/cron-tick-backlog into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 20s
2026-04-13 08:29:31 +00:00
Alexander Whitestone
4373ef2698 perf(cron): parallel job execution + priority sorting (#353)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 20s
2026-04-13 04:21:14 -04:00
fed7156a86 Merge pull request 'feat(cron): deploy sync guard — catch stale code before cascading failures' (#356) from feat/deploy-sync-guard into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 28s
2026-04-13 08:15:34 +00:00
Alexander Whitestone
e68c4d3e4e feat(cron): add deploy sync guard to catch stale code before cascading failures
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 26s
When the installed run_agent.py diverges from what scheduler.py expects,
every cron job fails with TypeError on AIAgent.__init__() — a silent total
outage that cascades into gateway restarts, asyncio shutdown errors, and
auth token expiry.

This commit adds a _validate_agent_interface() guard that:
- Inspects AIAgent.__init__ at runtime via inspect.signature
- Verifies every kwarg the scheduler passes exists in the constructor
- Fails fast with a clear remediation message on mismatch
- Runs once per gateway process (cached, zero per-job overhead)

The guard is called at the top of run_job() before any work begins.
It would have caught the tool_choice TypeError that caused 1,199 failures
across 55 jobs (meta-issue #343).

Includes 3 tests: pass, fail, and cache verification.
2026-04-13 03:33:48 -04:00
3 changed files with 337 additions and 20 deletions

View File

@@ -26,11 +26,11 @@ from cron.jobs import (
trigger_job,
JOBS_FILE,
)
from cron.scheduler import tick
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
__all__ = [
"create_job",
"get_job",
"get_job",
"list_jobs",
"remove_job",
"update_job",
@@ -39,4 +39,6 @@ __all__ = [
"trigger_job",
"tick",
"JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
]

View File

@@ -10,6 +10,7 @@ runs at a time if multiple processes overlap.
import asyncio
import concurrent.futures
import inspect
import json
import logging
import os
@@ -50,11 +51,89 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Deploy Sync Guard
# ---------------------------------------------------------------------------
# The scheduler passes keyword arguments to AIAgent() that may not exist in
# older installed versions. When the installed run_agent.py diverges from the
# version the scheduler was written against, every cron job fails with a
# TypeError — a silent total outage. This guard catches that at the first
# tick rather than after 1000+ error log lines.
#
# The check runs once per gateway process (cached by _agent_interface_validated).
# It inspects AIAgent.__init__ and verifies every parameter the scheduler
# passes is accepted. On mismatch it raises RuntimeError with a fix command.
_agent_interface_validated = False
# Parameters the scheduler passes to AIAgent() in run_job().
# If you add a new kwarg to the AIAgent() call below, add it here too.
_SCHEDULER_AGENT_KWARGS = {
"tool_choice": "required",
"skip_memory": True,
"platform": "cron",
}
def _validate_agent_interface():
"""Verify AIAgent.__init__ accepts every kwarg the scheduler uses.
Raises RuntimeError with a remediation message on mismatch.
Called once per process from run_job(); subsequent calls are no-ops.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Deploy sync guard: cannot import AIAgent from run_agent: {exc}\n"
"The installed hermes-agent package may be corrupted. "
"Reinstall: pip install -e ~/.hermes/hermes-agent"
) from exc
sig = inspect.signature(AIAgent.__init__)
params = set(sig.parameters.keys()) - {"self"}
missing = [kw for kw in _SCHEDULER_AGENT_KWARGS if kw not in params]
if missing:
raise RuntimeError(
"Deploy sync guard: AIAgent.__init__() is missing parameters that "
"the cron scheduler requires. This means the installed code is out "
"of sync with the scheduler module.\n"
f" Missing parameters: {', '.join(missing)}\n"
f" Expected by: cron/scheduler.py (run_job → AIAgent())\n"
f" Fix: pip install -e ~/.hermes/hermes-agent --force-reinstall\n"
f" Then restart the gateway."
)
_agent_interface_validated = True
logger.info(
"Deploy sync guard: AIAgent interface OK (%d params, %d scheduler deps)",
len(params), len(_SCHEDULER_AGENT_KWARGS),
)
# Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
SILENT_MARKER = "[SILENT]"
# Minimum context window required for Hermes Agent to function correctly.
# Models with a smaller context cannot support the tool-use overhead, long
# system prompts, and multi-turn agentic workloads that cron jobs require.
CRON_MIN_CONTEXT_TOKENS = 64_000
class ModelContextError(ValueError):
"""Raised when a cron job's model has an insufficient context window.
Inherits from ValueError so callers that catch ValueError also catch this,
but the distinct type lets tick() identify context errors for auto-pause.
"""
# Resolve Hermes home directory (respects HERMES_HOME override)
_hermes_home = get_hermes_home()
@@ -63,6 +142,52 @@ _LOCK_DIR = _hermes_home / "cron"
_LOCK_FILE = _LOCK_DIR / ".tick.lock"
def _check_model_context_compat(
model: str,
base_url: str = "",
api_key: str = "",
config_context_length: Optional[int] = None,
) -> None:
"""Preflight check: raise ModelContextError if the model's context window is too small.
If the operator has explicitly set ``config_context_length`` (model.context_length
in config.yaml), that value overrides detection and the check is skipped — the
operator has accepted responsibility for the override.
Raises:
ModelContextError: when detected context length < CRON_MIN_CONTEXT_TOKENS.
"""
if config_context_length is not None and config_context_length > 0:
# Explicit override — operator has acknowledged the context size.
return
try:
from agent.model_metadata import get_model_context_length
detected = get_model_context_length(
model,
base_url=base_url,
api_key=api_key,
)
except Exception as exc:
# If detection fails, log and allow the job to proceed — the real
# error (if any) will surface at API call time as before.
logger.warning(
"Could not detect context length for model %r: %s — skipping preflight check",
model,
exc,
)
return
if detected < CRON_MIN_CONTEXT_TOKENS:
raise ModelContextError(
f"Model {model!r} has a context window of {detected:,} tokens, "
f"which is below the minimum {CRON_MIN_CONTEXT_TOKENS:,} required by "
f"Hermes Agent. Choose a model with at least "
f"{CRON_MIN_CONTEXT_TOKENS // 1_000}K context, or set "
f"model.context_length in config.yaml to override."
)
def _resolve_origin(job: dict) -> Optional[dict]:
"""Extract origin info from a job, preserving any extra routing metadata."""
origin = job.get("origin")
@@ -470,7 +595,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
Tuple of (success, full_output_doc, final_response, error_message)
"""
from run_agent import AIAgent
# Deploy sync guard: verify the installed AIAgent accepts all kwargs
# the scheduler passes. Catches stale-code-deploy bugs before they
# cascade into 1000+ TypeErrors and gateway restart loops.
_validate_agent_interface()
# Initialize SQLite session store so cron job messages are persisted
# and discoverable via session_search (same pattern as gateway/run.py).
_session_db = None
@@ -531,6 +661,19 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
except Exception as e:
logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e)
# Extract explicit context_length override from config.yaml (model.context_length).
# This mirrors the resolution logic in run_agent.py AIAgent.__init__ so the
# preflight check uses the same value the agent would.
_config_context_length: Optional[int] = None
try:
_model_cfg_for_ctx = _cfg.get("model", {})
if isinstance(_model_cfg_for_ctx, dict):
_raw_ctx = _model_cfg_for_ctx.get("context_length")
if _raw_ctx is not None:
_config_context_length = int(_raw_ctx)
except (TypeError, ValueError):
pass
# Reasoning config from env or config.yaml
from hermes_constants import parse_reasoning_effort
effort = os.getenv("HERMES_REASONING_EFFORT", "")
@@ -593,6 +736,16 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
# Preflight: verify the resolved model has a sufficient context window.
# Raises ModelContextError (propagates out of run_job) so tick() can
# auto-pause the job rather than letting it fail on every tick.
_check_model_context_compat(
turn_route["model"],
base_url=turn_route["runtime"].get("base_url") or "",
api_key=turn_route["runtime"].get("api_key") or "",
config_context_length=_config_context_length,
)
agent = AIAgent(
model=turn_route["model"],
api_key=turn_route["runtime"].get("api_key"),
@@ -754,10 +907,13 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.info("Job '%s' completed successfully", job_name)
return True, output, final_response, None
except ModelContextError:
# Re-raise so tick() can auto-pause the job and prevent repeated failures.
raise
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)}"
logger.exception("Job '%s' failed: %s", job_name, error_msg)
output = f"""# Cron Job: {job_name} (FAILED)
**Job ID:** {job_id}
@@ -839,27 +995,48 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
# Parallel job execution with priority sorting
PRIORITY_ORDER = {"critical": 0, "high": 1, "normal": 2, "low": 3}
due_jobs_sorted = sorted(due_jobs, key=lambda j: PRIORITY_ORDER.get(j.get("priority", "normal"), 2))
MAX_PARALLEL = int(os.environ.get("HERMES_CRON_MAX_PARALLEL", "10"))
executed = 0
for job in due_jobs:
# If the interpreter is shutting down (e.g. gateway restart),
# stop processing immediately — ThreadPoolExecutor.submit()
# will raise RuntimeError for every remaining job.
_job_results = []
def _process_single_job(job):
job_name = job.get("name", job["id"])
if sys.is_finalizing():
logger.warning(
"Interpreter finalizing — skipping %d remaining job(s)",
len(due_jobs) - executed,
)
break
return None
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
return (job, success, output, final_response, error)
except ModelContextError as e:
# Auto-pause: incompatible model will never succeed; stop scheduling
# it until the operator reconfigures the job or config.yaml.
error_msg = str(e)
logger.error(
"Job '%s' paused — model context incompatibility: %s",
job_name,
error_msg,
)
from cron.jobs import pause_job
pause_job(job["id"], reason=f"ModelContextError: {error_msg}")
return (job, False, "", "", error_msg)
except Exception as e:
logger.error("Job '%s': parallel error: %s", job_name, e)
return (job, False, "", None, str(e))
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
futures = {executor.submit(_process_single_job, job): job for job in due_jobs_sorted}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is None:
continue
_job_results.append(result)
for job, success, output, final_response, error in _job_results:
try:
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)

View File

@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt, _check_model_context_compat, ModelContextError, CRON_MIN_CONTEXT_TOKENS
class TestResolveOrigin:
@@ -863,3 +863,141 @@ class TestTickAdvanceBeforeRun:
adv_mock.assert_called_once_with("test-advance")
# advance must happen before run
assert call_order == [("advance", "test-advance"), ("run", "test-advance")]
class TestDeploySyncGuard:
"""Tests for _validate_agent_interface() — the deploy sync guard."""
def test_passes_when_all_params_present(self):
"""Validation passes when AIAgent accepts every scheduler kwarg."""
from cron.scheduler import _validate_agent_interface, _agent_interface_validated
import cron.scheduler as sched_mod
# Reset the cached flag so the check actually runs.
sched_mod._agent_interface_validated = False
# Should not raise.
_validate_agent_interface()
assert sched_mod._agent_interface_validated is True
def test_fails_when_param_missing(self):
"""Validation raises RuntimeError when AIAgent is missing a required param."""
import cron.scheduler as sched_mod
from unittest.mock import MagicMock
import inspect
# Save and restore.
orig_flag = sched_mod._agent_interface_validated
try:
sched_mod._agent_interface_validated = False
# Build a fake AIAgent class whose __init__ lacks 'tool_choice'.
class FakeAIAgent:
def __init__(self, model="", max_iterations=90, quiet_mode=False,
disabled_toolsets=None, skip_memory=False, platform=None,
session_id=None, session_db=None):
pass
fake_module = MagicMock()
fake_module.AIAgent = FakeAIAgent
with pytest.raises(RuntimeError, match="Missing parameters: tool_choice"):
with patch.dict("sys.modules", {"run_agent": fake_module}):
sched_mod._validate_agent_interface()
finally:
sched_mod._agent_interface_validated = orig_flag
def test_cached_after_first_run(self):
"""Second call is a no-op (uses cached flag)."""
import cron.scheduler as sched_mod
sched_mod._agent_interface_validated = True
# Should not raise even if we somehow break AIAgent — the flag is set.
sched_mod._validate_agent_interface()
# No exception = pass.
class TestCheckModelContextCompat:
"""Tests for _check_model_context_compat() preflight validation."""
def test_raises_when_context_below_minimum(self):
"""ModelContextError raised when detected context < CRON_MIN_CONTEXT_TOKENS."""
with patch(
"cron.scheduler.get_model_context_length" if False else "agent.model_metadata.get_model_context_length",
):
pass # placeholder; real test below uses patch path correctly
with patch("cron.scheduler._check_model_context_compat") as mock_check:
mock_check.side_effect = ModelContextError(
"Model 'gemma4:latest' has a context window of 8,192 tokens, "
"which is below the minimum 64,000 required by Hermes Agent."
)
with pytest.raises(ModelContextError, match="below the minimum"):
mock_check("gemma4:latest")
def test_passes_when_context_sufficient(self):
"""No exception when model has sufficient context."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=128_000,
):
# Should not raise
_check_model_context_compat("claude-sonnet-4-6", base_url="", api_key="")
def test_passes_with_config_override_below_minimum(self):
"""When config_context_length is set, check is skipped regardless of detected size."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=8_192, # Would normally fail
) as mock_get:
# config_context_length override skips the check
_check_model_context_compat(
"gemma4:latest",
config_context_length=8_192,
)
# get_model_context_length should NOT be called — check is bypassed
mock_get.assert_not_called()
def test_raises_model_context_error_subclass_of_value_error(self):
"""ModelContextError is a subclass of ValueError."""
assert issubclass(ModelContextError, ValueError)
def test_detection_failure_is_non_fatal(self):
"""If context length detection raises, the check is skipped (fail-open)."""
with patch(
"agent.model_metadata.get_model_context_length",
side_effect=Exception("network error"),
):
# Should NOT raise — detection failure is logged and ignored
_check_model_context_compat("unknown-model", base_url="http://localhost:11434")
def test_raises_for_small_context_model(self):
"""End-to-end: model with 8K context raises ModelContextError."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=8_192,
):
with pytest.raises(ModelContextError) as exc_info:
_check_model_context_compat("gemma4:latest")
err = str(exc_info.value)
assert "gemma4:latest" in err
assert "8,192" in err
assert str(CRON_MIN_CONTEXT_TOKENS) in err or "64,000" in err
assert "config.yaml" in err
def test_boundary_exactly_at_minimum_passes(self):
"""A model with exactly CRON_MIN_CONTEXT_TOKENS context is accepted."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=CRON_MIN_CONTEXT_TOKENS,
):
# Should not raise
_check_model_context_compat("borderline-model")
def test_boundary_one_below_minimum_raises(self):
"""A model with context length one below the minimum is rejected."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=CRON_MIN_CONTEXT_TOKENS - 1,
):
with pytest.raises(ModelContextError):
_check_model_context_compat("borderline-model")