Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m9s
Health Monitor prompts say 'Check Ollama is responding' but run on cloud models that cannot reach localhost. Instead of just warning the agent, rewrite the instructions to cloud-compatible equivalents the agent can actually execute. Changes: - Add import re - Add _CLOUD_INCOMPATIBLE_PATTERNS: regex pairs (pattern, replacement) - Add _rewrite_cloud_incompatible_prompt(): rewrites localhost/Ollama references to 'use available tools to check service health' - Wire into run_job() after resolve_turn_route() Closes #378
1202 lines
45 KiB
Python
1202 lines
45 KiB
Python
"""
|
|
Cron job scheduler - executes due jobs.
|
|
|
|
Provides tick() which checks for due jobs and runs them. The gateway
|
|
calls this every 60 seconds from a background thread.
|
|
|
|
Uses a file-based lock (~/.hermes/cron/.tick.lock) so only one tick
|
|
runs at a time if multiple processes overlap.
|
|
"""
|
|
|
|
import asyncio
|
|
import concurrent.futures
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
|
|
# fcntl is Unix-only; on Windows use msvcrt for file locking
|
|
try:
|
|
import fcntl
|
|
except ImportError:
|
|
fcntl = None
|
|
try:
|
|
import msvcrt
|
|
except ImportError:
|
|
msvcrt = None
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Add parent directory to path for imports BEFORE repo-level imports.
|
|
# Without this, standalone invocations (e.g. after `hermes update` reloads
|
|
# the module) fail with ModuleNotFoundError for hermes_time et al.
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from hermes_constants import get_hermes_home
|
|
from hermes_cli.config import load_config
|
|
from hermes_time import now as _hermes_now
|
|
from agent.model_metadata import is_local_endpoint
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =====================================================================
|
|
# Deploy Sync Guard
|
|
# =====================================================================
|
|
#
|
|
# If the installed run_agent.py diverges from the version scheduler.py
|
|
# was written against, every cron job fails with:
|
|
# TypeError: AIAgent.__init__() got an unexpected keyword argument '...'
|
|
#
|
|
# _validate_agent_interface() catches this at the FIRST job, not the
|
|
# 55th. It uses inspect.signature() to verify every kwarg we pass is
|
|
# accepted by AIAgent.__init__().
|
|
#
|
|
# Maintaining this list: if you add a kwarg to the AIAgent() call in
|
|
# run_job(), add it here too. The guard catches mismatches.
|
|
|
|
_SCHEDULER_AGENT_KWARGS: set = frozenset({
|
|
"model", "api_key", "base_url", "provider", "api_mode",
|
|
"acp_command", "acp_args", "max_iterations", "reasoning_config",
|
|
"prefill_messages", "providers_allowed", "providers_ignored",
|
|
"providers_order", "provider_sort", "disabled_toolsets",
|
|
"tool_choice", "quiet_mode", "skip_memory", "platform",
|
|
"session_id", "session_db",
|
|
})
|
|
|
|
_agent_interface_validated: bool = False
|
|
|
|
|
|
def _validate_agent_interface() -> None:
|
|
"""Verify installed AIAgent.__init__ accepts every kwarg the scheduler passes.
|
|
|
|
Raises RuntimeError with actionable guidance if params are missing.
|
|
Caches result — runs once per gateway process lifetime.
|
|
"""
|
|
global _agent_interface_validated
|
|
if _agent_interface_validated:
|
|
return
|
|
|
|
import inspect
|
|
|
|
try:
|
|
from run_agent import AIAgent
|
|
except ImportError as exc:
|
|
raise RuntimeError(
|
|
f"Cannot import AIAgent: {exc}\n"
|
|
"Is hermes-agent installed? Check PYTHONPATH."
|
|
) from exc
|
|
|
|
sig = inspect.signature(AIAgent.__init__)
|
|
accepted = set(sig.parameters.keys()) - {"self"}
|
|
missing = _SCHEDULER_AGENT_KWARGS - accepted
|
|
|
|
if missing:
|
|
sorted_missing = sorted(missing)
|
|
raise RuntimeError(
|
|
"Deploy sync guard FAILED — AIAgent.__init__() is missing params:\n"
|
|
f" {', '.join(sorted_missing)}\n"
|
|
"This means the installed run_agent.py is out of date.\n"
|
|
"Fix: pull latest hermes-agent code and restart the gateway.\n"
|
|
" cd ~/.hermes/hermes-agent && git pull && source venv/bin/activate"
|
|
)
|
|
|
|
_agent_interface_validated = True
|
|
logger.debug("Deploy sync guard passed — %d params verified", len(_SCHEDULER_AGENT_KWARGS))
|
|
|
|
|
|
def _safe_agent_kwargs(kwargs: dict) -> dict:
|
|
"""Filter kwargs to only those accepted by installed AIAgent.__init__.
|
|
|
|
More resilient than _validate_agent_interface() alone: instead of
|
|
crashing on mismatch, drops unsupported kwargs and logs a warning.
|
|
Jobs run with degraded functionality instead of failing entirely.
|
|
|
|
Args:
|
|
kwargs: The kwargs dict the scheduler wants to pass to AIAgent().
|
|
|
|
Returns:
|
|
A new dict containing only kwargs the installed AIAgent accepts.
|
|
"""
|
|
import inspect
|
|
|
|
try:
|
|
from run_agent import AIAgent
|
|
except ImportError:
|
|
# Can't import — pass everything through, let the real error surface
|
|
return kwargs
|
|
|
|
sig = inspect.signature(AIAgent.__init__)
|
|
accepted = set(sig.parameters.keys()) - {"self"}
|
|
|
|
safe = {}
|
|
dropped = []
|
|
for key, value in kwargs.items():
|
|
if key in accepted:
|
|
safe[key] = value
|
|
else:
|
|
dropped.append(key)
|
|
|
|
if dropped:
|
|
logger.warning(
|
|
"Dropping unsupported AIAgent kwargs (stale install?): %s",
|
|
", ".join(sorted(dropped)),
|
|
)
|
|
|
|
return safe
|
|
|
|
# Valid delivery platforms — used to validate user-supplied platform names
|
|
# in cron delivery targets, preventing env var enumeration via crafted names.
|
|
_KNOWN_DELIVERY_PLATFORMS = frozenset({
|
|
"telegram", "discord", "slack", "whatsapp", "signal",
|
|
"matrix", "mattermost", "homeassistant", "dingtalk", "feishu",
|
|
"wecom", "sms", "email", "webhook",
|
|
})
|
|
|
|
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
|
|
|
|
# Sentinel: when a cron agent has nothing new to report, it can start its
|
|
# response with this marker to suppress delivery. Output is still saved
|
|
# locally for audit.
|
|
SILENT_MARKER = "[SILENT]"
|
|
SCRIPT_FAILED_MARKER = "[SCRIPT_FAILED]"
|
|
|
|
# Failure phrases that indicate an external script/command failed, even when
|
|
# the agent doesn't use the [SCRIPT_FAILED] marker. Matched case-insensitively
|
|
# against the final response. These are strong signals — agents rarely use
|
|
# these words when a script succeeded.
|
|
_SCRIPT_FAILURE_PHRASES = (
|
|
"timed out",
|
|
"timeout",
|
|
"connection error",
|
|
"connection refused",
|
|
"connection reset",
|
|
"failed to execute",
|
|
"failed due to",
|
|
"script failed",
|
|
"script error",
|
|
"command failed",
|
|
"exit code",
|
|
"exit status",
|
|
"non-zero exit",
|
|
"did not complete",
|
|
"could not run",
|
|
"unable to execute",
|
|
"permission denied",
|
|
"no such file",
|
|
"traceback",
|
|
)
|
|
|
|
|
|
def _detect_script_failure(final_response: str) -> Optional[str]:
|
|
"""Detect script failure from agent's final response.
|
|
|
|
Returns a reason string if failure detected, None otherwise.
|
|
Checks both the explicit [SCRIPT_FAILED] marker and heuristic patterns.
|
|
"""
|
|
if not final_response:
|
|
return None
|
|
|
|
# 1. Explicit marker — highest confidence.
|
|
if SCRIPT_FAILED_MARKER in final_response.upper():
|
|
import re as _re
|
|
_m = _re.search(
|
|
r'\[SCRIPT_FAILED\]\s*:?\s*(.*)',
|
|
final_response,
|
|
_re.IGNORECASE,
|
|
)
|
|
reason = _m.group(1).strip() if _m and _m.group(1).strip() else None
|
|
return reason or "Agent reported script failure"
|
|
|
|
# 2. Heuristic detection — catch failures described in natural language.
|
|
# Only flag if the response contains failure language AND does NOT
|
|
# contain success markers like [NOOP] (which means the script ran fine
|
|
# but found nothing).
|
|
lower = final_response.lower()
|
|
has_noop = "[noop]" in lower
|
|
has_silent = "[silent]" in lower
|
|
|
|
if has_noop or has_silent:
|
|
return None # Agent explicitly signaled success/nothing-to-report
|
|
|
|
for phrase in _SCRIPT_FAILURE_PHRASES:
|
|
if phrase in lower:
|
|
return f"Detected script failure phrase: '{phrase}'"
|
|
|
|
return None
|
|
|
|
|
|
# Resolve Hermes home directory (respects HERMES_HOME override)
|
|
_hermes_home = get_hermes_home()
|
|
|
|
# File-based lock prevents concurrent ticks from gateway + daemon + systemd timer
|
|
_LOCK_DIR = _hermes_home / "cron"
|
|
_LOCK_FILE = _LOCK_DIR / ".tick.lock"
|
|
|
|
|
|
def _resolve_origin(job: dict) -> Optional[dict]:
|
|
"""Extract origin info from a job, preserving any extra routing metadata."""
|
|
origin = job.get("origin")
|
|
if not origin:
|
|
return None
|
|
platform = origin.get("platform")
|
|
chat_id = origin.get("chat_id")
|
|
if platform and chat_id:
|
|
return origin
|
|
return None
|
|
|
|
|
|
def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
|
"""Resolve the concrete auto-delivery target for a cron job, if any."""
|
|
deliver = job.get("deliver", "local")
|
|
origin = _resolve_origin(job)
|
|
|
|
if deliver == "local":
|
|
return None
|
|
|
|
if deliver == "origin":
|
|
if origin:
|
|
return {
|
|
"platform": origin["platform"],
|
|
"chat_id": str(origin["chat_id"]),
|
|
"thread_id": origin.get("thread_id"),
|
|
}
|
|
# Origin missing (e.g. job created via API/script) — try each
|
|
# platform's home channel as a fallback instead of silently dropping.
|
|
for platform_name in ("matrix", "telegram", "discord", "slack"):
|
|
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
|
|
if chat_id:
|
|
logger.info(
|
|
"Job '%s' has deliver=origin but no origin; falling back to %s home channel",
|
|
job.get("name", job.get("id", "?")),
|
|
platform_name,
|
|
)
|
|
return {
|
|
"platform": platform_name,
|
|
"chat_id": chat_id,
|
|
"thread_id": None,
|
|
}
|
|
return None
|
|
|
|
if ":" in deliver:
|
|
platform_name, rest = deliver.split(":", 1)
|
|
platform_key = platform_name.lower()
|
|
|
|
from tools.send_message_tool import _parse_target_ref
|
|
|
|
parsed_chat_id, parsed_thread_id, is_explicit = _parse_target_ref(platform_key, rest)
|
|
if is_explicit:
|
|
chat_id, thread_id = parsed_chat_id, parsed_thread_id
|
|
else:
|
|
chat_id, thread_id = rest, None
|
|
|
|
# Resolve human-friendly labels like "Alice (dm)" to real IDs.
|
|
try:
|
|
from gateway.channel_directory import resolve_channel_name
|
|
resolved = resolve_channel_name(platform_key, chat_id)
|
|
if resolved:
|
|
parsed_chat_id, parsed_thread_id, resolved_is_explicit = _parse_target_ref(platform_key, resolved)
|
|
if resolved_is_explicit:
|
|
chat_id, thread_id = parsed_chat_id, parsed_thread_id
|
|
else:
|
|
chat_id = resolved
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"platform": platform_name,
|
|
"chat_id": chat_id,
|
|
"thread_id": thread_id,
|
|
}
|
|
|
|
platform_name = deliver
|
|
if origin and origin.get("platform") == platform_name:
|
|
return {
|
|
"platform": platform_name,
|
|
"chat_id": str(origin["chat_id"]),
|
|
"thread_id": origin.get("thread_id"),
|
|
}
|
|
|
|
if platform_name.lower() not in _KNOWN_DELIVERY_PLATFORMS:
|
|
return None
|
|
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
|
|
if not chat_id:
|
|
return None
|
|
|
|
return {
|
|
"platform": platform_name,
|
|
"chat_id": chat_id,
|
|
"thread_id": None,
|
|
}
|
|
|
|
|
|
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
|
|
"""
|
|
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
|
|
|
When ``adapters`` and ``loop`` are provided (gateway is running), tries to
|
|
use the live adapter first — this supports E2EE rooms (e.g. Matrix) where
|
|
the standalone HTTP path cannot encrypt. Falls back to standalone send if
|
|
the adapter path fails or is unavailable.
|
|
"""
|
|
target = _resolve_delivery_target(job)
|
|
if not target:
|
|
if job.get("deliver", "local") != "local":
|
|
logger.warning(
|
|
"Job '%s' deliver=%s but no concrete delivery target could be resolved",
|
|
job["id"],
|
|
job.get("deliver", "local"),
|
|
)
|
|
return
|
|
|
|
platform_name = target["platform"]
|
|
chat_id = target["chat_id"]
|
|
thread_id = target.get("thread_id")
|
|
|
|
from tools.send_message_tool import _send_to_platform
|
|
from gateway.config import load_gateway_config, Platform
|
|
|
|
platform_map = {
|
|
"telegram": Platform.TELEGRAM,
|
|
"discord": Platform.DISCORD,
|
|
"slack": Platform.SLACK,
|
|
"whatsapp": Platform.WHATSAPP,
|
|
"signal": Platform.SIGNAL,
|
|
"matrix": Platform.MATRIX,
|
|
"mattermost": Platform.MATTERMOST,
|
|
"homeassistant": Platform.HOMEASSISTANT,
|
|
"dingtalk": Platform.DINGTALK,
|
|
"feishu": Platform.FEISHU,
|
|
"wecom": Platform.WECOM,
|
|
"email": Platform.EMAIL,
|
|
"sms": Platform.SMS,
|
|
}
|
|
platform = platform_map.get(platform_name.lower())
|
|
if not platform:
|
|
logger.warning("Job '%s': unknown platform '%s' for delivery", job["id"], platform_name)
|
|
return
|
|
|
|
try:
|
|
config = load_gateway_config()
|
|
except Exception as e:
|
|
logger.error("Job '%s': failed to load gateway config for delivery: %s", job["id"], e)
|
|
return
|
|
|
|
pconfig = config.platforms.get(platform)
|
|
if not pconfig or not pconfig.enabled:
|
|
logger.warning("Job '%s': platform '%s' not configured/enabled", job["id"], platform_name)
|
|
return
|
|
|
|
# Optionally wrap the content with a header/footer so the user knows this
|
|
# is a cron delivery. Wrapping is on by default; set cron.wrap_response: false
|
|
# in config.yaml for clean output.
|
|
wrap_response = True
|
|
try:
|
|
user_cfg = load_config()
|
|
wrap_response = user_cfg.get("cron", {}).get("wrap_response", True)
|
|
except Exception:
|
|
pass
|
|
|
|
if wrap_response:
|
|
task_name = job.get("name", job["id"])
|
|
delivery_content = (
|
|
f"Cronjob Response: {task_name}\n"
|
|
f"-------------\n\n"
|
|
f"{content}\n\n"
|
|
f"Note: The agent cannot see this message, and therefore cannot respond to it."
|
|
)
|
|
else:
|
|
delivery_content = content
|
|
|
|
# Extract MEDIA: tags so attachments are forwarded as files, not raw text
|
|
from gateway.platforms.base import BasePlatformAdapter
|
|
media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content)
|
|
|
|
# Prefer the live adapter when the gateway is running — this supports E2EE
|
|
# rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt.
|
|
runtime_adapter = (adapters or {}).get(platform)
|
|
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
|
|
send_metadata = {"thread_id": thread_id} if thread_id else None
|
|
try:
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
runtime_adapter.send(chat_id, delivery_content, metadata=send_metadata),
|
|
loop,
|
|
)
|
|
send_result = future.result(timeout=60)
|
|
if send_result and not getattr(send_result, "success", True):
|
|
err = getattr(send_result, "error", "unknown")
|
|
logger.warning(
|
|
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
|
|
job["id"], platform_name, chat_id, err,
|
|
)
|
|
else:
|
|
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
|
|
return
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone",
|
|
job["id"], platform_name, chat_id, e,
|
|
)
|
|
|
|
# Standalone path: run the async send in a fresh event loop (safe from any thread)
|
|
coro = _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files)
|
|
try:
|
|
result = asyncio.run(coro)
|
|
except RuntimeError:
|
|
# asyncio.run() checks for a running loop before awaiting the coroutine;
|
|
# when it raises, the original coro was never started — close it to
|
|
# prevent "coroutine was never awaited" RuntimeWarning, then retry in a
|
|
# fresh thread that has no running loop.
|
|
coro.close()
|
|
import concurrent.futures
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
|
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files))
|
|
result = future.result(timeout=30)
|
|
except Exception as e:
|
|
logger.error("Job '%s': delivery to %s:%s failed: %s", job["id"], platform_name, chat_id, e)
|
|
return
|
|
|
|
if result and result.get("error"):
|
|
logger.error("Job '%s': delivery error: %s", job["id"], result["error"])
|
|
else:
|
|
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
|
|
|
|
|
|
_SCRIPT_TIMEOUT = 120 # seconds
|
|
|
|
|
|
def _run_job_script(script_path: str) -> tuple[bool, str]:
|
|
"""Execute a cron job's data-collection script and capture its output.
|
|
|
|
Scripts must reside within HERMES_HOME/scripts/. Both relative and
|
|
absolute paths are resolved and validated against this directory to
|
|
prevent arbitrary script execution via path traversal or absolute
|
|
path injection.
|
|
|
|
Args:
|
|
script_path: Path to a Python script. Relative paths are resolved
|
|
against HERMES_HOME/scripts/. Absolute and ~-prefixed paths
|
|
are also validated to ensure they stay within the scripts dir.
|
|
|
|
Returns:
|
|
(success, output) — on failure *output* contains the error message so the
|
|
LLM can report the problem to the user.
|
|
"""
|
|
from hermes_constants import get_hermes_home
|
|
|
|
scripts_dir = get_hermes_home() / "scripts"
|
|
scripts_dir.mkdir(parents=True, exist_ok=True)
|
|
scripts_dir_resolved = scripts_dir.resolve()
|
|
|
|
raw = Path(script_path).expanduser()
|
|
if raw.is_absolute():
|
|
path = raw.resolve()
|
|
else:
|
|
path = (scripts_dir / raw).resolve()
|
|
|
|
# Guard against path traversal, absolute path injection, and symlink
|
|
# escape — scripts MUST reside within HERMES_HOME/scripts/.
|
|
try:
|
|
path.relative_to(scripts_dir_resolved)
|
|
except ValueError:
|
|
return False, (
|
|
f"Blocked: script path resolves outside the scripts directory "
|
|
f"({scripts_dir_resolved}): {script_path!r}"
|
|
)
|
|
|
|
if not path.exists():
|
|
return False, f"Script not found: {path}"
|
|
if not path.is_file():
|
|
return False, f"Script path is not a file: {path}"
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[sys.executable, str(path)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=_SCRIPT_TIMEOUT,
|
|
cwd=str(path.parent),
|
|
)
|
|
stdout = (result.stdout or "").strip()
|
|
stderr = (result.stderr or "").strip()
|
|
|
|
if result.returncode != 0:
|
|
parts = [f"Script exited with code {result.returncode}"]
|
|
if stderr:
|
|
parts.append(f"stderr:\n{stderr}")
|
|
if stdout:
|
|
parts.append(f"stdout:\n{stdout}")
|
|
return False, "\n".join(parts)
|
|
|
|
# Redact any secrets that may appear in script output before
|
|
# they are injected into the LLM prompt context.
|
|
try:
|
|
from agent.redact import redact_sensitive_text
|
|
stdout = redact_sensitive_text(stdout)
|
|
except Exception:
|
|
pass
|
|
return True, stdout
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return False, f"Script timed out after {_SCRIPT_TIMEOUT}s: {path}"
|
|
except Exception as exc:
|
|
return False, f"Script execution failed: {exc}"
|
|
|
|
|
|
def _build_job_prompt(job: dict) -> str:
|
|
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
|
|
prompt = job.get("prompt", "")
|
|
skills = job.get("skills")
|
|
|
|
# Run data-collection script if configured, inject output as context.
|
|
script_path = job.get("script")
|
|
if script_path:
|
|
success, script_output = _run_job_script(script_path)
|
|
if success:
|
|
if script_output:
|
|
prompt = (
|
|
"## Script Output\n"
|
|
"The following data was collected by a pre-run script. "
|
|
"Use it as context for your analysis.\n\n"
|
|
f"```\n{script_output}\n```\n\n"
|
|
f"{prompt}"
|
|
)
|
|
else:
|
|
prompt = (
|
|
"[Script ran successfully but produced no output.]\n\n"
|
|
f"{prompt}"
|
|
)
|
|
else:
|
|
prompt = (
|
|
"## Script Error\n"
|
|
"The data-collection script failed. Report this to the user.\n\n"
|
|
f"```\n{script_output}\n```\n\n"
|
|
f"{prompt}"
|
|
)
|
|
|
|
# Always prepend cron execution guidance so the agent knows how
|
|
# delivery works and can suppress delivery when appropriate.
|
|
cron_hint = (
|
|
"[SYSTEM: You are running as a scheduled cron job. "
|
|
"DELIVERY: Your final response will be automatically delivered "
|
|
"to the user — do NOT use send_message or try to deliver "
|
|
"the output yourself. Just produce your report/output as your "
|
|
"final response and the system handles the rest. "
|
|
"SILENT: If there is genuinely nothing new to report, respond "
|
|
"with exactly \"[SILENT]\" (nothing else) to suppress delivery. "
|
|
"Never combine [SILENT] with content — either report your "
|
|
"findings normally, or say [SILENT] and nothing more. "
|
|
"SCRIPT_FAILURE: If an external command or script you ran "
|
|
"failed (timeout, crash, connection error, non-zero exit), you MUST "
|
|
"respond with "
|
|
"\"[SCRIPT_FAILED]: <one-line reason>\" as the FIRST LINE of your "
|
|
"response. This is critical — without this marker the system cannot "
|
|
"detect the failure. Examples: "
|
|
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
|
|
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
|
|
)
|
|
prompt = cron_hint + prompt
|
|
if skills is None:
|
|
legacy = job.get("skill")
|
|
skills = [legacy] if legacy else []
|
|
|
|
skill_names = [str(name).strip() for name in skills if str(name).strip()]
|
|
if not skill_names:
|
|
return prompt
|
|
|
|
from tools.skills_tool import skill_view
|
|
|
|
parts = []
|
|
skipped: list[str] = []
|
|
for skill_name in skill_names:
|
|
loaded = json.loads(skill_view(skill_name))
|
|
if not loaded.get("success"):
|
|
error = loaded.get("error") or f"Failed to load skill '{skill_name}'"
|
|
logger.warning("Cron job '%s': skill not found, skipping — %s", job.get("name", job.get("id")), error)
|
|
skipped.append(skill_name)
|
|
continue
|
|
|
|
content = str(loaded.get("content") or "").strip()
|
|
if parts:
|
|
parts.append("")
|
|
parts.extend(
|
|
[
|
|
f'[SYSTEM: The user has invoked the "{skill_name}" skill, indicating they want you to follow its instructions. The full skill content is loaded below.]',
|
|
"",
|
|
content,
|
|
]
|
|
)
|
|
|
|
if skipped:
|
|
notice = (
|
|
f"[SYSTEM: The following skill(s) were listed for this job but could not be found "
|
|
f"and were skipped: {', '.join(skipped)}. "
|
|
f"Start your response with a brief notice so the user is aware, e.g.: "
|
|
f"'⚠️ Skill(s) not found and skipped: {', '.join(skipped)}']"
|
|
)
|
|
parts.insert(0, notice)
|
|
|
|
if prompt:
|
|
parts.extend(["", f"The user has provided the following instruction alongside the skill invocation: {prompt}"])
|
|
return "\n".join(parts)
|
|
|
|
|
|
# Regex patterns for local service references that fail on cloud endpoints
|
|
_CLOUD_INCOMPATIBLE_PATTERNS = [
|
|
(re.compile(r"\b[Cc]heck\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up|available)", re.IGNORECASE),
|
|
"Verify system services are healthy using available tools"),
|
|
(re.compile(r"\b[Vv]erify\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up)", re.IGNORECASE),
|
|
"Verify system services are healthy using available tools"),
|
|
(re.compile(r"\bcurl\s+localhost:\d+", re.IGNORECASE),
|
|
"use available tools to check service health"),
|
|
(re.compile(r"\bcurl\s+127\.0\.0\.1:\d+", re.IGNORECASE),
|
|
"use available tools to check service health"),
|
|
(re.compile(r"\bpoll\s+localhost", re.IGNORECASE),
|
|
"check service health via available tools"),
|
|
]
|
|
|
|
|
|
def _rewrite_cloud_incompatible_prompt(prompt: str, base_url: str) -> str:
|
|
"""Rewrite prompt instructions that assume local service access when running on cloud.
|
|
|
|
When a cron job runs on a cloud inference endpoint (Nous, OpenRouter, Anthropic),
|
|
instructions to "Check Ollama" or "curl localhost:11434" are impossible.
|
|
Instead of just warning, this rewrites the instruction to a cloud-compatible
|
|
equivalent that the agent can actually execute.
|
|
|
|
Returns the (possibly rewritten) prompt.
|
|
"""
|
|
try:
|
|
from agent.model_metadata import is_local_endpoint
|
|
except ImportError:
|
|
return prompt
|
|
|
|
if is_local_endpoint(base_url or ""):
|
|
return prompt # Local — no rewrite needed
|
|
|
|
rewritten = prompt
|
|
for pattern, replacement in _CLOUD_INCOMPATIBLE_PATTERNS:
|
|
rewritten = pattern.sub(replacement, rewritten)
|
|
|
|
if rewritten != prompt:
|
|
rewritten = (
|
|
"[NOTE: Some instructions were adjusted for cloud execution. "
|
|
"Local service checks were rewritten to use available tools.]
|
|
|
|
"
|
|
+ rewritten
|
|
)
|
|
|
|
return rewritten
|
|
|
|
|
|
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|
"""
|
|
Execute a single cron job.
|
|
|
|
Returns:
|
|
Tuple of (success, full_output_doc, final_response, error_message)
|
|
"""
|
|
# Deploy sync guard — fail fast on first job if the installed
|
|
# AIAgent.__init__ is missing params the scheduler expects.
|
|
_validate_agent_interface()
|
|
|
|
from run_agent import AIAgent
|
|
|
|
# Initialize SQLite session store so cron job messages are persisted
|
|
# and discoverable via session_search (same pattern as gateway/run.py).
|
|
_session_db = None
|
|
try:
|
|
from hermes_state import SessionDB
|
|
_session_db = SessionDB()
|
|
except Exception as e:
|
|
logger.debug("Job '%s': SQLite session store not available: %s", job.get("id", "?"), e)
|
|
|
|
job_id = job["id"]
|
|
job_name = job["name"]
|
|
prompt = _build_job_prompt(job)
|
|
origin = _resolve_origin(job)
|
|
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
|
|
logger.info("Prompt: %s", prompt[:100])
|
|
|
|
try:
|
|
# Inject origin context so the agent's send_message tool knows the chat.
|
|
# Must be INSIDE the try block so the finally cleanup always runs.
|
|
if origin:
|
|
os.environ["HERMES_SESSION_PLATFORM"] = origin["platform"]
|
|
os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"])
|
|
if origin.get("chat_name"):
|
|
os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"]
|
|
# Re-read .env and config.yaml fresh every run so provider/key
|
|
# changes take effect without a gateway restart.
|
|
from dotenv import load_dotenv
|
|
try:
|
|
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="latin-1")
|
|
|
|
delivery_target = _resolve_delivery_target(job)
|
|
if delivery_target:
|
|
os.environ["HERMES_CRON_AUTO_DELIVER_PLATFORM"] = delivery_target["platform"]
|
|
os.environ["HERMES_CRON_AUTO_DELIVER_CHAT_ID"] = str(delivery_target["chat_id"])
|
|
if delivery_target.get("thread_id") is not None:
|
|
os.environ["HERMES_CRON_AUTO_DELIVER_THREAD_ID"] = str(delivery_target["thread_id"])
|
|
|
|
model = job.get("model") or os.getenv("HERMES_MODEL") or ""
|
|
|
|
# Load config.yaml for model, reasoning, prefill, toolsets, provider routing
|
|
_cfg = {}
|
|
try:
|
|
import yaml
|
|
_cfg_path = str(_hermes_home / "config.yaml")
|
|
if os.path.exists(_cfg_path):
|
|
with open(_cfg_path) as _f:
|
|
_cfg = yaml.safe_load(_f) or {}
|
|
_model_cfg = _cfg.get("model", {})
|
|
if not job.get("model"):
|
|
if isinstance(_model_cfg, str):
|
|
model = _model_cfg
|
|
elif isinstance(_model_cfg, dict):
|
|
model = _model_cfg.get("default", model)
|
|
except Exception as e:
|
|
logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e)
|
|
|
|
# Reasoning config from env or config.yaml
|
|
from hermes_constants import parse_reasoning_effort
|
|
|
|
# Time-aware cron model routing — override model during high-error windows
|
|
try:
|
|
from agent.smart_model_routing import resolve_cron_model
|
|
_cron_routing_cfg = (_cfg.get("cron_model_routing") or {})
|
|
_cron_route = resolve_cron_model(model, _cron_routing_cfg)
|
|
if _cron_route["overridden"]:
|
|
_original_model = model
|
|
model = _cron_route["model"]
|
|
logger.info(
|
|
"Job '%s': cron model override %s -> %s (%s)",
|
|
job_id, _original_model, model, _cron_route["reason"],
|
|
)
|
|
except Exception as _e:
|
|
logger.debug("Job '%s': cron model routing skipped: %s", job_id, _e)
|
|
|
|
effort = os.getenv("HERMES_REASONING_EFFORT", "")
|
|
if not effort:
|
|
effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()
|
|
reasoning_config = parse_reasoning_effort(effort)
|
|
|
|
# Prefill messages from env or config.yaml
|
|
prefill_messages = None
|
|
prefill_file = os.getenv("HERMES_PREFILL_MESSAGES_FILE", "") or _cfg.get("prefill_messages_file", "")
|
|
if prefill_file:
|
|
import json as _json
|
|
pfpath = Path(prefill_file).expanduser()
|
|
if not pfpath.is_absolute():
|
|
pfpath = _hermes_home / pfpath
|
|
if pfpath.exists():
|
|
try:
|
|
with open(pfpath, "r", encoding="utf-8") as _pf:
|
|
prefill_messages = _json.load(_pf)
|
|
if not isinstance(prefill_messages, list):
|
|
prefill_messages = None
|
|
except Exception as e:
|
|
logger.warning("Job '%s': failed to parse prefill messages file '%s': %s", job_id, pfpath, e)
|
|
prefill_messages = None
|
|
|
|
# Max iterations
|
|
max_iterations = _cfg.get("agent", {}).get("max_turns") or _cfg.get("max_turns") or 90
|
|
|
|
# Provider routing
|
|
pr = _cfg.get("provider_routing", {})
|
|
smart_routing = _cfg.get("smart_model_routing", {}) or {}
|
|
|
|
from hermes_cli.runtime_provider import (
|
|
resolve_runtime_provider,
|
|
format_runtime_provider_error,
|
|
)
|
|
try:
|
|
runtime_kwargs = {
|
|
"requested": job.get("provider") or os.getenv("HERMES_INFERENCE_PROVIDER"),
|
|
}
|
|
if job.get("base_url"):
|
|
runtime_kwargs["explicit_base_url"] = job.get("base_url")
|
|
runtime = resolve_runtime_provider(**runtime_kwargs)
|
|
except Exception as exc:
|
|
message = format_runtime_provider_error(exc)
|
|
raise RuntimeError(message) from exc
|
|
|
|
from agent.smart_model_routing import resolve_turn_route
|
|
turn_route = resolve_turn_route(
|
|
prompt,
|
|
smart_routing,
|
|
{
|
|
"model": model,
|
|
"api_key": runtime.get("api_key"),
|
|
"base_url": runtime.get("base_url"),
|
|
"provider": runtime.get("provider"),
|
|
"api_mode": runtime.get("api_mode"),
|
|
"command": runtime.get("command"),
|
|
"args": list(runtime.get("args") or []),
|
|
},
|
|
)
|
|
|
|
# Build disabled toolsets — always exclude cronjob/messaging/clarify
|
|
# for cron sessions. When the runtime endpoint is cloud (not local),
|
|
# also disable terminal so the agent does not attempt SSH or shell
|
|
# commands that require local infrastructure (keys, filesystem).
|
|
# Jobs that declare requires_local_infra=true also get terminal
|
|
# disabled on cloud endpoints regardless of this check. #379
|
|
_cron_disabled = ["cronjob", "messaging", "clarify"]
|
|
_runtime_base_url = turn_route["runtime"].get("base_url", "")
|
|
_is_cloud = not is_local_endpoint(_runtime_base_url)
|
|
if _is_cloud:
|
|
_cron_disabled.append("terminal")
|
|
logger.info(
|
|
"Job '%s': cloud provider detected (%s), disabling terminal toolset",
|
|
job_name,
|
|
turn_route["runtime"].get("provider", "unknown"),
|
|
)
|
|
if job.get("requires_local_infra") and _is_cloud:
|
|
logger.warning(
|
|
"Job '%s': requires_local_infra=true but running on cloud provider — "
|
|
"terminal-dependent steps will fail gracefully",
|
|
job_name,
|
|
)
|
|
|
|
_agent_kwargs = _safe_agent_kwargs({
|
|
"model": turn_route["model"],
|
|
"api_key": turn_route["runtime"].get("api_key"),
|
|
"base_url": turn_route["runtime"].get("base_url"),
|
|
"provider": turn_route["runtime"].get("provider"),
|
|
"api_mode": turn_route["runtime"].get("api_mode"),
|
|
"acp_command": turn_route["runtime"].get("command"),
|
|
"acp_args": list(turn_route["runtime"].get("args") or []),
|
|
"max_iterations": max_iterations,
|
|
"reasoning_config": reasoning_config,
|
|
"prefill_messages": prefill_messages,
|
|
"providers_allowed": pr.get("only"),
|
|
"providers_ignored": pr.get("ignore"),
|
|
"providers_order": pr.get("order"),
|
|
"provider_sort": pr.get("sort"),
|
|
"disabled_toolsets": _cron_disabled,
|
|
"tool_choice": "required",
|
|
"quiet_mode": True,
|
|
"skip_memory": True, # Cron system prompts would corrupt user representations
|
|
"platform": "cron",
|
|
"session_id": _cron_session_id,
|
|
"session_db": _session_db,
|
|
})
|
|
agent = AIAgent(**_agent_kwargs)
|
|
|
|
# Run the agent with an *inactivity*-based timeout: the job can run
|
|
# for hours if it's actively calling tools / receiving stream tokens,
|
|
# but a hung API call or stuck tool with no activity for the configured
|
|
# duration is caught and killed. Default 600s (10 min inactivity);
|
|
# override via HERMES_CRON_TIMEOUT env var. 0 = unlimited.
|
|
#
|
|
# Uses the agent's built-in activity tracker (updated by
|
|
# _touch_activity() on every tool call, API call, and stream delta).
|
|
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
|
|
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
|
|
_POLL_INTERVAL = 5.0
|
|
|
|
# Guard against interpreter shutdown: ThreadPoolExecutor.submit()
|
|
# raises RuntimeError("cannot schedule new futures after interpreter
|
|
# shutdown") when Python is finalizing (e.g. gateway restart races).
|
|
# Fall back to synchronous execution so the job at least attempts.
|
|
_cron_pool = None
|
|
try:
|
|
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
|
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
|
|
except RuntimeError:
|
|
logger.warning(
|
|
"Job '%s': ThreadPoolExecutor unavailable (interpreter shutdown?) "
|
|
"— falling back to synchronous execution",
|
|
job_name,
|
|
)
|
|
if _cron_pool is not None:
|
|
try:
|
|
_cron_pool.shutdown(wait=False)
|
|
except Exception:
|
|
pass
|
|
_cron_pool = None
|
|
result = agent.run_conversation(prompt)
|
|
final_response = result.get("final_response", "") or ""
|
|
logged_response = final_response if final_response else "(No response generated)"
|
|
output = f"""# Cron Job: {job_name}
|
|
|
|
**Job ID:** {job_id}
|
|
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Schedule:** {job.get('schedule_display', 'N/A')}
|
|
|
|
## Prompt
|
|
|
|
{prompt}
|
|
|
|
## Response
|
|
|
|
{logged_response}
|
|
"""
|
|
logger.info("Job '%s' completed (sync fallback)", job_name)
|
|
return True, output, final_response, None
|
|
|
|
_inactivity_timeout = False
|
|
try:
|
|
if _cron_inactivity_limit is None:
|
|
# Unlimited — just wait for the result.
|
|
result = _cron_future.result()
|
|
else:
|
|
result = None
|
|
while True:
|
|
done, _ = concurrent.futures.wait(
|
|
{_cron_future}, timeout=_POLL_INTERVAL,
|
|
)
|
|
if done:
|
|
result = _cron_future.result()
|
|
break
|
|
# Agent still running — check inactivity.
|
|
_idle_secs = 0.0
|
|
if hasattr(agent, "get_activity_summary"):
|
|
try:
|
|
_act = agent.get_activity_summary()
|
|
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
|
except Exception:
|
|
pass
|
|
if _idle_secs >= _cron_inactivity_limit:
|
|
_inactivity_timeout = True
|
|
break
|
|
except Exception:
|
|
if _cron_pool is not None:
|
|
_cron_pool.shutdown(wait=False, cancel_futures=True)
|
|
raise
|
|
finally:
|
|
if _cron_pool is not None:
|
|
_cron_pool.shutdown(wait=False)
|
|
|
|
if _inactivity_timeout:
|
|
# Build diagnostic summary from the agent's activity tracker.
|
|
_activity = {}
|
|
if hasattr(agent, "get_activity_summary"):
|
|
try:
|
|
_activity = agent.get_activity_summary()
|
|
except Exception:
|
|
pass
|
|
_last_desc = _activity.get("last_activity_desc", "unknown")
|
|
_secs_ago = _activity.get("seconds_since_activity", 0)
|
|
_cur_tool = _activity.get("current_tool")
|
|
_iter_n = _activity.get("api_call_count", 0)
|
|
_iter_max = _activity.get("max_iterations", 0)
|
|
|
|
logger.error(
|
|
"Job '%s' idle for %.0fs (inactivity limit %.0fs) "
|
|
"| last_activity=%s | iteration=%s/%s | tool=%s",
|
|
job_name, _secs_ago, _cron_inactivity_limit,
|
|
_last_desc, _iter_n, _iter_max,
|
|
_cur_tool or "none",
|
|
)
|
|
if hasattr(agent, "interrupt"):
|
|
agent.interrupt("Cron job timed out (inactivity)")
|
|
raise TimeoutError(
|
|
f"Cron job '{job_name}' idle for "
|
|
f"{int(_secs_ago)}s (limit {int(_cron_inactivity_limit)}s) "
|
|
f"— last activity: {_last_desc}"
|
|
)
|
|
|
|
final_response = result.get("final_response", "") or ""
|
|
# Use a separate variable for log display; keep final_response clean
|
|
# for delivery logic (empty response = no delivery).
|
|
logged_response = final_response if final_response else "(No response generated)"
|
|
|
|
# Check for script failure — both explicit [SCRIPT_FAILED] marker
|
|
# and heuristic detection for failures described in natural language.
|
|
_script_failed_reason = _detect_script_failure(final_response)
|
|
if _script_failed_reason is not None:
|
|
logger.warning(
|
|
"Job '%s': agent reported script failure — %s",
|
|
job_name, _script_failed_reason,
|
|
)
|
|
output = f"""# Cron Job: {job_name} (SCRIPT FAILED)
|
|
|
|
**Job ID:** {job_id}
|
|
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Schedule:** {job.get('schedule_display', 'N/A')}
|
|
|
|
## Prompt
|
|
|
|
{prompt}
|
|
|
|
## Response
|
|
|
|
{logged_response}
|
|
"""
|
|
return False, output, final_response, _script_failed_reason
|
|
|
|
output = f"""# Cron Job: {job_name}
|
|
|
|
**Job ID:** {job_id}
|
|
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Schedule:** {job.get('schedule_display', 'N/A')}
|
|
|
|
## Prompt
|
|
|
|
{prompt}
|
|
|
|
## Response
|
|
|
|
{logged_response}
|
|
"""
|
|
|
|
logger.info("Job '%s' completed successfully", job_name)
|
|
return True, output, final_response, None
|
|
|
|
except Exception as e:
|
|
error_msg = f"{type(e).__name__}: {str(e)}"
|
|
logger.exception("Job '%s' failed: %s", job_name, error_msg)
|
|
|
|
output = f"""# Cron Job: {job_name} (FAILED)
|
|
|
|
**Job ID:** {job_id}
|
|
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Schedule:** {job.get('schedule_display', 'N/A')}
|
|
|
|
## Prompt
|
|
|
|
{prompt}
|
|
|
|
## Error
|
|
|
|
```
|
|
{error_msg}
|
|
```
|
|
"""
|
|
return False, output, "", error_msg
|
|
|
|
finally:
|
|
# Clean up injected env vars so they don't leak to other jobs
|
|
for key in (
|
|
"HERMES_SESSION_PLATFORM",
|
|
"HERMES_SESSION_CHAT_ID",
|
|
"HERMES_SESSION_CHAT_NAME",
|
|
"HERMES_CRON_AUTO_DELIVER_PLATFORM",
|
|
"HERMES_CRON_AUTO_DELIVER_CHAT_ID",
|
|
"HERMES_CRON_AUTO_DELIVER_THREAD_ID",
|
|
):
|
|
os.environ.pop(key, None)
|
|
if _session_db:
|
|
try:
|
|
_session_db.end_session(_cron_session_id, "cron_complete")
|
|
except (Exception, KeyboardInterrupt) as e:
|
|
logger.debug("Job '%s': failed to end session: %s", job_id, e)
|
|
try:
|
|
_session_db.close()
|
|
except (Exception, KeyboardInterrupt) as e:
|
|
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
|
|
|
|
|
|
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
|
"""
|
|
Check and run all due jobs.
|
|
|
|
Uses a file lock so only one tick runs at a time, even if the gateway's
|
|
in-process ticker and a standalone daemon or manual tick overlap.
|
|
|
|
Args:
|
|
verbose: Whether to print status messages
|
|
adapters: Optional dict mapping Platform → live adapter (from gateway)
|
|
loop: Optional asyncio event loop (from gateway) for live adapter sends
|
|
|
|
Returns:
|
|
Number of jobs executed (0 if another tick is already running)
|
|
"""
|
|
_LOCK_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Cross-platform file locking: fcntl on Unix, msvcrt on Windows
|
|
lock_fd = None
|
|
try:
|
|
lock_fd = open(_LOCK_FILE, "w")
|
|
if fcntl:
|
|
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
elif msvcrt:
|
|
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
|
|
except (OSError, IOError):
|
|
logger.debug("Tick skipped — another instance holds the lock")
|
|
if lock_fd is not None:
|
|
lock_fd.close()
|
|
return 0
|
|
|
|
try:
|
|
due_jobs = get_due_jobs()
|
|
|
|
if verbose and not due_jobs:
|
|
logger.info("%s - No jobs due", _hermes_now().strftime('%H:%M:%S'))
|
|
return 0
|
|
|
|
if verbose:
|
|
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
|
|
|
|
executed = 0
|
|
for job in due_jobs:
|
|
# If the interpreter is shutting down (e.g. gateway restart),
|
|
# stop processing immediately — ThreadPoolExecutor.submit()
|
|
# will raise RuntimeError for every remaining job.
|
|
if sys.is_finalizing():
|
|
logger.warning(
|
|
"Interpreter finalizing — skipping %d remaining job(s)",
|
|
len(due_jobs) - executed,
|
|
)
|
|
break
|
|
|
|
try:
|
|
# For recurring jobs (cron/interval), advance next_run_at to the
|
|
# next future occurrence BEFORE execution. This way, if the
|
|
# process crashes mid-run, the job won't re-fire on restart.
|
|
# One-shot jobs are left alone so they can retry on restart.
|
|
advance_next_run(job["id"])
|
|
|
|
success, output, final_response, error = run_job(job)
|
|
|
|
output_file = save_job_output(job["id"], output)
|
|
if verbose:
|
|
logger.info("Output saved to: %s", output_file)
|
|
|
|
# Deliver the final response to the origin/target chat.
|
|
# If the agent responded with [SILENT], skip delivery (but
|
|
# output is already saved above). Failed jobs always deliver.
|
|
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
|
|
should_deliver = bool(deliver_content)
|
|
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
|
|
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
|
|
should_deliver = False
|
|
|
|
if should_deliver:
|
|
try:
|
|
_deliver_result(job, deliver_content, adapters=adapters, loop=loop)
|
|
except Exception as de:
|
|
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
|
|
|
mark_job_run(job["id"], success, error)
|
|
executed += 1
|
|
|
|
except Exception as e:
|
|
logger.error("Error processing job %s: %s", job['id'], e)
|
|
mark_job_run(job["id"], False, str(e))
|
|
|
|
return executed
|
|
finally:
|
|
if fcntl:
|
|
fcntl.flock(lock_fd, fcntl.LOCK_UN)
|
|
elif msvcrt:
|
|
try:
|
|
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
|
|
except (OSError, IOError):
|
|
pass
|
|
lock_fd.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
tick(verbose=True)
|