diff --git a/cron/jobs.py b/cron/jobs.py index 841f5633..5e3d7067 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -598,6 +598,34 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None): save_jobs(jobs) +def advance_next_run(job_id: str) -> bool: + """Preemptively advance next_run_at for a recurring job before execution. + + Call this BEFORE run_job() so that if the process crashes mid-execution, + the job won't re-fire on the next gateway restart. This converts the + scheduler from at-least-once to at-most-once for recurring jobs — missing + one run is far better than firing dozens of times in a crash loop. + + One-shot jobs are left unchanged so they can still retry on restart. + + Returns True if next_run_at was advanced, False otherwise. + """ + jobs = load_jobs() + for job in jobs: + if job["id"] == job_id: + kind = job.get("schedule", {}).get("kind") + if kind not in ("cron", "interval"): + return False + now = _hermes_now().isoformat() + new_next = compute_next_run(job["schedule"], now) + if new_next and new_next != job.get("next_run_at"): + job["next_run_at"] = new_next + save_jobs(jobs) + return True + return False + return False + + def get_due_jobs() -> List[Dict[str, Any]]: """Get all jobs that are due to run now. diff --git a/cron/scheduler.py b/cron/scheduler.py index e6313cd7..55a03878 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -35,7 +35,7 @@ logger = logging.getLogger(__name__) # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) -from cron.jobs import get_due_jobs, mark_job_run, save_job_output +from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run # Sentinel: when a cron agent has nothing new to report, it can start its # response with this marker to suppress delivery. Output is still saved @@ -524,6 +524,12 @@ def tick(verbose: bool = True) -> int: executed = 0 for job in due_jobs: try: + # For recurring jobs (cron/interval), advance next_run_at to the + # next future occurrence BEFORE execution. This way, if the + # process crashes mid-run, the job won't re-fire on restart. + # One-shot jobs are left alone so they can retry on restart. + advance_next_run(job["id"]) + success, output, final_response, error = run_job(job) output_file = save_job_output(job["id"], output) diff --git a/tests/cron/test_jobs.py b/tests/cron/test_jobs.py index 71883d15..cca46010 100644 --- a/tests/cron/test_jobs.py +++ b/tests/cron/test_jobs.py @@ -20,6 +20,7 @@ from cron.jobs import ( resume_job, remove_job, mark_job_run, + advance_next_run, get_due_jobs, save_job_output, ) @@ -339,6 +340,90 @@ class TestMarkJobRun: assert updated["last_error"] == "timeout" +class TestAdvanceNextRun: + """Tests for advance_next_run() — crash-safety for recurring jobs.""" + + def test_advances_interval_job(self, tmp_cron_dir): + """Interval jobs should have next_run_at bumped to the next future occurrence.""" + job = create_job(prompt="Recurring check", schedule="every 1h") + # Force next_run_at to 5 minutes ago (i.e. the job is due) + jobs = load_jobs() + old_next = (datetime.now() - timedelta(minutes=5)).isoformat() + jobs[0]["next_run_at"] = old_next + save_jobs(jobs) + + result = advance_next_run(job["id"]) + assert result is True + + updated = get_job(job["id"]) + from cron.jobs import _ensure_aware, _hermes_now + new_next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"])) + assert new_next_dt > _hermes_now(), "next_run_at should be in the future after advance" + + def test_advances_cron_job(self, tmp_cron_dir): + """Cron-expression jobs should have next_run_at bumped to the next occurrence.""" + pytest.importorskip("croniter") + job = create_job(prompt="Daily wakeup", schedule="15 6 * * *") + # Force next_run_at to 30 minutes ago + jobs = load_jobs() + old_next = (datetime.now() - timedelta(minutes=30)).isoformat() + jobs[0]["next_run_at"] = old_next + save_jobs(jobs) + + result = advance_next_run(job["id"]) + assert result is True + + updated = get_job(job["id"]) + from cron.jobs import _ensure_aware, _hermes_now + new_next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"])) + assert new_next_dt > _hermes_now(), "next_run_at should be in the future after advance" + + def test_skips_oneshot_job(self, tmp_cron_dir): + """One-shot jobs should NOT be advanced — they need to retry on restart.""" + job = create_job(prompt="Run once", schedule="30m") + original_next = get_job(job["id"])["next_run_at"] + + result = advance_next_run(job["id"]) + assert result is False + + updated = get_job(job["id"]) + assert updated["next_run_at"] == original_next, "one-shot next_run_at should be unchanged" + + def test_nonexistent_job_returns_false(self, tmp_cron_dir): + result = advance_next_run("nonexistent-id") + assert result is False + + def test_already_future_stays_future(self, tmp_cron_dir): + """If next_run_at is already in the future, advance keeps it in the future (no harm).""" + job = create_job(prompt="Future job", schedule="every 1h") + # next_run_at is already set to ~1h from now by create_job + advance_next_run(job["id"]) + # Regardless of return value, the job should still be in the future + updated = get_job(job["id"]) + from cron.jobs import _ensure_aware, _hermes_now + new_next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"])) + assert new_next_dt > _hermes_now(), "next_run_at should remain in the future" + + def test_crash_safety_scenario(self, tmp_cron_dir): + """Simulate the crash-loop scenario: after advance, the job should NOT be due.""" + job = create_job(prompt="Crash test", schedule="every 1h") + # Force next_run_at to 5 minutes ago (job is due) + jobs = load_jobs() + jobs[0]["next_run_at"] = (datetime.now() - timedelta(minutes=5)).isoformat() + save_jobs(jobs) + + # Job should be due before advance + due_before = get_due_jobs() + assert len(due_before) == 1 + + # Advance (simulating what tick() does before run_job) + advance_next_run(job["id"]) + + # Now the job should NOT be due (simulates restart after crash) + due_after = get_due_jobs() + assert len(due_after) == 0, "Job should not be due after advance_next_run" + + class TestGetDueJobs: def test_past_due_within_window_returned(self, tmp_cron_dir): """Jobs within the dynamic grace window are still considered due (not stale). diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index c5f244a1..25bc202c 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -687,3 +687,41 @@ class TestBuildJobPromptMissingSkill: result = _build_job_prompt({"skills": ["ghost-skill", "real-skill"], "prompt": "go"}) assert "Real skill content." in result assert "go" in result + + +class TestTickAdvanceBeforeRun: + """Verify that tick() calls advance_next_run before run_job for crash safety.""" + + def test_advance_called_before_run_job(self, tmp_path): + """advance_next_run must be called before run_job to prevent crash-loop re-fires.""" + call_order = [] + + def fake_advance(job_id): + call_order.append(("advance", job_id)) + return True + + def fake_run_job(job): + call_order.append(("run", job["id"])) + return True, "output", "response", None + + fake_job = { + "id": "test-advance", + "name": "test", + "prompt": "hello", + "enabled": True, + "schedule": {"kind": "cron", "expr": "15 6 * * *"}, + } + + with patch("cron.scheduler.get_due_jobs", return_value=[fake_job]), \ + patch("cron.scheduler.advance_next_run", side_effect=fake_advance) as adv_mock, \ + patch("cron.scheduler.run_job", side_effect=fake_run_job), \ + patch("cron.scheduler.save_job_output", return_value=tmp_path / "out.md"), \ + patch("cron.scheduler.mark_job_run"), \ + patch("cron.scheduler._deliver_result"): + from cron.scheduler import tick + executed = tick(verbose=False) + + assert executed == 1 + adv_mock.assert_called_once_with("test-advance") + # advance must happen before run + assert call_order == [("advance", "test-advance"), ("run", "test-advance")]