Add an optional 'script' parameter to cron jobs that references a Python script. The script runs before each agent turn, and its stdout is injected into the prompt as context. This enables stateful monitoring — the script handles data collection and change detection, the LLM analyzes and reports. - cron/jobs.py: add script field to create_job(), stored in job dict - cron/scheduler.py: add _run_job_script() executor with timeout handling, inject script output/errors into _build_job_prompt() - tools/cronjob_tools.py: add script to tool schema, create/update handlers, _format_job display - hermes_cli/cron.py: add --script to create/edit, display in list/edit output - hermes_cli/main.py: add --script argparse for cron create/edit subcommands - tests/cron/test_cron_script.py: 20 tests covering job CRUD, script execution, path resolution, error handling, prompt injection, tool API Script paths can be absolute or relative (resolved against ~/.hermes/scripts/). Scripts run with a 120s timeout. Failures are injected as error context so the LLM can report the problem. Empty string clears an attached script.
709 lines
25 KiB
Python
709 lines
25 KiB
Python
"""
|
|
Cron job scheduler - executes due jobs.
|
|
|
|
Provides tick() which checks for due jobs and runs them. The gateway
|
|
calls this every 60 seconds from a background thread.
|
|
|
|
Uses a file-based lock (~/.hermes/cron/.tick.lock) so only one tick
|
|
runs at a time if multiple processes overlap.
|
|
"""
|
|
|
|
import asyncio
|
|
import concurrent.futures
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
|
|
# fcntl is Unix-only; on Windows use msvcrt for file locking
|
|
try:
|
|
import fcntl
|
|
except ImportError:
|
|
fcntl = None
|
|
try:
|
|
import msvcrt
|
|
except ImportError:
|
|
msvcrt = None
|
|
from pathlib import Path
|
|
from hermes_constants import get_hermes_home
|
|
from hermes_cli.config import load_config
|
|
from typing import Optional
|
|
|
|
from hermes_time import now as _hermes_now
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
|
|
|
|
# Sentinel: when a cron agent has nothing new to report, it can start its
|
|
# response with this marker to suppress delivery. Output is still saved
|
|
# locally for audit.
|
|
SILENT_MARKER = "[SILENT]"
|
|
|
|
# Resolve Hermes home directory (respects HERMES_HOME override)
|
|
_hermes_home = get_hermes_home()
|
|
|
|
# File-based lock prevents concurrent ticks from gateway + daemon + systemd timer
|
|
_LOCK_DIR = _hermes_home / "cron"
|
|
_LOCK_FILE = _LOCK_DIR / ".tick.lock"
|
|
|
|
|
|
def _resolve_origin(job: dict) -> Optional[dict]:
|
|
"""Extract origin info from a job, preserving any extra routing metadata."""
|
|
origin = job.get("origin")
|
|
if not origin:
|
|
return None
|
|
platform = origin.get("platform")
|
|
chat_id = origin.get("chat_id")
|
|
if platform and chat_id:
|
|
return origin
|
|
return None
|
|
|
|
|
|
def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
|
"""Resolve the concrete auto-delivery target for a cron job, if any."""
|
|
deliver = job.get("deliver", "local")
|
|
origin = _resolve_origin(job)
|
|
|
|
if deliver == "local":
|
|
return None
|
|
|
|
if deliver == "origin":
|
|
if not origin:
|
|
return None
|
|
return {
|
|
"platform": origin["platform"],
|
|
"chat_id": str(origin["chat_id"]),
|
|
"thread_id": origin.get("thread_id"),
|
|
}
|
|
|
|
if ":" in deliver:
|
|
platform_name, rest = deliver.split(":", 1)
|
|
# Check for thread_id suffix (e.g. "telegram:-1003724596514:17")
|
|
if ":" in rest:
|
|
chat_id, thread_id = rest.split(":", 1)
|
|
else:
|
|
chat_id, thread_id = rest, None
|
|
|
|
# Resolve human-friendly labels like "Alice (dm)" to real IDs.
|
|
# send_message(action="list") shows labels with display suffixes
|
|
# that aren't valid platform IDs (e.g. WhatsApp JIDs).
|
|
try:
|
|
from gateway.channel_directory import resolve_channel_name
|
|
target = chat_id
|
|
# Strip display suffix like " (dm)" or " (group)"
|
|
if target.endswith(")") and " (" in target:
|
|
target = target.rsplit(" (", 1)[0].strip()
|
|
resolved = resolve_channel_name(platform_name.lower(), target)
|
|
if resolved:
|
|
chat_id = resolved
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"platform": platform_name,
|
|
"chat_id": chat_id,
|
|
"thread_id": thread_id,
|
|
}
|
|
|
|
platform_name = deliver
|
|
if origin and origin.get("platform") == platform_name:
|
|
return {
|
|
"platform": platform_name,
|
|
"chat_id": str(origin["chat_id"]),
|
|
"thread_id": origin.get("thread_id"),
|
|
}
|
|
|
|
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
|
|
if not chat_id:
|
|
return None
|
|
|
|
return {
|
|
"platform": platform_name,
|
|
"chat_id": chat_id,
|
|
"thread_id": None,
|
|
}
|
|
|
|
|
|
def _deliver_result(job: dict, content: str) -> None:
|
|
"""
|
|
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
|
|
|
Uses the standalone platform send functions from send_message_tool so delivery
|
|
works whether or not the gateway is running.
|
|
"""
|
|
target = _resolve_delivery_target(job)
|
|
if not target:
|
|
if job.get("deliver", "local") != "local":
|
|
logger.warning(
|
|
"Job '%s' deliver=%s but no concrete delivery target could be resolved",
|
|
job["id"],
|
|
job.get("deliver", "local"),
|
|
)
|
|
return
|
|
|
|
platform_name = target["platform"]
|
|
chat_id = target["chat_id"]
|
|
thread_id = target.get("thread_id")
|
|
|
|
from tools.send_message_tool import _send_to_platform
|
|
from gateway.config import load_gateway_config, Platform
|
|
|
|
platform_map = {
|
|
"telegram": Platform.TELEGRAM,
|
|
"discord": Platform.DISCORD,
|
|
"slack": Platform.SLACK,
|
|
"whatsapp": Platform.WHATSAPP,
|
|
"signal": Platform.SIGNAL,
|
|
"matrix": Platform.MATRIX,
|
|
"mattermost": Platform.MATTERMOST,
|
|
"homeassistant": Platform.HOMEASSISTANT,
|
|
"dingtalk": Platform.DINGTALK,
|
|
"feishu": Platform.FEISHU,
|
|
"wecom": Platform.WECOM,
|
|
"email": Platform.EMAIL,
|
|
"sms": Platform.SMS,
|
|
}
|
|
platform = platform_map.get(platform_name.lower())
|
|
if not platform:
|
|
logger.warning("Job '%s': unknown platform '%s' for delivery", job["id"], platform_name)
|
|
return
|
|
|
|
try:
|
|
config = load_gateway_config()
|
|
except Exception as e:
|
|
logger.error("Job '%s': failed to load gateway config for delivery: %s", job["id"], e)
|
|
return
|
|
|
|
pconfig = config.platforms.get(platform)
|
|
if not pconfig or not pconfig.enabled:
|
|
logger.warning("Job '%s': platform '%s' not configured/enabled", job["id"], platform_name)
|
|
return
|
|
|
|
# Optionally wrap the content with a header/footer so the user knows this
|
|
# is a cron delivery. Wrapping is on by default; set cron.wrap_response: false
|
|
# in config.yaml for clean output.
|
|
wrap_response = True
|
|
try:
|
|
user_cfg = load_config()
|
|
wrap_response = user_cfg.get("cron", {}).get("wrap_response", True)
|
|
except Exception:
|
|
pass
|
|
|
|
if wrap_response:
|
|
task_name = job.get("name", job["id"])
|
|
delivery_content = (
|
|
f"Cronjob Response: {task_name}\n"
|
|
f"-------------\n\n"
|
|
f"{content}\n\n"
|
|
f"Note: The agent cannot see this message, and therefore cannot respond to it."
|
|
)
|
|
else:
|
|
delivery_content = content
|
|
|
|
# Run the async send in a fresh event loop (safe from any thread)
|
|
coro = _send_to_platform(platform, pconfig, chat_id, delivery_content, thread_id=thread_id)
|
|
try:
|
|
result = asyncio.run(coro)
|
|
except RuntimeError:
|
|
# asyncio.run() checks for a running loop before awaiting the coroutine;
|
|
# when it raises, the original coro was never started — close it to
|
|
# prevent "coroutine was never awaited" RuntimeWarning, then retry in a
|
|
# fresh thread that has no running loop.
|
|
coro.close()
|
|
import concurrent.futures
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
|
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, delivery_content, thread_id=thread_id))
|
|
result = future.result(timeout=30)
|
|
except Exception as e:
|
|
logger.error("Job '%s': delivery to %s:%s failed: %s", job["id"], platform_name, chat_id, e)
|
|
return
|
|
|
|
if result and result.get("error"):
|
|
logger.error("Job '%s': delivery error: %s", job["id"], result["error"])
|
|
else:
|
|
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
|
|
|
|
|
|
_SCRIPT_TIMEOUT = 120 # seconds
|
|
|
|
|
|
def _run_job_script(script_path: str) -> tuple[bool, str]:
|
|
"""Execute a cron job's data-collection script and capture its output.
|
|
|
|
Args:
|
|
script_path: Path to a Python script (resolved via HERMES_HOME/scripts/ or absolute).
|
|
|
|
Returns:
|
|
(success, output) — on failure *output* contains the error message so the
|
|
LLM can report the problem to the user.
|
|
"""
|
|
from hermes_constants import get_hermes_home
|
|
|
|
path = Path(script_path).expanduser()
|
|
if not path.is_absolute():
|
|
# Resolve relative paths against HERMES_HOME/scripts/
|
|
path = get_hermes_home() / "scripts" / path
|
|
|
|
if not path.exists():
|
|
return False, f"Script not found: {path}"
|
|
if not path.is_file():
|
|
return False, f"Script path is not a file: {path}"
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[sys.executable, str(path)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=_SCRIPT_TIMEOUT,
|
|
cwd=str(path.parent),
|
|
)
|
|
stdout = (result.stdout or "").strip()
|
|
stderr = (result.stderr or "").strip()
|
|
|
|
if result.returncode != 0:
|
|
parts = [f"Script exited with code {result.returncode}"]
|
|
if stderr:
|
|
parts.append(f"stderr:\n{stderr}")
|
|
if stdout:
|
|
parts.append(f"stdout:\n{stdout}")
|
|
return False, "\n".join(parts)
|
|
|
|
return True, stdout
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return False, f"Script timed out after {_SCRIPT_TIMEOUT}s: {path}"
|
|
except Exception as exc:
|
|
return False, f"Script execution failed: {exc}"
|
|
|
|
|
|
def _build_job_prompt(job: dict) -> str:
|
|
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
|
|
prompt = job.get("prompt", "")
|
|
skills = job.get("skills")
|
|
|
|
# Run data-collection script if configured, inject output as context.
|
|
script_path = job.get("script")
|
|
if script_path:
|
|
success, script_output = _run_job_script(script_path)
|
|
if success:
|
|
if script_output:
|
|
prompt = (
|
|
"## Script Output\n"
|
|
"The following data was collected by a pre-run script. "
|
|
"Use it as context for your analysis.\n\n"
|
|
f"```\n{script_output}\n```\n\n"
|
|
f"{prompt}"
|
|
)
|
|
else:
|
|
prompt = (
|
|
"[Script ran successfully but produced no output.]\n\n"
|
|
f"{prompt}"
|
|
)
|
|
else:
|
|
prompt = (
|
|
"## Script Error\n"
|
|
"The data-collection script failed. Report this to the user.\n\n"
|
|
f"```\n{script_output}\n```\n\n"
|
|
f"{prompt}"
|
|
)
|
|
|
|
# Always prepend [SILENT] guidance so the cron agent can suppress
|
|
# delivery when it has nothing new or noteworthy to report.
|
|
silent_hint = (
|
|
"[SYSTEM: If you have a meaningful status report or findings, "
|
|
"send them — that is the whole point of this job. Only respond "
|
|
"with exactly \"[SILENT]\" (nothing else) when there is genuinely "
|
|
"nothing new to report. [SILENT] suppresses delivery to the user. "
|
|
"Never combine [SILENT] with content — either report your "
|
|
"findings normally, or say [SILENT] and nothing more.]\n\n"
|
|
)
|
|
prompt = silent_hint + prompt
|
|
if skills is None:
|
|
legacy = job.get("skill")
|
|
skills = [legacy] if legacy else []
|
|
|
|
skill_names = [str(name).strip() for name in skills if str(name).strip()]
|
|
if not skill_names:
|
|
return prompt
|
|
|
|
from tools.skills_tool import skill_view
|
|
|
|
parts = []
|
|
skipped: list[str] = []
|
|
for skill_name in skill_names:
|
|
loaded = json.loads(skill_view(skill_name))
|
|
if not loaded.get("success"):
|
|
error = loaded.get("error") or f"Failed to load skill '{skill_name}'"
|
|
logger.warning("Cron job '%s': skill not found, skipping — %s", job.get("name", job.get("id")), error)
|
|
skipped.append(skill_name)
|
|
continue
|
|
|
|
content = str(loaded.get("content") or "").strip()
|
|
if parts:
|
|
parts.append("")
|
|
parts.extend(
|
|
[
|
|
f'[SYSTEM: The user has invoked the "{skill_name}" skill, indicating they want you to follow its instructions. The full skill content is loaded below.]',
|
|
"",
|
|
content,
|
|
]
|
|
)
|
|
|
|
if skipped:
|
|
notice = (
|
|
f"[SYSTEM: The following skill(s) were listed for this job but could not be found "
|
|
f"and were skipped: {', '.join(skipped)}. "
|
|
f"Start your response with a brief notice so the user is aware, e.g.: "
|
|
f"'⚠️ Skill(s) not found and skipped: {', '.join(skipped)}']"
|
|
)
|
|
parts.insert(0, notice)
|
|
|
|
if prompt:
|
|
parts.extend(["", f"The user has provided the following instruction alongside the skill invocation: {prompt}"])
|
|
return "\n".join(parts)
|
|
|
|
|
|
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|
"""
|
|
Execute a single cron job.
|
|
|
|
Returns:
|
|
Tuple of (success, full_output_doc, final_response, error_message)
|
|
"""
|
|
from run_agent import AIAgent
|
|
|
|
# Initialize SQLite session store so cron job messages are persisted
|
|
# and discoverable via session_search (same pattern as gateway/run.py).
|
|
_session_db = None
|
|
try:
|
|
from hermes_state import SessionDB
|
|
_session_db = SessionDB()
|
|
except Exception as e:
|
|
logger.debug("Job '%s': SQLite session store not available: %s", job.get("id", "?"), e)
|
|
|
|
job_id = job["id"]
|
|
job_name = job["name"]
|
|
prompt = _build_job_prompt(job)
|
|
origin = _resolve_origin(job)
|
|
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
|
|
logger.info("Prompt: %s", prompt[:100])
|
|
|
|
# Inject origin context so the agent's send_message tool knows the chat
|
|
if origin:
|
|
os.environ["HERMES_SESSION_PLATFORM"] = origin["platform"]
|
|
os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"])
|
|
if origin.get("chat_name"):
|
|
os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"]
|
|
|
|
try:
|
|
# Re-read .env and config.yaml fresh every run so provider/key
|
|
# changes take effect without a gateway restart.
|
|
from dotenv import load_dotenv
|
|
try:
|
|
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="latin-1")
|
|
|
|
delivery_target = _resolve_delivery_target(job)
|
|
if delivery_target:
|
|
os.environ["HERMES_CRON_AUTO_DELIVER_PLATFORM"] = delivery_target["platform"]
|
|
os.environ["HERMES_CRON_AUTO_DELIVER_CHAT_ID"] = str(delivery_target["chat_id"])
|
|
if delivery_target.get("thread_id") is not None:
|
|
os.environ["HERMES_CRON_AUTO_DELIVER_THREAD_ID"] = str(delivery_target["thread_id"])
|
|
|
|
model = job.get("model") or os.getenv("HERMES_MODEL") or ""
|
|
|
|
# Load config.yaml for model, reasoning, prefill, toolsets, provider routing
|
|
_cfg = {}
|
|
try:
|
|
import yaml
|
|
_cfg_path = str(_hermes_home / "config.yaml")
|
|
if os.path.exists(_cfg_path):
|
|
with open(_cfg_path) as _f:
|
|
_cfg = yaml.safe_load(_f) or {}
|
|
_model_cfg = _cfg.get("model", {})
|
|
if not job.get("model"):
|
|
if isinstance(_model_cfg, str):
|
|
model = _model_cfg
|
|
elif isinstance(_model_cfg, dict):
|
|
model = _model_cfg.get("default", model)
|
|
except Exception as e:
|
|
logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e)
|
|
|
|
# Reasoning config from env or config.yaml
|
|
from hermes_constants import parse_reasoning_effort
|
|
effort = os.getenv("HERMES_REASONING_EFFORT", "")
|
|
if not effort:
|
|
effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()
|
|
reasoning_config = parse_reasoning_effort(effort)
|
|
|
|
# Prefill messages from env or config.yaml
|
|
prefill_messages = None
|
|
prefill_file = os.getenv("HERMES_PREFILL_MESSAGES_FILE", "") or _cfg.get("prefill_messages_file", "")
|
|
if prefill_file:
|
|
import json as _json
|
|
pfpath = Path(prefill_file).expanduser()
|
|
if not pfpath.is_absolute():
|
|
pfpath = _hermes_home / pfpath
|
|
if pfpath.exists():
|
|
try:
|
|
with open(pfpath, "r", encoding="utf-8") as _pf:
|
|
prefill_messages = _json.load(_pf)
|
|
if not isinstance(prefill_messages, list):
|
|
prefill_messages = None
|
|
except Exception as e:
|
|
logger.warning("Job '%s': failed to parse prefill messages file '%s': %s", job_id, pfpath, e)
|
|
prefill_messages = None
|
|
|
|
# Max iterations
|
|
max_iterations = _cfg.get("agent", {}).get("max_turns") or _cfg.get("max_turns") or 90
|
|
|
|
# Provider routing
|
|
pr = _cfg.get("provider_routing", {})
|
|
smart_routing = _cfg.get("smart_model_routing", {}) or {}
|
|
|
|
from hermes_cli.runtime_provider import (
|
|
resolve_runtime_provider,
|
|
format_runtime_provider_error,
|
|
)
|
|
try:
|
|
runtime_kwargs = {
|
|
"requested": job.get("provider") or os.getenv("HERMES_INFERENCE_PROVIDER"),
|
|
}
|
|
if job.get("base_url"):
|
|
runtime_kwargs["explicit_base_url"] = job.get("base_url")
|
|
runtime = resolve_runtime_provider(**runtime_kwargs)
|
|
except Exception as exc:
|
|
message = format_runtime_provider_error(exc)
|
|
raise RuntimeError(message) from exc
|
|
|
|
from agent.smart_model_routing import resolve_turn_route
|
|
turn_route = resolve_turn_route(
|
|
prompt,
|
|
smart_routing,
|
|
{
|
|
"model": model,
|
|
"api_key": runtime.get("api_key"),
|
|
"base_url": runtime.get("base_url"),
|
|
"provider": runtime.get("provider"),
|
|
"api_mode": runtime.get("api_mode"),
|
|
"command": runtime.get("command"),
|
|
"args": list(runtime.get("args") or []),
|
|
},
|
|
)
|
|
|
|
agent = AIAgent(
|
|
model=turn_route["model"],
|
|
api_key=turn_route["runtime"].get("api_key"),
|
|
base_url=turn_route["runtime"].get("base_url"),
|
|
provider=turn_route["runtime"].get("provider"),
|
|
api_mode=turn_route["runtime"].get("api_mode"),
|
|
acp_command=turn_route["runtime"].get("command"),
|
|
acp_args=turn_route["runtime"].get("args"),
|
|
max_iterations=max_iterations,
|
|
reasoning_config=reasoning_config,
|
|
prefill_messages=prefill_messages,
|
|
providers_allowed=pr.get("only"),
|
|
providers_ignored=pr.get("ignore"),
|
|
providers_order=pr.get("order"),
|
|
provider_sort=pr.get("sort"),
|
|
disabled_toolsets=["cronjob", "messaging", "clarify"],
|
|
quiet_mode=True,
|
|
skip_memory=True, # Cron system prompts would corrupt user representations
|
|
platform="cron",
|
|
session_id=_cron_session_id,
|
|
session_db=_session_db,
|
|
)
|
|
|
|
# Run the agent with a timeout so a hung API call or tool doesn't
|
|
# block the cron ticker thread indefinitely. Default 10 minutes;
|
|
# override via env var. Uses a separate thread because
|
|
# run_conversation is synchronous.
|
|
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
|
|
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
|
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
|
|
try:
|
|
result = _cron_future.result(timeout=_cron_timeout)
|
|
except concurrent.futures.TimeoutError:
|
|
logger.error(
|
|
"Job '%s' timed out after %.0fs — interrupting agent",
|
|
job_name, _cron_timeout,
|
|
)
|
|
if hasattr(agent, "interrupt"):
|
|
agent.interrupt("Cron job timed out")
|
|
_cron_pool.shutdown(wait=False, cancel_futures=True)
|
|
raise TimeoutError(
|
|
f"Cron job '{job_name}' timed out after "
|
|
f"{int(_cron_timeout // 60)} minutes"
|
|
)
|
|
finally:
|
|
_cron_pool.shutdown(wait=False)
|
|
|
|
final_response = result.get("final_response", "") or ""
|
|
# Use a separate variable for log display; keep final_response clean
|
|
# for delivery logic (empty response = no delivery).
|
|
logged_response = final_response if final_response else "(No response generated)"
|
|
|
|
output = f"""# Cron Job: {job_name}
|
|
|
|
**Job ID:** {job_id}
|
|
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Schedule:** {job.get('schedule_display', 'N/A')}
|
|
|
|
## Prompt
|
|
|
|
{prompt}
|
|
|
|
## Response
|
|
|
|
{logged_response}
|
|
"""
|
|
|
|
logger.info("Job '%s' completed successfully", job_name)
|
|
return True, output, final_response, None
|
|
|
|
except Exception as e:
|
|
error_msg = f"{type(e).__name__}: {str(e)}"
|
|
logger.error("Job '%s' failed: %s", job_name, error_msg)
|
|
|
|
output = f"""# Cron Job: {job_name} (FAILED)
|
|
|
|
**Job ID:** {job_id}
|
|
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Schedule:** {job.get('schedule_display', 'N/A')}
|
|
|
|
## Prompt
|
|
|
|
{prompt}
|
|
|
|
## Error
|
|
|
|
```
|
|
{error_msg}
|
|
|
|
{traceback.format_exc()}
|
|
```
|
|
"""
|
|
return False, output, "", error_msg
|
|
|
|
finally:
|
|
# Clean up injected env vars so they don't leak to other jobs
|
|
for key in (
|
|
"HERMES_SESSION_PLATFORM",
|
|
"HERMES_SESSION_CHAT_ID",
|
|
"HERMES_SESSION_CHAT_NAME",
|
|
"HERMES_CRON_AUTO_DELIVER_PLATFORM",
|
|
"HERMES_CRON_AUTO_DELIVER_CHAT_ID",
|
|
"HERMES_CRON_AUTO_DELIVER_THREAD_ID",
|
|
):
|
|
os.environ.pop(key, None)
|
|
if _session_db:
|
|
try:
|
|
_session_db.end_session(_cron_session_id, "cron_complete")
|
|
except (Exception, KeyboardInterrupt) as e:
|
|
logger.debug("Job '%s': failed to end session: %s", job_id, e)
|
|
try:
|
|
_session_db.close()
|
|
except (Exception, KeyboardInterrupt) as e:
|
|
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
|
|
|
|
|
|
def tick(verbose: bool = True) -> int:
|
|
"""
|
|
Check and run all due jobs.
|
|
|
|
Uses a file lock so only one tick runs at a time, even if the gateway's
|
|
in-process ticker and a standalone daemon or manual tick overlap.
|
|
|
|
Args:
|
|
verbose: Whether to print status messages
|
|
|
|
Returns:
|
|
Number of jobs executed (0 if another tick is already running)
|
|
"""
|
|
_LOCK_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Cross-platform file locking: fcntl on Unix, msvcrt on Windows
|
|
lock_fd = None
|
|
try:
|
|
lock_fd = open(_LOCK_FILE, "w")
|
|
if fcntl:
|
|
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
elif msvcrt:
|
|
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
|
|
except (OSError, IOError):
|
|
logger.debug("Tick skipped — another instance holds the lock")
|
|
if lock_fd is not None:
|
|
lock_fd.close()
|
|
return 0
|
|
|
|
try:
|
|
due_jobs = get_due_jobs()
|
|
|
|
if verbose and not due_jobs:
|
|
logger.info("%s - No jobs due", _hermes_now().strftime('%H:%M:%S'))
|
|
return 0
|
|
|
|
if verbose:
|
|
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
|
|
|
|
executed = 0
|
|
for job in due_jobs:
|
|
try:
|
|
# For recurring jobs (cron/interval), advance next_run_at to the
|
|
# next future occurrence BEFORE execution. This way, if the
|
|
# process crashes mid-run, the job won't re-fire on restart.
|
|
# One-shot jobs are left alone so they can retry on restart.
|
|
advance_next_run(job["id"])
|
|
|
|
success, output, final_response, error = run_job(job)
|
|
|
|
output_file = save_job_output(job["id"], output)
|
|
if verbose:
|
|
logger.info("Output saved to: %s", output_file)
|
|
|
|
# Deliver the final response to the origin/target chat.
|
|
# If the agent responded with [SILENT], skip delivery (but
|
|
# output is already saved above). Failed jobs always deliver.
|
|
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
|
|
should_deliver = bool(deliver_content)
|
|
if should_deliver and success and deliver_content.strip().upper().startswith(SILENT_MARKER):
|
|
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
|
|
should_deliver = False
|
|
|
|
if should_deliver:
|
|
try:
|
|
_deliver_result(job, deliver_content)
|
|
except Exception as de:
|
|
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
|
|
|
mark_job_run(job["id"], success, error)
|
|
executed += 1
|
|
|
|
except Exception as e:
|
|
logger.error("Error processing job %s: %s", job['id'], e)
|
|
mark_job_run(job["id"], False, str(e))
|
|
|
|
return executed
|
|
finally:
|
|
if fcntl:
|
|
fcntl.flock(lock_fd, fcntl.LOCK_UN)
|
|
elif msvcrt:
|
|
try:
|
|
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
|
|
except (OSError, IOError):
|
|
pass
|
|
lock_fd.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
tick(verbose=True)
|