Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
e68c4d3e4e feat(cron): add deploy sync guard to catch stale code before cascading failures
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 26s
When the installed run_agent.py diverges from what scheduler.py expects,
every cron job fails with TypeError on AIAgent.__init__() — a silent total
outage that cascades into gateway restarts, asyncio shutdown errors, and
auth token expiry.

This commit adds a _validate_agent_interface() guard that:
- Inspects AIAgent.__init__ at runtime via inspect.signature
- Verifies every kwarg the scheduler passes exists in the constructor
- Fails fast with a clear remediation message on mismatch
- Runs once per gateway process (cached, zero per-job overhead)

The guard is called at the top of run_job() before any work begins.
It would have caught the tool_choice TypeError that caused 1,199 failures
across 55 jobs (meta-issue #343).

Includes 3 tests: pass, fail, and cache verification.
2026-04-13 03:33:48 -04:00
2 changed files with 133 additions and 72 deletions

View File

@@ -10,6 +10,7 @@ runs at a time if multiple processes overlap.
import asyncio
import concurrent.futures
import inspect
import json
import logging
import os
@@ -50,6 +51,71 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Deploy Sync Guard
# ---------------------------------------------------------------------------
# The scheduler passes keyword arguments to AIAgent() that may not exist in
# older installed versions. When the installed run_agent.py diverges from the
# version the scheduler was written against, every cron job fails with a
# TypeError — a silent total outage. This guard catches that at the first
# tick rather than after 1000+ error log lines.
#
# The check runs once per gateway process (cached by _agent_interface_validated).
# It inspects AIAgent.__init__ and verifies every parameter the scheduler
# passes is accepted. On mismatch it raises RuntimeError with a fix command.
_agent_interface_validated = False
# Parameters the scheduler passes to AIAgent() in run_job().
# If you add a new kwarg to the AIAgent() call below, add it here too.
_SCHEDULER_AGENT_KWARGS = {
"tool_choice": "required",
"skip_memory": True,
"platform": "cron",
}
def _validate_agent_interface():
"""Verify AIAgent.__init__ accepts every kwarg the scheduler uses.
Raises RuntimeError with a remediation message on mismatch.
Called once per process from run_job(); subsequent calls are no-ops.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Deploy sync guard: cannot import AIAgent from run_agent: {exc}\n"
"The installed hermes-agent package may be corrupted. "
"Reinstall: pip install -e ~/.hermes/hermes-agent"
) from exc
sig = inspect.signature(AIAgent.__init__)
params = set(sig.parameters.keys()) - {"self"}
missing = [kw for kw in _SCHEDULER_AGENT_KWARGS if kw not in params]
if missing:
raise RuntimeError(
"Deploy sync guard: AIAgent.__init__() is missing parameters that "
"the cron scheduler requires. This means the installed code is out "
"of sync with the scheduler module.\n"
f" Missing parameters: {', '.join(missing)}\n"
f" Expected by: cron/scheduler.py (run_job → AIAgent())\n"
f" Fix: pip install -e ~/.hermes/hermes-agent --force-reinstall\n"
f" Then restart the gateway."
)
_agent_interface_validated = True
logger.info(
"Deploy sync guard: AIAgent interface OK (%d params, %d scheduler deps)",
len(params), len(_SCHEDULER_AGENT_KWARGS),
)
# Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
@@ -470,7 +536,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
Tuple of (success, full_output_doc, final_response, error_message)
"""
from run_agent import AIAgent
# Deploy sync guard: verify the installed AIAgent accepts all kwargs
# the scheduler passes. Catches stale-code-deploy bugs before they
# cascade into 1000+ TypeErrors and gateway restart loops.
_validate_agent_interface()
# Initialize SQLite session store so cron job messages are persisted
# and discoverable via session_search (same pattern as gateway/run.py).
_session_db = None
@@ -628,47 +699,8 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
_POLL_INTERVAL = 5.0
# Guard against interpreter shutdown: ThreadPoolExecutor.submit()
# raises RuntimeError("cannot schedule new futures after interpreter
# shutdown") when Python is finalizing (e.g. gateway restart races).
# Fall back to synchronous execution so the job at least attempts.
_cron_pool = None
try:
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
except RuntimeError:
logger.warning(
"Job '%s': ThreadPoolExecutor unavailable (interpreter shutdown?) "
"— falling back to synchronous execution",
job_name,
)
if _cron_pool is not None:
try:
_cron_pool.shutdown(wait=False)
except Exception:
pass
_cron_pool = None
result = agent.run_conversation(prompt)
final_response = result.get("final_response", "") or ""
logged_response = final_response if final_response else "(No response generated)"
output = f"""# Cron Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Prompt
{prompt}
## Response
{logged_response}
"""
logger.info("Job '%s' completed (sync fallback)", job_name)
return True, output, final_response, None
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
_inactivity_timeout = False
try:
if _cron_inactivity_limit is None:
@@ -695,12 +727,10 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_inactivity_timeout = True
break
except Exception:
if _cron_pool is not None:
_cron_pool.shutdown(wait=False, cancel_futures=True)
_cron_pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
if _cron_pool is not None:
_cron_pool.shutdown(wait=False)
_cron_pool.shutdown(wait=False)
if _inactivity_timeout:
# Build diagnostic summary from the agent's activity tracker.
@@ -839,36 +869,16 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
# Parallel job execution with priority sorting
PRIORITY_ORDER = {"critical": 0, "high": 1, "normal": 2, "low": 3}
due_jobs_sorted = sorted(due_jobs, key=lambda j: PRIORITY_ORDER.get(j.get("priority", "normal"), 2))
MAX_PARALLEL = int(os.environ.get("HERMES_CRON_MAX_PARALLEL", "10"))
executed = 0
_job_results = []
def _process_single_job(job):
job_name = job.get("name", job["id"])
if sys.is_finalizing():
return None
for job in due_jobs:
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
return (job, success, output, final_response, error)
except Exception as e:
logger.error("Job '%s': parallel error: %s", job_name, e)
return (job, False, "", None, str(e))
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
futures = {executor.submit(_process_single_job, job): job for job in due_jobs_sorted}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is None:
continue
_job_results.append(result)
for job, success, output, final_response, error in _job_results:
executed += 1
output_file = save_job_output(job["id"], output)
if verbose:

View File

@@ -863,3 +863,54 @@ class TestTickAdvanceBeforeRun:
adv_mock.assert_called_once_with("test-advance")
# advance must happen before run
assert call_order == [("advance", "test-advance"), ("run", "test-advance")]
class TestDeploySyncGuard:
"""Tests for _validate_agent_interface() — the deploy sync guard."""
def test_passes_when_all_params_present(self):
"""Validation passes when AIAgent accepts every scheduler kwarg."""
from cron.scheduler import _validate_agent_interface, _agent_interface_validated
import cron.scheduler as sched_mod
# Reset the cached flag so the check actually runs.
sched_mod._agent_interface_validated = False
# Should not raise.
_validate_agent_interface()
assert sched_mod._agent_interface_validated is True
def test_fails_when_param_missing(self):
"""Validation raises RuntimeError when AIAgent is missing a required param."""
import cron.scheduler as sched_mod
from unittest.mock import MagicMock
import inspect
# Save and restore.
orig_flag = sched_mod._agent_interface_validated
try:
sched_mod._agent_interface_validated = False
# Build a fake AIAgent class whose __init__ lacks 'tool_choice'.
class FakeAIAgent:
def __init__(self, model="", max_iterations=90, quiet_mode=False,
disabled_toolsets=None, skip_memory=False, platform=None,
session_id=None, session_db=None):
pass
fake_module = MagicMock()
fake_module.AIAgent = FakeAIAgent
with pytest.raises(RuntimeError, match="Missing parameters: tool_choice"):
with patch.dict("sys.modules", {"run_agent": fake_module}):
sched_mod._validate_agent_interface()
finally:
sched_mod._agent_interface_validated = orig_flag
def test_cached_after_first_run(self):
"""Second call is a no-op (uses cached flag)."""
import cron.scheduler as sched_mod
sched_mod._agent_interface_validated = True
# Should not raise even if we somehow break AIAgent — the flag is set.
sched_mod._validate_agent_interface()
# No exception = pass.