diff --git a/cron/jobs.py b/cron/jobs.py index 9c411f0c7..b749c51f0 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -6,6 +6,7 @@ Output is saved to ~/.hermes/cron/output/{job_id}/{timestamp}.md """ import json +import logging import tempfile import os import re @@ -14,6 +15,8 @@ from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Dict, List, Any +logger = logging.getLogger(__name__) + from hermes_time import now as _hermes_now try: @@ -528,10 +531,18 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None): def get_due_jobs() -> List[Dict[str, Any]]: - """Get all jobs that are due to run now.""" + """Get all jobs that are due to run now. + + For recurring jobs (cron/interval), if the scheduled time is stale + (more than one period in the past, e.g. because the gateway was down), + the job is fast-forwarded to the next future run instead of firing + immediately. This prevents a burst of missed jobs on gateway restart. + """ now = _hermes_now() jobs = [_apply_skill_fields(j) for j in load_jobs()] + raw_jobs = load_jobs() # For saving updates due = [] + needs_save = False for job in jobs: if not job.get("enabled", True): @@ -543,8 +554,37 @@ def get_due_jobs() -> List[Dict[str, Any]]: next_run_dt = _ensure_aware(datetime.fromisoformat(next_run)) if next_run_dt <= now: + schedule = job.get("schedule", {}) + kind = schedule.get("kind") + + # For recurring jobs, check if the scheduled time is stale + # (gateway was down and missed the window). Fast-forward to + # the next future occurrence instead of firing a stale run. + if kind in ("cron", "interval") and (now - next_run_dt).total_seconds() > 120: + # More than 2 minutes late — this is a missed run, not a current one. + # Recompute next_run_at to the next future occurrence. + new_next = compute_next_run(schedule, now.isoformat()) + if new_next: + logger.info( + "Job '%s' missed its scheduled time (%s). " + "Fast-forwarding to next run: %s", + job.get("name", job["id"]), + next_run, + new_next, + ) + # Update the job in storage + for rj in raw_jobs: + if rj["id"] == job["id"]: + rj["next_run_at"] = new_next + needs_save = True + break + continue # Skip this run + due.append(job) + if needs_save: + save_jobs(raw_jobs) + return due diff --git a/tests/cron/test_jobs.py b/tests/cron/test_jobs.py index 802a744f3..31565e677 100644 --- a/tests/cron/test_jobs.py +++ b/tests/cron/test_jobs.py @@ -304,17 +304,34 @@ class TestMarkJobRun: class TestGetDueJobs: - def test_past_due_returned(self, tmp_cron_dir): + def test_past_due_within_window_returned(self, tmp_cron_dir): + """Jobs less than 2 minutes late are still considered due (not stale).""" job = create_job(prompt="Due now", schedule="every 1h") - # Force next_run_at to the past + # Force next_run_at to just 1 minute ago (within the 2-min window) jobs = load_jobs() - jobs[0]["next_run_at"] = (datetime.now() - timedelta(minutes=5)).isoformat() + jobs[0]["next_run_at"] = (datetime.now() - timedelta(seconds=60)).isoformat() save_jobs(jobs) due = get_due_jobs() assert len(due) == 1 assert due[0]["id"] == job["id"] + def test_stale_past_due_skipped(self, tmp_cron_dir): + """Recurring jobs more than 2 minutes late are fast-forwarded, not fired.""" + job = create_job(prompt="Stale", schedule="every 1h") + # Force next_run_at to 5 minutes ago (beyond the 2-min window) + jobs = load_jobs() + jobs[0]["next_run_at"] = (datetime.now() - timedelta(minutes=5)).isoformat() + save_jobs(jobs) + + due = get_due_jobs() + assert len(due) == 0 + # next_run_at should be fast-forwarded to the future + updated = get_job(job["id"]) + from cron.jobs import _ensure_aware, _hermes_now + next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"])) + assert next_dt > _hermes_now() + def test_future_not_returned(self, tmp_cron_dir): create_job(prompt="Not yet", schedule="every 1h") due = get_due_jobs() diff --git a/tests/test_timezone.py b/tests/test_timezone.py index 728d47dd9..9848212ce 100644 --- a/tests/test_timezone.py +++ b/tests/test_timezone.py @@ -241,7 +241,7 @@ class TestCronTimezone: job = create_job(prompt="Test job", schedule="every 1h") jobs = load_jobs() # Force a naive (no timezone) past timestamp - naive_past = (datetime.now() - timedelta(minutes=5)).isoformat() + naive_past = (datetime.now() - timedelta(seconds=30)).isoformat() jobs[0]["next_run_at"] = naive_past save_jobs(jobs) @@ -318,7 +318,7 @@ class TestCronTimezone: # Simulate a naive timestamp that was written by datetime.now() on a # system running in UTC+5:30 — 5 minutes in the past (local time) - naive_past = (datetime.now() - timedelta(minutes=5)).isoformat() + naive_past = (datetime.now() - timedelta(seconds=30)).isoformat() jobs[0]["next_run_at"] = naive_past save_jobs(jobs) @@ -347,7 +347,7 @@ class TestCronTimezone: jobs = load_jobs() # Force a naive past timestamp (system-local wall time, 10 min ago) - naive_past = (datetime.now() - timedelta(minutes=10)).isoformat() + naive_past = (datetime.now() - timedelta(seconds=30)).isoformat() jobs[0]["next_run_at"] = naive_past save_jobs(jobs)