User-provided ISO timestamps like '2026-02-03T14:00' (no timezone) were stored naive. The _ensure_aware() helper at check time interprets naive datetimes using the current system timezone, but if the system timezone changes between job creation and checking, the job fires at the wrong time. Fix: call dt.astimezone() at parse time to immediately stamp the datetime with the local timezone. The stored value is now always timezone-aware, so it's stable regardless of later timezone changes.
622 lines
20 KiB
Python
622 lines
20 KiB
Python
"""
|
|
Cron job storage and management.
|
|
|
|
Jobs are stored in ~/.hermes/cron/jobs.json
|
|
Output is saved to ~/.hermes/cron/output/{job_id}/{timestamp}.md
|
|
"""
|
|
|
|
import copy
|
|
import json
|
|
import logging
|
|
import tempfile
|
|
import os
|
|
import re
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, List, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from hermes_time import now as _hermes_now
|
|
|
|
try:
|
|
from croniter import croniter
|
|
HAS_CRONITER = True
|
|
except ImportError:
|
|
HAS_CRONITER = False
|
|
|
|
# =============================================================================
|
|
# Configuration
|
|
# =============================================================================
|
|
|
|
HERMES_DIR = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
|
CRON_DIR = HERMES_DIR / "cron"
|
|
JOBS_FILE = CRON_DIR / "jobs.json"
|
|
OUTPUT_DIR = CRON_DIR / "output"
|
|
|
|
|
|
def _normalize_skill_list(skill: Optional[str] = None, skills: Optional[Any] = None) -> List[str]:
|
|
"""Normalize legacy/single-skill and multi-skill inputs into a unique ordered list."""
|
|
if skills is None:
|
|
raw_items = [skill] if skill else []
|
|
elif isinstance(skills, str):
|
|
raw_items = [skills]
|
|
else:
|
|
raw_items = list(skills)
|
|
|
|
normalized: List[str] = []
|
|
for item in raw_items:
|
|
text = str(item or "").strip()
|
|
if text and text not in normalized:
|
|
normalized.append(text)
|
|
return normalized
|
|
|
|
|
|
def _apply_skill_fields(job: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Return a job dict with canonical `skills` and legacy `skill` fields aligned."""
|
|
normalized = dict(job)
|
|
skills = _normalize_skill_list(normalized.get("skill"), normalized.get("skills"))
|
|
normalized["skills"] = skills
|
|
normalized["skill"] = skills[0] if skills else None
|
|
return normalized
|
|
|
|
|
|
def _secure_dir(path: Path):
|
|
"""Set directory to owner-only access (0700). No-op on Windows."""
|
|
try:
|
|
os.chmod(path, 0o700)
|
|
except (OSError, NotImplementedError):
|
|
pass # Windows or other platforms where chmod is not supported
|
|
|
|
|
|
def _secure_file(path: Path):
|
|
"""Set file to owner-only read/write (0600). No-op on Windows."""
|
|
try:
|
|
if path.exists():
|
|
os.chmod(path, 0o600)
|
|
except (OSError, NotImplementedError):
|
|
pass
|
|
|
|
|
|
def ensure_dirs():
|
|
"""Ensure cron directories exist with secure permissions."""
|
|
CRON_DIR.mkdir(parents=True, exist_ok=True)
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
_secure_dir(CRON_DIR)
|
|
_secure_dir(OUTPUT_DIR)
|
|
|
|
|
|
# =============================================================================
|
|
# Schedule Parsing
|
|
# =============================================================================
|
|
|
|
def parse_duration(s: str) -> int:
|
|
"""
|
|
Parse duration string into minutes.
|
|
|
|
Examples:
|
|
"30m" → 30
|
|
"2h" → 120
|
|
"1d" → 1440
|
|
"""
|
|
s = s.strip().lower()
|
|
match = re.match(r'^(\d+)\s*(m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days)$', s)
|
|
if not match:
|
|
raise ValueError(f"Invalid duration: '{s}'. Use format like '30m', '2h', or '1d'")
|
|
|
|
value = int(match.group(1))
|
|
unit = match.group(2)[0] # First char: m, h, or d
|
|
|
|
multipliers = {'m': 1, 'h': 60, 'd': 1440}
|
|
return value * multipliers[unit]
|
|
|
|
|
|
def parse_schedule(schedule: str) -> Dict[str, Any]:
|
|
"""
|
|
Parse schedule string into structured format.
|
|
|
|
Returns dict with:
|
|
- kind: "once" | "interval" | "cron"
|
|
- For "once": "run_at" (ISO timestamp)
|
|
- For "interval": "minutes" (int)
|
|
- For "cron": "expr" (cron expression)
|
|
|
|
Examples:
|
|
"30m" → once in 30 minutes
|
|
"2h" → once in 2 hours
|
|
"every 30m" → recurring every 30 minutes
|
|
"every 2h" → recurring every 2 hours
|
|
"0 9 * * *" → cron expression
|
|
"2026-02-03T14:00" → once at timestamp
|
|
"""
|
|
schedule = schedule.strip()
|
|
original = schedule
|
|
schedule_lower = schedule.lower()
|
|
|
|
# "every X" pattern → recurring interval
|
|
if schedule_lower.startswith("every "):
|
|
duration_str = schedule[6:].strip()
|
|
minutes = parse_duration(duration_str)
|
|
return {
|
|
"kind": "interval",
|
|
"minutes": minutes,
|
|
"display": f"every {minutes}m"
|
|
}
|
|
|
|
# Check for cron expression (5 or 6 space-separated fields)
|
|
# Cron fields: minute hour day month weekday [year]
|
|
parts = schedule.split()
|
|
if len(parts) >= 5 and all(
|
|
re.match(r'^[\d\*\-,/]+$', p) for p in parts[:5]
|
|
):
|
|
if not HAS_CRONITER:
|
|
raise ValueError("Cron expressions require 'croniter' package. Install with: pip install croniter")
|
|
# Validate cron expression
|
|
try:
|
|
croniter(schedule)
|
|
except Exception as e:
|
|
raise ValueError(f"Invalid cron expression '{schedule}': {e}")
|
|
return {
|
|
"kind": "cron",
|
|
"expr": schedule,
|
|
"display": schedule
|
|
}
|
|
|
|
# ISO timestamp (contains T or looks like date)
|
|
if 'T' in schedule or re.match(r'^\d{4}-\d{2}-\d{2}', schedule):
|
|
try:
|
|
# Parse and validate
|
|
dt = datetime.fromisoformat(schedule.replace('Z', '+00:00'))
|
|
# Make naive timestamps timezone-aware at parse time so the stored
|
|
# value doesn't depend on the system timezone matching at check time.
|
|
if dt.tzinfo is None:
|
|
dt = dt.astimezone() # Interpret as local timezone
|
|
return {
|
|
"kind": "once",
|
|
"run_at": dt.isoformat(),
|
|
"display": f"once at {dt.strftime('%Y-%m-%d %H:%M')}"
|
|
}
|
|
except ValueError as e:
|
|
raise ValueError(f"Invalid timestamp '{schedule}': {e}")
|
|
|
|
# Duration like "30m", "2h", "1d" → one-shot from now
|
|
try:
|
|
minutes = parse_duration(schedule)
|
|
run_at = _hermes_now() + timedelta(minutes=minutes)
|
|
return {
|
|
"kind": "once",
|
|
"run_at": run_at.isoformat(),
|
|
"display": f"once in {original}"
|
|
}
|
|
except ValueError:
|
|
pass
|
|
|
|
raise ValueError(
|
|
f"Invalid schedule '{original}'. Use:\n"
|
|
f" - Duration: '30m', '2h', '1d' (one-shot)\n"
|
|
f" - Interval: 'every 30m', 'every 2h' (recurring)\n"
|
|
f" - Cron: '0 9 * * *' (cron expression)\n"
|
|
f" - Timestamp: '2026-02-03T14:00:00' (one-shot at time)"
|
|
)
|
|
|
|
|
|
def _ensure_aware(dt: datetime) -> datetime:
|
|
"""Return a timezone-aware datetime in Hermes configured timezone.
|
|
|
|
Backward compatibility:
|
|
- Older stored timestamps may be naive.
|
|
- Naive values are interpreted as *system-local wall time* (the timezone
|
|
`datetime.now()` used when they were created), then converted to the
|
|
configured Hermes timezone.
|
|
|
|
This preserves relative ordering for legacy naive timestamps across
|
|
timezone changes and avoids false not-due results.
|
|
"""
|
|
target_tz = _hermes_now().tzinfo
|
|
if dt.tzinfo is None:
|
|
local_tz = datetime.now().astimezone().tzinfo
|
|
return dt.replace(tzinfo=local_tz).astimezone(target_tz)
|
|
return dt.astimezone(target_tz)
|
|
|
|
|
|
def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
Compute the next run time for a schedule.
|
|
|
|
Returns ISO timestamp string, or None if no more runs.
|
|
"""
|
|
now = _hermes_now()
|
|
|
|
if schedule["kind"] == "once":
|
|
run_at = _ensure_aware(datetime.fromisoformat(schedule["run_at"]))
|
|
# If in the future, return it; if in the past, no more runs
|
|
return schedule["run_at"] if run_at > now else None
|
|
|
|
elif schedule["kind"] == "interval":
|
|
minutes = schedule["minutes"]
|
|
if last_run_at:
|
|
# Next run is last_run + interval
|
|
last = _ensure_aware(datetime.fromisoformat(last_run_at))
|
|
next_run = last + timedelta(minutes=minutes)
|
|
else:
|
|
# First run is now + interval
|
|
next_run = now + timedelta(minutes=minutes)
|
|
return next_run.isoformat()
|
|
|
|
elif schedule["kind"] == "cron":
|
|
if not HAS_CRONITER:
|
|
return None
|
|
cron = croniter(schedule["expr"], now)
|
|
next_run = cron.get_next(datetime)
|
|
return next_run.isoformat()
|
|
|
|
return None
|
|
|
|
|
|
# =============================================================================
|
|
# Job CRUD Operations
|
|
# =============================================================================
|
|
|
|
def load_jobs() -> List[Dict[str, Any]]:
|
|
"""Load all jobs from storage."""
|
|
ensure_dirs()
|
|
if not JOBS_FILE.exists():
|
|
return []
|
|
|
|
try:
|
|
with open(JOBS_FILE, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
return data.get("jobs", [])
|
|
except (json.JSONDecodeError, IOError):
|
|
return []
|
|
|
|
|
|
def save_jobs(jobs: List[Dict[str, Any]]):
|
|
"""Save all jobs to storage."""
|
|
ensure_dirs()
|
|
fd, tmp_path = tempfile.mkstemp(dir=str(JOBS_FILE.parent), suffix='.tmp', prefix='.jobs_')
|
|
try:
|
|
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
|
json.dump({"jobs": jobs, "updated_at": _hermes_now().isoformat()}, f, indent=2)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp_path, JOBS_FILE)
|
|
_secure_file(JOBS_FILE)
|
|
except BaseException:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
|
|
def create_job(
|
|
prompt: str,
|
|
schedule: str,
|
|
name: Optional[str] = None,
|
|
repeat: Optional[int] = None,
|
|
deliver: Optional[str] = None,
|
|
origin: Optional[Dict[str, Any]] = None,
|
|
skill: Optional[str] = None,
|
|
skills: Optional[List[str]] = None,
|
|
model: Optional[str] = None,
|
|
provider: Optional[str] = None,
|
|
base_url: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a new cron job.
|
|
|
|
Args:
|
|
prompt: The prompt to run (must be self-contained, or a task instruction when skill is set)
|
|
schedule: Schedule string (see parse_schedule)
|
|
name: Optional friendly name
|
|
repeat: How many times to run (None = forever, 1 = once)
|
|
deliver: Where to deliver output ("origin", "local", "telegram", etc.)
|
|
origin: Source info where job was created (for "origin" delivery)
|
|
skill: Optional legacy single skill name to load before running the prompt
|
|
skills: Optional ordered list of skills to load before running the prompt
|
|
model: Optional per-job model override
|
|
provider: Optional per-job provider override
|
|
base_url: Optional per-job base URL override
|
|
|
|
Returns:
|
|
The created job dict
|
|
"""
|
|
parsed_schedule = parse_schedule(schedule)
|
|
|
|
# Auto-set repeat=1 for one-shot schedules if not specified
|
|
if parsed_schedule["kind"] == "once" and repeat is None:
|
|
repeat = 1
|
|
|
|
# Default delivery to origin if available, otherwise local
|
|
if deliver is None:
|
|
deliver = "origin" if origin else "local"
|
|
|
|
job_id = uuid.uuid4().hex[:12]
|
|
now = _hermes_now().isoformat()
|
|
|
|
normalized_skills = _normalize_skill_list(skill, skills)
|
|
normalized_model = str(model).strip() if isinstance(model, str) else None
|
|
normalized_provider = str(provider).strip() if isinstance(provider, str) else None
|
|
normalized_base_url = str(base_url).strip().rstrip("/") if isinstance(base_url, str) else None
|
|
normalized_model = normalized_model or None
|
|
normalized_provider = normalized_provider or None
|
|
normalized_base_url = normalized_base_url or None
|
|
|
|
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
|
|
job = {
|
|
"id": job_id,
|
|
"name": name or label_source[:50].strip(),
|
|
"prompt": prompt,
|
|
"skills": normalized_skills,
|
|
"skill": normalized_skills[0] if normalized_skills else None,
|
|
"model": normalized_model,
|
|
"provider": normalized_provider,
|
|
"base_url": normalized_base_url,
|
|
"schedule": parsed_schedule,
|
|
"schedule_display": parsed_schedule.get("display", schedule),
|
|
"repeat": {
|
|
"times": repeat, # None = forever
|
|
"completed": 0
|
|
},
|
|
"enabled": True,
|
|
"state": "scheduled",
|
|
"paused_at": None,
|
|
"paused_reason": None,
|
|
"created_at": now,
|
|
"next_run_at": compute_next_run(parsed_schedule),
|
|
"last_run_at": None,
|
|
"last_status": None,
|
|
"last_error": None,
|
|
# Delivery configuration
|
|
"deliver": deliver,
|
|
"origin": origin, # Tracks where job was created for "origin" delivery
|
|
}
|
|
|
|
jobs = load_jobs()
|
|
jobs.append(job)
|
|
save_jobs(jobs)
|
|
|
|
return job
|
|
|
|
|
|
def get_job(job_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get a job by ID."""
|
|
jobs = load_jobs()
|
|
for job in jobs:
|
|
if job["id"] == job_id:
|
|
return _apply_skill_fields(job)
|
|
return None
|
|
|
|
|
|
def list_jobs(include_disabled: bool = False) -> List[Dict[str, Any]]:
|
|
"""List all jobs, optionally including disabled ones."""
|
|
jobs = [_apply_skill_fields(j) for j in load_jobs()]
|
|
if not include_disabled:
|
|
jobs = [j for j in jobs if j.get("enabled", True)]
|
|
return jobs
|
|
|
|
|
|
def update_job(job_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""Update a job by ID, refreshing derived schedule fields when needed."""
|
|
jobs = load_jobs()
|
|
for i, job in enumerate(jobs):
|
|
if job["id"] != job_id:
|
|
continue
|
|
|
|
updated = _apply_skill_fields({**job, **updates})
|
|
schedule_changed = "schedule" in updates
|
|
|
|
if "skills" in updates or "skill" in updates:
|
|
normalized_skills = _normalize_skill_list(updated.get("skill"), updated.get("skills"))
|
|
updated["skills"] = normalized_skills
|
|
updated["skill"] = normalized_skills[0] if normalized_skills else None
|
|
|
|
if schedule_changed:
|
|
updated_schedule = updated["schedule"]
|
|
updated["schedule_display"] = updates.get(
|
|
"schedule_display",
|
|
updated_schedule.get("display", updated.get("schedule_display")),
|
|
)
|
|
if updated.get("state") != "paused":
|
|
updated["next_run_at"] = compute_next_run(updated_schedule)
|
|
|
|
if updated.get("enabled", True) and updated.get("state") != "paused" and not updated.get("next_run_at"):
|
|
updated["next_run_at"] = compute_next_run(updated["schedule"])
|
|
|
|
jobs[i] = updated
|
|
save_jobs(jobs)
|
|
return _apply_skill_fields(jobs[i])
|
|
return None
|
|
|
|
|
|
def pause_job(job_id: str, reason: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
"""Pause a job without deleting it."""
|
|
return update_job(
|
|
job_id,
|
|
{
|
|
"enabled": False,
|
|
"state": "paused",
|
|
"paused_at": _hermes_now().isoformat(),
|
|
"paused_reason": reason,
|
|
},
|
|
)
|
|
|
|
|
|
def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Resume a paused job and compute the next future run from now."""
|
|
job = get_job(job_id)
|
|
if not job:
|
|
return None
|
|
|
|
next_run_at = compute_next_run(job["schedule"])
|
|
return update_job(
|
|
job_id,
|
|
{
|
|
"enabled": True,
|
|
"state": "scheduled",
|
|
"paused_at": None,
|
|
"paused_reason": None,
|
|
"next_run_at": next_run_at,
|
|
},
|
|
)
|
|
|
|
|
|
def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Schedule a job to run on the next scheduler tick."""
|
|
job = get_job(job_id)
|
|
if not job:
|
|
return None
|
|
return update_job(
|
|
job_id,
|
|
{
|
|
"enabled": True,
|
|
"state": "scheduled",
|
|
"paused_at": None,
|
|
"paused_reason": None,
|
|
"next_run_at": _hermes_now().isoformat(),
|
|
},
|
|
)
|
|
|
|
|
|
def remove_job(job_id: str) -> bool:
|
|
"""Remove a job by ID."""
|
|
jobs = load_jobs()
|
|
original_len = len(jobs)
|
|
jobs = [j for j in jobs if j["id"] != job_id]
|
|
if len(jobs) < original_len:
|
|
save_jobs(jobs)
|
|
return True
|
|
return False
|
|
|
|
|
|
def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
|
|
"""
|
|
Mark a job as having been run.
|
|
|
|
Updates last_run_at, last_status, increments completed count,
|
|
computes next_run_at, and auto-deletes if repeat limit reached.
|
|
"""
|
|
jobs = load_jobs()
|
|
for i, job in enumerate(jobs):
|
|
if job["id"] == job_id:
|
|
now = _hermes_now().isoformat()
|
|
job["last_run_at"] = now
|
|
job["last_status"] = "ok" if success else "error"
|
|
job["last_error"] = error if not success else None
|
|
|
|
# Increment completed count
|
|
if job.get("repeat"):
|
|
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1
|
|
|
|
# Check if we've hit the repeat limit
|
|
times = job["repeat"].get("times")
|
|
completed = job["repeat"]["completed"]
|
|
if times is not None and completed >= times:
|
|
# Remove the job (limit reached)
|
|
jobs.pop(i)
|
|
save_jobs(jobs)
|
|
return
|
|
|
|
# Compute next run
|
|
job["next_run_at"] = compute_next_run(job["schedule"], now)
|
|
|
|
# If no next run (one-shot completed), disable
|
|
if job["next_run_at"] is None:
|
|
job["enabled"] = False
|
|
job["state"] = "completed"
|
|
elif job.get("state") != "paused":
|
|
job["state"] = "scheduled"
|
|
|
|
save_jobs(jobs)
|
|
return
|
|
|
|
save_jobs(jobs)
|
|
|
|
|
|
def get_due_jobs() -> List[Dict[str, Any]]:
|
|
"""Get all jobs that are due to run now.
|
|
|
|
For recurring jobs (cron/interval), if the scheduled time is stale
|
|
(more than one period in the past, e.g. because the gateway was down),
|
|
the job is fast-forwarded to the next future run instead of firing
|
|
immediately. This prevents a burst of missed jobs on gateway restart.
|
|
"""
|
|
now = _hermes_now()
|
|
raw_jobs = load_jobs()
|
|
jobs = [_apply_skill_fields(j) for j in copy.deepcopy(raw_jobs)]
|
|
due = []
|
|
needs_save = False
|
|
|
|
for job in jobs:
|
|
if not job.get("enabled", True):
|
|
continue
|
|
|
|
next_run = job.get("next_run_at")
|
|
if not next_run:
|
|
continue
|
|
|
|
next_run_dt = _ensure_aware(datetime.fromisoformat(next_run))
|
|
if next_run_dt <= now:
|
|
schedule = job.get("schedule", {})
|
|
kind = schedule.get("kind")
|
|
|
|
# For recurring jobs, check if the scheduled time is stale
|
|
# (gateway was down and missed the window). Fast-forward to
|
|
# the next future occurrence instead of firing a stale run.
|
|
if kind in ("cron", "interval") and (now - next_run_dt).total_seconds() > 120:
|
|
# More than 2 minutes late — this is a missed run, not a current one.
|
|
# Recompute next_run_at to the next future occurrence.
|
|
new_next = compute_next_run(schedule, now.isoformat())
|
|
if new_next:
|
|
logger.info(
|
|
"Job '%s' missed its scheduled time (%s). "
|
|
"Fast-forwarding to next run: %s",
|
|
job.get("name", job["id"]),
|
|
next_run,
|
|
new_next,
|
|
)
|
|
# Update the job in storage
|
|
for rj in raw_jobs:
|
|
if rj["id"] == job["id"]:
|
|
rj["next_run_at"] = new_next
|
|
needs_save = True
|
|
break
|
|
continue # Skip this run
|
|
|
|
due.append(job)
|
|
|
|
if needs_save:
|
|
save_jobs(raw_jobs)
|
|
|
|
return due
|
|
|
|
|
|
def save_job_output(job_id: str, output: str):
|
|
"""Save job output to file."""
|
|
ensure_dirs()
|
|
job_output_dir = OUTPUT_DIR / job_id
|
|
job_output_dir.mkdir(parents=True, exist_ok=True)
|
|
_secure_dir(job_output_dir)
|
|
|
|
timestamp = _hermes_now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
output_file = job_output_dir / f"{timestamp}.md"
|
|
|
|
fd, tmp_path = tempfile.mkstemp(dir=str(job_output_dir), suffix='.tmp', prefix='.output_')
|
|
try:
|
|
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
|
f.write(output)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp_path, output_file)
|
|
_secure_file(output_file)
|
|
except BaseException:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
return output_file
|