Merge pull request 'fix(cron): deploy sync guard + kwarg filter + script failure marker' (#364) from fix/cron-sync-guard-v2 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 23s

This commit was merged in pull request #364.
This commit is contained in:
2026-04-13 19:13:31 +00:00

View File

@@ -10,7 +10,6 @@ runs at a time if multiple processes overlap.
import asyncio
import concurrent.futures
import inspect
import json
import logging
import os
@@ -41,6 +40,112 @@ from hermes_time import now as _hermes_now
logger = logging.getLogger(__name__)
# =====================================================================
# Deploy Sync Guard
# =====================================================================
#
# If the installed run_agent.py diverges from the version scheduler.py
# was written against, every cron job fails with:
# TypeError: AIAgent.__init__() got an unexpected keyword argument '...'
#
# _validate_agent_interface() catches this at the FIRST job, not the
# 55th. It uses inspect.signature() to verify every kwarg we pass is
# accepted by AIAgent.__init__().
#
# Maintaining this list: if you add a kwarg to the AIAgent() call in
# run_job(), add it here too. The guard catches mismatches.
_SCHEDULER_AGENT_KWARGS: set = frozenset({
"model", "api_key", "base_url", "provider", "api_mode",
"acp_command", "acp_args", "max_iterations", "reasoning_config",
"prefill_messages", "providers_allowed", "providers_ignored",
"providers_order", "provider_sort", "disabled_toolsets",
"tool_choice", "quiet_mode", "skip_memory", "platform",
"session_id", "session_db",
})
_agent_interface_validated: bool = False
def _validate_agent_interface() -> None:
"""Verify installed AIAgent.__init__ accepts every kwarg the scheduler passes.
Raises RuntimeError with actionable guidance if params are missing.
Caches result — runs once per gateway process lifetime.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
import inspect
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Cannot import AIAgent: {exc}\n"
"Is hermes-agent installed? Check PYTHONPATH."
) from exc
sig = inspect.signature(AIAgent.__init__)
accepted = set(sig.parameters.keys()) - {"self"}
missing = _SCHEDULER_AGENT_KWARGS - accepted
if missing:
sorted_missing = sorted(missing)
raise RuntimeError(
"Deploy sync guard FAILED — AIAgent.__init__() is missing params:\n"
f" {', '.join(sorted_missing)}\n"
"This means the installed run_agent.py is out of date.\n"
"Fix: pull latest hermes-agent code and restart the gateway.\n"
" cd ~/.hermes/hermes-agent && git pull && source venv/bin/activate"
)
_agent_interface_validated = True
logger.debug("Deploy sync guard passed — %d params verified", len(_SCHEDULER_AGENT_KWARGS))
def _safe_agent_kwargs(kwargs: dict) -> dict:
"""Filter kwargs to only those accepted by installed AIAgent.__init__.
More resilient than _validate_agent_interface() alone: instead of
crashing on mismatch, drops unsupported kwargs and logs a warning.
Jobs run with degraded functionality instead of failing entirely.
Args:
kwargs: The kwargs dict the scheduler wants to pass to AIAgent().
Returns:
A new dict containing only kwargs the installed AIAgent accepts.
"""
import inspect
try:
from run_agent import AIAgent
except ImportError:
# Can't import — pass everything through, let the real error surface
return kwargs
sig = inspect.signature(AIAgent.__init__)
accepted = set(sig.parameters.keys()) - {"self"}
safe = {}
dropped = []
for key, value in kwargs.items():
if key in accepted:
safe[key] = value
else:
dropped.append(key)
if dropped:
logger.warning(
"Dropping unsupported AIAgent kwargs (stale install?): %s",
", ".join(sorted(dropped)),
)
return safe
# Valid delivery platforms — used to validate user-supplied platform names
# in cron delivery targets, preventing env var enumeration via crafted names.
_KNOWN_DELIVERY_PLATFORMS = frozenset({
@@ -51,88 +156,76 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Deploy Sync Guard
# ---------------------------------------------------------------------------
# The scheduler passes keyword arguments to AIAgent() that may not exist in
# older installed versions. When the installed run_agent.py diverges from the
# version the scheduler was written against, every cron job fails with a
# TypeError — a silent total outage. This guard catches that at the first
# tick rather than after 1000+ error log lines.
#
# The check runs once per gateway process (cached by _agent_interface_validated).
# It inspects AIAgent.__init__ and verifies every parameter the scheduler
# passes is accepted. On mismatch it raises RuntimeError with a fix command.
_agent_interface_validated = False
# Parameters the scheduler passes to AIAgent() in run_job().
# If you add a new kwarg to the AIAgent() call below, add it here too.
_SCHEDULER_AGENT_KWARGS = {
"tool_choice": "required",
"skip_memory": True,
"platform": "cron",
}
def _validate_agent_interface():
"""Verify AIAgent.__init__ accepts every kwarg the scheduler uses.
Raises RuntimeError with a remediation message on mismatch.
Called once per process from run_job(); subsequent calls are no-ops.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Deploy sync guard: cannot import AIAgent from run_agent: {exc}\n"
"The installed hermes-agent package may be corrupted. "
"Reinstall: pip install -e ~/.hermes/hermes-agent"
) from exc
sig = inspect.signature(AIAgent.__init__)
params = set(sig.parameters.keys()) - {"self"}
missing = [kw for kw in _SCHEDULER_AGENT_KWARGS if kw not in params]
if missing:
raise RuntimeError(
"Deploy sync guard: AIAgent.__init__() is missing parameters that "
"the cron scheduler requires. This means the installed code is out "
"of sync with the scheduler module.\n"
f" Missing parameters: {', '.join(missing)}\n"
f" Expected by: cron/scheduler.py (run_job → AIAgent())\n"
f" Fix: pip install -e ~/.hermes/hermes-agent --force-reinstall\n"
f" Then restart the gateway."
)
_agent_interface_validated = True
logger.info(
"Deploy sync guard: AIAgent interface OK (%d params, %d scheduler deps)",
len(params), len(_SCHEDULER_AGENT_KWARGS),
)
# Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
SILENT_MARKER = "[SILENT]"
SCRIPT_FAILED_MARKER = "[SCRIPT_FAILED]"
# Minimum context window required for Hermes Agent to function correctly.
# Models with a smaller context cannot support the tool-use overhead, long
# system prompts, and multi-turn agentic workloads that cron jobs require.
CRON_MIN_CONTEXT_TOKENS = 64_000
# Failure phrases that indicate an external script/command failed, even when
# the agent doesn't use the [SCRIPT_FAILED] marker. Matched case-insensitively
# against the final response. These are strong signals — agents rarely use
# these words when a script succeeded.
_SCRIPT_FAILURE_PHRASES = (
"timed out",
"timeout",
"connection error",
"connection refused",
"connection reset",
"failed to execute",
"failed due to",
"script failed",
"script error",
"command failed",
"exit code",
"exit status",
"non-zero exit",
"did not complete",
"could not run",
"unable to execute",
"permission denied",
"no such file",
"traceback",
)
class ModelContextError(ValueError):
"""Raised when a cron job's model has an insufficient context window.
def _detect_script_failure(final_response: str) -> Optional[str]:
"""Detect script failure from agent's final response.
Inherits from ValueError so callers that catch ValueError also catch this,
but the distinct type lets tick() identify context errors for auto-pause.
Returns a reason string if failure detected, None otherwise.
Checks both the explicit [SCRIPT_FAILED] marker and heuristic patterns.
"""
if not final_response:
return None
# 1. Explicit marker — highest confidence.
if SCRIPT_FAILED_MARKER in final_response.upper():
import re as _re
_m = _re.search(
r'\[SCRIPT_FAILED\]\s*:?\s*(.*)',
final_response,
_re.IGNORECASE,
)
reason = _m.group(1).strip() if _m and _m.group(1).strip() else None
return reason or "Agent reported script failure"
# 2. Heuristic detection — catch failures described in natural language.
# Only flag if the response contains failure language AND does NOT
# contain success markers like [NOOP] (which means the script ran fine
# but found nothing).
lower = final_response.lower()
has_noop = "[noop]" in lower
has_silent = "[silent]" in lower
if has_noop or has_silent:
return None # Agent explicitly signaled success/nothing-to-report
for phrase in _SCRIPT_FAILURE_PHRASES:
if phrase in lower:
return f"Detected script failure phrase: '{phrase}'"
return None
# Resolve Hermes home directory (respects HERMES_HOME override)
_hermes_home = get_hermes_home()
@@ -142,52 +235,6 @@ _LOCK_DIR = _hermes_home / "cron"
_LOCK_FILE = _LOCK_DIR / ".tick.lock"
def _check_model_context_compat(
model: str,
base_url: str = "",
api_key: str = "",
config_context_length: Optional[int] = None,
) -> None:
"""Preflight check: raise ModelContextError if the model's context window is too small.
If the operator has explicitly set ``config_context_length`` (model.context_length
in config.yaml), that value overrides detection and the check is skipped — the
operator has accepted responsibility for the override.
Raises:
ModelContextError: when detected context length < CRON_MIN_CONTEXT_TOKENS.
"""
if config_context_length is not None and config_context_length > 0:
# Explicit override — operator has acknowledged the context size.
return
try:
from agent.model_metadata import get_model_context_length
detected = get_model_context_length(
model,
base_url=base_url,
api_key=api_key,
)
except Exception as exc:
# If detection fails, log and allow the job to proceed — the real
# error (if any) will surface at API call time as before.
logger.warning(
"Could not detect context length for model %r: %s — skipping preflight check",
model,
exc,
)
return
if detected < CRON_MIN_CONTEXT_TOKENS:
raise ModelContextError(
f"Model {model!r} has a context window of {detected:,} tokens, "
f"which is below the minimum {CRON_MIN_CONTEXT_TOKENS:,} required by "
f"Hermes Agent. Choose a model with at least "
f"{CRON_MIN_CONTEXT_TOKENS // 1_000}K context, or set "
f"model.context_length in config.yaml to override."
)
def _resolve_origin(job: dict) -> Optional[dict]:
"""Extract origin info from a job, preserving any extra routing metadata."""
origin = job.get("origin")
@@ -539,7 +586,15 @@ def _build_job_prompt(job: dict) -> str:
"SILENT: If there is genuinely nothing new to report, respond "
"with exactly \"[SILENT]\" (nothing else) to suppress delivery. "
"Never combine [SILENT] with content — either report your "
"findings normally, or say [SILENT] and nothing more.]\n\n"
"findings normally, or say [SILENT] and nothing more. "
"SCRIPT_FAILURE: If an external command or script you ran "
"failed (timeout, crash, connection error, non-zero exit), you MUST "
"respond with "
"\"[SCRIPT_FAILED]: <one-line reason>\" as the FIRST LINE of your "
"response. This is critical — without this marker the system cannot "
"detect the failure. Examples: "
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
)
prompt = cron_hint + prompt
if skills is None:
@@ -594,13 +649,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
Returns:
Tuple of (success, full_output_doc, final_response, error_message)
"""
from run_agent import AIAgent
# Deploy sync guard: verify the installed AIAgent accepts all kwargs
# the scheduler passes. Catches stale-code-deploy bugs before they
# cascade into 1000+ TypeErrors and gateway restart loops.
# Deploy sync guard — fail fast on first job if the installed
# AIAgent.__init__ is missing params the scheduler expects.
_validate_agent_interface()
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
# and discoverable via session_search (same pattern as gateway/run.py).
_session_db = None
@@ -661,19 +715,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
except Exception as e:
logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e)
# Extract explicit context_length override from config.yaml (model.context_length).
# This mirrors the resolution logic in run_agent.py AIAgent.__init__ so the
# preflight check uses the same value the agent would.
_config_context_length: Optional[int] = None
try:
_model_cfg_for_ctx = _cfg.get("model", {})
if isinstance(_model_cfg_for_ctx, dict):
_raw_ctx = _model_cfg_for_ctx.get("context_length")
if _raw_ctx is not None:
_config_context_length = int(_raw_ctx)
except (TypeError, ValueError):
pass
# Reasoning config from env or config.yaml
from hermes_constants import parse_reasoning_effort
effort = os.getenv("HERMES_REASONING_EFFORT", "")
@@ -736,39 +777,30 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
# Preflight: verify the resolved model has a sufficient context window.
# Raises ModelContextError (propagates out of run_job) so tick() can
# auto-pause the job rather than letting it fail on every tick.
_check_model_context_compat(
turn_route["model"],
base_url=turn_route["runtime"].get("base_url") or "",
api_key=turn_route["runtime"].get("api_key") or "",
config_context_length=_config_context_length,
)
agent = AIAgent(
model=turn_route["model"],
api_key=turn_route["runtime"].get("api_key"),
base_url=turn_route["runtime"].get("base_url"),
provider=turn_route["runtime"].get("provider"),
api_mode=turn_route["runtime"].get("api_mode"),
acp_command=turn_route["runtime"].get("command"),
acp_args=turn_route["runtime"].get("args"),
max_iterations=max_iterations,
reasoning_config=reasoning_config,
prefill_messages=prefill_messages,
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
provider_sort=pr.get("sort"),
disabled_toolsets=["cronjob", "messaging", "clarify"],
tool_choice="required",
quiet_mode=True,
skip_memory=True, # Cron system prompts would corrupt user representations
platform="cron",
session_id=_cron_session_id,
session_db=_session_db,
)
_agent_kwargs = _safe_agent_kwargs({
"model": turn_route["model"],
"api_key": turn_route["runtime"].get("api_key"),
"base_url": turn_route["runtime"].get("base_url"),
"provider": turn_route["runtime"].get("provider"),
"api_mode": turn_route["runtime"].get("api_mode"),
"acp_command": turn_route["runtime"].get("command"),
"acp_args": turn_route["runtime"].get("args"),
"max_iterations": max_iterations,
"reasoning_config": reasoning_config,
"prefill_messages": prefill_messages,
"providers_allowed": pr.get("only"),
"providers_ignored": pr.get("ignore"),
"providers_order": pr.get("order"),
"provider_sort": pr.get("sort"),
"disabled_toolsets": ["cronjob", "messaging", "clarify"],
"tool_choice": "required",
"quiet_mode": True,
"skip_memory": True, # Cron system prompts would corrupt user representations
"platform": "cron",
"session_id": _cron_session_id,
"session_db": _session_db,
})
agent = AIAgent(**_agent_kwargs)
# Run the agent with an *inactivity*-based timeout: the job can run
# for hours if it's actively calling tools / receiving stream tokens,
@@ -888,6 +920,30 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Use a separate variable for log display; keep final_response clean
# for delivery logic (empty response = no delivery).
logged_response = final_response if final_response else "(No response generated)"
# Check for script failure — both explicit [SCRIPT_FAILED] marker
# and heuristic detection for failures described in natural language.
_script_failed_reason = _detect_script_failure(final_response)
if _script_failed_reason is not None:
logger.warning(
"Job '%s': agent reported script failure — %s",
job_name, _script_failed_reason,
)
output = f"""# Cron Job: {job_name} (SCRIPT FAILED)
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Prompt
{prompt}
## Response
{logged_response}
"""
return False, output, final_response, _script_failed_reason
output = f"""# Cron Job: {job_name}
@@ -907,13 +963,10 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.info("Job '%s' completed successfully", job_name)
return True, output, final_response, None
except ModelContextError:
# Re-raise so tick() can auto-pause the job and prevent repeated failures.
raise
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)}"
logger.exception("Job '%s' failed: %s", job_name, error_msg)
output = f"""# Cron Job: {job_name} (FAILED)
**Job ID:** {job_id}
@@ -995,48 +1048,27 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
# Parallel job execution with priority sorting
PRIORITY_ORDER = {"critical": 0, "high": 1, "normal": 2, "low": 3}
due_jobs_sorted = sorted(due_jobs, key=lambda j: PRIORITY_ORDER.get(j.get("priority", "normal"), 2))
MAX_PARALLEL = int(os.environ.get("HERMES_CRON_MAX_PARALLEL", "10"))
executed = 0
_job_results = []
def _process_single_job(job):
job_name = job.get("name", job["id"])
for job in due_jobs:
# If the interpreter is shutting down (e.g. gateway restart),
# stop processing immediately — ThreadPoolExecutor.submit()
# will raise RuntimeError for every remaining job.
if sys.is_finalizing():
return None
try:
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
return (job, success, output, final_response, error)
except ModelContextError as e:
# Auto-pause: incompatible model will never succeed; stop scheduling
# it until the operator reconfigures the job or config.yaml.
error_msg = str(e)
logger.error(
"Job '%s' paused — model context incompatibility: %s",
job_name,
error_msg,
logger.warning(
"Interpreter finalizing — skipping %d remaining job(s)",
len(due_jobs) - executed,
)
from cron.jobs import pause_job
pause_job(job["id"], reason=f"ModelContextError: {error_msg}")
return (job, False, "", "", error_msg)
except Exception as e:
logger.error("Job '%s': parallel error: %s", job_name, e)
return (job, False, "", None, str(e))
break
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
futures = {executor.submit(_process_single_job, job): job for job in due_jobs_sorted}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is None:
continue
_job_results.append(result)
for job, success, output, final_response, error in _job_results:
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)