Compare commits

...

13 Commits

Author SHA1 Message Date
cea4c7fdd0 fix(poka-yoke): circuit breaker for error cascading (#309) + tool fixation detection (#310) (#362)
Some checks failed
Forge CI / smoke-and-build (push) Failing after 26s
Merged poka-yoke #309 and #310
2026-04-13 14:18:35 +00:00
f9b6db52af fix: unescape corrupted quotes in mempalace __init__.py (#360)
Some checks failed
Forge CI / smoke-and-build (push) Failing after 29s
Co-authored-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
Co-committed-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
2026-04-13 14:03:30 +00:00
f91f22ef7a Merge pull request '[claude] fix(cron): preflight model context validation + auto-pause (#351)' (#359) from claude/issue-351 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
Merged by Timmy overnight cycle
2026-04-13 14:03:12 +00:00
b89c670400 Merge pull request 'feat: add hermes cron run --now for immediate job execution (closes #347)' (#361) from feat/cron-run-now into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
Merged by Timmy overnight cycle
2026-04-13 14:03:08 +00:00
Timmy
f6e72c135c feat: add hermes cron run --now for immediate job execution (closes #347)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 24s
Problem: 'hermes cron run JOBID' only queues for next scheduler tick.
Stale error state (like tool_choice TypeError residue) persists forever
because there's no way to execute a job immediately and get fresh results.

Solution: Three-layer synchronous execution path:
- cron/jobs.py: run_job_now() calls scheduler.run_job() then mark_job_run()
- gateway: POST /api/jobs/{id}/run-now endpoint (runs in thread executor)
- CLI: hermes cron run JOBID --now executes and prints result immediately
- tools/cronjob_tools.py: 'run_now' action routes to new function

Also fixes #346, #349 (same stale error pattern).
2026-04-13 09:58:47 -04:00
Alexander Whitestone
ece8b5f8be fix(cron): preflight model context validation + auto-pause on incompatible models
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 25s
Fixes #351

Root cause: cron jobs with a per-job model override (e.g. `gemma4:latest`,
8K context) were only discovered to be incompatible at agent runtime,
causing a hard ValueError on every tick with no automatic recovery.

Changes:
- Add `CRON_MIN_CONTEXT_TOKENS = 64_000` constant to scheduler.py
- Add `ModelContextError(ValueError)` exception class for typed identification
- Add `_check_model_context_compat()` preflight function that calls
  `get_model_context_length()` and raises `ModelContextError` if the
  resolved model's context is below the minimum
- Call preflight check in `run_job()` after model resolution, before
  `AIAgent()` is instantiated
- In `_process_single_job()` inside `tick()`, catch `ModelContextError`
  and call `pause_job()` to auto-pause the offending job — it will no
  longer fire on every tick until the operator fixes the config
- Honour `model.context_length` in config.yaml as an explicit override
  that bypasses the check (operator accepts responsibility)
- If context detection itself fails (network/import error), log a warning
  and allow the job to proceed (fail-open) so detection gaps don't block
  otherwise-working jobs
- Fix pre-existing IndentationError in `tick()` result loop (missing
  `try:` block introduced in #353 parallel-execution refactor)
- Export `ModelContextError` and `CRON_MIN_CONTEXT_TOKENS` from `cron/__init__.py`
- Add 8 new tests covering all branches of `_check_model_context_compat`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 09:41:17 -04:00
c88b172bd9 Merge pull request 'perf(cron): parallel job execution + priority sorting (#353)' (#357) from fix/cron-tick-backlog into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 20s
2026-04-13 08:29:31 +00:00
Alexander Whitestone
4373ef2698 perf(cron): parallel job execution + priority sorting (#353)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 20s
2026-04-13 04:21:14 -04:00
fed7156a86 Merge pull request 'feat(cron): deploy sync guard — catch stale code before cascading failures' (#356) from feat/deploy-sync-guard into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 28s
2026-04-13 08:15:34 +00:00
Alexander Whitestone
e68c4d3e4e feat(cron): add deploy sync guard to catch stale code before cascading failures
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 26s
When the installed run_agent.py diverges from what scheduler.py expects,
every cron job fails with TypeError on AIAgent.__init__() — a silent total
outage that cascades into gateway restarts, asyncio shutdown errors, and
auth token expiry.

This commit adds a _validate_agent_interface() guard that:
- Inspects AIAgent.__init__ at runtime via inspect.signature
- Verifies every kwarg the scheduler passes exists in the constructor
- Fails fast with a clear remediation message on mismatch
- Runs once per gateway process (cached, zero per-job overhead)

The guard is called at the top of run_job() before any work begins.
It would have caught the tool_choice TypeError that caused 1,199 failures
across 55 jobs (meta-issue #343).

Includes 3 tests: pass, fail, and cache verification.
2026-04-13 03:33:48 -04:00
a547552ff7 Merge pull request 'fix(cron): guard against interpreter shutdown in run_job() and tick()' (#355) from fix/cron-interpreter-shutdown-352 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 27s
Merge PR #355: fix(cron): guard against interpreter shutdown in run_job() and tick()
2026-04-13 07:32:06 +00:00
Alexander Whitestone
d6bd3bc10a fix(cron): guard against interpreter shutdown in run_job() and tick()
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 27s
Fixes #352

Problem: When the gateway restarts, Python's interpreter enters
shutdown phase while the last cron tick is still processing jobs.
ThreadPoolExecutor.submit() raises RuntimeError("cannot schedule
new futures after interpreter shutdown") for every remaining job.
This cascades through the entire tick queue.

Fix (two-part):
1. run_job(): Wrap ThreadPoolExecutor creation + submit in try/except.
   On RuntimeError, fall back to synchronous execution (same thread)
   so the job at least attempts instead of dying silently.
2. tick(): Check sys.is_finalizing() before each job. If the
   interpreter is shutting down, stop processing immediately
   instead of wasting time on doomed ThreadPoolExecutor.submit() calls.
2026-04-13 03:22:10 -04:00
7a577068f0 Merge pull request 'fix(cron): ensure ticker thread starts and monitor for death (#342)' (#345) from fix/cron-ticker-startup into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 25s
Auto-merge #345
2026-04-13 07:15:28 +00:00
10 changed files with 596 additions and 23 deletions

View File

@@ -26,11 +26,11 @@ from cron.jobs import (
trigger_job,
JOBS_FILE,
)
from cron.scheduler import tick
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
__all__ = [
"create_job",
"get_job",
"get_job",
"list_jobs",
"remove_job",
"update_job",
@@ -39,4 +39,6 @@ __all__ = [
"trigger_job",
"tick",
"JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
]

View File

@@ -563,6 +563,44 @@ def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
)
def run_job_now(job_id: str) -> Optional[Dict[str, Any]]:
"""
Execute a job immediately and persist fresh state.
Unlike trigger_job() which queues for the next scheduler tick,
this runs the job synchronously and returns the result.
Clears stale error state on success.
Returns:
Dict with 'job', 'success', 'output', 'error' keys, or None if not found.
"""
job = get_job(job_id)
if not job:
return None
try:
from cron.scheduler import run_job as _run_job
except ImportError as exc:
return {
"job": job,
"success": False,
"output": None,
"error": f"Cannot import scheduler: {exc}",
}
success, output, final_response, error = _run_job(job)
mark_job_run(job_id, success, error)
updated_job = get_job(job_id) or job
return {
"job": updated_job,
"success": success,
"output": output,
"final_response": final_response,
"error": error,
}
def remove_job(job_id: str) -> bool:
"""Remove a job by ID."""
jobs = load_jobs()

View File

@@ -10,6 +10,7 @@ runs at a time if multiple processes overlap.
import asyncio
import concurrent.futures
import inspect
import json
import logging
import os
@@ -50,11 +51,89 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Deploy Sync Guard
# ---------------------------------------------------------------------------
# The scheduler passes keyword arguments to AIAgent() that may not exist in
# older installed versions. When the installed run_agent.py diverges from the
# version the scheduler was written against, every cron job fails with a
# TypeError — a silent total outage. This guard catches that at the first
# tick rather than after 1000+ error log lines.
#
# The check runs once per gateway process (cached by _agent_interface_validated).
# It inspects AIAgent.__init__ and verifies every parameter the scheduler
# passes is accepted. On mismatch it raises RuntimeError with a fix command.
_agent_interface_validated = False
# Parameters the scheduler passes to AIAgent() in run_job().
# If you add a new kwarg to the AIAgent() call below, add it here too.
_SCHEDULER_AGENT_KWARGS = {
"tool_choice": "required",
"skip_memory": True,
"platform": "cron",
}
def _validate_agent_interface():
"""Verify AIAgent.__init__ accepts every kwarg the scheduler uses.
Raises RuntimeError with a remediation message on mismatch.
Called once per process from run_job(); subsequent calls are no-ops.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Deploy sync guard: cannot import AIAgent from run_agent: {exc}\n"
"The installed hermes-agent package may be corrupted. "
"Reinstall: pip install -e ~/.hermes/hermes-agent"
) from exc
sig = inspect.signature(AIAgent.__init__)
params = set(sig.parameters.keys()) - {"self"}
missing = [kw for kw in _SCHEDULER_AGENT_KWARGS if kw not in params]
if missing:
raise RuntimeError(
"Deploy sync guard: AIAgent.__init__() is missing parameters that "
"the cron scheduler requires. This means the installed code is out "
"of sync with the scheduler module.\n"
f" Missing parameters: {', '.join(missing)}\n"
f" Expected by: cron/scheduler.py (run_job → AIAgent())\n"
f" Fix: pip install -e ~/.hermes/hermes-agent --force-reinstall\n"
f" Then restart the gateway."
)
_agent_interface_validated = True
logger.info(
"Deploy sync guard: AIAgent interface OK (%d params, %d scheduler deps)",
len(params), len(_SCHEDULER_AGENT_KWARGS),
)
# Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
SILENT_MARKER = "[SILENT]"
# Minimum context window required for Hermes Agent to function correctly.
# Models with a smaller context cannot support the tool-use overhead, long
# system prompts, and multi-turn agentic workloads that cron jobs require.
CRON_MIN_CONTEXT_TOKENS = 64_000
class ModelContextError(ValueError):
"""Raised when a cron job's model has an insufficient context window.
Inherits from ValueError so callers that catch ValueError also catch this,
but the distinct type lets tick() identify context errors for auto-pause.
"""
# Resolve Hermes home directory (respects HERMES_HOME override)
_hermes_home = get_hermes_home()
@@ -63,6 +142,52 @@ _LOCK_DIR = _hermes_home / "cron"
_LOCK_FILE = _LOCK_DIR / ".tick.lock"
def _check_model_context_compat(
model: str,
base_url: str = "",
api_key: str = "",
config_context_length: Optional[int] = None,
) -> None:
"""Preflight check: raise ModelContextError if the model's context window is too small.
If the operator has explicitly set ``config_context_length`` (model.context_length
in config.yaml), that value overrides detection and the check is skipped — the
operator has accepted responsibility for the override.
Raises:
ModelContextError: when detected context length < CRON_MIN_CONTEXT_TOKENS.
"""
if config_context_length is not None and config_context_length > 0:
# Explicit override — operator has acknowledged the context size.
return
try:
from agent.model_metadata import get_model_context_length
detected = get_model_context_length(
model,
base_url=base_url,
api_key=api_key,
)
except Exception as exc:
# If detection fails, log and allow the job to proceed — the real
# error (if any) will surface at API call time as before.
logger.warning(
"Could not detect context length for model %r: %s — skipping preflight check",
model,
exc,
)
return
if detected < CRON_MIN_CONTEXT_TOKENS:
raise ModelContextError(
f"Model {model!r} has a context window of {detected:,} tokens, "
f"which is below the minimum {CRON_MIN_CONTEXT_TOKENS:,} required by "
f"Hermes Agent. Choose a model with at least "
f"{CRON_MIN_CONTEXT_TOKENS // 1_000}K context, or set "
f"model.context_length in config.yaml to override."
)
def _resolve_origin(job: dict) -> Optional[dict]:
"""Extract origin info from a job, preserving any extra routing metadata."""
origin = job.get("origin")
@@ -470,7 +595,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
Tuple of (success, full_output_doc, final_response, error_message)
"""
from run_agent import AIAgent
# Deploy sync guard: verify the installed AIAgent accepts all kwargs
# the scheduler passes. Catches stale-code-deploy bugs before they
# cascade into 1000+ TypeErrors and gateway restart loops.
_validate_agent_interface()
# Initialize SQLite session store so cron job messages are persisted
# and discoverable via session_search (same pattern as gateway/run.py).
_session_db = None
@@ -531,6 +661,19 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
except Exception as e:
logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e)
# Extract explicit context_length override from config.yaml (model.context_length).
# This mirrors the resolution logic in run_agent.py AIAgent.__init__ so the
# preflight check uses the same value the agent would.
_config_context_length: Optional[int] = None
try:
_model_cfg_for_ctx = _cfg.get("model", {})
if isinstance(_model_cfg_for_ctx, dict):
_raw_ctx = _model_cfg_for_ctx.get("context_length")
if _raw_ctx is not None:
_config_context_length = int(_raw_ctx)
except (TypeError, ValueError):
pass
# Reasoning config from env or config.yaml
from hermes_constants import parse_reasoning_effort
effort = os.getenv("HERMES_REASONING_EFFORT", "")
@@ -593,6 +736,16 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
# Preflight: verify the resolved model has a sufficient context window.
# Raises ModelContextError (propagates out of run_job) so tick() can
# auto-pause the job rather than letting it fail on every tick.
_check_model_context_compat(
turn_route["model"],
base_url=turn_route["runtime"].get("base_url") or "",
api_key=turn_route["runtime"].get("api_key") or "",
config_context_length=_config_context_length,
)
agent = AIAgent(
model=turn_route["model"],
api_key=turn_route["runtime"].get("api_key"),
@@ -628,8 +781,47 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
_POLL_INTERVAL = 5.0
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
# Guard against interpreter shutdown: ThreadPoolExecutor.submit()
# raises RuntimeError("cannot schedule new futures after interpreter
# shutdown") when Python is finalizing (e.g. gateway restart races).
# Fall back to synchronous execution so the job at least attempts.
_cron_pool = None
try:
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
except RuntimeError:
logger.warning(
"Job '%s': ThreadPoolExecutor unavailable (interpreter shutdown?) "
"— falling back to synchronous execution",
job_name,
)
if _cron_pool is not None:
try:
_cron_pool.shutdown(wait=False)
except Exception:
pass
_cron_pool = None
result = agent.run_conversation(prompt)
final_response = result.get("final_response", "") or ""
logged_response = final_response if final_response else "(No response generated)"
output = f"""# Cron Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Prompt
{prompt}
## Response
{logged_response}
"""
logger.info("Job '%s' completed (sync fallback)", job_name)
return True, output, final_response, None
_inactivity_timeout = False
try:
if _cron_inactivity_limit is None:
@@ -656,10 +848,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_inactivity_timeout = True
break
except Exception:
_cron_pool.shutdown(wait=False, cancel_futures=True)
if _cron_pool is not None:
_cron_pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
_cron_pool.shutdown(wait=False)
if _cron_pool is not None:
_cron_pool.shutdown(wait=False)
if _inactivity_timeout:
# Build diagnostic summary from the agent's activity tracker.
@@ -713,10 +907,13 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.info("Job '%s' completed successfully", job_name)
return True, output, final_response, None
except ModelContextError:
# Re-raise so tick() can auto-pause the job and prevent repeated failures.
raise
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)}"
logger.exception("Job '%s' failed: %s", job_name, error_msg)
output = f"""# Cron Job: {job_name} (FAILED)
**Job ID:** {job_id}
@@ -798,17 +995,48 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
# Parallel job execution with priority sorting
PRIORITY_ORDER = {"critical": 0, "high": 1, "normal": 2, "low": 3}
due_jobs_sorted = sorted(due_jobs, key=lambda j: PRIORITY_ORDER.get(j.get("priority", "normal"), 2))
MAX_PARALLEL = int(os.environ.get("HERMES_CRON_MAX_PARALLEL", "10"))
executed = 0
for job in due_jobs:
_job_results = []
def _process_single_job(job):
job_name = job.get("name", job["id"])
if sys.is_finalizing():
return None
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
return (job, success, output, final_response, error)
except ModelContextError as e:
# Auto-pause: incompatible model will never succeed; stop scheduling
# it until the operator reconfigures the job or config.yaml.
error_msg = str(e)
logger.error(
"Job '%s' paused — model context incompatibility: %s",
job_name,
error_msg,
)
from cron.jobs import pause_job
pause_job(job["id"], reason=f"ModelContextError: {error_msg}")
return (job, False, "", "", error_msg)
except Exception as e:
logger.error("Job '%s': parallel error: %s", job_name, e)
return (job, False, "", None, str(e))
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
futures = {executor.submit(_process_single_job, job): job for job in due_jobs_sorted}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is None:
continue
_job_results.append(result)
for job, success, output, final_response, error in _job_results:
try:
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)

View File

@@ -973,6 +973,7 @@ class APIServerAdapter(BasePlatformAdapter):
pause_job as _cron_pause,
resume_job as _cron_resume,
trigger_job as _cron_trigger,
run_job_now as _cron_run_now,
)
# Wrap as staticmethod to prevent descriptor binding — these are plain
# module functions, not instance methods. Without this, self._cron_*()
@@ -986,6 +987,7 @@ class APIServerAdapter(BasePlatformAdapter):
_cron_pause = staticmethod(_cron_pause)
_cron_resume = staticmethod(_cron_resume)
_cron_trigger = staticmethod(_cron_trigger)
_cron_run_now = staticmethod(_cron_run_now)
_CRON_AVAILABLE = True
except ImportError:
pass
@@ -1204,6 +1206,28 @@ class APIServerAdapter(BasePlatformAdapter):
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
async def _handle_run_job_now(self, request: "web.Request") -> "web.Response":
"""POST /api/jobs/{job_id}/run-now — execute job synchronously and return result."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
job_id, id_err = self._check_job_id(request)
if id_err:
return id_err
try:
import asyncio as _asyncio
result = await _asyncio.get_event_loop().run_in_executor(
None, self._cron_run_now, job_id
)
if result is None:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response(result)
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
# ------------------------------------------------------------------
# Output extraction helper
# ------------------------------------------------------------------
@@ -1565,6 +1589,7 @@ class APIServerAdapter(BasePlatformAdapter):
self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job)
self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job)
self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job)
self._app.router.add_post("/api/jobs/{job_id}/run-now", self._handle_run_job_now)
# Structured event streaming
self._app.router.add_post("/v1/runs", self._handle_runs)
self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events)

View File

@@ -221,7 +221,31 @@ def cron_edit(args):
return 0
def _job_action(action: str, job_id: str, success_verb: str) -> int:
def _job_action(action: str, job_id: str, success_verb: str, now: bool = False) -> int:
if action == "run" and now:
# Synchronous execution — run job immediately and show result
result = _cron_api(action="run_now", job_id=job_id)
if not result.get("success"):
if result.get("error"):
print(color(f"Failed to run job now: {result['error']}", Colors.RED))
else:
print(color(f"Failed to run job now: {result.get('error', 'unknown error')}", Colors.RED))
return 1
job = result.get("job", {})
success = result.get("success", False)
error = result.get("error")
final_response = result.get("final_response", "")
name = job.get("name", job_id)
if success:
print(color(f"Job '{name}' completed successfully", Colors.GREEN))
else:
print(color(f"Job '{name}' failed: {error}", Colors.RED))
if final_response:
print(f"\n{final_response}\n")
if not error:
print(color("Stale error state cleared.", Colors.GREEN))
return 0 if success else 1
result = _cron_api(action=action, job_id=job_id)
if not result.get("success"):
print(color(f"Failed to {action} job: {result.get('error', 'unknown error')}", Colors.RED))
@@ -265,7 +289,8 @@ def cron_command(args):
return _job_action("resume", args.job_id, "Resumed")
if subcmd == "run":
return _job_action("run", args.job_id, "Triggered")
now = getattr(args, 'now', False)
return _job_action("run", args.job_id, "Triggered", now=now)
if subcmd in {"remove", "rm", "delete"}:
return _job_action("remove", args.job_id, "Removed")

View File

@@ -4574,6 +4574,7 @@ For more help on a command:
cron_run = cron_subparsers.add_parser("run", help="Run a job on the next scheduler tick")
cron_run.add_argument("job_id", help="Job ID to trigger")
cron_run.add_argument("--now", action="store_true", help="Execute immediately and wait for result (clears stale errors)")
cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job")
cron_remove.add_argument("job_id", help="Job ID to remove")

View File

@@ -172,13 +172,13 @@ class MemPalacePortalProvider(MemoryProvider):
# Extract key facts for Hologram
if self._hologram_store:
# Simple heuristic: look for \"I prefer\", \"The project uses\", etc.
# Simple heuristic: look for "I prefer", "The project uses", etc.
for msg in messages:
if msg.get(\"role\") == \"user\":
content = msg.get(\"content\", \"\")
if \"prefer\" in content.lower() or \"use\" in content.lower():
if msg.get("role") == "user":
content = msg.get("content", "")
if "prefer" in content.lower() or "use" in content.lower():
try:
self._hologram_store.add_fact(content[:200], category=\"user_pref\")
self._hologram_store.add_fact(content[:200], category="user_pref")
except Exception:
pass

View File

@@ -721,6 +721,19 @@ class AIAgent:
self._current_tool: str | None = None
self._api_call_count: int = 0
# Poka-yoke #309: Circuit breaker for error cascading
# P(error | prev was error) = 58.6% vs P(error | prev was success) = 25.2%
# After 3+ consecutive errors, inject guidance to break the cascade.
self._consecutive_tool_errors: int = 0
self._error_streak_tool_names: list = [] # track which tools are in the streak
# Poka-yoke #310: Tool fixation detection
# Marathon sessions show tool fixation - same tool called 8-25 times in a row.
# After 5 consecutive calls to the same tool, nudge the agent to diversify.
self._last_tool_name: str | None = None
self._same_tool_streak: int = 0
self._tool_fixation_threshold: int = 5
# Centralized logging — agent.log (INFO+) and errors.log (WARNING+)
# both live under ~/.hermes/logs/. Idempotent, so gateway mode
# (which creates a new AIAgent per message) won't duplicate handlers.
@@ -6238,6 +6251,12 @@ class AIAgent:
def _run_tool(index, tool_call, function_name, function_args):
"""Worker function executed in a thread."""
start = time.time()
# Poka-yoke #310: Tool fixation detection (concurrent path)
if function_name == self._last_tool_name:
self._same_tool_streak += 1
else:
self._last_tool_name = function_name
self._same_tool_streak = 1
try:
result = self._invoke_tool(function_name, function_args, effective_task_id, tool_call.id)
except Exception as tool_error:
@@ -6288,6 +6307,13 @@ class AIAgent:
if is_error:
result_preview = function_result[:200] if len(function_result) > 200 else function_result
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
# Circuit breaker: track consecutive errors
self._consecutive_tool_errors += 1
self._error_streak_tool_names.append(function_name)
else:
# Reset circuit breaker on success
self._consecutive_tool_errors = 0
self._error_streak_tool_names = []
if self.tool_progress_callback:
try:
@@ -6331,6 +6357,41 @@ class AIAgent:
if subdir_hints:
function_result += subdir_hints
# Circuit breaker: inject warning after 3+ consecutive errors
if self._consecutive_tool_errors >= 3:
streak_info = self._error_streak_tool_names[-self._consecutive_tool_errors:]
unique_tools = list(dict.fromkeys(streak_info))
if self._consecutive_tool_errors == 3:
cb_msg = (
f"\n\n⚠️ CIRCUIT BREAKER: You have had {self._consecutive_tool_errors} consecutive tool errors "
f"({', '.join(unique_tools)}). Errors cascade — P(error|error) is 2.33x higher than normal. "
f"Consider: (1) trying a different tool type, (2) using terminal to debug, "
f"(3) simplifying your approach, or (4) asking the user for guidance."
)
function_result += cb_msg
elif self._consecutive_tool_errors == 6:
cb_msg = (
f"\n\n🛑 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
f"The error cascade is severe. STOP retrying the same approach. "
f"Use terminal to investigate, or switch strategies entirely."
)
function_result += cb_msg
elif self._consecutive_tool_errors >= 9 and self._consecutive_tool_errors % 3 == 0:
cb_msg = (
f"\n\n🔴 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
f"Terminal is your only reliable recovery path. Use it now."
)
function_result += cb_msg
# Poka-yoke #310: Tool fixation nudge
if self._same_tool_streak >= self._tool_fixation_threshold and self._same_tool_streak % self._tool_fixation_threshold == 0:
fixation_msg = (
f"\n\n🔄 TOOL FIXATION: You have called `{function_name}` {self._same_tool_streak} times consecutively. "
f"Consider: (1) trying a different tool, (2) using `terminal` to verify your approach, "
f"(3) stepping back to reassess the task."
)
function_result += fixation_msg
# Append tool result message in order
tool_msg = {
"role": "tool",
@@ -6416,6 +6477,13 @@ class AIAgent:
self._current_tool = function_name
self._touch_activity(f"executing tool: {function_name}")
# Poka-yoke #310: Tool fixation detection
if function_name == self._last_tool_name:
self._same_tool_streak += 1
else:
self._last_tool_name = function_name
self._same_tool_streak = 1
if self.tool_progress_callback:
try:
preview = _build_tool_preview(function_name, function_args)
@@ -6609,8 +6677,14 @@ class AIAgent:
_is_error_result, _ = _detect_tool_failure(function_name, function_result)
if _is_error_result:
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
# Circuit breaker: track consecutive errors
self._consecutive_tool_errors += 1
self._error_streak_tool_names.append(function_name)
else:
logger.info("tool %s completed (%.2fs, %d chars)", function_name, tool_duration, len(function_result))
# Reset circuit breaker on success
self._consecutive_tool_errors = 0
self._error_streak_tool_names = []
if self.tool_progress_callback:
try:
@@ -6642,6 +6716,41 @@ class AIAgent:
if subdir_hints:
function_result += subdir_hints
# Circuit breaker: inject warning after 3+ consecutive errors
if self._consecutive_tool_errors >= 3:
streak_info = self._error_streak_tool_names[-self._consecutive_tool_errors:]
unique_tools = list(dict.fromkeys(streak_info)) # preserve order, deduplicate
if self._consecutive_tool_errors == 3:
cb_msg = (
f"\n\n⚠️ CIRCUIT BREAKER: You have had {self._consecutive_tool_errors} consecutive tool errors "
f"({', '.join(unique_tools)}). Errors cascade — P(error|error) is 2.33x higher than normal. "
f"Consider: (1) trying a different tool type, (2) using terminal to debug, "
f"(3) simplifying your approach, or (4) asking the user for guidance."
)
function_result += cb_msg
elif self._consecutive_tool_errors == 6:
cb_msg = (
f"\n\n🛑 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
f"The error cascade is severe. STOP retrying the same approach. "
f"Use terminal to investigate, or switch strategies entirely."
)
function_result += cb_msg
elif self._consecutive_tool_errors >= 9 and self._consecutive_tool_errors % 3 == 0:
cb_msg = (
f"\n\n🔴 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
f"Terminal is your only reliable recovery path. Use it now."
)
function_result += cb_msg
# Poka-yoke #310: Tool fixation nudge
if self._same_tool_streak >= self._tool_fixation_threshold and self._same_tool_streak % self._tool_fixation_threshold == 0:
fixation_msg = (
f"\n\n🔄 TOOL FIXATION: You have called `{function_name}` {self._same_tool_streak} times consecutively. "
f"Consider: (1) trying a different tool, (2) using `terminal` to verify your approach, "
f"(3) stepping back to reassess the task."
)
function_result += fixation_msg
tool_msg = {
"role": "tool",
"content": function_result,

View File

@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt, _check_model_context_compat, ModelContextError, CRON_MIN_CONTEXT_TOKENS
class TestResolveOrigin:
@@ -863,3 +863,141 @@ class TestTickAdvanceBeforeRun:
adv_mock.assert_called_once_with("test-advance")
# advance must happen before run
assert call_order == [("advance", "test-advance"), ("run", "test-advance")]
class TestDeploySyncGuard:
"""Tests for _validate_agent_interface() — the deploy sync guard."""
def test_passes_when_all_params_present(self):
"""Validation passes when AIAgent accepts every scheduler kwarg."""
from cron.scheduler import _validate_agent_interface, _agent_interface_validated
import cron.scheduler as sched_mod
# Reset the cached flag so the check actually runs.
sched_mod._agent_interface_validated = False
# Should not raise.
_validate_agent_interface()
assert sched_mod._agent_interface_validated is True
def test_fails_when_param_missing(self):
"""Validation raises RuntimeError when AIAgent is missing a required param."""
import cron.scheduler as sched_mod
from unittest.mock import MagicMock
import inspect
# Save and restore.
orig_flag = sched_mod._agent_interface_validated
try:
sched_mod._agent_interface_validated = False
# Build a fake AIAgent class whose __init__ lacks 'tool_choice'.
class FakeAIAgent:
def __init__(self, model="", max_iterations=90, quiet_mode=False,
disabled_toolsets=None, skip_memory=False, platform=None,
session_id=None, session_db=None):
pass
fake_module = MagicMock()
fake_module.AIAgent = FakeAIAgent
with pytest.raises(RuntimeError, match="Missing parameters: tool_choice"):
with patch.dict("sys.modules", {"run_agent": fake_module}):
sched_mod._validate_agent_interface()
finally:
sched_mod._agent_interface_validated = orig_flag
def test_cached_after_first_run(self):
"""Second call is a no-op (uses cached flag)."""
import cron.scheduler as sched_mod
sched_mod._agent_interface_validated = True
# Should not raise even if we somehow break AIAgent — the flag is set.
sched_mod._validate_agent_interface()
# No exception = pass.
class TestCheckModelContextCompat:
"""Tests for _check_model_context_compat() preflight validation."""
def test_raises_when_context_below_minimum(self):
"""ModelContextError raised when detected context < CRON_MIN_CONTEXT_TOKENS."""
with patch(
"cron.scheduler.get_model_context_length" if False else "agent.model_metadata.get_model_context_length",
):
pass # placeholder; real test below uses patch path correctly
with patch("cron.scheduler._check_model_context_compat") as mock_check:
mock_check.side_effect = ModelContextError(
"Model 'gemma4:latest' has a context window of 8,192 tokens, "
"which is below the minimum 64,000 required by Hermes Agent."
)
with pytest.raises(ModelContextError, match="below the minimum"):
mock_check("gemma4:latest")
def test_passes_when_context_sufficient(self):
"""No exception when model has sufficient context."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=128_000,
):
# Should not raise
_check_model_context_compat("claude-sonnet-4-6", base_url="", api_key="")
def test_passes_with_config_override_below_minimum(self):
"""When config_context_length is set, check is skipped regardless of detected size."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=8_192, # Would normally fail
) as mock_get:
# config_context_length override skips the check
_check_model_context_compat(
"gemma4:latest",
config_context_length=8_192,
)
# get_model_context_length should NOT be called — check is bypassed
mock_get.assert_not_called()
def test_raises_model_context_error_subclass_of_value_error(self):
"""ModelContextError is a subclass of ValueError."""
assert issubclass(ModelContextError, ValueError)
def test_detection_failure_is_non_fatal(self):
"""If context length detection raises, the check is skipped (fail-open)."""
with patch(
"agent.model_metadata.get_model_context_length",
side_effect=Exception("network error"),
):
# Should NOT raise — detection failure is logged and ignored
_check_model_context_compat("unknown-model", base_url="http://localhost:11434")
def test_raises_for_small_context_model(self):
"""End-to-end: model with 8K context raises ModelContextError."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=8_192,
):
with pytest.raises(ModelContextError) as exc_info:
_check_model_context_compat("gemma4:latest")
err = str(exc_info.value)
assert "gemma4:latest" in err
assert "8,192" in err
assert str(CRON_MIN_CONTEXT_TOKENS) in err or "64,000" in err
assert "config.yaml" in err
def test_boundary_exactly_at_minimum_passes(self):
"""A model with exactly CRON_MIN_CONTEXT_TOKENS context is accepted."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=CRON_MIN_CONTEXT_TOKENS,
):
# Should not raise
_check_model_context_compat("borderline-model")
def test_boundary_one_below_minimum_raises(self):
"""A model with context length one below the minimum is rejected."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=CRON_MIN_CONTEXT_TOKENS - 1,
):
with pytest.raises(ModelContextError):
_check_model_context_compat("borderline-model")

View File

@@ -316,10 +316,17 @@ def cronjob(
updated = resume_job(job_id)
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized in {"run", "run_now", "trigger"}:
if normalized in {"run", "trigger"}:
updated = trigger_job(job_id)
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized == "run_now":
from cron.jobs import run_job_now
result = run_job_now(job_id)
if result is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps(result, indent=2)
if normalized == "update":
updates: Dict[str, Any] = {}
if prompt is not None: