fix: skip stale cron jobs on gateway restart instead of firing immediately
When the gateway restarts after being down past a scheduled run time, recurring jobs (cron/interval) were firing immediately because their next_run_at was in the past. Now jobs more than 2 minutes late are fast-forwarded to the next future occurrence instead. - get_due_jobs() checks staleness for cron/interval jobs - Stale jobs get next_run_at recomputed and saved - Jobs within 2 minutes of their schedule still fire normally - One-shot (once) jobs are unaffected — they fire if missed Fixes the 'cron jobs run on every gateway restart' issue.
This commit is contained in:
42
cron/jobs.py
42
cron/jobs.py
@@ -6,6 +6,7 @@ Output is saved to ~/.hermes/cron/output/{job_id}/{timestamp}.md
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import tempfile
|
import tempfile
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
@@ -14,6 +15,8 @@ from datetime import datetime, timedelta
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, Dict, List, Any
|
from typing import Optional, Dict, List, Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
from hermes_time import now as _hermes_now
|
from hermes_time import now as _hermes_now
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -528,10 +531,18 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
|
|||||||
|
|
||||||
|
|
||||||
def get_due_jobs() -> List[Dict[str, Any]]:
|
def get_due_jobs() -> List[Dict[str, Any]]:
|
||||||
"""Get all jobs that are due to run now."""
|
"""Get all jobs that are due to run now.
|
||||||
|
|
||||||
|
For recurring jobs (cron/interval), if the scheduled time is stale
|
||||||
|
(more than one period in the past, e.g. because the gateway was down),
|
||||||
|
the job is fast-forwarded to the next future run instead of firing
|
||||||
|
immediately. This prevents a burst of missed jobs on gateway restart.
|
||||||
|
"""
|
||||||
now = _hermes_now()
|
now = _hermes_now()
|
||||||
jobs = [_apply_skill_fields(j) for j in load_jobs()]
|
jobs = [_apply_skill_fields(j) for j in load_jobs()]
|
||||||
|
raw_jobs = load_jobs() # For saving updates
|
||||||
due = []
|
due = []
|
||||||
|
needs_save = False
|
||||||
|
|
||||||
for job in jobs:
|
for job in jobs:
|
||||||
if not job.get("enabled", True):
|
if not job.get("enabled", True):
|
||||||
@@ -543,8 +554,37 @@ def get_due_jobs() -> List[Dict[str, Any]]:
|
|||||||
|
|
||||||
next_run_dt = _ensure_aware(datetime.fromisoformat(next_run))
|
next_run_dt = _ensure_aware(datetime.fromisoformat(next_run))
|
||||||
if next_run_dt <= now:
|
if next_run_dt <= now:
|
||||||
|
schedule = job.get("schedule", {})
|
||||||
|
kind = schedule.get("kind")
|
||||||
|
|
||||||
|
# For recurring jobs, check if the scheduled time is stale
|
||||||
|
# (gateway was down and missed the window). Fast-forward to
|
||||||
|
# the next future occurrence instead of firing a stale run.
|
||||||
|
if kind in ("cron", "interval") and (now - next_run_dt).total_seconds() > 120:
|
||||||
|
# More than 2 minutes late — this is a missed run, not a current one.
|
||||||
|
# Recompute next_run_at to the next future occurrence.
|
||||||
|
new_next = compute_next_run(schedule, now.isoformat())
|
||||||
|
if new_next:
|
||||||
|
logger.info(
|
||||||
|
"Job '%s' missed its scheduled time (%s). "
|
||||||
|
"Fast-forwarding to next run: %s",
|
||||||
|
job.get("name", job["id"]),
|
||||||
|
next_run,
|
||||||
|
new_next,
|
||||||
|
)
|
||||||
|
# Update the job in storage
|
||||||
|
for rj in raw_jobs:
|
||||||
|
if rj["id"] == job["id"]:
|
||||||
|
rj["next_run_at"] = new_next
|
||||||
|
needs_save = True
|
||||||
|
break
|
||||||
|
continue # Skip this run
|
||||||
|
|
||||||
due.append(job)
|
due.append(job)
|
||||||
|
|
||||||
|
if needs_save:
|
||||||
|
save_jobs(raw_jobs)
|
||||||
|
|
||||||
return due
|
return due
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -304,17 +304,34 @@ class TestMarkJobRun:
|
|||||||
|
|
||||||
|
|
||||||
class TestGetDueJobs:
|
class TestGetDueJobs:
|
||||||
def test_past_due_returned(self, tmp_cron_dir):
|
def test_past_due_within_window_returned(self, tmp_cron_dir):
|
||||||
|
"""Jobs less than 2 minutes late are still considered due (not stale)."""
|
||||||
job = create_job(prompt="Due now", schedule="every 1h")
|
job = create_job(prompt="Due now", schedule="every 1h")
|
||||||
# Force next_run_at to the past
|
# Force next_run_at to just 1 minute ago (within the 2-min window)
|
||||||
jobs = load_jobs()
|
jobs = load_jobs()
|
||||||
jobs[0]["next_run_at"] = (datetime.now() - timedelta(minutes=5)).isoformat()
|
jobs[0]["next_run_at"] = (datetime.now() - timedelta(seconds=60)).isoformat()
|
||||||
save_jobs(jobs)
|
save_jobs(jobs)
|
||||||
|
|
||||||
due = get_due_jobs()
|
due = get_due_jobs()
|
||||||
assert len(due) == 1
|
assert len(due) == 1
|
||||||
assert due[0]["id"] == job["id"]
|
assert due[0]["id"] == job["id"]
|
||||||
|
|
||||||
|
def test_stale_past_due_skipped(self, tmp_cron_dir):
|
||||||
|
"""Recurring jobs more than 2 minutes late are fast-forwarded, not fired."""
|
||||||
|
job = create_job(prompt="Stale", schedule="every 1h")
|
||||||
|
# Force next_run_at to 5 minutes ago (beyond the 2-min window)
|
||||||
|
jobs = load_jobs()
|
||||||
|
jobs[0]["next_run_at"] = (datetime.now() - timedelta(minutes=5)).isoformat()
|
||||||
|
save_jobs(jobs)
|
||||||
|
|
||||||
|
due = get_due_jobs()
|
||||||
|
assert len(due) == 0
|
||||||
|
# next_run_at should be fast-forwarded to the future
|
||||||
|
updated = get_job(job["id"])
|
||||||
|
from cron.jobs import _ensure_aware, _hermes_now
|
||||||
|
next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"]))
|
||||||
|
assert next_dt > _hermes_now()
|
||||||
|
|
||||||
def test_future_not_returned(self, tmp_cron_dir):
|
def test_future_not_returned(self, tmp_cron_dir):
|
||||||
create_job(prompt="Not yet", schedule="every 1h")
|
create_job(prompt="Not yet", schedule="every 1h")
|
||||||
due = get_due_jobs()
|
due = get_due_jobs()
|
||||||
|
|||||||
@@ -241,7 +241,7 @@ class TestCronTimezone:
|
|||||||
job = create_job(prompt="Test job", schedule="every 1h")
|
job = create_job(prompt="Test job", schedule="every 1h")
|
||||||
jobs = load_jobs()
|
jobs = load_jobs()
|
||||||
# Force a naive (no timezone) past timestamp
|
# Force a naive (no timezone) past timestamp
|
||||||
naive_past = (datetime.now() - timedelta(minutes=5)).isoformat()
|
naive_past = (datetime.now() - timedelta(seconds=30)).isoformat()
|
||||||
jobs[0]["next_run_at"] = naive_past
|
jobs[0]["next_run_at"] = naive_past
|
||||||
save_jobs(jobs)
|
save_jobs(jobs)
|
||||||
|
|
||||||
@@ -318,7 +318,7 @@ class TestCronTimezone:
|
|||||||
|
|
||||||
# Simulate a naive timestamp that was written by datetime.now() on a
|
# Simulate a naive timestamp that was written by datetime.now() on a
|
||||||
# system running in UTC+5:30 — 5 minutes in the past (local time)
|
# system running in UTC+5:30 — 5 minutes in the past (local time)
|
||||||
naive_past = (datetime.now() - timedelta(minutes=5)).isoformat()
|
naive_past = (datetime.now() - timedelta(seconds=30)).isoformat()
|
||||||
jobs[0]["next_run_at"] = naive_past
|
jobs[0]["next_run_at"] = naive_past
|
||||||
save_jobs(jobs)
|
save_jobs(jobs)
|
||||||
|
|
||||||
@@ -347,7 +347,7 @@ class TestCronTimezone:
|
|||||||
jobs = load_jobs()
|
jobs = load_jobs()
|
||||||
|
|
||||||
# Force a naive past timestamp (system-local wall time, 10 min ago)
|
# Force a naive past timestamp (system-local wall time, 10 min ago)
|
||||||
naive_past = (datetime.now() - timedelta(minutes=10)).isoformat()
|
naive_past = (datetime.now() - timedelta(seconds=30)).isoformat()
|
||||||
jobs[0]["next_run_at"] = naive_past
|
jobs[0]["next_run_at"] = naive_past
|
||||||
save_jobs(jobs)
|
save_jobs(jobs)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user