Add an optional 'script' parameter to cron jobs that references a Python script. The script runs before each agent turn, and its stdout is injected into the prompt as context. This enables stateful monitoring — the script handles data collection and change detection, the LLM analyzes and reports. - cron/jobs.py: add script field to create_job(), stored in job dict - cron/scheduler.py: add _run_job_script() executor with timeout handling, inject script output/errors into _build_job_prompt() - tools/cronjob_tools.py: add script to tool schema, create/update handlers, _format_job display - hermes_cli/cron.py: add --script to create/edit, display in list/edit output - hermes_cli/main.py: add --script argparse for cron create/edit subcommands - tests/cron/test_cron_script.py: 20 tests covering job CRUD, script execution, path resolution, error handling, prompt injection, tool API Script paths can be absolute or relative (resolved against ~/.hermes/scripts/). Scripts run with a 120s timeout. Failures are injected as error context so the LLM can report the problem. Empty string clears an attached script.
754 lines
25 KiB
Python
754 lines
25 KiB
Python
"""
|
|
Cron job storage and management.
|
|
|
|
Jobs are stored in ~/.hermes/cron/jobs.json
|
|
Output is saved to ~/.hermes/cron/output/{job_id}/{timestamp}.md
|
|
"""
|
|
|
|
import copy
|
|
import json
|
|
import logging
|
|
import tempfile
|
|
import os
|
|
import re
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from hermes_constants import get_hermes_home
|
|
from typing import Optional, Dict, List, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from hermes_time import now as _hermes_now
|
|
|
|
try:
|
|
from croniter import croniter
|
|
HAS_CRONITER = True
|
|
except ImportError:
|
|
HAS_CRONITER = False
|
|
|
|
# =============================================================================
|
|
# Configuration
|
|
# =============================================================================
|
|
|
|
HERMES_DIR = get_hermes_home()
|
|
CRON_DIR = HERMES_DIR / "cron"
|
|
JOBS_FILE = CRON_DIR / "jobs.json"
|
|
OUTPUT_DIR = CRON_DIR / "output"
|
|
ONESHOT_GRACE_SECONDS = 120
|
|
|
|
|
|
def _normalize_skill_list(skill: Optional[str] = None, skills: Optional[Any] = None) -> List[str]:
|
|
"""Normalize legacy/single-skill and multi-skill inputs into a unique ordered list."""
|
|
if skills is None:
|
|
raw_items = [skill] if skill else []
|
|
elif isinstance(skills, str):
|
|
raw_items = [skills]
|
|
else:
|
|
raw_items = list(skills)
|
|
|
|
normalized: List[str] = []
|
|
for item in raw_items:
|
|
text = str(item or "").strip()
|
|
if text and text not in normalized:
|
|
normalized.append(text)
|
|
return normalized
|
|
|
|
|
|
def _apply_skill_fields(job: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Return a job dict with canonical `skills` and legacy `skill` fields aligned."""
|
|
normalized = dict(job)
|
|
skills = _normalize_skill_list(normalized.get("skill"), normalized.get("skills"))
|
|
normalized["skills"] = skills
|
|
normalized["skill"] = skills[0] if skills else None
|
|
return normalized
|
|
|
|
|
|
def _secure_dir(path: Path):
|
|
"""Set directory to owner-only access (0700). No-op on Windows."""
|
|
try:
|
|
os.chmod(path, 0o700)
|
|
except (OSError, NotImplementedError):
|
|
pass # Windows or other platforms where chmod is not supported
|
|
|
|
|
|
def _secure_file(path: Path):
|
|
"""Set file to owner-only read/write (0600). No-op on Windows."""
|
|
try:
|
|
if path.exists():
|
|
os.chmod(path, 0o600)
|
|
except (OSError, NotImplementedError):
|
|
pass
|
|
|
|
|
|
def ensure_dirs():
|
|
"""Ensure cron directories exist with secure permissions."""
|
|
CRON_DIR.mkdir(parents=True, exist_ok=True)
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
_secure_dir(CRON_DIR)
|
|
_secure_dir(OUTPUT_DIR)
|
|
|
|
|
|
# =============================================================================
|
|
# Schedule Parsing
|
|
# =============================================================================
|
|
|
|
def parse_duration(s: str) -> int:
|
|
"""
|
|
Parse duration string into minutes.
|
|
|
|
Examples:
|
|
"30m" → 30
|
|
"2h" → 120
|
|
"1d" → 1440
|
|
"""
|
|
s = s.strip().lower()
|
|
match = re.match(r'^(\d+)\s*(m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days)$', s)
|
|
if not match:
|
|
raise ValueError(f"Invalid duration: '{s}'. Use format like '30m', '2h', or '1d'")
|
|
|
|
value = int(match.group(1))
|
|
unit = match.group(2)[0] # First char: m, h, or d
|
|
|
|
multipliers = {'m': 1, 'h': 60, 'd': 1440}
|
|
return value * multipliers[unit]
|
|
|
|
|
|
def parse_schedule(schedule: str) -> Dict[str, Any]:
|
|
"""
|
|
Parse schedule string into structured format.
|
|
|
|
Returns dict with:
|
|
- kind: "once" | "interval" | "cron"
|
|
- For "once": "run_at" (ISO timestamp)
|
|
- For "interval": "minutes" (int)
|
|
- For "cron": "expr" (cron expression)
|
|
|
|
Examples:
|
|
"30m" → once in 30 minutes
|
|
"2h" → once in 2 hours
|
|
"every 30m" → recurring every 30 minutes
|
|
"every 2h" → recurring every 2 hours
|
|
"0 9 * * *" → cron expression
|
|
"2026-02-03T14:00" → once at timestamp
|
|
"""
|
|
schedule = schedule.strip()
|
|
original = schedule
|
|
schedule_lower = schedule.lower()
|
|
|
|
# "every X" pattern → recurring interval
|
|
if schedule_lower.startswith("every "):
|
|
duration_str = schedule[6:].strip()
|
|
minutes = parse_duration(duration_str)
|
|
return {
|
|
"kind": "interval",
|
|
"minutes": minutes,
|
|
"display": f"every {minutes}m"
|
|
}
|
|
|
|
# Check for cron expression (5 or 6 space-separated fields)
|
|
# Cron fields: minute hour day month weekday [year]
|
|
parts = schedule.split()
|
|
if len(parts) >= 5 and all(
|
|
re.match(r'^[\d\*\-,/]+$', p) for p in parts[:5]
|
|
):
|
|
if not HAS_CRONITER:
|
|
raise ValueError("Cron expressions require 'croniter' package. Install with: pip install croniter")
|
|
# Validate cron expression
|
|
try:
|
|
croniter(schedule)
|
|
except Exception as e:
|
|
raise ValueError(f"Invalid cron expression '{schedule}': {e}")
|
|
return {
|
|
"kind": "cron",
|
|
"expr": schedule,
|
|
"display": schedule
|
|
}
|
|
|
|
# ISO timestamp (contains T or looks like date)
|
|
if 'T' in schedule or re.match(r'^\d{4}-\d{2}-\d{2}', schedule):
|
|
try:
|
|
# Parse and validate
|
|
dt = datetime.fromisoformat(schedule.replace('Z', '+00:00'))
|
|
# Make naive timestamps timezone-aware at parse time so the stored
|
|
# value doesn't depend on the system timezone matching at check time.
|
|
if dt.tzinfo is None:
|
|
dt = dt.astimezone() # Interpret as local timezone
|
|
return {
|
|
"kind": "once",
|
|
"run_at": dt.isoformat(),
|
|
"display": f"once at {dt.strftime('%Y-%m-%d %H:%M')}"
|
|
}
|
|
except ValueError as e:
|
|
raise ValueError(f"Invalid timestamp '{schedule}': {e}")
|
|
|
|
# Duration like "30m", "2h", "1d" → one-shot from now
|
|
try:
|
|
minutes = parse_duration(schedule)
|
|
run_at = _hermes_now() + timedelta(minutes=minutes)
|
|
return {
|
|
"kind": "once",
|
|
"run_at": run_at.isoformat(),
|
|
"display": f"once in {original}"
|
|
}
|
|
except ValueError:
|
|
pass
|
|
|
|
raise ValueError(
|
|
f"Invalid schedule '{original}'. Use:\n"
|
|
f" - Duration: '30m', '2h', '1d' (one-shot)\n"
|
|
f" - Interval: 'every 30m', 'every 2h' (recurring)\n"
|
|
f" - Cron: '0 9 * * *' (cron expression)\n"
|
|
f" - Timestamp: '2026-02-03T14:00:00' (one-shot at time)"
|
|
)
|
|
|
|
|
|
def _ensure_aware(dt: datetime) -> datetime:
|
|
"""Return a timezone-aware datetime in Hermes configured timezone.
|
|
|
|
Backward compatibility:
|
|
- Older stored timestamps may be naive.
|
|
- Naive values are interpreted as *system-local wall time* (the timezone
|
|
`datetime.now()` used when they were created), then converted to the
|
|
configured Hermes timezone.
|
|
|
|
This preserves relative ordering for legacy naive timestamps across
|
|
timezone changes and avoids false not-due results.
|
|
"""
|
|
target_tz = _hermes_now().tzinfo
|
|
if dt.tzinfo is None:
|
|
local_tz = datetime.now().astimezone().tzinfo
|
|
return dt.replace(tzinfo=local_tz).astimezone(target_tz)
|
|
return dt.astimezone(target_tz)
|
|
|
|
|
|
def _recoverable_oneshot_run_at(
|
|
schedule: Dict[str, Any],
|
|
now: datetime,
|
|
*,
|
|
last_run_at: Optional[str] = None,
|
|
) -> Optional[str]:
|
|
"""Return a one-shot run time if it is still eligible to fire.
|
|
|
|
One-shot jobs get a small grace window so jobs created a few seconds after
|
|
their requested minute still run on the next tick. Once a one-shot has
|
|
already run, it is never eligible again.
|
|
"""
|
|
if schedule.get("kind") != "once":
|
|
return None
|
|
if last_run_at:
|
|
return None
|
|
|
|
run_at = schedule.get("run_at")
|
|
if not run_at:
|
|
return None
|
|
|
|
run_at_dt = _ensure_aware(datetime.fromisoformat(run_at))
|
|
if run_at_dt >= now - timedelta(seconds=ONESHOT_GRACE_SECONDS):
|
|
return run_at
|
|
return None
|
|
|
|
|
|
def _compute_grace_seconds(schedule: dict) -> int:
|
|
"""Compute how late a job can be and still catch up instead of fast-forwarding.
|
|
|
|
Uses half the schedule period, clamped between 120 seconds and 2 hours.
|
|
This ensures daily jobs can catch up if missed by up to 2 hours,
|
|
while frequent jobs (every 5-10 min) still fast-forward quickly.
|
|
"""
|
|
MIN_GRACE = 120
|
|
MAX_GRACE = 7200 # 2 hours
|
|
|
|
kind = schedule.get("kind")
|
|
|
|
if kind == "interval":
|
|
period_seconds = schedule.get("minutes", 1) * 60
|
|
grace = period_seconds // 2
|
|
return max(MIN_GRACE, min(grace, MAX_GRACE))
|
|
|
|
if kind == "cron" and HAS_CRONITER:
|
|
try:
|
|
now = _hermes_now()
|
|
cron = croniter(schedule["expr"], now)
|
|
first = cron.get_next(datetime)
|
|
second = cron.get_next(datetime)
|
|
period_seconds = int((second - first).total_seconds())
|
|
grace = period_seconds // 2
|
|
return max(MIN_GRACE, min(grace, MAX_GRACE))
|
|
except Exception:
|
|
pass
|
|
|
|
return MIN_GRACE
|
|
|
|
|
|
def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
Compute the next run time for a schedule.
|
|
|
|
Returns ISO timestamp string, or None if no more runs.
|
|
"""
|
|
now = _hermes_now()
|
|
|
|
if schedule["kind"] == "once":
|
|
return _recoverable_oneshot_run_at(schedule, now, last_run_at=last_run_at)
|
|
|
|
elif schedule["kind"] == "interval":
|
|
minutes = schedule["minutes"]
|
|
if last_run_at:
|
|
# Next run is last_run + interval
|
|
last = _ensure_aware(datetime.fromisoformat(last_run_at))
|
|
next_run = last + timedelta(minutes=minutes)
|
|
else:
|
|
# First run is now + interval
|
|
next_run = now + timedelta(minutes=minutes)
|
|
return next_run.isoformat()
|
|
|
|
elif schedule["kind"] == "cron":
|
|
if not HAS_CRONITER:
|
|
return None
|
|
cron = croniter(schedule["expr"], now)
|
|
next_run = cron.get_next(datetime)
|
|
return next_run.isoformat()
|
|
|
|
return None
|
|
|
|
|
|
# =============================================================================
|
|
# Job CRUD Operations
|
|
# =============================================================================
|
|
|
|
def load_jobs() -> List[Dict[str, Any]]:
|
|
"""Load all jobs from storage."""
|
|
ensure_dirs()
|
|
if not JOBS_FILE.exists():
|
|
return []
|
|
|
|
try:
|
|
with open(JOBS_FILE, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
return data.get("jobs", [])
|
|
except json.JSONDecodeError:
|
|
# Retry with strict=False to handle bare control chars in string values
|
|
try:
|
|
with open(JOBS_FILE, 'r', encoding='utf-8') as f:
|
|
data = json.loads(f.read(), strict=False)
|
|
jobs = data.get("jobs", [])
|
|
if jobs:
|
|
# Auto-repair: rewrite with proper escaping
|
|
save_jobs(jobs)
|
|
logger.warning("Auto-repaired jobs.json (had invalid control characters)")
|
|
return jobs
|
|
except Exception:
|
|
return []
|
|
except IOError:
|
|
return []
|
|
|
|
|
|
def save_jobs(jobs: List[Dict[str, Any]]):
|
|
"""Save all jobs to storage."""
|
|
ensure_dirs()
|
|
fd, tmp_path = tempfile.mkstemp(dir=str(JOBS_FILE.parent), suffix='.tmp', prefix='.jobs_')
|
|
try:
|
|
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
|
json.dump({"jobs": jobs, "updated_at": _hermes_now().isoformat()}, f, indent=2)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp_path, JOBS_FILE)
|
|
_secure_file(JOBS_FILE)
|
|
except BaseException:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
|
|
def create_job(
|
|
prompt: str,
|
|
schedule: str,
|
|
name: Optional[str] = None,
|
|
repeat: Optional[int] = None,
|
|
deliver: Optional[str] = None,
|
|
origin: Optional[Dict[str, Any]] = None,
|
|
skill: Optional[str] = None,
|
|
skills: Optional[List[str]] = None,
|
|
model: Optional[str] = None,
|
|
provider: Optional[str] = None,
|
|
base_url: Optional[str] = None,
|
|
script: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a new cron job.
|
|
|
|
Args:
|
|
prompt: The prompt to run (must be self-contained, or a task instruction when skill is set)
|
|
schedule: Schedule string (see parse_schedule)
|
|
name: Optional friendly name
|
|
repeat: How many times to run (None = forever, 1 = once)
|
|
deliver: Where to deliver output ("origin", "local", "telegram", etc.)
|
|
origin: Source info where job was created (for "origin" delivery)
|
|
skill: Optional legacy single skill name to load before running the prompt
|
|
skills: Optional ordered list of skills to load before running the prompt
|
|
model: Optional per-job model override
|
|
provider: Optional per-job provider override
|
|
base_url: Optional per-job base URL override
|
|
script: Optional path to a Python script whose stdout is injected into the
|
|
prompt each run. The script runs before the agent turn, and its output
|
|
is prepended as context. Useful for data collection / change detection.
|
|
|
|
Returns:
|
|
The created job dict
|
|
"""
|
|
parsed_schedule = parse_schedule(schedule)
|
|
|
|
# Normalize repeat: treat 0 or negative values as None (infinite)
|
|
if repeat is not None and repeat <= 0:
|
|
repeat = None
|
|
|
|
# Auto-set repeat=1 for one-shot schedules if not specified
|
|
if parsed_schedule["kind"] == "once" and repeat is None:
|
|
repeat = 1
|
|
|
|
# Default delivery to origin if available, otherwise local
|
|
if deliver is None:
|
|
deliver = "origin" if origin else "local"
|
|
|
|
job_id = uuid.uuid4().hex[:12]
|
|
now = _hermes_now().isoformat()
|
|
|
|
normalized_skills = _normalize_skill_list(skill, skills)
|
|
normalized_model = str(model).strip() if isinstance(model, str) else None
|
|
normalized_provider = str(provider).strip() if isinstance(provider, str) else None
|
|
normalized_base_url = str(base_url).strip().rstrip("/") if isinstance(base_url, str) else None
|
|
normalized_model = normalized_model or None
|
|
normalized_provider = normalized_provider or None
|
|
normalized_base_url = normalized_base_url or None
|
|
normalized_script = str(script).strip() if isinstance(script, str) else None
|
|
normalized_script = normalized_script or None
|
|
|
|
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
|
|
job = {
|
|
"id": job_id,
|
|
"name": name or label_source[:50].strip(),
|
|
"prompt": prompt,
|
|
"skills": normalized_skills,
|
|
"skill": normalized_skills[0] if normalized_skills else None,
|
|
"model": normalized_model,
|
|
"provider": normalized_provider,
|
|
"base_url": normalized_base_url,
|
|
"script": normalized_script,
|
|
"schedule": parsed_schedule,
|
|
"schedule_display": parsed_schedule.get("display", schedule),
|
|
"repeat": {
|
|
"times": repeat, # None = forever
|
|
"completed": 0
|
|
},
|
|
"enabled": True,
|
|
"state": "scheduled",
|
|
"paused_at": None,
|
|
"paused_reason": None,
|
|
"created_at": now,
|
|
"next_run_at": compute_next_run(parsed_schedule),
|
|
"last_run_at": None,
|
|
"last_status": None,
|
|
"last_error": None,
|
|
# Delivery configuration
|
|
"deliver": deliver,
|
|
"origin": origin, # Tracks where job was created for "origin" delivery
|
|
}
|
|
|
|
jobs = load_jobs()
|
|
jobs.append(job)
|
|
save_jobs(jobs)
|
|
|
|
return job
|
|
|
|
|
|
def get_job(job_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get a job by ID."""
|
|
jobs = load_jobs()
|
|
for job in jobs:
|
|
if job["id"] == job_id:
|
|
return _apply_skill_fields(job)
|
|
return None
|
|
|
|
|
|
def list_jobs(include_disabled: bool = False) -> List[Dict[str, Any]]:
|
|
"""List all jobs, optionally including disabled ones."""
|
|
jobs = [_apply_skill_fields(j) for j in load_jobs()]
|
|
if not include_disabled:
|
|
jobs = [j for j in jobs if j.get("enabled", True)]
|
|
return jobs
|
|
|
|
|
|
def update_job(job_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""Update a job by ID, refreshing derived schedule fields when needed."""
|
|
jobs = load_jobs()
|
|
for i, job in enumerate(jobs):
|
|
if job["id"] != job_id:
|
|
continue
|
|
|
|
updated = _apply_skill_fields({**job, **updates})
|
|
schedule_changed = "schedule" in updates
|
|
|
|
if "skills" in updates or "skill" in updates:
|
|
normalized_skills = _normalize_skill_list(updated.get("skill"), updated.get("skills"))
|
|
updated["skills"] = normalized_skills
|
|
updated["skill"] = normalized_skills[0] if normalized_skills else None
|
|
|
|
if schedule_changed:
|
|
updated_schedule = updated["schedule"]
|
|
updated["schedule_display"] = updates.get(
|
|
"schedule_display",
|
|
updated_schedule.get("display", updated.get("schedule_display")),
|
|
)
|
|
if updated.get("state") != "paused":
|
|
updated["next_run_at"] = compute_next_run(updated_schedule)
|
|
|
|
if updated.get("enabled", True) and updated.get("state") != "paused" and not updated.get("next_run_at"):
|
|
updated["next_run_at"] = compute_next_run(updated["schedule"])
|
|
|
|
jobs[i] = updated
|
|
save_jobs(jobs)
|
|
return _apply_skill_fields(jobs[i])
|
|
return None
|
|
|
|
|
|
def pause_job(job_id: str, reason: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
"""Pause a job without deleting it."""
|
|
return update_job(
|
|
job_id,
|
|
{
|
|
"enabled": False,
|
|
"state": "paused",
|
|
"paused_at": _hermes_now().isoformat(),
|
|
"paused_reason": reason,
|
|
},
|
|
)
|
|
|
|
|
|
def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Resume a paused job and compute the next future run from now."""
|
|
job = get_job(job_id)
|
|
if not job:
|
|
return None
|
|
|
|
next_run_at = compute_next_run(job["schedule"])
|
|
return update_job(
|
|
job_id,
|
|
{
|
|
"enabled": True,
|
|
"state": "scheduled",
|
|
"paused_at": None,
|
|
"paused_reason": None,
|
|
"next_run_at": next_run_at,
|
|
},
|
|
)
|
|
|
|
|
|
def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Schedule a job to run on the next scheduler tick."""
|
|
job = get_job(job_id)
|
|
if not job:
|
|
return None
|
|
return update_job(
|
|
job_id,
|
|
{
|
|
"enabled": True,
|
|
"state": "scheduled",
|
|
"paused_at": None,
|
|
"paused_reason": None,
|
|
"next_run_at": _hermes_now().isoformat(),
|
|
},
|
|
)
|
|
|
|
|
|
def remove_job(job_id: str) -> bool:
|
|
"""Remove a job by ID."""
|
|
jobs = load_jobs()
|
|
original_len = len(jobs)
|
|
jobs = [j for j in jobs if j["id"] != job_id]
|
|
if len(jobs) < original_len:
|
|
save_jobs(jobs)
|
|
return True
|
|
return False
|
|
|
|
|
|
def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
|
|
"""
|
|
Mark a job as having been run.
|
|
|
|
Updates last_run_at, last_status, increments completed count,
|
|
computes next_run_at, and auto-deletes if repeat limit reached.
|
|
"""
|
|
jobs = load_jobs()
|
|
for i, job in enumerate(jobs):
|
|
if job["id"] == job_id:
|
|
now = _hermes_now().isoformat()
|
|
job["last_run_at"] = now
|
|
job["last_status"] = "ok" if success else "error"
|
|
job["last_error"] = error if not success else None
|
|
|
|
# Increment completed count
|
|
if job.get("repeat"):
|
|
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1
|
|
|
|
# Check if we've hit the repeat limit
|
|
times = job["repeat"].get("times")
|
|
completed = job["repeat"]["completed"]
|
|
if times is not None and times > 0 and completed >= times:
|
|
# Remove the job (limit reached)
|
|
jobs.pop(i)
|
|
save_jobs(jobs)
|
|
return
|
|
|
|
# Compute next run
|
|
job["next_run_at"] = compute_next_run(job["schedule"], now)
|
|
|
|
# If no next run (one-shot completed), disable
|
|
if job["next_run_at"] is None:
|
|
job["enabled"] = False
|
|
job["state"] = "completed"
|
|
elif job.get("state") != "paused":
|
|
job["state"] = "scheduled"
|
|
|
|
save_jobs(jobs)
|
|
return
|
|
|
|
save_jobs(jobs)
|
|
|
|
|
|
def advance_next_run(job_id: str) -> bool:
|
|
"""Preemptively advance next_run_at for a recurring job before execution.
|
|
|
|
Call this BEFORE run_job() so that if the process crashes mid-execution,
|
|
the job won't re-fire on the next gateway restart. This converts the
|
|
scheduler from at-least-once to at-most-once for recurring jobs — missing
|
|
one run is far better than firing dozens of times in a crash loop.
|
|
|
|
One-shot jobs are left unchanged so they can still retry on restart.
|
|
|
|
Returns True if next_run_at was advanced, False otherwise.
|
|
"""
|
|
jobs = load_jobs()
|
|
for job in jobs:
|
|
if job["id"] == job_id:
|
|
kind = job.get("schedule", {}).get("kind")
|
|
if kind not in ("cron", "interval"):
|
|
return False
|
|
now = _hermes_now().isoformat()
|
|
new_next = compute_next_run(job["schedule"], now)
|
|
if new_next and new_next != job.get("next_run_at"):
|
|
job["next_run_at"] = new_next
|
|
save_jobs(jobs)
|
|
return True
|
|
return False
|
|
return False
|
|
|
|
|
|
def get_due_jobs() -> List[Dict[str, Any]]:
|
|
"""Get all jobs that are due to run now.
|
|
|
|
For recurring jobs (cron/interval), if the scheduled time is stale
|
|
(more than one period in the past, e.g. because the gateway was down),
|
|
the job is fast-forwarded to the next future run instead of firing
|
|
immediately. This prevents a burst of missed jobs on gateway restart.
|
|
"""
|
|
now = _hermes_now()
|
|
raw_jobs = load_jobs()
|
|
jobs = [_apply_skill_fields(j) for j in copy.deepcopy(raw_jobs)]
|
|
due = []
|
|
needs_save = False
|
|
|
|
for job in jobs:
|
|
if not job.get("enabled", True):
|
|
continue
|
|
|
|
next_run = job.get("next_run_at")
|
|
if not next_run:
|
|
recovered_next = _recoverable_oneshot_run_at(
|
|
job.get("schedule", {}),
|
|
now,
|
|
last_run_at=job.get("last_run_at"),
|
|
)
|
|
if not recovered_next:
|
|
continue
|
|
|
|
job["next_run_at"] = recovered_next
|
|
next_run = recovered_next
|
|
logger.info(
|
|
"Job '%s' had no next_run_at; recovering one-shot run at %s",
|
|
job.get("name", job["id"]),
|
|
recovered_next,
|
|
)
|
|
for rj in raw_jobs:
|
|
if rj["id"] == job["id"]:
|
|
rj["next_run_at"] = recovered_next
|
|
needs_save = True
|
|
break
|
|
|
|
next_run_dt = _ensure_aware(datetime.fromisoformat(next_run))
|
|
if next_run_dt <= now:
|
|
schedule = job.get("schedule", {})
|
|
kind = schedule.get("kind")
|
|
|
|
# For recurring jobs, check if the scheduled time is stale
|
|
# (gateway was down and missed the window). Fast-forward to
|
|
# the next future occurrence instead of firing a stale run.
|
|
grace = _compute_grace_seconds(schedule)
|
|
if kind in ("cron", "interval") and (now - next_run_dt).total_seconds() > grace:
|
|
# Job is past its catch-up grace window — this is a stale missed run.
|
|
# Grace scales with schedule period: daily=2h, hourly=30m, 10min=5m.
|
|
new_next = compute_next_run(schedule, now.isoformat())
|
|
if new_next:
|
|
logger.info(
|
|
"Job '%s' missed its scheduled time (%s, grace=%ds). "
|
|
"Fast-forwarding to next run: %s",
|
|
job.get("name", job["id"]),
|
|
next_run,
|
|
grace,
|
|
new_next,
|
|
)
|
|
# Update the job in storage
|
|
for rj in raw_jobs:
|
|
if rj["id"] == job["id"]:
|
|
rj["next_run_at"] = new_next
|
|
needs_save = True
|
|
break
|
|
continue # Skip this run
|
|
|
|
due.append(job)
|
|
|
|
if needs_save:
|
|
save_jobs(raw_jobs)
|
|
|
|
return due
|
|
|
|
|
|
def save_job_output(job_id: str, output: str):
|
|
"""Save job output to file."""
|
|
ensure_dirs()
|
|
job_output_dir = OUTPUT_DIR / job_id
|
|
job_output_dir.mkdir(parents=True, exist_ok=True)
|
|
_secure_dir(job_output_dir)
|
|
|
|
timestamp = _hermes_now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
output_file = job_output_dir / f"{timestamp}.md"
|
|
|
|
fd, tmp_path = tempfile.mkstemp(dir=str(job_output_dir), suffix='.tmp', prefix='.output_')
|
|
try:
|
|
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
|
f.write(output)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp_path, output_file)
|
|
_secure_file(output_file)
|
|
except BaseException:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
return output_file
|