Compare commits

...

2 Commits

Author SHA1 Message Date
Alexander Whitestone
215913bd45 fix(cron): add [SCRIPT_FAILED] marker for prompt-wrapped script jobs
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 27s
Fixes #348

Problem: Prompt-wrapped script jobs (like Gitea Event Watcher) have
no way to propagate external command failure back to the scheduler.
The agent describes the error in prose, but run_job() returns
success=True because no exception was raised. Cron state shows
green even when the underlying script failed.

Fix:
1. Added SCRIPT_FAILED_MARKER = "[SCRIPT_FAILED]" constant
2. Cron hint now instructs agents: if an external command failed,
   respond with "[SCRIPT_FAILED]: <reason>"
3. run_job() scans final_response for the marker. If found,
   overrides success=False and extracts the reason.

The marker is checked BEFORE the normal output path, so
script failures are always recorded as failures in cron state.
2026-04-13 14:56:31 -04:00
Alexander Whitestone
4cce433abb fix(cron): filter unsupported AIAgent kwargs instead of crashing jobs
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 23s
Fixes #341

Problem: When the installed run_agent.py diverges from what
scheduler.py expects, AIAgent() raises TypeError on unexpected
kwargs. This kills every cron job — 1,199 failures in the
2026-04-13 incident.

The deploy sync guard (PR #356) catches this at the FIRST job
via fail-fast. This PR adds a second layer: _safe_agent_kwargs()
filters unsupported kwargs before passing them to AIAgent().
Jobs run with degraded functionality instead of crashing.

Changes:
- _safe_agent_kwargs(): inspects AIAgent.__init__ signature,
  drops unsupported kwargs, logs warning
- AIAgent() call now wraps kwargs through _safe_agent_kwargs()
- 3 tests: all pass, drops unsupported, empty if all dropped
2026-04-13 09:36:22 -04:00
2 changed files with 210 additions and 24 deletions

View File

@@ -40,6 +40,112 @@ from hermes_time import now as _hermes_now
logger = logging.getLogger(__name__)
# =====================================================================
# Deploy Sync Guard
# =====================================================================
#
# If the installed run_agent.py diverges from the version scheduler.py
# was written against, every cron job fails with:
# TypeError: AIAgent.__init__() got an unexpected keyword argument '...'
#
# _validate_agent_interface() catches this at the FIRST job, not the
# 55th. It uses inspect.signature() to verify every kwarg we pass is
# accepted by AIAgent.__init__().
#
# Maintaining this list: if you add a kwarg to the AIAgent() call in
# run_job(), add it here too. The guard catches mismatches.
_SCHEDULER_AGENT_KWARGS: set = frozenset({
"model", "api_key", "base_url", "provider", "api_mode",
"acp_command", "acp_args", "max_iterations", "reasoning_config",
"prefill_messages", "providers_allowed", "providers_ignored",
"providers_order", "provider_sort", "disabled_toolsets",
"tool_choice", "quiet_mode", "skip_memory", "platform",
"session_id", "session_db",
})
_agent_interface_validated: bool = False
def _validate_agent_interface() -> None:
"""Verify installed AIAgent.__init__ accepts every kwarg the scheduler passes.
Raises RuntimeError with actionable guidance if params are missing.
Caches result — runs once per gateway process lifetime.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
import inspect
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Cannot import AIAgent: {exc}\n"
"Is hermes-agent installed? Check PYTHONPATH."
) from exc
sig = inspect.signature(AIAgent.__init__)
accepted = set(sig.parameters.keys()) - {"self"}
missing = _SCHEDULER_AGENT_KWARGS - accepted
if missing:
sorted_missing = sorted(missing)
raise RuntimeError(
"Deploy sync guard FAILED — AIAgent.__init__() is missing params:\n"
f" {', '.join(sorted_missing)}\n"
"This means the installed run_agent.py is out of date.\n"
"Fix: pull latest hermes-agent code and restart the gateway.\n"
" cd ~/.hermes/hermes-agent && git pull && source venv/bin/activate"
)
_agent_interface_validated = True
logger.debug("Deploy sync guard passed — %d params verified", len(_SCHEDULER_AGENT_KWARGS))
def _safe_agent_kwargs(kwargs: dict) -> dict:
"""Filter kwargs to only those accepted by installed AIAgent.__init__.
More resilient than _validate_agent_interface() alone: instead of
crashing on mismatch, drops unsupported kwargs and logs a warning.
Jobs run with degraded functionality instead of failing entirely.
Args:
kwargs: The kwargs dict the scheduler wants to pass to AIAgent().
Returns:
A new dict containing only kwargs the installed AIAgent accepts.
"""
import inspect
try:
from run_agent import AIAgent
except ImportError:
# Can't import — pass everything through, let the real error surface
return kwargs
sig = inspect.signature(AIAgent.__init__)
accepted = set(sig.parameters.keys()) - {"self"}
safe = {}
dropped = []
for key, value in kwargs.items():
if key in accepted:
safe[key] = value
else:
dropped.append(key)
if dropped:
logger.warning(
"Dropping unsupported AIAgent kwargs (stale install?): %s",
", ".join(sorted(dropped)),
)
return safe
# Valid delivery platforms — used to validate user-supplied platform names
# in cron delivery targets, preventing env var enumeration via crafted names.
_KNOWN_DELIVERY_PLATFORMS = frozenset({
@@ -54,6 +160,7 @@ from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
SILENT_MARKER = "[SILENT]"
SCRIPT_FAILED_MARKER = "[SCRIPT_FAILED]"
# Resolve Hermes home directory (respects HERMES_HOME override)
_hermes_home = get_hermes_home()
@@ -414,7 +521,11 @@ def _build_job_prompt(job: dict) -> str:
"SILENT: If there is genuinely nothing new to report, respond "
"with exactly \"[SILENT]\" (nothing else) to suppress delivery. "
"Never combine [SILENT] with content — either report your "
"findings normally, or say [SILENT] and nothing more.]\n\n"
"findings normally, or say [SILENT] and nothing more. "
"SCRIPT_FAILURE: If an external command or script you ran "
"failed (timeout, crash, connection error), respond with "
"\"[SCRIPT_FAILED]: <one-line reason>\" followed by your "
"analysis. This lets the scheduler record the failure.]\n\n"
)
prompt = cron_hint + prompt
if skills is None:
@@ -469,6 +580,10 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
Returns:
Tuple of (success, full_output_doc, final_response, error_message)
"""
# Deploy sync guard — fail fast on first job if the installed
# AIAgent.__init__ is missing params the scheduler expects.
_validate_agent_interface()
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
@@ -593,29 +708,30 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
agent = AIAgent(
model=turn_route["model"],
api_key=turn_route["runtime"].get("api_key"),
base_url=turn_route["runtime"].get("base_url"),
provider=turn_route["runtime"].get("provider"),
api_mode=turn_route["runtime"].get("api_mode"),
acp_command=turn_route["runtime"].get("command"),
acp_args=turn_route["runtime"].get("args"),
max_iterations=max_iterations,
reasoning_config=reasoning_config,
prefill_messages=prefill_messages,
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
provider_sort=pr.get("sort"),
disabled_toolsets=["cronjob", "messaging", "clarify"],
tool_choice="required",
quiet_mode=True,
skip_memory=True, # Cron system prompts would corrupt user representations
platform="cron",
session_id=_cron_session_id,
session_db=_session_db,
)
_agent_kwargs = _safe_agent_kwargs({
"model": turn_route["model"],
"api_key": turn_route["runtime"].get("api_key"),
"base_url": turn_route["runtime"].get("base_url"),
"provider": turn_route["runtime"].get("provider"),
"api_mode": turn_route["runtime"].get("api_mode"),
"acp_command": turn_route["runtime"].get("command"),
"acp_args": turn_route["runtime"].get("args"),
"max_iterations": max_iterations,
"reasoning_config": reasoning_config,
"prefill_messages": prefill_messages,
"providers_allowed": pr.get("only"),
"providers_ignored": pr.get("ignore"),
"providers_order": pr.get("order"),
"provider_sort": pr.get("sort"),
"disabled_toolsets": ["cronjob", "messaging", "clarify"],
"tool_choice": "required",
"quiet_mode": True,
"skip_memory": True, # Cron system prompts would corrupt user representations
"platform": "cron",
"session_id": _cron_session_id,
"session_db": _session_db,
})
agent = AIAgent(**_agent_kwargs)
# Run the agent with an *inactivity*-based timeout: the job can run
# for hours if it's actively calling tools / receiving stream tokens,
@@ -735,6 +851,39 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Use a separate variable for log display; keep final_response clean
# for delivery logic (empty response = no delivery).
logged_response = final_response if final_response else "(No response generated)"
# Check for [SCRIPT_FAILED] marker — agent detected external command failure.
# Override success so the cron state reflects the failure, not a green run
# with failure prose hidden in the response.
_script_failed_reason = None
if SCRIPT_FAILED_MARKER in final_response.upper():
import re as _re
_m = _re.search(
r'\[SCRIPT_FAILED\]\s*:?\s*(.*)',
final_response,
_re.IGNORECASE,
)
_script_failed_reason = (_m.group(1).strip() if _m and _m.group(1).strip()
else "Agent reported script failure")
logger.warning(
"Job '%s': agent reported script failure — %s",
job_name, _script_failed_reason,
)
output = f"""# Cron Job: {job_name} (SCRIPT FAILED)
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Prompt
{prompt}
## Response
{logged_response}
"""
return False, output, final_response, _script_failed_reason
output = f"""# Cron Job: {job_name}

View File

@@ -863,3 +863,40 @@ class TestTickAdvanceBeforeRun:
adv_mock.assert_called_once_with("test-advance")
# advance must happen before run
assert call_order == [("advance", "test-advance"), ("run", "test-advance")]
class TestSafeAgentKwargs:
"""Tests for _safe_agent_kwargs() — kwarg filtering on AIAgent init."""
def test_passes_all_kwargs_when_all_accepted(self):
"""When AIAgent accepts all kwargs, _safe_agent_kwargs returns them unchanged."""
from cron.scheduler import _safe_agent_kwargs
kwargs = {"model": "test", "quiet_mode": True, "platform": "cron"}
result = _safe_agent_kwargs(kwargs)
assert result == kwargs
def test_drops_unsupported_kwargs(self):
"""Unsupported kwargs are silently dropped with a warning log."""
from unittest.mock import patch, MagicMock
from cron.scheduler import _safe_agent_kwargs
# Mock AIAgent with limited params
mock_sig = MagicMock()
mock_sig.parameters = {"self": None, "model": None, "quiet_mode": None}
with patch("inspect.signature", return_value=mock_sig), patch("run_agent.AIAgent"):
kwargs = {"model": "test", "quiet_mode": True, "nonexistent_param": 42}
result = _safe_agent_kwargs(kwargs)
assert "model" in result
assert "quiet_mode" in result
assert "nonexistent_param" not in result
def test_returns_empty_dict_if_all_dropped(self):
"""If every kwarg is unsupported, returns empty dict (not crash)."""
from unittest.mock import patch, MagicMock
from cron.scheduler import _safe_agent_kwargs
mock_sig = MagicMock()
mock_sig.parameters = {"self": None}
with patch("inspect.signature", return_value=mock_sig), patch("run_agent.AIAgent"):
result = _safe_agent_kwargs({"tool_choice": "required", "skip_memory": True})
assert result == {}