fix(cron): prevent recurring job re-fire on gateway crash/restart loop (#3396)
When a gateway crashes mid-job execution (before mark_job_run can persist the updated next_run_at), the job would fire again on every restart attempt within the grace window. For a daily 6:15 AM job with a 2-hour grace, rapidly restarting the gateway could trigger dozens of duplicate runs. Fix: call advance_next_run() BEFORE run_job() in tick(). For recurring jobs (cron/interval), this preemptively advances next_run_at to the next future occurrence and persists it to disk. If the process then crashes during execution, the job won't be considered due on restart. One-shot jobs are left unchanged — they still retry on restart since there's no future occurrence to advance to. This changes the scheduler from at-least-once to at-most-once semantics for recurring jobs, which is the correct tradeoff: missing one daily message is far better than sending it dozens of times.
This commit is contained in:
28
cron/jobs.py
28
cron/jobs.py
@@ -598,6 +598,34 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
|
|||||||
save_jobs(jobs)
|
save_jobs(jobs)
|
||||||
|
|
||||||
|
|
||||||
|
def advance_next_run(job_id: str) -> bool:
|
||||||
|
"""Preemptively advance next_run_at for a recurring job before execution.
|
||||||
|
|
||||||
|
Call this BEFORE run_job() so that if the process crashes mid-execution,
|
||||||
|
the job won't re-fire on the next gateway restart. This converts the
|
||||||
|
scheduler from at-least-once to at-most-once for recurring jobs — missing
|
||||||
|
one run is far better than firing dozens of times in a crash loop.
|
||||||
|
|
||||||
|
One-shot jobs are left unchanged so they can still retry on restart.
|
||||||
|
|
||||||
|
Returns True if next_run_at was advanced, False otherwise.
|
||||||
|
"""
|
||||||
|
jobs = load_jobs()
|
||||||
|
for job in jobs:
|
||||||
|
if job["id"] == job_id:
|
||||||
|
kind = job.get("schedule", {}).get("kind")
|
||||||
|
if kind not in ("cron", "interval"):
|
||||||
|
return False
|
||||||
|
now = _hermes_now().isoformat()
|
||||||
|
new_next = compute_next_run(job["schedule"], now)
|
||||||
|
if new_next and new_next != job.get("next_run_at"):
|
||||||
|
job["next_run_at"] = new_next
|
||||||
|
save_jobs(jobs)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def get_due_jobs() -> List[Dict[str, Any]]:
|
def get_due_jobs() -> List[Dict[str, Any]]:
|
||||||
"""Get all jobs that are due to run now.
|
"""Get all jobs that are due to run now.
|
||||||
|
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ logger = logging.getLogger(__name__)
|
|||||||
# Add parent directory to path for imports
|
# Add parent directory to path for imports
|
||||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||||
|
|
||||||
from cron.jobs import get_due_jobs, mark_job_run, save_job_output
|
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
|
||||||
|
|
||||||
# Sentinel: when a cron agent has nothing new to report, it can start its
|
# Sentinel: when a cron agent has nothing new to report, it can start its
|
||||||
# response with this marker to suppress delivery. Output is still saved
|
# response with this marker to suppress delivery. Output is still saved
|
||||||
@@ -524,6 +524,12 @@ def tick(verbose: bool = True) -> int:
|
|||||||
executed = 0
|
executed = 0
|
||||||
for job in due_jobs:
|
for job in due_jobs:
|
||||||
try:
|
try:
|
||||||
|
# For recurring jobs (cron/interval), advance next_run_at to the
|
||||||
|
# next future occurrence BEFORE execution. This way, if the
|
||||||
|
# process crashes mid-run, the job won't re-fire on restart.
|
||||||
|
# One-shot jobs are left alone so they can retry on restart.
|
||||||
|
advance_next_run(job["id"])
|
||||||
|
|
||||||
success, output, final_response, error = run_job(job)
|
success, output, final_response, error = run_job(job)
|
||||||
|
|
||||||
output_file = save_job_output(job["id"], output)
|
output_file = save_job_output(job["id"], output)
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ from cron.jobs import (
|
|||||||
resume_job,
|
resume_job,
|
||||||
remove_job,
|
remove_job,
|
||||||
mark_job_run,
|
mark_job_run,
|
||||||
|
advance_next_run,
|
||||||
get_due_jobs,
|
get_due_jobs,
|
||||||
save_job_output,
|
save_job_output,
|
||||||
)
|
)
|
||||||
@@ -339,6 +340,90 @@ class TestMarkJobRun:
|
|||||||
assert updated["last_error"] == "timeout"
|
assert updated["last_error"] == "timeout"
|
||||||
|
|
||||||
|
|
||||||
|
class TestAdvanceNextRun:
|
||||||
|
"""Tests for advance_next_run() — crash-safety for recurring jobs."""
|
||||||
|
|
||||||
|
def test_advances_interval_job(self, tmp_cron_dir):
|
||||||
|
"""Interval jobs should have next_run_at bumped to the next future occurrence."""
|
||||||
|
job = create_job(prompt="Recurring check", schedule="every 1h")
|
||||||
|
# Force next_run_at to 5 minutes ago (i.e. the job is due)
|
||||||
|
jobs = load_jobs()
|
||||||
|
old_next = (datetime.now() - timedelta(minutes=5)).isoformat()
|
||||||
|
jobs[0]["next_run_at"] = old_next
|
||||||
|
save_jobs(jobs)
|
||||||
|
|
||||||
|
result = advance_next_run(job["id"])
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
updated = get_job(job["id"])
|
||||||
|
from cron.jobs import _ensure_aware, _hermes_now
|
||||||
|
new_next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"]))
|
||||||
|
assert new_next_dt > _hermes_now(), "next_run_at should be in the future after advance"
|
||||||
|
|
||||||
|
def test_advances_cron_job(self, tmp_cron_dir):
|
||||||
|
"""Cron-expression jobs should have next_run_at bumped to the next occurrence."""
|
||||||
|
pytest.importorskip("croniter")
|
||||||
|
job = create_job(prompt="Daily wakeup", schedule="15 6 * * *")
|
||||||
|
# Force next_run_at to 30 minutes ago
|
||||||
|
jobs = load_jobs()
|
||||||
|
old_next = (datetime.now() - timedelta(minutes=30)).isoformat()
|
||||||
|
jobs[0]["next_run_at"] = old_next
|
||||||
|
save_jobs(jobs)
|
||||||
|
|
||||||
|
result = advance_next_run(job["id"])
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
updated = get_job(job["id"])
|
||||||
|
from cron.jobs import _ensure_aware, _hermes_now
|
||||||
|
new_next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"]))
|
||||||
|
assert new_next_dt > _hermes_now(), "next_run_at should be in the future after advance"
|
||||||
|
|
||||||
|
def test_skips_oneshot_job(self, tmp_cron_dir):
|
||||||
|
"""One-shot jobs should NOT be advanced — they need to retry on restart."""
|
||||||
|
job = create_job(prompt="Run once", schedule="30m")
|
||||||
|
original_next = get_job(job["id"])["next_run_at"]
|
||||||
|
|
||||||
|
result = advance_next_run(job["id"])
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
updated = get_job(job["id"])
|
||||||
|
assert updated["next_run_at"] == original_next, "one-shot next_run_at should be unchanged"
|
||||||
|
|
||||||
|
def test_nonexistent_job_returns_false(self, tmp_cron_dir):
|
||||||
|
result = advance_next_run("nonexistent-id")
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_already_future_stays_future(self, tmp_cron_dir):
|
||||||
|
"""If next_run_at is already in the future, advance keeps it in the future (no harm)."""
|
||||||
|
job = create_job(prompt="Future job", schedule="every 1h")
|
||||||
|
# next_run_at is already set to ~1h from now by create_job
|
||||||
|
advance_next_run(job["id"])
|
||||||
|
# Regardless of return value, the job should still be in the future
|
||||||
|
updated = get_job(job["id"])
|
||||||
|
from cron.jobs import _ensure_aware, _hermes_now
|
||||||
|
new_next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"]))
|
||||||
|
assert new_next_dt > _hermes_now(), "next_run_at should remain in the future"
|
||||||
|
|
||||||
|
def test_crash_safety_scenario(self, tmp_cron_dir):
|
||||||
|
"""Simulate the crash-loop scenario: after advance, the job should NOT be due."""
|
||||||
|
job = create_job(prompt="Crash test", schedule="every 1h")
|
||||||
|
# Force next_run_at to 5 minutes ago (job is due)
|
||||||
|
jobs = load_jobs()
|
||||||
|
jobs[0]["next_run_at"] = (datetime.now() - timedelta(minutes=5)).isoformat()
|
||||||
|
save_jobs(jobs)
|
||||||
|
|
||||||
|
# Job should be due before advance
|
||||||
|
due_before = get_due_jobs()
|
||||||
|
assert len(due_before) == 1
|
||||||
|
|
||||||
|
# Advance (simulating what tick() does before run_job)
|
||||||
|
advance_next_run(job["id"])
|
||||||
|
|
||||||
|
# Now the job should NOT be due (simulates restart after crash)
|
||||||
|
due_after = get_due_jobs()
|
||||||
|
assert len(due_after) == 0, "Job should not be due after advance_next_run"
|
||||||
|
|
||||||
|
|
||||||
class TestGetDueJobs:
|
class TestGetDueJobs:
|
||||||
def test_past_due_within_window_returned(self, tmp_cron_dir):
|
def test_past_due_within_window_returned(self, tmp_cron_dir):
|
||||||
"""Jobs within the dynamic grace window are still considered due (not stale).
|
"""Jobs within the dynamic grace window are still considered due (not stale).
|
||||||
|
|||||||
@@ -687,3 +687,41 @@ class TestBuildJobPromptMissingSkill:
|
|||||||
result = _build_job_prompt({"skills": ["ghost-skill", "real-skill"], "prompt": "go"})
|
result = _build_job_prompt({"skills": ["ghost-skill", "real-skill"], "prompt": "go"})
|
||||||
assert "Real skill content." in result
|
assert "Real skill content." in result
|
||||||
assert "go" in result
|
assert "go" in result
|
||||||
|
|
||||||
|
|
||||||
|
class TestTickAdvanceBeforeRun:
|
||||||
|
"""Verify that tick() calls advance_next_run before run_job for crash safety."""
|
||||||
|
|
||||||
|
def test_advance_called_before_run_job(self, tmp_path):
|
||||||
|
"""advance_next_run must be called before run_job to prevent crash-loop re-fires."""
|
||||||
|
call_order = []
|
||||||
|
|
||||||
|
def fake_advance(job_id):
|
||||||
|
call_order.append(("advance", job_id))
|
||||||
|
return True
|
||||||
|
|
||||||
|
def fake_run_job(job):
|
||||||
|
call_order.append(("run", job["id"]))
|
||||||
|
return True, "output", "response", None
|
||||||
|
|
||||||
|
fake_job = {
|
||||||
|
"id": "test-advance",
|
||||||
|
"name": "test",
|
||||||
|
"prompt": "hello",
|
||||||
|
"enabled": True,
|
||||||
|
"schedule": {"kind": "cron", "expr": "15 6 * * *"},
|
||||||
|
}
|
||||||
|
|
||||||
|
with patch("cron.scheduler.get_due_jobs", return_value=[fake_job]), \
|
||||||
|
patch("cron.scheduler.advance_next_run", side_effect=fake_advance) as adv_mock, \
|
||||||
|
patch("cron.scheduler.run_job", side_effect=fake_run_job), \
|
||||||
|
patch("cron.scheduler.save_job_output", return_value=tmp_path / "out.md"), \
|
||||||
|
patch("cron.scheduler.mark_job_run"), \
|
||||||
|
patch("cron.scheduler._deliver_result"):
|
||||||
|
from cron.scheduler import tick
|
||||||
|
executed = tick(verbose=False)
|
||||||
|
|
||||||
|
assert executed == 1
|
||||||
|
adv_mock.assert_called_once_with("test-advance")
|
||||||
|
# advance must happen before run
|
||||||
|
assert call_order == [("advance", "test-advance"), ("run", "test-advance")]
|
||||||
|
|||||||
Reference in New Issue
Block a user