|
|
|
|
@@ -10,6 +10,7 @@ runs at a time if multiple processes overlap.
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import concurrent.futures
|
|
|
|
|
import inspect
|
|
|
|
|
import json
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
@@ -50,6 +51,71 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
|
|
|
|
|
|
|
|
|
|
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Deploy Sync Guard
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# The scheduler passes keyword arguments to AIAgent() that may not exist in
|
|
|
|
|
# older installed versions. When the installed run_agent.py diverges from the
|
|
|
|
|
# version the scheduler was written against, every cron job fails with a
|
|
|
|
|
# TypeError — a silent total outage. This guard catches that at the first
|
|
|
|
|
# tick rather than after 1000+ error log lines.
|
|
|
|
|
#
|
|
|
|
|
# The check runs once per gateway process (cached by _agent_interface_validated).
|
|
|
|
|
# It inspects AIAgent.__init__ and verifies every parameter the scheduler
|
|
|
|
|
# passes is accepted. On mismatch it raises RuntimeError with a fix command.
|
|
|
|
|
|
|
|
|
|
_agent_interface_validated = False
|
|
|
|
|
|
|
|
|
|
# Parameters the scheduler passes to AIAgent() in run_job().
|
|
|
|
|
# If you add a new kwarg to the AIAgent() call below, add it here too.
|
|
|
|
|
_SCHEDULER_AGENT_KWARGS = {
|
|
|
|
|
"tool_choice": "required",
|
|
|
|
|
"skip_memory": True,
|
|
|
|
|
"platform": "cron",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _validate_agent_interface():
|
|
|
|
|
"""Verify AIAgent.__init__ accepts every kwarg the scheduler uses.
|
|
|
|
|
|
|
|
|
|
Raises RuntimeError with a remediation message on mismatch.
|
|
|
|
|
Called once per process from run_job(); subsequent calls are no-ops.
|
|
|
|
|
"""
|
|
|
|
|
global _agent_interface_validated
|
|
|
|
|
if _agent_interface_validated:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
from run_agent import AIAgent
|
|
|
|
|
except ImportError as exc:
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
f"Deploy sync guard: cannot import AIAgent from run_agent: {exc}\n"
|
|
|
|
|
"The installed hermes-agent package may be corrupted. "
|
|
|
|
|
"Reinstall: pip install -e ~/.hermes/hermes-agent"
|
|
|
|
|
) from exc
|
|
|
|
|
|
|
|
|
|
sig = inspect.signature(AIAgent.__init__)
|
|
|
|
|
params = set(sig.parameters.keys()) - {"self"}
|
|
|
|
|
|
|
|
|
|
missing = [kw for kw in _SCHEDULER_AGENT_KWARGS if kw not in params]
|
|
|
|
|
if missing:
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
"Deploy sync guard: AIAgent.__init__() is missing parameters that "
|
|
|
|
|
"the cron scheduler requires. This means the installed code is out "
|
|
|
|
|
"of sync with the scheduler module.\n"
|
|
|
|
|
f" Missing parameters: {', '.join(missing)}\n"
|
|
|
|
|
f" Expected by: cron/scheduler.py (run_job → AIAgent())\n"
|
|
|
|
|
f" Fix: pip install -e ~/.hermes/hermes-agent --force-reinstall\n"
|
|
|
|
|
f" Then restart the gateway."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
_agent_interface_validated = True
|
|
|
|
|
logger.info(
|
|
|
|
|
"Deploy sync guard: AIAgent interface OK (%d params, %d scheduler deps)",
|
|
|
|
|
len(params), len(_SCHEDULER_AGENT_KWARGS),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Sentinel: when a cron agent has nothing new to report, it can start its
|
|
|
|
|
# response with this marker to suppress delivery. Output is still saved
|
|
|
|
|
# locally for audit.
|
|
|
|
|
@@ -470,7 +536,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|
|
|
|
Tuple of (success, full_output_doc, final_response, error_message)
|
|
|
|
|
"""
|
|
|
|
|
from run_agent import AIAgent
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Deploy sync guard: verify the installed AIAgent accepts all kwargs
|
|
|
|
|
# the scheduler passes. Catches stale-code-deploy bugs before they
|
|
|
|
|
# cascade into 1000+ TypeErrors and gateway restart loops.
|
|
|
|
|
_validate_agent_interface()
|
|
|
|
|
|
|
|
|
|
# Initialize SQLite session store so cron job messages are persisted
|
|
|
|
|
# and discoverable via session_search (same pattern as gateway/run.py).
|
|
|
|
|
_session_db = None
|
|
|
|
|
@@ -628,47 +699,8 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|
|
|
|
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
|
|
|
|
|
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
|
|
|
|
|
_POLL_INTERVAL = 5.0
|
|
|
|
|
|
|
|
|
|
# Guard against interpreter shutdown: ThreadPoolExecutor.submit()
|
|
|
|
|
# raises RuntimeError("cannot schedule new futures after interpreter
|
|
|
|
|
# shutdown") when Python is finalizing (e.g. gateway restart races).
|
|
|
|
|
# Fall back to synchronous execution so the job at least attempts.
|
|
|
|
|
_cron_pool = None
|
|
|
|
|
try:
|
|
|
|
|
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
|
|
|
|
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
|
|
|
|
|
except RuntimeError:
|
|
|
|
|
logger.warning(
|
|
|
|
|
"Job '%s': ThreadPoolExecutor unavailable (interpreter shutdown?) "
|
|
|
|
|
"— falling back to synchronous execution",
|
|
|
|
|
job_name,
|
|
|
|
|
)
|
|
|
|
|
if _cron_pool is not None:
|
|
|
|
|
try:
|
|
|
|
|
_cron_pool.shutdown(wait=False)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
_cron_pool = None
|
|
|
|
|
result = agent.run_conversation(prompt)
|
|
|
|
|
final_response = result.get("final_response", "") or ""
|
|
|
|
|
logged_response = final_response if final_response else "(No response generated)"
|
|
|
|
|
output = f"""# Cron Job: {job_name}
|
|
|
|
|
|
|
|
|
|
**Job ID:** {job_id}
|
|
|
|
|
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
|
|
|
**Schedule:** {job.get('schedule_display', 'N/A')}
|
|
|
|
|
|
|
|
|
|
## Prompt
|
|
|
|
|
|
|
|
|
|
{prompt}
|
|
|
|
|
|
|
|
|
|
## Response
|
|
|
|
|
|
|
|
|
|
{logged_response}
|
|
|
|
|
"""
|
|
|
|
|
logger.info("Job '%s' completed (sync fallback)", job_name)
|
|
|
|
|
return True, output, final_response, None
|
|
|
|
|
|
|
|
|
|
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
|
|
|
|
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
|
|
|
|
|
_inactivity_timeout = False
|
|
|
|
|
try:
|
|
|
|
|
if _cron_inactivity_limit is None:
|
|
|
|
|
@@ -695,12 +727,10 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|
|
|
|
_inactivity_timeout = True
|
|
|
|
|
break
|
|
|
|
|
except Exception:
|
|
|
|
|
if _cron_pool is not None:
|
|
|
|
|
_cron_pool.shutdown(wait=False, cancel_futures=True)
|
|
|
|
|
_cron_pool.shutdown(wait=False, cancel_futures=True)
|
|
|
|
|
raise
|
|
|
|
|
finally:
|
|
|
|
|
if _cron_pool is not None:
|
|
|
|
|
_cron_pool.shutdown(wait=False)
|
|
|
|
|
_cron_pool.shutdown(wait=False)
|
|
|
|
|
|
|
|
|
|
if _inactivity_timeout:
|
|
|
|
|
# Build diagnostic summary from the agent's activity tracker.
|
|
|
|
|
@@ -839,36 +869,16 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
|
|
|
|
if verbose:
|
|
|
|
|
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
|
|
|
|
|
|
|
|
|
|
# Parallel job execution with priority sorting
|
|
|
|
|
PRIORITY_ORDER = {"critical": 0, "high": 1, "normal": 2, "low": 3}
|
|
|
|
|
due_jobs_sorted = sorted(due_jobs, key=lambda j: PRIORITY_ORDER.get(j.get("priority", "normal"), 2))
|
|
|
|
|
MAX_PARALLEL = int(os.environ.get("HERMES_CRON_MAX_PARALLEL", "10"))
|
|
|
|
|
|
|
|
|
|
executed = 0
|
|
|
|
|
_job_results = []
|
|
|
|
|
|
|
|
|
|
def _process_single_job(job):
|
|
|
|
|
job_name = job.get("name", job["id"])
|
|
|
|
|
if sys.is_finalizing():
|
|
|
|
|
return None
|
|
|
|
|
for job in due_jobs:
|
|
|
|
|
try:
|
|
|
|
|
# For recurring jobs (cron/interval), advance next_run_at to the
|
|
|
|
|
# next future occurrence BEFORE execution. This way, if the
|
|
|
|
|
# process crashes mid-run, the job won't re-fire on restart.
|
|
|
|
|
# One-shot jobs are left alone so they can retry on restart.
|
|
|
|
|
advance_next_run(job["id"])
|
|
|
|
|
|
|
|
|
|
success, output, final_response, error = run_job(job)
|
|
|
|
|
return (job, success, output, final_response, error)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error("Job '%s': parallel error: %s", job_name, e)
|
|
|
|
|
return (job, False, "", None, str(e))
|
|
|
|
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
|
|
|
|
|
futures = {executor.submit(_process_single_job, job): job for job in due_jobs_sorted}
|
|
|
|
|
for future in concurrent.futures.as_completed(futures):
|
|
|
|
|
result = future.result()
|
|
|
|
|
if result is None:
|
|
|
|
|
continue
|
|
|
|
|
_job_results.append(result)
|
|
|
|
|
|
|
|
|
|
for job, success, output, final_response, error in _job_results:
|
|
|
|
|
executed += 1
|
|
|
|
|
|
|
|
|
|
output_file = save_job_output(job["id"], output)
|
|
|
|
|
if verbose:
|
|
|
|
|
|