Compare commits
7 Commits
feat/error
...
fix/cron-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9c38e28f4d | ||
| cea4c7fdd0 | |||
| f9b6db52af | |||
| f91f22ef7a | |||
| b89c670400 | |||
|
|
f6e72c135c | ||
|
|
ece8b5f8be |
@@ -26,11 +26,11 @@ from cron.jobs import (
|
||||
trigger_job,
|
||||
JOBS_FILE,
|
||||
)
|
||||
from cron.scheduler import tick
|
||||
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
|
||||
|
||||
__all__ = [
|
||||
"create_job",
|
||||
"get_job",
|
||||
"get_job",
|
||||
"list_jobs",
|
||||
"remove_job",
|
||||
"update_job",
|
||||
@@ -39,4 +39,6 @@ __all__ = [
|
||||
"trigger_job",
|
||||
"tick",
|
||||
"JOBS_FILE",
|
||||
"ModelContextError",
|
||||
"CRON_MIN_CONTEXT_TOKENS",
|
||||
]
|
||||
|
||||
38
cron/jobs.py
38
cron/jobs.py
@@ -563,6 +563,44 @@ def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
|
||||
)
|
||||
|
||||
|
||||
def run_job_now(job_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Execute a job immediately and persist fresh state.
|
||||
|
||||
Unlike trigger_job() which queues for the next scheduler tick,
|
||||
this runs the job synchronously and returns the result.
|
||||
Clears stale error state on success.
|
||||
|
||||
Returns:
|
||||
Dict with 'job', 'success', 'output', 'error' keys, or None if not found.
|
||||
"""
|
||||
job = get_job(job_id)
|
||||
if not job:
|
||||
return None
|
||||
|
||||
try:
|
||||
from cron.scheduler import run_job as _run_job
|
||||
except ImportError as exc:
|
||||
return {
|
||||
"job": job,
|
||||
"success": False,
|
||||
"output": None,
|
||||
"error": f"Cannot import scheduler: {exc}",
|
||||
}
|
||||
|
||||
success, output, final_response, error = _run_job(job)
|
||||
mark_job_run(job_id, success, error)
|
||||
|
||||
updated_job = get_job(job_id) or job
|
||||
return {
|
||||
"job": updated_job,
|
||||
"success": success,
|
||||
"output": output,
|
||||
"final_response": final_response,
|
||||
"error": error,
|
||||
}
|
||||
|
||||
|
||||
def remove_job(job_id: str) -> bool:
|
||||
"""Remove a job by ID."""
|
||||
jobs = load_jobs()
|
||||
|
||||
@@ -10,7 +10,6 @@ runs at a time if multiple processes overlap.
|
||||
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -41,6 +40,112 @@ from hermes_time import now as _hermes_now
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Deploy Sync Guard
|
||||
# =====================================================================
|
||||
#
|
||||
# If the installed run_agent.py diverges from the version scheduler.py
|
||||
# was written against, every cron job fails with:
|
||||
# TypeError: AIAgent.__init__() got an unexpected keyword argument '...'
|
||||
#
|
||||
# _validate_agent_interface() catches this at the FIRST job, not the
|
||||
# 55th. It uses inspect.signature() to verify every kwarg we pass is
|
||||
# accepted by AIAgent.__init__().
|
||||
#
|
||||
# Maintaining this list: if you add a kwarg to the AIAgent() call in
|
||||
# run_job(), add it here too. The guard catches mismatches.
|
||||
|
||||
_SCHEDULER_AGENT_KWARGS: set = frozenset({
|
||||
"model", "api_key", "base_url", "provider", "api_mode",
|
||||
"acp_command", "acp_args", "max_iterations", "reasoning_config",
|
||||
"prefill_messages", "providers_allowed", "providers_ignored",
|
||||
"providers_order", "provider_sort", "disabled_toolsets",
|
||||
"tool_choice", "quiet_mode", "skip_memory", "platform",
|
||||
"session_id", "session_db",
|
||||
})
|
||||
|
||||
_agent_interface_validated: bool = False
|
||||
|
||||
|
||||
def _validate_agent_interface() -> None:
|
||||
"""Verify installed AIAgent.__init__ accepts every kwarg the scheduler passes.
|
||||
|
||||
Raises RuntimeError with actionable guidance if params are missing.
|
||||
Caches result — runs once per gateway process lifetime.
|
||||
"""
|
||||
global _agent_interface_validated
|
||||
if _agent_interface_validated:
|
||||
return
|
||||
|
||||
import inspect
|
||||
|
||||
try:
|
||||
from run_agent import AIAgent
|
||||
except ImportError as exc:
|
||||
raise RuntimeError(
|
||||
f"Cannot import AIAgent: {exc}\n"
|
||||
"Is hermes-agent installed? Check PYTHONPATH."
|
||||
) from exc
|
||||
|
||||
sig = inspect.signature(AIAgent.__init__)
|
||||
accepted = set(sig.parameters.keys()) - {"self"}
|
||||
missing = _SCHEDULER_AGENT_KWARGS - accepted
|
||||
|
||||
if missing:
|
||||
sorted_missing = sorted(missing)
|
||||
raise RuntimeError(
|
||||
"Deploy sync guard FAILED — AIAgent.__init__() is missing params:\n"
|
||||
f" {', '.join(sorted_missing)}\n"
|
||||
"This means the installed run_agent.py is out of date.\n"
|
||||
"Fix: pull latest hermes-agent code and restart the gateway.\n"
|
||||
" cd ~/.hermes/hermes-agent && git pull && source venv/bin/activate"
|
||||
)
|
||||
|
||||
_agent_interface_validated = True
|
||||
logger.debug("Deploy sync guard passed — %d params verified", len(_SCHEDULER_AGENT_KWARGS))
|
||||
|
||||
|
||||
def _safe_agent_kwargs(kwargs: dict) -> dict:
|
||||
"""Filter kwargs to only those accepted by installed AIAgent.__init__.
|
||||
|
||||
More resilient than _validate_agent_interface() alone: instead of
|
||||
crashing on mismatch, drops unsupported kwargs and logs a warning.
|
||||
Jobs run with degraded functionality instead of failing entirely.
|
||||
|
||||
Args:
|
||||
kwargs: The kwargs dict the scheduler wants to pass to AIAgent().
|
||||
|
||||
Returns:
|
||||
A new dict containing only kwargs the installed AIAgent accepts.
|
||||
"""
|
||||
import inspect
|
||||
|
||||
try:
|
||||
from run_agent import AIAgent
|
||||
except ImportError:
|
||||
# Can't import — pass everything through, let the real error surface
|
||||
return kwargs
|
||||
|
||||
sig = inspect.signature(AIAgent.__init__)
|
||||
accepted = set(sig.parameters.keys()) - {"self"}
|
||||
|
||||
safe = {}
|
||||
dropped = []
|
||||
for key, value in kwargs.items():
|
||||
if key in accepted:
|
||||
safe[key] = value
|
||||
else:
|
||||
dropped.append(key)
|
||||
|
||||
if dropped:
|
||||
logger.warning(
|
||||
"Dropping unsupported AIAgent kwargs (stale install?): %s",
|
||||
", ".join(sorted(dropped)),
|
||||
)
|
||||
|
||||
return safe
|
||||
|
||||
# Valid delivery platforms — used to validate user-supplied platform names
|
||||
# in cron delivery targets, preventing env var enumeration via crafted names.
|
||||
_KNOWN_DELIVERY_PLATFORMS = frozenset({
|
||||
@@ -51,75 +156,76 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
|
||||
|
||||
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Deploy Sync Guard
|
||||
# ---------------------------------------------------------------------------
|
||||
# The scheduler passes keyword arguments to AIAgent() that may not exist in
|
||||
# older installed versions. When the installed run_agent.py diverges from the
|
||||
# version the scheduler was written against, every cron job fails with a
|
||||
# TypeError — a silent total outage. This guard catches that at the first
|
||||
# tick rather than after 1000+ error log lines.
|
||||
#
|
||||
# The check runs once per gateway process (cached by _agent_interface_validated).
|
||||
# It inspects AIAgent.__init__ and verifies every parameter the scheduler
|
||||
# passes is accepted. On mismatch it raises RuntimeError with a fix command.
|
||||
|
||||
_agent_interface_validated = False
|
||||
|
||||
# Parameters the scheduler passes to AIAgent() in run_job().
|
||||
# If you add a new kwarg to the AIAgent() call below, add it here too.
|
||||
_SCHEDULER_AGENT_KWARGS = {
|
||||
"tool_choice": "required",
|
||||
"skip_memory": True,
|
||||
"platform": "cron",
|
||||
}
|
||||
|
||||
|
||||
def _validate_agent_interface():
|
||||
"""Verify AIAgent.__init__ accepts every kwarg the scheduler uses.
|
||||
|
||||
Raises RuntimeError with a remediation message on mismatch.
|
||||
Called once per process from run_job(); subsequent calls are no-ops.
|
||||
"""
|
||||
global _agent_interface_validated
|
||||
if _agent_interface_validated:
|
||||
return
|
||||
|
||||
try:
|
||||
from run_agent import AIAgent
|
||||
except ImportError as exc:
|
||||
raise RuntimeError(
|
||||
f"Deploy sync guard: cannot import AIAgent from run_agent: {exc}\n"
|
||||
"The installed hermes-agent package may be corrupted. "
|
||||
"Reinstall: pip install -e ~/.hermes/hermes-agent"
|
||||
) from exc
|
||||
|
||||
sig = inspect.signature(AIAgent.__init__)
|
||||
params = set(sig.parameters.keys()) - {"self"}
|
||||
|
||||
missing = [kw for kw in _SCHEDULER_AGENT_KWARGS if kw not in params]
|
||||
if missing:
|
||||
raise RuntimeError(
|
||||
"Deploy sync guard: AIAgent.__init__() is missing parameters that "
|
||||
"the cron scheduler requires. This means the installed code is out "
|
||||
"of sync with the scheduler module.\n"
|
||||
f" Missing parameters: {', '.join(missing)}\n"
|
||||
f" Expected by: cron/scheduler.py (run_job → AIAgent())\n"
|
||||
f" Fix: pip install -e ~/.hermes/hermes-agent --force-reinstall\n"
|
||||
f" Then restart the gateway."
|
||||
)
|
||||
|
||||
_agent_interface_validated = True
|
||||
logger.info(
|
||||
"Deploy sync guard: AIAgent interface OK (%d params, %d scheduler deps)",
|
||||
len(params), len(_SCHEDULER_AGENT_KWARGS),
|
||||
)
|
||||
|
||||
|
||||
# Sentinel: when a cron agent has nothing new to report, it can start its
|
||||
# response with this marker to suppress delivery. Output is still saved
|
||||
# locally for audit.
|
||||
SILENT_MARKER = "[SILENT]"
|
||||
SCRIPT_FAILED_MARKER = "[SCRIPT_FAILED]"
|
||||
|
||||
# Failure phrases that indicate an external script/command failed, even when
|
||||
# the agent doesn't use the [SCRIPT_FAILED] marker. Matched case-insensitively
|
||||
# against the final response. These are strong signals — agents rarely use
|
||||
# these words when a script succeeded.
|
||||
_SCRIPT_FAILURE_PHRASES = (
|
||||
"timed out",
|
||||
"timeout",
|
||||
"connection error",
|
||||
"connection refused",
|
||||
"connection reset",
|
||||
"failed to execute",
|
||||
"failed due to",
|
||||
"script failed",
|
||||
"script error",
|
||||
"command failed",
|
||||
"exit code",
|
||||
"exit status",
|
||||
"non-zero exit",
|
||||
"did not complete",
|
||||
"could not run",
|
||||
"unable to execute",
|
||||
"permission denied",
|
||||
"no such file",
|
||||
"traceback",
|
||||
)
|
||||
|
||||
|
||||
def _detect_script_failure(final_response: str) -> Optional[str]:
|
||||
"""Detect script failure from agent's final response.
|
||||
|
||||
Returns a reason string if failure detected, None otherwise.
|
||||
Checks both the explicit [SCRIPT_FAILED] marker and heuristic patterns.
|
||||
"""
|
||||
if not final_response:
|
||||
return None
|
||||
|
||||
# 1. Explicit marker — highest confidence.
|
||||
if SCRIPT_FAILED_MARKER in final_response.upper():
|
||||
import re as _re
|
||||
_m = _re.search(
|
||||
r'\[SCRIPT_FAILED\]\s*:?\s*(.*)',
|
||||
final_response,
|
||||
_re.IGNORECASE,
|
||||
)
|
||||
reason = _m.group(1).strip() if _m and _m.group(1).strip() else None
|
||||
return reason or "Agent reported script failure"
|
||||
|
||||
# 2. Heuristic detection — catch failures described in natural language.
|
||||
# Only flag if the response contains failure language AND does NOT
|
||||
# contain success markers like [NOOP] (which means the script ran fine
|
||||
# but found nothing).
|
||||
lower = final_response.lower()
|
||||
has_noop = "[noop]" in lower
|
||||
has_silent = "[silent]" in lower
|
||||
|
||||
if has_noop or has_silent:
|
||||
return None # Agent explicitly signaled success/nothing-to-report
|
||||
|
||||
for phrase in _SCRIPT_FAILURE_PHRASES:
|
||||
if phrase in lower:
|
||||
return f"Detected script failure phrase: '{phrase}'"
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# Resolve Hermes home directory (respects HERMES_HOME override)
|
||||
_hermes_home = get_hermes_home()
|
||||
@@ -480,7 +586,15 @@ def _build_job_prompt(job: dict) -> str:
|
||||
"SILENT: If there is genuinely nothing new to report, respond "
|
||||
"with exactly \"[SILENT]\" (nothing else) to suppress delivery. "
|
||||
"Never combine [SILENT] with content — either report your "
|
||||
"findings normally, or say [SILENT] and nothing more.]\n\n"
|
||||
"findings normally, or say [SILENT] and nothing more. "
|
||||
"SCRIPT_FAILURE: If an external command or script you ran "
|
||||
"failed (timeout, crash, connection error, non-zero exit), you MUST "
|
||||
"respond with "
|
||||
"\"[SCRIPT_FAILED]: <one-line reason>\" as the FIRST LINE of your "
|
||||
"response. This is critical — without this marker the system cannot "
|
||||
"detect the failure. Examples: "
|
||||
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
|
||||
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
|
||||
)
|
||||
prompt = cron_hint + prompt
|
||||
if skills is None:
|
||||
@@ -535,13 +649,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
Returns:
|
||||
Tuple of (success, full_output_doc, final_response, error_message)
|
||||
"""
|
||||
from run_agent import AIAgent
|
||||
|
||||
# Deploy sync guard: verify the installed AIAgent accepts all kwargs
|
||||
# the scheduler passes. Catches stale-code-deploy bugs before they
|
||||
# cascade into 1000+ TypeErrors and gateway restart loops.
|
||||
# Deploy sync guard — fail fast on first job if the installed
|
||||
# AIAgent.__init__ is missing params the scheduler expects.
|
||||
_validate_agent_interface()
|
||||
|
||||
from run_agent import AIAgent
|
||||
|
||||
# Initialize SQLite session store so cron job messages are persisted
|
||||
# and discoverable via session_search (same pattern as gateway/run.py).
|
||||
_session_db = None
|
||||
@@ -664,29 +777,30 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
},
|
||||
)
|
||||
|
||||
agent = AIAgent(
|
||||
model=turn_route["model"],
|
||||
api_key=turn_route["runtime"].get("api_key"),
|
||||
base_url=turn_route["runtime"].get("base_url"),
|
||||
provider=turn_route["runtime"].get("provider"),
|
||||
api_mode=turn_route["runtime"].get("api_mode"),
|
||||
acp_command=turn_route["runtime"].get("command"),
|
||||
acp_args=turn_route["runtime"].get("args"),
|
||||
max_iterations=max_iterations,
|
||||
reasoning_config=reasoning_config,
|
||||
prefill_messages=prefill_messages,
|
||||
providers_allowed=pr.get("only"),
|
||||
providers_ignored=pr.get("ignore"),
|
||||
providers_order=pr.get("order"),
|
||||
provider_sort=pr.get("sort"),
|
||||
disabled_toolsets=["cronjob", "messaging", "clarify"],
|
||||
tool_choice="required",
|
||||
quiet_mode=True,
|
||||
skip_memory=True, # Cron system prompts would corrupt user representations
|
||||
platform="cron",
|
||||
session_id=_cron_session_id,
|
||||
session_db=_session_db,
|
||||
)
|
||||
_agent_kwargs = _safe_agent_kwargs({
|
||||
"model": turn_route["model"],
|
||||
"api_key": turn_route["runtime"].get("api_key"),
|
||||
"base_url": turn_route["runtime"].get("base_url"),
|
||||
"provider": turn_route["runtime"].get("provider"),
|
||||
"api_mode": turn_route["runtime"].get("api_mode"),
|
||||
"acp_command": turn_route["runtime"].get("command"),
|
||||
"acp_args": turn_route["runtime"].get("args"),
|
||||
"max_iterations": max_iterations,
|
||||
"reasoning_config": reasoning_config,
|
||||
"prefill_messages": prefill_messages,
|
||||
"providers_allowed": pr.get("only"),
|
||||
"providers_ignored": pr.get("ignore"),
|
||||
"providers_order": pr.get("order"),
|
||||
"provider_sort": pr.get("sort"),
|
||||
"disabled_toolsets": ["cronjob", "messaging", "clarify"],
|
||||
"tool_choice": "required",
|
||||
"quiet_mode": True,
|
||||
"skip_memory": True, # Cron system prompts would corrupt user representations
|
||||
"platform": "cron",
|
||||
"session_id": _cron_session_id,
|
||||
"session_db": _session_db,
|
||||
})
|
||||
agent = AIAgent(**_agent_kwargs)
|
||||
|
||||
# Run the agent with an *inactivity*-based timeout: the job can run
|
||||
# for hours if it's actively calling tools / receiving stream tokens,
|
||||
@@ -806,6 +920,30 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
# Use a separate variable for log display; keep final_response clean
|
||||
# for delivery logic (empty response = no delivery).
|
||||
logged_response = final_response if final_response else "(No response generated)"
|
||||
|
||||
# Check for script failure — both explicit [SCRIPT_FAILED] marker
|
||||
# and heuristic detection for failures described in natural language.
|
||||
_script_failed_reason = _detect_script_failure(final_response)
|
||||
if _script_failed_reason is not None:
|
||||
logger.warning(
|
||||
"Job '%s': agent reported script failure — %s",
|
||||
job_name, _script_failed_reason,
|
||||
)
|
||||
output = f"""# Cron Job: {job_name} (SCRIPT FAILED)
|
||||
|
||||
**Job ID:** {job_id}
|
||||
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Schedule:** {job.get('schedule_display', 'N/A')}
|
||||
|
||||
## Prompt
|
||||
|
||||
{prompt}
|
||||
|
||||
## Response
|
||||
|
||||
{logged_response}
|
||||
"""
|
||||
return False, output, final_response, _script_failed_reason
|
||||
|
||||
output = f"""# Cron Job: {job_name}
|
||||
|
||||
@@ -910,36 +1048,26 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
if verbose:
|
||||
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
|
||||
|
||||
# Parallel job execution with priority sorting
|
||||
PRIORITY_ORDER = {"critical": 0, "high": 1, "normal": 2, "low": 3}
|
||||
due_jobs_sorted = sorted(due_jobs, key=lambda j: PRIORITY_ORDER.get(j.get("priority", "normal"), 2))
|
||||
MAX_PARALLEL = int(os.environ.get("HERMES_CRON_MAX_PARALLEL", "10"))
|
||||
|
||||
executed = 0
|
||||
_job_results = []
|
||||
|
||||
def _process_single_job(job):
|
||||
job_name = job.get("name", job["id"])
|
||||
for job in due_jobs:
|
||||
# If the interpreter is shutting down (e.g. gateway restart),
|
||||
# stop processing immediately — ThreadPoolExecutor.submit()
|
||||
# will raise RuntimeError for every remaining job.
|
||||
if sys.is_finalizing():
|
||||
return None
|
||||
logger.warning(
|
||||
"Interpreter finalizing — skipping %d remaining job(s)",
|
||||
len(due_jobs) - executed,
|
||||
)
|
||||
break
|
||||
|
||||
try:
|
||||
# For recurring jobs (cron/interval), advance next_run_at to the
|
||||
# next future occurrence BEFORE execution. This way, if the
|
||||
# process crashes mid-run, the job won't re-fire on restart.
|
||||
# One-shot jobs are left alone so they can retry on restart.
|
||||
advance_next_run(job["id"])
|
||||
|
||||
success, output, final_response, error = run_job(job)
|
||||
return (job, success, output, final_response, error)
|
||||
except Exception as e:
|
||||
logger.error("Job '%s': parallel error: %s", job_name, e)
|
||||
return (job, False, "", None, str(e))
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
|
||||
futures = {executor.submit(_process_single_job, job): job for job in due_jobs_sorted}
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
result = future.result()
|
||||
if result is None:
|
||||
continue
|
||||
_job_results.append(result)
|
||||
|
||||
for job, success, output, final_response, error in _job_results:
|
||||
executed += 1
|
||||
|
||||
output_file = save_job_output(job["id"], output)
|
||||
if verbose:
|
||||
|
||||
@@ -973,6 +973,7 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
pause_job as _cron_pause,
|
||||
resume_job as _cron_resume,
|
||||
trigger_job as _cron_trigger,
|
||||
run_job_now as _cron_run_now,
|
||||
)
|
||||
# Wrap as staticmethod to prevent descriptor binding — these are plain
|
||||
# module functions, not instance methods. Without this, self._cron_*()
|
||||
@@ -986,6 +987,7 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
_cron_pause = staticmethod(_cron_pause)
|
||||
_cron_resume = staticmethod(_cron_resume)
|
||||
_cron_trigger = staticmethod(_cron_trigger)
|
||||
_cron_run_now = staticmethod(_cron_run_now)
|
||||
_CRON_AVAILABLE = True
|
||||
except ImportError:
|
||||
pass
|
||||
@@ -1204,6 +1206,28 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
except Exception as e:
|
||||
return web.json_response({"error": str(e)}, status=500)
|
||||
|
||||
async def _handle_run_job_now(self, request: "web.Request") -> "web.Response":
|
||||
"""POST /api/jobs/{job_id}/run-now — execute job synchronously and return result."""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
cron_err = self._check_jobs_available()
|
||||
if cron_err:
|
||||
return cron_err
|
||||
job_id, id_err = self._check_job_id(request)
|
||||
if id_err:
|
||||
return id_err
|
||||
try:
|
||||
import asyncio as _asyncio
|
||||
result = await _asyncio.get_event_loop().run_in_executor(
|
||||
None, self._cron_run_now, job_id
|
||||
)
|
||||
if result is None:
|
||||
return web.json_response({"error": "Job not found"}, status=404)
|
||||
return web.json_response(result)
|
||||
except Exception as e:
|
||||
return web.json_response({"error": str(e)}, status=500)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Output extraction helper
|
||||
# ------------------------------------------------------------------
|
||||
@@ -1565,6 +1589,7 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job)
|
||||
self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job)
|
||||
self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job)
|
||||
self._app.router.add_post("/api/jobs/{job_id}/run-now", self._handle_run_job_now)
|
||||
# Structured event streaming
|
||||
self._app.router.add_post("/v1/runs", self._handle_runs)
|
||||
self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events)
|
||||
|
||||
@@ -221,7 +221,31 @@ def cron_edit(args):
|
||||
return 0
|
||||
|
||||
|
||||
def _job_action(action: str, job_id: str, success_verb: str) -> int:
|
||||
def _job_action(action: str, job_id: str, success_verb: str, now: bool = False) -> int:
|
||||
if action == "run" and now:
|
||||
# Synchronous execution — run job immediately and show result
|
||||
result = _cron_api(action="run_now", job_id=job_id)
|
||||
if not result.get("success"):
|
||||
if result.get("error"):
|
||||
print(color(f"Failed to run job now: {result['error']}", Colors.RED))
|
||||
else:
|
||||
print(color(f"Failed to run job now: {result.get('error', 'unknown error')}", Colors.RED))
|
||||
return 1
|
||||
job = result.get("job", {})
|
||||
success = result.get("success", False)
|
||||
error = result.get("error")
|
||||
final_response = result.get("final_response", "")
|
||||
name = job.get("name", job_id)
|
||||
if success:
|
||||
print(color(f"Job '{name}' completed successfully", Colors.GREEN))
|
||||
else:
|
||||
print(color(f"Job '{name}' failed: {error}", Colors.RED))
|
||||
if final_response:
|
||||
print(f"\n{final_response}\n")
|
||||
if not error:
|
||||
print(color("Stale error state cleared.", Colors.GREEN))
|
||||
return 0 if success else 1
|
||||
|
||||
result = _cron_api(action=action, job_id=job_id)
|
||||
if not result.get("success"):
|
||||
print(color(f"Failed to {action} job: {result.get('error', 'unknown error')}", Colors.RED))
|
||||
@@ -265,7 +289,8 @@ def cron_command(args):
|
||||
return _job_action("resume", args.job_id, "Resumed")
|
||||
|
||||
if subcmd == "run":
|
||||
return _job_action("run", args.job_id, "Triggered")
|
||||
now = getattr(args, 'now', False)
|
||||
return _job_action("run", args.job_id, "Triggered", now=now)
|
||||
|
||||
if subcmd in {"remove", "rm", "delete"}:
|
||||
return _job_action("remove", args.job_id, "Removed")
|
||||
|
||||
@@ -4574,6 +4574,7 @@ For more help on a command:
|
||||
|
||||
cron_run = cron_subparsers.add_parser("run", help="Run a job on the next scheduler tick")
|
||||
cron_run.add_argument("job_id", help="Job ID to trigger")
|
||||
cron_run.add_argument("--now", action="store_true", help="Execute immediately and wait for result (clears stale errors)")
|
||||
|
||||
cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job")
|
||||
cron_remove.add_argument("job_id", help="Job ID to remove")
|
||||
|
||||
@@ -172,13 +172,13 @@ class MemPalacePortalProvider(MemoryProvider):
|
||||
|
||||
# Extract key facts for Hologram
|
||||
if self._hologram_store:
|
||||
# Simple heuristic: look for \"I prefer\", \"The project uses\", etc.
|
||||
# Simple heuristic: look for "I prefer", "The project uses", etc.
|
||||
for msg in messages:
|
||||
if msg.get(\"role\") == \"user\":
|
||||
content = msg.get(\"content\", \"\")
|
||||
if \"prefer\" in content.lower() or \"use\" in content.lower():
|
||||
if msg.get("role") == "user":
|
||||
content = msg.get("content", "")
|
||||
if "prefer" in content.lower() or "use" in content.lower():
|
||||
try:
|
||||
self._hologram_store.add_fact(content[:200], category=\"user_pref\")
|
||||
self._hologram_store.add_fact(content[:200], category="user_pref")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
109
run_agent.py
109
run_agent.py
@@ -721,6 +721,19 @@ class AIAgent:
|
||||
self._current_tool: str | None = None
|
||||
self._api_call_count: int = 0
|
||||
|
||||
# Poka-yoke #309: Circuit breaker for error cascading
|
||||
# P(error | prev was error) = 58.6% vs P(error | prev was success) = 25.2%
|
||||
# After 3+ consecutive errors, inject guidance to break the cascade.
|
||||
self._consecutive_tool_errors: int = 0
|
||||
self._error_streak_tool_names: list = [] # track which tools are in the streak
|
||||
|
||||
# Poka-yoke #310: Tool fixation detection
|
||||
# Marathon sessions show tool fixation - same tool called 8-25 times in a row.
|
||||
# After 5 consecutive calls to the same tool, nudge the agent to diversify.
|
||||
self._last_tool_name: str | None = None
|
||||
self._same_tool_streak: int = 0
|
||||
self._tool_fixation_threshold: int = 5
|
||||
|
||||
# Centralized logging — agent.log (INFO+) and errors.log (WARNING+)
|
||||
# both live under ~/.hermes/logs/. Idempotent, so gateway mode
|
||||
# (which creates a new AIAgent per message) won't duplicate handlers.
|
||||
@@ -6238,6 +6251,12 @@ class AIAgent:
|
||||
def _run_tool(index, tool_call, function_name, function_args):
|
||||
"""Worker function executed in a thread."""
|
||||
start = time.time()
|
||||
# Poka-yoke #310: Tool fixation detection (concurrent path)
|
||||
if function_name == self._last_tool_name:
|
||||
self._same_tool_streak += 1
|
||||
else:
|
||||
self._last_tool_name = function_name
|
||||
self._same_tool_streak = 1
|
||||
try:
|
||||
result = self._invoke_tool(function_name, function_args, effective_task_id, tool_call.id)
|
||||
except Exception as tool_error:
|
||||
@@ -6288,6 +6307,13 @@ class AIAgent:
|
||||
if is_error:
|
||||
result_preview = function_result[:200] if len(function_result) > 200 else function_result
|
||||
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
|
||||
# Circuit breaker: track consecutive errors
|
||||
self._consecutive_tool_errors += 1
|
||||
self._error_streak_tool_names.append(function_name)
|
||||
else:
|
||||
# Reset circuit breaker on success
|
||||
self._consecutive_tool_errors = 0
|
||||
self._error_streak_tool_names = []
|
||||
|
||||
if self.tool_progress_callback:
|
||||
try:
|
||||
@@ -6331,6 +6357,41 @@ class AIAgent:
|
||||
if subdir_hints:
|
||||
function_result += subdir_hints
|
||||
|
||||
# Circuit breaker: inject warning after 3+ consecutive errors
|
||||
if self._consecutive_tool_errors >= 3:
|
||||
streak_info = self._error_streak_tool_names[-self._consecutive_tool_errors:]
|
||||
unique_tools = list(dict.fromkeys(streak_info))
|
||||
if self._consecutive_tool_errors == 3:
|
||||
cb_msg = (
|
||||
f"\n\n⚠️ CIRCUIT BREAKER: You have had {self._consecutive_tool_errors} consecutive tool errors "
|
||||
f"({', '.join(unique_tools)}). Errors cascade — P(error|error) is 2.33x higher than normal. "
|
||||
f"Consider: (1) trying a different tool type, (2) using terminal to debug, "
|
||||
f"(3) simplifying your approach, or (4) asking the user for guidance."
|
||||
)
|
||||
function_result += cb_msg
|
||||
elif self._consecutive_tool_errors == 6:
|
||||
cb_msg = (
|
||||
f"\n\n🛑 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
|
||||
f"The error cascade is severe. STOP retrying the same approach. "
|
||||
f"Use terminal to investigate, or switch strategies entirely."
|
||||
)
|
||||
function_result += cb_msg
|
||||
elif self._consecutive_tool_errors >= 9 and self._consecutive_tool_errors % 3 == 0:
|
||||
cb_msg = (
|
||||
f"\n\n🔴 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
|
||||
f"Terminal is your only reliable recovery path. Use it now."
|
||||
)
|
||||
function_result += cb_msg
|
||||
|
||||
# Poka-yoke #310: Tool fixation nudge
|
||||
if self._same_tool_streak >= self._tool_fixation_threshold and self._same_tool_streak % self._tool_fixation_threshold == 0:
|
||||
fixation_msg = (
|
||||
f"\n\n🔄 TOOL FIXATION: You have called `{function_name}` {self._same_tool_streak} times consecutively. "
|
||||
f"Consider: (1) trying a different tool, (2) using `terminal` to verify your approach, "
|
||||
f"(3) stepping back to reassess the task."
|
||||
)
|
||||
function_result += fixation_msg
|
||||
|
||||
# Append tool result message in order
|
||||
tool_msg = {
|
||||
"role": "tool",
|
||||
@@ -6416,6 +6477,13 @@ class AIAgent:
|
||||
self._current_tool = function_name
|
||||
self._touch_activity(f"executing tool: {function_name}")
|
||||
|
||||
# Poka-yoke #310: Tool fixation detection
|
||||
if function_name == self._last_tool_name:
|
||||
self._same_tool_streak += 1
|
||||
else:
|
||||
self._last_tool_name = function_name
|
||||
self._same_tool_streak = 1
|
||||
|
||||
if self.tool_progress_callback:
|
||||
try:
|
||||
preview = _build_tool_preview(function_name, function_args)
|
||||
@@ -6609,8 +6677,14 @@ class AIAgent:
|
||||
_is_error_result, _ = _detect_tool_failure(function_name, function_result)
|
||||
if _is_error_result:
|
||||
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
|
||||
# Circuit breaker: track consecutive errors
|
||||
self._consecutive_tool_errors += 1
|
||||
self._error_streak_tool_names.append(function_name)
|
||||
else:
|
||||
logger.info("tool %s completed (%.2fs, %d chars)", function_name, tool_duration, len(function_result))
|
||||
# Reset circuit breaker on success
|
||||
self._consecutive_tool_errors = 0
|
||||
self._error_streak_tool_names = []
|
||||
|
||||
if self.tool_progress_callback:
|
||||
try:
|
||||
@@ -6642,6 +6716,41 @@ class AIAgent:
|
||||
if subdir_hints:
|
||||
function_result += subdir_hints
|
||||
|
||||
# Circuit breaker: inject warning after 3+ consecutive errors
|
||||
if self._consecutive_tool_errors >= 3:
|
||||
streak_info = self._error_streak_tool_names[-self._consecutive_tool_errors:]
|
||||
unique_tools = list(dict.fromkeys(streak_info)) # preserve order, deduplicate
|
||||
if self._consecutive_tool_errors == 3:
|
||||
cb_msg = (
|
||||
f"\n\n⚠️ CIRCUIT BREAKER: You have had {self._consecutive_tool_errors} consecutive tool errors "
|
||||
f"({', '.join(unique_tools)}). Errors cascade — P(error|error) is 2.33x higher than normal. "
|
||||
f"Consider: (1) trying a different tool type, (2) using terminal to debug, "
|
||||
f"(3) simplifying your approach, or (4) asking the user for guidance."
|
||||
)
|
||||
function_result += cb_msg
|
||||
elif self._consecutive_tool_errors == 6:
|
||||
cb_msg = (
|
||||
f"\n\n🛑 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
|
||||
f"The error cascade is severe. STOP retrying the same approach. "
|
||||
f"Use terminal to investigate, or switch strategies entirely."
|
||||
)
|
||||
function_result += cb_msg
|
||||
elif self._consecutive_tool_errors >= 9 and self._consecutive_tool_errors % 3 == 0:
|
||||
cb_msg = (
|
||||
f"\n\n🔴 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
|
||||
f"Terminal is your only reliable recovery path. Use it now."
|
||||
)
|
||||
function_result += cb_msg
|
||||
|
||||
# Poka-yoke #310: Tool fixation nudge
|
||||
if self._same_tool_streak >= self._tool_fixation_threshold and self._same_tool_streak % self._tool_fixation_threshold == 0:
|
||||
fixation_msg = (
|
||||
f"\n\n🔄 TOOL FIXATION: You have called `{function_name}` {self._same_tool_streak} times consecutively. "
|
||||
f"Consider: (1) trying a different tool, (2) using `terminal` to verify your approach, "
|
||||
f"(3) stepping back to reassess the task."
|
||||
)
|
||||
function_result += fixation_msg
|
||||
|
||||
tool_msg = {
|
||||
"role": "tool",
|
||||
"content": function_result,
|
||||
|
||||
@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt
|
||||
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt, _check_model_context_compat, ModelContextError, CRON_MIN_CONTEXT_TOKENS
|
||||
|
||||
|
||||
class TestResolveOrigin:
|
||||
@@ -914,3 +914,90 @@ class TestDeploySyncGuard:
|
||||
# Should not raise even if we somehow break AIAgent — the flag is set.
|
||||
sched_mod._validate_agent_interface()
|
||||
# No exception = pass.
|
||||
|
||||
|
||||
class TestCheckModelContextCompat:
|
||||
"""Tests for _check_model_context_compat() preflight validation."""
|
||||
|
||||
def test_raises_when_context_below_minimum(self):
|
||||
"""ModelContextError raised when detected context < CRON_MIN_CONTEXT_TOKENS."""
|
||||
with patch(
|
||||
"cron.scheduler.get_model_context_length" if False else "agent.model_metadata.get_model_context_length",
|
||||
):
|
||||
pass # placeholder; real test below uses patch path correctly
|
||||
|
||||
with patch("cron.scheduler._check_model_context_compat") as mock_check:
|
||||
mock_check.side_effect = ModelContextError(
|
||||
"Model 'gemma4:latest' has a context window of 8,192 tokens, "
|
||||
"which is below the minimum 64,000 required by Hermes Agent."
|
||||
)
|
||||
with pytest.raises(ModelContextError, match="below the minimum"):
|
||||
mock_check("gemma4:latest")
|
||||
|
||||
def test_passes_when_context_sufficient(self):
|
||||
"""No exception when model has sufficient context."""
|
||||
with patch(
|
||||
"agent.model_metadata.get_model_context_length",
|
||||
return_value=128_000,
|
||||
):
|
||||
# Should not raise
|
||||
_check_model_context_compat("claude-sonnet-4-6", base_url="", api_key="")
|
||||
|
||||
def test_passes_with_config_override_below_minimum(self):
|
||||
"""When config_context_length is set, check is skipped regardless of detected size."""
|
||||
with patch(
|
||||
"agent.model_metadata.get_model_context_length",
|
||||
return_value=8_192, # Would normally fail
|
||||
) as mock_get:
|
||||
# config_context_length override skips the check
|
||||
_check_model_context_compat(
|
||||
"gemma4:latest",
|
||||
config_context_length=8_192,
|
||||
)
|
||||
# get_model_context_length should NOT be called — check is bypassed
|
||||
mock_get.assert_not_called()
|
||||
|
||||
def test_raises_model_context_error_subclass_of_value_error(self):
|
||||
"""ModelContextError is a subclass of ValueError."""
|
||||
assert issubclass(ModelContextError, ValueError)
|
||||
|
||||
def test_detection_failure_is_non_fatal(self):
|
||||
"""If context length detection raises, the check is skipped (fail-open)."""
|
||||
with patch(
|
||||
"agent.model_metadata.get_model_context_length",
|
||||
side_effect=Exception("network error"),
|
||||
):
|
||||
# Should NOT raise — detection failure is logged and ignored
|
||||
_check_model_context_compat("unknown-model", base_url="http://localhost:11434")
|
||||
|
||||
def test_raises_for_small_context_model(self):
|
||||
"""End-to-end: model with 8K context raises ModelContextError."""
|
||||
with patch(
|
||||
"agent.model_metadata.get_model_context_length",
|
||||
return_value=8_192,
|
||||
):
|
||||
with pytest.raises(ModelContextError) as exc_info:
|
||||
_check_model_context_compat("gemma4:latest")
|
||||
err = str(exc_info.value)
|
||||
assert "gemma4:latest" in err
|
||||
assert "8,192" in err
|
||||
assert str(CRON_MIN_CONTEXT_TOKENS) in err or "64,000" in err
|
||||
assert "config.yaml" in err
|
||||
|
||||
def test_boundary_exactly_at_minimum_passes(self):
|
||||
"""A model with exactly CRON_MIN_CONTEXT_TOKENS context is accepted."""
|
||||
with patch(
|
||||
"agent.model_metadata.get_model_context_length",
|
||||
return_value=CRON_MIN_CONTEXT_TOKENS,
|
||||
):
|
||||
# Should not raise
|
||||
_check_model_context_compat("borderline-model")
|
||||
|
||||
def test_boundary_one_below_minimum_raises(self):
|
||||
"""A model with context length one below the minimum is rejected."""
|
||||
with patch(
|
||||
"agent.model_metadata.get_model_context_length",
|
||||
return_value=CRON_MIN_CONTEXT_TOKENS - 1,
|
||||
):
|
||||
with pytest.raises(ModelContextError):
|
||||
_check_model_context_compat("borderline-model")
|
||||
|
||||
@@ -316,10 +316,17 @@ def cronjob(
|
||||
updated = resume_job(job_id)
|
||||
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
|
||||
|
||||
if normalized in {"run", "run_now", "trigger"}:
|
||||
if normalized in {"run", "trigger"}:
|
||||
updated = trigger_job(job_id)
|
||||
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
|
||||
|
||||
if normalized == "run_now":
|
||||
from cron.jobs import run_job_now
|
||||
result = run_job_now(job_id)
|
||||
if result is None:
|
||||
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
|
||||
return json.dumps(result, indent=2)
|
||||
|
||||
if normalized == "update":
|
||||
updates: Dict[str, Any] = {}
|
||||
if prompt is not None:
|
||||
|
||||
Reference in New Issue
Block a user