fix(cron): recover recent one-shot jobs (#1918)
Co-authored-by: Frederico Ribeiro <fr@tecompanytea.com>
This commit is contained in:
53
cron/jobs.py
53
cron/jobs.py
@@ -34,6 +34,7 @@ HERMES_DIR = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
CRON_DIR = HERMES_DIR / "cron"
|
||||
JOBS_FILE = CRON_DIR / "jobs.json"
|
||||
OUTPUT_DIR = CRON_DIR / "output"
|
||||
ONESHOT_GRACE_SECONDS = 120
|
||||
|
||||
|
||||
def _normalize_skill_list(skill: Optional[str] = None, skills: Optional[Any] = None) -> List[str]:
|
||||
@@ -220,6 +221,33 @@ def _ensure_aware(dt: datetime) -> datetime:
|
||||
return dt.astimezone(target_tz)
|
||||
|
||||
|
||||
def _recoverable_oneshot_run_at(
|
||||
schedule: Dict[str, Any],
|
||||
now: datetime,
|
||||
*,
|
||||
last_run_at: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
"""Return a one-shot run time if it is still eligible to fire.
|
||||
|
||||
One-shot jobs get a small grace window so jobs created a few seconds after
|
||||
their requested minute still run on the next tick. Once a one-shot has
|
||||
already run, it is never eligible again.
|
||||
"""
|
||||
if schedule.get("kind") != "once":
|
||||
return None
|
||||
if last_run_at:
|
||||
return None
|
||||
|
||||
run_at = schedule.get("run_at")
|
||||
if not run_at:
|
||||
return None
|
||||
|
||||
run_at_dt = _ensure_aware(datetime.fromisoformat(run_at))
|
||||
if run_at_dt >= now - timedelta(seconds=ONESHOT_GRACE_SECONDS):
|
||||
return run_at
|
||||
return None
|
||||
|
||||
|
||||
def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None) -> Optional[str]:
|
||||
"""
|
||||
Compute the next run time for a schedule.
|
||||
@@ -229,9 +257,7 @@ def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None
|
||||
now = _hermes_now()
|
||||
|
||||
if schedule["kind"] == "once":
|
||||
run_at = _ensure_aware(datetime.fromisoformat(schedule["run_at"]))
|
||||
# If in the future, return it; if in the past, no more runs
|
||||
return schedule["run_at"] if run_at > now else None
|
||||
return _recoverable_oneshot_run_at(schedule, now, last_run_at=last_run_at)
|
||||
|
||||
elif schedule["kind"] == "interval":
|
||||
minutes = schedule["minutes"]
|
||||
@@ -555,7 +581,26 @@ def get_due_jobs() -> List[Dict[str, Any]]:
|
||||
|
||||
next_run = job.get("next_run_at")
|
||||
if not next_run:
|
||||
continue
|
||||
recovered_next = _recoverable_oneshot_run_at(
|
||||
job.get("schedule", {}),
|
||||
now,
|
||||
last_run_at=job.get("last_run_at"),
|
||||
)
|
||||
if not recovered_next:
|
||||
continue
|
||||
|
||||
job["next_run_at"] = recovered_next
|
||||
next_run = recovered_next
|
||||
logger.info(
|
||||
"Job '%s' had no next_run_at; recovering one-shot run at %s",
|
||||
job.get("name", job["id"]),
|
||||
recovered_next,
|
||||
)
|
||||
for rj in raw_jobs:
|
||||
if rj["id"] == job["id"]:
|
||||
rj["next_run_at"] = recovered_next
|
||||
needs_save = True
|
||||
break
|
||||
|
||||
next_run_dt = _ensure_aware(datetime.fromisoformat(next_run))
|
||||
if next_run_dt <= now:
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import json
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
@@ -122,11 +122,29 @@ class TestComputeNextRun:
|
||||
schedule = {"kind": "once", "run_at": future}
|
||||
assert compute_next_run(schedule) == future
|
||||
|
||||
def test_once_recent_past_within_grace_returns_time(self, monkeypatch):
|
||||
now = datetime(2026, 3, 18, 4, 22, 3, tzinfo=timezone.utc)
|
||||
run_at = "2026-03-18T04:22:00+00:00"
|
||||
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
|
||||
|
||||
schedule = {"kind": "once", "run_at": run_at}
|
||||
|
||||
assert compute_next_run(schedule) == run_at
|
||||
|
||||
def test_once_past_returns_none(self):
|
||||
past = (datetime.now() - timedelta(hours=1)).isoformat()
|
||||
schedule = {"kind": "once", "run_at": past}
|
||||
assert compute_next_run(schedule) is None
|
||||
|
||||
def test_once_with_last_run_returns_none_even_within_grace(self, monkeypatch):
|
||||
now = datetime(2026, 3, 18, 4, 22, 3, tzinfo=timezone.utc)
|
||||
run_at = "2026-03-18T04:22:00+00:00"
|
||||
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
|
||||
|
||||
schedule = {"kind": "once", "run_at": run_at}
|
||||
|
||||
assert compute_next_run(schedule, last_run_at=now.isoformat()) is None
|
||||
|
||||
def test_interval_first_run(self):
|
||||
schedule = {"kind": "interval", "minutes": 60}
|
||||
result = compute_next_run(schedule)
|
||||
@@ -347,6 +365,67 @@ class TestGetDueJobs:
|
||||
due = get_due_jobs()
|
||||
assert len(due) == 0
|
||||
|
||||
def test_broken_recent_one_shot_without_next_run_is_recovered(self, tmp_cron_dir, monkeypatch):
|
||||
now = datetime(2026, 3, 18, 4, 22, 30, tzinfo=timezone.utc)
|
||||
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
|
||||
|
||||
run_at = "2026-03-18T04:22:00+00:00"
|
||||
save_jobs(
|
||||
[{
|
||||
"id": "oneshot-recover",
|
||||
"name": "Recover me",
|
||||
"prompt": "Word of the day",
|
||||
"schedule": {"kind": "once", "run_at": run_at, "display": "once at 2026-03-18 04:22"},
|
||||
"schedule_display": "once at 2026-03-18 04:22",
|
||||
"repeat": {"times": 1, "completed": 0},
|
||||
"enabled": True,
|
||||
"state": "scheduled",
|
||||
"paused_at": None,
|
||||
"paused_reason": None,
|
||||
"created_at": "2026-03-18T04:21:00+00:00",
|
||||
"next_run_at": None,
|
||||
"last_run_at": None,
|
||||
"last_status": None,
|
||||
"last_error": None,
|
||||
"deliver": "local",
|
||||
"origin": None,
|
||||
}]
|
||||
)
|
||||
|
||||
due = get_due_jobs()
|
||||
|
||||
assert [job["id"] for job in due] == ["oneshot-recover"]
|
||||
assert get_job("oneshot-recover")["next_run_at"] == run_at
|
||||
|
||||
def test_broken_stale_one_shot_without_next_run_is_not_recovered(self, tmp_cron_dir, monkeypatch):
|
||||
now = datetime(2026, 3, 18, 4, 30, 0, tzinfo=timezone.utc)
|
||||
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
|
||||
|
||||
save_jobs(
|
||||
[{
|
||||
"id": "oneshot-stale",
|
||||
"name": "Too old",
|
||||
"prompt": "Word of the day",
|
||||
"schedule": {"kind": "once", "run_at": "2026-03-18T04:22:00+00:00", "display": "once at 2026-03-18 04:22"},
|
||||
"schedule_display": "once at 2026-03-18 04:22",
|
||||
"repeat": {"times": 1, "completed": 0},
|
||||
"enabled": True,
|
||||
"state": "scheduled",
|
||||
"paused_at": None,
|
||||
"paused_reason": None,
|
||||
"created_at": "2026-03-18T04:21:00+00:00",
|
||||
"next_run_at": None,
|
||||
"last_run_at": None,
|
||||
"last_status": None,
|
||||
"last_error": None,
|
||||
"deliver": "local",
|
||||
"origin": None,
|
||||
}]
|
||||
)
|
||||
|
||||
assert get_due_jobs() == []
|
||||
assert get_job("oneshot-stale")["next_run_at"] is None
|
||||
|
||||
|
||||
class TestSaveJobOutput:
|
||||
def test_creates_output_file(self, tmp_cron_dir):
|
||||
|
||||
Reference in New Issue
Block a user