Compare commits

...

5 Commits

Author SHA1 Message Date
Alexander Whitestone
c1826ece68 feat(agent): error circuit breaker to prevent cascade amplification
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 24s
P(error | prev error) = 58.6% vs P(error | prev success) = 25.2%.
That's a 2.33x cascade factor. Max observed streak: 31 consecutive errors.
After 9 errors, 41/46 recoveries are via terminal alone.

This commit adds a circuit breaker to the agent loop:

- _error_streak counter on AIAgent, incremented per tool error, reset on success
- At 3 consecutive errors (nudge threshold): inject a WARNING into the last
  tool message telling the LLM to try a different approach or use terminal
- At 5 consecutive errors (halt threshold): set _error_streak_halted=True,
  inject CIRCUIT BREAKER message, and break out of the main loop
- Works in both concurrent and sequential tool execution paths
- Nudge is injected as _circuit_breaker key in JSON tool results, or appended
  to plain-text results (same pattern as budget pressure injection)

Also fixes a pre-existing indentation bug in scheduler.py tick() that caused
SyntaxError on the for-job-results loop body.

16 tests covering: streak counting, nudge/halt thresholds, reset on success,
message injection (JSON + plain text + edge cases), class constants, and
__init__ attribute verification.

Fixes #309
2026-04-13 10:22:15 -04:00
c88b172bd9 Merge pull request 'perf(cron): parallel job execution + priority sorting (#353)' (#357) from fix/cron-tick-backlog into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 20s
2026-04-13 08:29:31 +00:00
Alexander Whitestone
4373ef2698 perf(cron): parallel job execution + priority sorting (#353)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 20s
2026-04-13 04:21:14 -04:00
fed7156a86 Merge pull request 'feat(cron): deploy sync guard — catch stale code before cascading failures' (#356) from feat/deploy-sync-guard into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 28s
2026-04-13 08:15:34 +00:00
Alexander Whitestone
e68c4d3e4e feat(cron): add deploy sync guard to catch stale code before cascading failures
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 26s
When the installed run_agent.py diverges from what scheduler.py expects,
every cron job fails with TypeError on AIAgent.__init__() — a silent total
outage that cascades into gateway restarts, asyncio shutdown errors, and
auth token expiry.

This commit adds a _validate_agent_interface() guard that:
- Inspects AIAgent.__init__ at runtime via inspect.signature
- Verifies every kwarg the scheduler passes exists in the constructor
- Fails fast with a clear remediation message on mismatch
- Runs once per gateway process (cached, zero per-job overhead)

The guard is called at the top of run_job() before any work begins.
It would have caught the tool_choice TypeError that caused 1,199 failures
across 55 jobs (meta-issue #343).

Includes 3 tests: pass, fail, and cache verification.
2026-04-13 03:33:48 -04:00
5 changed files with 408 additions and 17 deletions

View File

@@ -10,6 +10,7 @@ runs at a time if multiple processes overlap.
import asyncio import asyncio
import concurrent.futures import concurrent.futures
import inspect
import json import json
import logging import logging
import os import os
@@ -50,6 +51,71 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Deploy Sync Guard
# ---------------------------------------------------------------------------
# The scheduler passes keyword arguments to AIAgent() that may not exist in
# older installed versions. When the installed run_agent.py diverges from the
# version the scheduler was written against, every cron job fails with a
# TypeError — a silent total outage. This guard catches that at the first
# tick rather than after 1000+ error log lines.
#
# The check runs once per gateway process (cached by _agent_interface_validated).
# It inspects AIAgent.__init__ and verifies every parameter the scheduler
# passes is accepted. On mismatch it raises RuntimeError with a fix command.
_agent_interface_validated = False
# Parameters the scheduler passes to AIAgent() in run_job().
# If you add a new kwarg to the AIAgent() call below, add it here too.
_SCHEDULER_AGENT_KWARGS = {
"tool_choice": "required",
"skip_memory": True,
"platform": "cron",
}
def _validate_agent_interface():
"""Verify AIAgent.__init__ accepts every kwarg the scheduler uses.
Raises RuntimeError with a remediation message on mismatch.
Called once per process from run_job(); subsequent calls are no-ops.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Deploy sync guard: cannot import AIAgent from run_agent: {exc}\n"
"The installed hermes-agent package may be corrupted. "
"Reinstall: pip install -e ~/.hermes/hermes-agent"
) from exc
sig = inspect.signature(AIAgent.__init__)
params = set(sig.parameters.keys()) - {"self"}
missing = [kw for kw in _SCHEDULER_AGENT_KWARGS if kw not in params]
if missing:
raise RuntimeError(
"Deploy sync guard: AIAgent.__init__() is missing parameters that "
"the cron scheduler requires. This means the installed code is out "
"of sync with the scheduler module.\n"
f" Missing parameters: {', '.join(missing)}\n"
f" Expected by: cron/scheduler.py (run_job → AIAgent())\n"
f" Fix: pip install -e ~/.hermes/hermes-agent --force-reinstall\n"
f" Then restart the gateway."
)
_agent_interface_validated = True
logger.info(
"Deploy sync guard: AIAgent interface OK (%d params, %d scheduler deps)",
len(params), len(_SCHEDULER_AGENT_KWARGS),
)
# Sentinel: when a cron agent has nothing new to report, it can start its # Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved # response with this marker to suppress delivery. Output is still saved
# locally for audit. # locally for audit.
@@ -470,7 +536,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
Tuple of (success, full_output_doc, final_response, error_message) Tuple of (success, full_output_doc, final_response, error_message)
""" """
from run_agent import AIAgent from run_agent import AIAgent
# Deploy sync guard: verify the installed AIAgent accepts all kwargs
# the scheduler passes. Catches stale-code-deploy bugs before they
# cascade into 1000+ TypeErrors and gateway restart loops.
_validate_agent_interface()
# Initialize SQLite session store so cron job messages are persisted # Initialize SQLite session store so cron job messages are persisted
# and discoverable via session_search (same pattern as gateway/run.py). # and discoverable via session_search (same pattern as gateway/run.py).
_session_db = None _session_db = None
@@ -839,27 +910,36 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose: if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs)) logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
# Parallel job execution with priority sorting
PRIORITY_ORDER = {"critical": 0, "high": 1, "normal": 2, "low": 3}
due_jobs_sorted = sorted(due_jobs, key=lambda j: PRIORITY_ORDER.get(j.get("priority", "normal"), 2))
MAX_PARALLEL = int(os.environ.get("HERMES_CRON_MAX_PARALLEL", "10"))
executed = 0 executed = 0
for job in due_jobs: _job_results = []
# If the interpreter is shutting down (e.g. gateway restart),
# stop processing immediately — ThreadPoolExecutor.submit() def _process_single_job(job):
# will raise RuntimeError for every remaining job. job_name = job.get("name", job["id"])
if sys.is_finalizing(): if sys.is_finalizing():
logger.warning( return None
"Interpreter finalizing — skipping %d remaining job(s)",
len(due_jobs) - executed,
)
break
try: try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"]) advance_next_run(job["id"])
success, output, final_response, error = run_job(job) success, output, final_response, error = run_job(job)
return (job, success, output, final_response, error)
except Exception as e:
logger.error("Job '%s': parallel error: %s", job_name, e)
return (job, False, "", None, str(e))
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
futures = {executor.submit(_process_single_job, job): job for job in due_jobs_sorted}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is None:
continue
_job_results.append(result)
for job, success, output, final_response, error in _job_results:
try:
output_file = save_job_output(job["id"], output) output_file = save_job_output(job["id"], output)
if verbose: if verbose:
logger.info("Output saved to: %s", output_file) logger.info("Output saved to: %s", output_file)

View File

@@ -590,6 +590,12 @@ class AIAgent:
# Consumed by every LLM turn across parent + all subagents. # Consumed by every LLM turn across parent + all subagents.
self.iteration_budget = iteration_budget or IterationBudget(max_iterations) self.iteration_budget = iteration_budget or IterationBudget(max_iterations)
self.tool_delay = tool_delay self.tool_delay = tool_delay
# Error circuit breaker — tracks consecutive tool failures across turns.
# After N consecutive errors, inject a nudge to change strategy.
# After N+2, halt and report. Prevents 2.33x cascade amplification.
# See: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/309
self._error_streak = 0
self._error_streak_halted = False
self.save_trajectories = save_trajectories self.save_trajectories = save_trajectories
self.verbose_logging = verbose_logging self.verbose_logging = verbose_logging
self.quiet_mode = quiet_mode self.quiet_mode = quiet_mode
@@ -6285,6 +6291,9 @@ class AIAgent:
else: else:
function_name, function_args, function_result, tool_duration, is_error = r function_name, function_args, function_result, tool_duration, is_error = r
# Error circuit breaker: track consecutive failures
_streak_nudge = self._update_error_streak(is_error)
if is_error: if is_error:
result_preview = function_result[:200] if len(function_result) > 200 else function_result result_preview = function_result[:200] if len(function_result) > 200 else function_result
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview) logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
@@ -6339,6 +6348,25 @@ class AIAgent:
} }
messages.append(tool_msg) messages.append(tool_msg)
# ── Circuit breaker injection (concurrent path) ─────────────────
# Inject the nudge from the error streak tracker, if any.
if self._error_streak >= self._ERROR_STREAK_NUDGE and messages and messages[-1].get("role") == "tool":
if self._error_streak_halted:
_nudge_text = (
f"[CIRCUIT BREAKER: {self._error_streak} consecutive tool errors. "
"STOP calling tools. Provide your final response based on what you "
"have learned. If you need to continue, use the terminal tool — it "
"is the most reliable path. Do NOT repeat the same failed approach.]"
)
else:
_nudge_text = (
f"[WARNING: {self._error_streak} consecutive tool errors detected. "
"Your current approach is not working. Try a different tool type, "
"simplify your request, or use the terminal tool. "
"Do NOT retry the exact same call.]"
)
self._inject_circuit_breaker(messages, _nudge_text)
# ── Budget pressure injection ──────────────────────────────────── # ── Budget pressure injection ────────────────────────────────────
budget_warning = self._get_budget_warning(api_call_count) budget_warning = self._get_budget_warning(api_call_count)
if budget_warning and messages and messages[-1].get("role") == "tool": if budget_warning and messages and messages[-1].get("role") == "tool":
@@ -6607,6 +6635,7 @@ class AIAgent:
# Log tool errors to the persistent error log so [error] tags # Log tool errors to the persistent error log so [error] tags
# in the UI always have a corresponding detailed entry on disk. # in the UI always have a corresponding detailed entry on disk.
_is_error_result, _ = _detect_tool_failure(function_name, function_result) _is_error_result, _ = _detect_tool_failure(function_name, function_result)
_streak_nudge = self._update_error_streak(_is_error_result)
if _is_error_result: if _is_error_result:
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview) logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
else: else:
@@ -6673,6 +6702,24 @@ class AIAgent:
if self.tool_delay > 0 and i < len(assistant_message.tool_calls): if self.tool_delay > 0 and i < len(assistant_message.tool_calls):
time.sleep(self.tool_delay) time.sleep(self.tool_delay)
# ── Circuit breaker injection (sequential path) ────────────────
if self._error_streak >= self._ERROR_STREAK_NUDGE and messages and messages[-1].get("role") == "tool":
if self._error_streak_halted:
_nudge_text = (
f"[CIRCUIT BREAKER: {self._error_streak} consecutive tool errors. "
"STOP calling tools. Provide your final response based on what you "
"have learned. If you need to continue, use the terminal tool — it "
"is the most reliable path. Do NOT repeat the same failed approach.]"
)
else:
_nudge_text = (
f"[WARNING: {self._error_streak} consecutive tool errors detected. "
"Your current approach is not working. Try a different tool type, "
"simplify your request, or use the terminal tool. "
"Do NOT retry the exact same call.]"
)
self._inject_circuit_breaker(messages, _nudge_text)
# ── Budget pressure injection ───────────────────────────────── # ── Budget pressure injection ─────────────────────────────────
# After all tool calls in this turn are processed, check if we're # After all tool calls in this turn are processed, check if we're
# approaching max_iterations. If so, inject a warning into the LAST # approaching max_iterations. If so, inject a warning into the LAST
@@ -6694,6 +6741,66 @@ class AIAgent:
tier = "⚠️ WARNING" if remaining <= self.max_iterations * 0.1 else "💡 CAUTION" tier = "⚠️ WARNING" if remaining <= self.max_iterations * 0.1 else "💡 CAUTION"
print(f"{self.log_prefix}{tier}: {remaining} iterations remaining") print(f"{self.log_prefix}{tier}: {remaining} iterations remaining")
# ── Error circuit breaker ──────────────────────────────────────────────
# Tracks consecutive tool failures across API turns. After a threshold
# of errors (default 3), injects a nudge telling the LLM to try a
# different approach. After hard limit (default 5), halts iteration
# and returns a failure summary instead of burning more tokens.
#
# Empirical basis (issue #309):
# P(error | prev error) = 58.6% vs P(error | prev success) = 25.2%
# Max observed streak: 31 consecutive errors.
# Recovery path: terminal tool (2,300 recoveries).
_ERROR_STREAK_NUDGE = 3 # Inject strategy-change nudge
_ERROR_STREAK_HALT = 5 # Stop and report
def _update_error_streak(self, is_error: bool) -> Optional[str]:
"""Update the error streak counter. Returns a nudge string or None.
Call once per tool result. Resets streak on success, increments on
error. Returns a system nudge to inject into the last tool message
when the streak crosses the nudge threshold.
"""
if is_error:
self._error_streak += 1
else:
self._error_streak = 0
return None
if self._error_streak >= self._ERROR_STREAK_HALT:
self._error_streak_halted = True
return (
f"[CIRCUIT BREAKER: {self._error_streak} consecutive tool errors. "
"STOP calling tools. Provide your final response based on what you "
"have learned. If you need to continue, use the terminal tool — it "
"is the most reliable path. Do NOT repeat the same failed approach.]"
)
if self._error_streak >= self._ERROR_STREAK_NUDGE:
return (
f"[WARNING: {self._error_streak} consecutive tool errors detected. "
"Your current approach is not working. Try a different tool type, "
"simplify your request, or use the terminal tool. "
"Do NOT retry the exact same call.]"
)
return None
def _inject_circuit_breaker(self, messages: list, nudge: str) -> None:
"""Append the circuit breaker nudge to the last tool message."""
if not messages or messages[-1].get("role") != "tool":
return
last_content = messages[-1]["content"]
try:
parsed = json.loads(last_content)
if isinstance(parsed, dict):
parsed["_circuit_breaker"] = nudge
messages[-1]["content"] = json.dumps(parsed, ensure_ascii=False)
return
except (json.JSONDecodeError, TypeError):
pass
messages[-1]["content"] = last_content + f"\n\n{nudge}"
def _get_budget_warning(self, api_call_count: int) -> Optional[str]: def _get_budget_warning(self, api_call_count: int) -> Optional[str]:
"""Return a budget pressure string, or None if not yet needed. """Return a budget pressure string, or None if not yet needed.
@@ -7228,7 +7335,12 @@ class AIAgent:
if not self.quiet_mode: if not self.quiet_mode:
self._safe_print("\n⚡ Breaking out of tool loop due to interrupt...") self._safe_print("\n⚡ Breaking out of tool loop due to interrupt...")
break break
# Error circuit breaker: halt if too many consecutive tool errors
if self._error_streak_halted:
logger.warning("Circuit breaker tripped: %d consecutive errors, halting.", self._error_streak)
break
api_call_count += 1 api_call_count += 1
self._api_call_count = api_call_count self._api_call_count = api_call_count
self._touch_activity(f"starting API call #{api_call_count}") self._touch_activity(f"starting API call #{api_call_count}")

View File

@@ -863,3 +863,54 @@ class TestTickAdvanceBeforeRun:
adv_mock.assert_called_once_with("test-advance") adv_mock.assert_called_once_with("test-advance")
# advance must happen before run # advance must happen before run
assert call_order == [("advance", "test-advance"), ("run", "test-advance")] assert call_order == [("advance", "test-advance"), ("run", "test-advance")]
class TestDeploySyncGuard:
"""Tests for _validate_agent_interface() — the deploy sync guard."""
def test_passes_when_all_params_present(self):
"""Validation passes when AIAgent accepts every scheduler kwarg."""
from cron.scheduler import _validate_agent_interface, _agent_interface_validated
import cron.scheduler as sched_mod
# Reset the cached flag so the check actually runs.
sched_mod._agent_interface_validated = False
# Should not raise.
_validate_agent_interface()
assert sched_mod._agent_interface_validated is True
def test_fails_when_param_missing(self):
"""Validation raises RuntimeError when AIAgent is missing a required param."""
import cron.scheduler as sched_mod
from unittest.mock import MagicMock
import inspect
# Save and restore.
orig_flag = sched_mod._agent_interface_validated
try:
sched_mod._agent_interface_validated = False
# Build a fake AIAgent class whose __init__ lacks 'tool_choice'.
class FakeAIAgent:
def __init__(self, model="", max_iterations=90, quiet_mode=False,
disabled_toolsets=None, skip_memory=False, platform=None,
session_id=None, session_db=None):
pass
fake_module = MagicMock()
fake_module.AIAgent = FakeAIAgent
with pytest.raises(RuntimeError, match="Missing parameters: tool_choice"):
with patch.dict("sys.modules", {"run_agent": fake_module}):
sched_mod._validate_agent_interface()
finally:
sched_mod._agent_interface_validated = orig_flag
def test_cached_after_first_run(self):
"""Second call is a no-op (uses cached flag)."""
import cron.scheduler as sched_mod
sched_mod._agent_interface_validated = True
# Should not raise even if we somehow break AIAgent — the flag is set.
sched_mod._validate_agent_interface()
# No exception = pass.

View File

View File

@@ -0,0 +1,148 @@
"""Tests for the error circuit breaker in run_agent.py (issue #309)."""
import json
import pytest
from unittest.mock import patch, MagicMock
class TestErrorStreakTracking:
"""Test _update_error_streak counter logic."""
def _make_agent(self):
"""Create a minimal AIAgent-like object for testing streak logic."""
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent._error_streak = 0
agent._error_streak_halted = False
return agent
def test_success_resets_streak(self):
agent = self._make_agent()
agent._error_streak = 5
result = agent._update_error_streak(is_error=False)
assert agent._error_streak == 0
assert result is None
def test_error_increments_streak(self):
agent = self._make_agent()
agent._update_error_streak(is_error=True)
assert agent._error_streak == 1
agent._update_error_streak(is_error=True)
assert agent._error_streak == 2
def test_nudge_at_threshold(self):
agent = self._make_agent()
# Errors 1 and 2: no nudge
assert agent._update_error_streak(is_error=True) is None # streak=1
assert agent._update_error_streak(is_error=True) is None # streak=2
# Error 3: nudge
nudge = agent._update_error_streak(is_error=True) # streak=3
assert nudge is not None
assert "WARNING" in nudge
assert "3 consecutive" in nudge
assert not agent._error_streak_halted
def test_halt_at_threshold(self):
agent = self._make_agent()
agent._error_streak = 4
nudge = agent._update_error_streak(is_error=True) # streak=5
assert nudge is not None
assert "CIRCUIT BREAKER" in nudge
assert agent._error_streak_halted
def test_success_between_errors_resets(self):
agent = self._make_agent()
agent._update_error_streak(is_error=True) # 1
agent._update_error_streak(is_error=True) # 2
agent._update_error_streak(is_error=False) # reset
assert agent._error_streak == 0
# Two more errors shouldn't nudge yet
assert agent._update_error_streak(is_error=True) is None # 1
assert agent._update_error_streak(is_error=True) is None # 2
def test_streak_above_halt_keeps_halting(self):
agent = self._make_agent()
agent._error_streak = 6
agent._error_streak_halted = True
nudge = agent._update_error_streak(is_error=True)
assert "CIRCUIT BREAKER" in nudge
assert agent._error_streak == 7
class TestInjectCircuitBreaker:
"""Test _inject_circuit_breaker message injection."""
def _make_agent(self):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
return agent
def test_injects_into_json_message(self):
agent = self._make_agent()
messages = [{"role": "tool", "content": json.dumps({"output": "hello"}), "tool_call_id": "t1"}]
agent._inject_circuit_breaker(messages, "[WARNING: test nudge]")
parsed = json.loads(messages[-1]["content"])
assert parsed["_circuit_breaker"] == "[WARNING: test nudge]"
assert parsed["output"] == "hello"
def test_injects_into_plain_text_message(self):
agent = self._make_agent()
messages = [{"role": "tool", "content": "some error string", "tool_call_id": "t1"}]
agent._inject_circuit_breaker(messages, "[WARNING: test nudge]")
assert "[WARNING: test nudge]" in messages[-1]["content"]
assert "some error string" in messages[-1]["content"]
def test_noop_on_non_tool_message(self):
agent = self._make_agent()
messages = [{"role": "user", "content": "hello"}]
agent._inject_circuit_breaker(messages, "[WARNING: test nudge]")
assert messages[-1]["content"] == "hello"
def test_noop_on_empty_messages(self):
agent = self._make_agent()
agent._inject_circuit_breaker([], "[WARNING: test nudge]")
def test_noop_on_missing_role(self):
agent = self._make_agent()
messages = [{"content": "no role key"}]
agent._inject_circuit_breaker(messages, "[WARNING: test nudge]")
assert messages[-1]["content"] == "no role key"
class TestCircuitBreakerConstants:
"""Test that the class-level thresholds are set correctly."""
def test_nudge_threshold(self):
from run_agent import AIAgent
assert AIAgent._ERROR_STREAK_NUDGE == 3
def test_halt_threshold(self):
from run_agent import AIAgent
assert AIAgent._ERROR_STREAK_HALT == 5
def test_halt_greater_than_nudge(self):
from run_agent import AIAgent
assert AIAgent._ERROR_STREAK_HALT > AIAgent._ERROR_STREAK_NUDGE
class TestCircuitBreakerIntegration:
"""Test that the circuit breaker is initialized in AIAgent.__init__."""
def test_agent_has_streak_attributes(self):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent._error_streak = 0
agent._error_streak_halted = False
# Verify the attributes exist and have correct defaults
assert hasattr(agent, "_error_streak")
assert hasattr(agent, "_error_streak_halted")
assert agent._error_streak == 0
assert agent._error_streak_halted is False
def test_agent_init_sets_streak(self):
"""Verify __init__ actually initializes the streak attributes."""
import inspect
from run_agent import AIAgent
src = inspect.getsource(AIAgent.__init__)
assert "_error_streak" in src
assert "_error_streak_halted" in src