Replace hardcoded 120-second grace period with a dynamic window that scales with the job's scheduling frequency (half the period, clamped to [120s, 2h]). Daily jobs now catch up if missed by up to 2 hours instead of being silently skipped after just 2 minutes.
442 lines
17 KiB
Python
442 lines
17 KiB
Python
"""Tests for cron/jobs.py — schedule parsing, job CRUD, and due-job detection."""
|
|
|
|
import json
|
|
import pytest
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from cron.jobs import (
|
|
parse_duration,
|
|
parse_schedule,
|
|
compute_next_run,
|
|
create_job,
|
|
load_jobs,
|
|
save_jobs,
|
|
get_job,
|
|
list_jobs,
|
|
update_job,
|
|
pause_job,
|
|
resume_job,
|
|
remove_job,
|
|
mark_job_run,
|
|
get_due_jobs,
|
|
save_job_output,
|
|
)
|
|
|
|
|
|
# =========================================================================
|
|
# parse_duration
|
|
# =========================================================================
|
|
|
|
class TestParseDuration:
|
|
def test_minutes(self):
|
|
assert parse_duration("30m") == 30
|
|
assert parse_duration("1min") == 1
|
|
assert parse_duration("5mins") == 5
|
|
assert parse_duration("10minute") == 10
|
|
assert parse_duration("120minutes") == 120
|
|
|
|
def test_hours(self):
|
|
assert parse_duration("2h") == 120
|
|
assert parse_duration("1hr") == 60
|
|
assert parse_duration("3hrs") == 180
|
|
assert parse_duration("1hour") == 60
|
|
assert parse_duration("24hours") == 1440
|
|
|
|
def test_days(self):
|
|
assert parse_duration("1d") == 1440
|
|
assert parse_duration("7day") == 7 * 1440
|
|
assert parse_duration("2days") == 2 * 1440
|
|
|
|
def test_whitespace_tolerance(self):
|
|
assert parse_duration(" 30m ") == 30
|
|
assert parse_duration("2 h") == 120
|
|
|
|
def test_invalid_raises(self):
|
|
with pytest.raises(ValueError):
|
|
parse_duration("abc")
|
|
with pytest.raises(ValueError):
|
|
parse_duration("30x")
|
|
with pytest.raises(ValueError):
|
|
parse_duration("")
|
|
with pytest.raises(ValueError):
|
|
parse_duration("m30")
|
|
|
|
|
|
# =========================================================================
|
|
# parse_schedule
|
|
# =========================================================================
|
|
|
|
class TestParseSchedule:
|
|
def test_duration_becomes_once(self):
|
|
result = parse_schedule("30m")
|
|
assert result["kind"] == "once"
|
|
assert "run_at" in result
|
|
# run_at should be a valid ISO timestamp string ~30 minutes from now
|
|
run_at_str = result["run_at"]
|
|
assert isinstance(run_at_str, str)
|
|
run_at = datetime.fromisoformat(run_at_str)
|
|
now = datetime.now().astimezone()
|
|
assert run_at > now
|
|
assert run_at < now + timedelta(minutes=31)
|
|
|
|
def test_every_becomes_interval(self):
|
|
result = parse_schedule("every 2h")
|
|
assert result["kind"] == "interval"
|
|
assert result["minutes"] == 120
|
|
|
|
def test_every_case_insensitive(self):
|
|
result = parse_schedule("Every 30m")
|
|
assert result["kind"] == "interval"
|
|
assert result["minutes"] == 30
|
|
|
|
def test_cron_expression(self):
|
|
pytest.importorskip("croniter")
|
|
result = parse_schedule("0 9 * * *")
|
|
assert result["kind"] == "cron"
|
|
assert result["expr"] == "0 9 * * *"
|
|
|
|
def test_iso_timestamp(self):
|
|
result = parse_schedule("2030-01-15T14:00:00")
|
|
assert result["kind"] == "once"
|
|
assert "2030-01-15" in result["run_at"]
|
|
|
|
def test_invalid_schedule_raises(self):
|
|
with pytest.raises(ValueError):
|
|
parse_schedule("not_a_schedule")
|
|
|
|
def test_invalid_cron_raises(self):
|
|
pytest.importorskip("croniter")
|
|
with pytest.raises(ValueError):
|
|
parse_schedule("99 99 99 99 99")
|
|
|
|
|
|
# =========================================================================
|
|
# compute_next_run
|
|
# =========================================================================
|
|
|
|
class TestComputeNextRun:
|
|
def test_once_future_returns_time(self):
|
|
future = (datetime.now() + timedelta(hours=1)).isoformat()
|
|
schedule = {"kind": "once", "run_at": future}
|
|
assert compute_next_run(schedule) == future
|
|
|
|
def test_once_recent_past_within_grace_returns_time(self, monkeypatch):
|
|
now = datetime(2026, 3, 18, 4, 22, 3, tzinfo=timezone.utc)
|
|
run_at = "2026-03-18T04:22:00+00:00"
|
|
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
|
|
|
|
schedule = {"kind": "once", "run_at": run_at}
|
|
|
|
assert compute_next_run(schedule) == run_at
|
|
|
|
def test_once_past_returns_none(self):
|
|
past = (datetime.now() - timedelta(hours=1)).isoformat()
|
|
schedule = {"kind": "once", "run_at": past}
|
|
assert compute_next_run(schedule) is None
|
|
|
|
def test_once_with_last_run_returns_none_even_within_grace(self, monkeypatch):
|
|
now = datetime(2026, 3, 18, 4, 22, 3, tzinfo=timezone.utc)
|
|
run_at = "2026-03-18T04:22:00+00:00"
|
|
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
|
|
|
|
schedule = {"kind": "once", "run_at": run_at}
|
|
|
|
assert compute_next_run(schedule, last_run_at=now.isoformat()) is None
|
|
|
|
def test_interval_first_run(self):
|
|
schedule = {"kind": "interval", "minutes": 60}
|
|
result = compute_next_run(schedule)
|
|
next_dt = datetime.fromisoformat(result)
|
|
# Should be ~60 minutes from now
|
|
assert next_dt > datetime.now().astimezone() + timedelta(minutes=59)
|
|
|
|
def test_interval_subsequent_run(self):
|
|
schedule = {"kind": "interval", "minutes": 30}
|
|
last = datetime.now().astimezone().isoformat()
|
|
result = compute_next_run(schedule, last_run_at=last)
|
|
next_dt = datetime.fromisoformat(result)
|
|
# Should be ~30 minutes from last run
|
|
assert next_dt > datetime.now().astimezone() + timedelta(minutes=29)
|
|
|
|
def test_cron_returns_future(self):
|
|
pytest.importorskip("croniter")
|
|
schedule = {"kind": "cron", "expr": "* * * * *"} # every minute
|
|
result = compute_next_run(schedule)
|
|
assert isinstance(result, str), f"Expected ISO timestamp string, got {type(result)}"
|
|
assert len(result) > 0
|
|
next_dt = datetime.fromisoformat(result)
|
|
assert isinstance(next_dt, datetime)
|
|
assert next_dt > datetime.now().astimezone()
|
|
|
|
def test_unknown_kind_returns_none(self):
|
|
assert compute_next_run({"kind": "unknown"}) is None
|
|
|
|
|
|
# =========================================================================
|
|
# Job CRUD (with tmp file storage)
|
|
# =========================================================================
|
|
|
|
@pytest.fixture()
|
|
def tmp_cron_dir(tmp_path, monkeypatch):
|
|
"""Redirect cron storage to a temp directory."""
|
|
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path / "cron")
|
|
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "cron" / "jobs.json")
|
|
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "cron" / "output")
|
|
return tmp_path
|
|
|
|
|
|
class TestJobCRUD:
|
|
def test_create_and_get(self, tmp_cron_dir):
|
|
job = create_job(prompt="Check server status", schedule="30m")
|
|
assert job["id"]
|
|
assert job["prompt"] == "Check server status"
|
|
assert job["enabled"] is True
|
|
assert job["schedule"]["kind"] == "once"
|
|
|
|
fetched = get_job(job["id"])
|
|
assert fetched is not None
|
|
assert fetched["prompt"] == "Check server status"
|
|
|
|
def test_list_jobs(self, tmp_cron_dir):
|
|
create_job(prompt="Job 1", schedule="every 1h")
|
|
create_job(prompt="Job 2", schedule="every 2h")
|
|
jobs = list_jobs()
|
|
assert len(jobs) == 2
|
|
|
|
def test_remove_job(self, tmp_cron_dir):
|
|
job = create_job(prompt="Temp job", schedule="30m")
|
|
assert remove_job(job["id"]) is True
|
|
assert get_job(job["id"]) is None
|
|
|
|
def test_remove_nonexistent_returns_false(self, tmp_cron_dir):
|
|
assert remove_job("nonexistent") is False
|
|
|
|
def test_auto_repeat_for_once(self, tmp_cron_dir):
|
|
job = create_job(prompt="One-shot", schedule="1h")
|
|
assert job["repeat"]["times"] == 1
|
|
|
|
def test_interval_no_auto_repeat(self, tmp_cron_dir):
|
|
job = create_job(prompt="Recurring", schedule="every 1h")
|
|
assert job["repeat"]["times"] is None
|
|
|
|
def test_default_delivery_origin(self, tmp_cron_dir):
|
|
job = create_job(
|
|
prompt="Test", schedule="30m",
|
|
origin={"platform": "telegram", "chat_id": "123"},
|
|
)
|
|
assert job["deliver"] == "origin"
|
|
|
|
def test_default_delivery_local_no_origin(self, tmp_cron_dir):
|
|
job = create_job(prompt="Test", schedule="30m")
|
|
assert job["deliver"] == "local"
|
|
|
|
|
|
class TestUpdateJob:
|
|
def test_update_name(self, tmp_cron_dir):
|
|
job = create_job(prompt="Check server status", schedule="every 1h", name="Old Name")
|
|
assert job["name"] == "Old Name"
|
|
updated = update_job(job["id"], {"name": "New Name"})
|
|
assert updated is not None
|
|
assert isinstance(updated, dict)
|
|
assert updated["name"] == "New Name"
|
|
# Verify other fields are preserved
|
|
assert updated["prompt"] == "Check server status"
|
|
assert updated["id"] == job["id"]
|
|
assert updated["schedule"] == job["schedule"]
|
|
# Verify persisted to disk
|
|
fetched = get_job(job["id"])
|
|
assert fetched["name"] == "New Name"
|
|
|
|
def test_update_schedule(self, tmp_cron_dir):
|
|
job = create_job(prompt="Daily report", schedule="every 1h")
|
|
assert job["schedule"]["kind"] == "interval"
|
|
assert job["schedule"]["minutes"] == 60
|
|
old_next_run = job["next_run_at"]
|
|
new_schedule = parse_schedule("every 2h")
|
|
updated = update_job(job["id"], {"schedule": new_schedule, "schedule_display": new_schedule["display"]})
|
|
assert updated is not None
|
|
assert updated["schedule"]["kind"] == "interval"
|
|
assert updated["schedule"]["minutes"] == 120
|
|
assert updated["schedule_display"] == "every 120m"
|
|
assert updated["next_run_at"] != old_next_run
|
|
# Verify persisted to disk
|
|
fetched = get_job(job["id"])
|
|
assert fetched["schedule"]["minutes"] == 120
|
|
assert fetched["schedule_display"] == "every 120m"
|
|
|
|
def test_update_enable_disable(self, tmp_cron_dir):
|
|
job = create_job(prompt="Toggle me", schedule="every 1h")
|
|
assert job["enabled"] is True
|
|
updated = update_job(job["id"], {"enabled": False})
|
|
assert updated["enabled"] is False
|
|
fetched = get_job(job["id"])
|
|
assert fetched["enabled"] is False
|
|
|
|
def test_update_nonexistent_returns_none(self, tmp_cron_dir):
|
|
result = update_job("nonexistent_id", {"name": "X"})
|
|
assert result is None
|
|
|
|
|
|
class TestPauseResumeJob:
|
|
def test_pause_sets_state(self, tmp_cron_dir):
|
|
job = create_job(prompt="Pause me", schedule="every 1h")
|
|
paused = pause_job(job["id"], reason="user paused")
|
|
assert paused is not None
|
|
assert paused["enabled"] is False
|
|
assert paused["state"] == "paused"
|
|
assert paused["paused_reason"] == "user paused"
|
|
|
|
def test_resume_reenables_job(self, tmp_cron_dir):
|
|
job = create_job(prompt="Resume me", schedule="every 1h")
|
|
pause_job(job["id"], reason="user paused")
|
|
resumed = resume_job(job["id"])
|
|
assert resumed is not None
|
|
assert resumed["enabled"] is True
|
|
assert resumed["state"] == "scheduled"
|
|
assert resumed["paused_at"] is None
|
|
assert resumed["paused_reason"] is None
|
|
|
|
|
|
class TestMarkJobRun:
|
|
def test_increments_completed(self, tmp_cron_dir):
|
|
job = create_job(prompt="Test", schedule="every 1h")
|
|
mark_job_run(job["id"], success=True)
|
|
updated = get_job(job["id"])
|
|
assert updated["repeat"]["completed"] == 1
|
|
assert updated["last_status"] == "ok"
|
|
|
|
def test_repeat_limit_removes_job(self, tmp_cron_dir):
|
|
job = create_job(prompt="Once", schedule="30m", repeat=1)
|
|
mark_job_run(job["id"], success=True)
|
|
# Job should be removed after hitting repeat limit
|
|
assert get_job(job["id"]) is None
|
|
|
|
def test_error_status(self, tmp_cron_dir):
|
|
job = create_job(prompt="Fail", schedule="every 1h")
|
|
mark_job_run(job["id"], success=False, error="timeout")
|
|
updated = get_job(job["id"])
|
|
assert updated["last_status"] == "error"
|
|
assert updated["last_error"] == "timeout"
|
|
|
|
|
|
class TestGetDueJobs:
|
|
def test_past_due_within_window_returned(self, tmp_cron_dir):
|
|
"""Jobs within the dynamic grace window are still considered due (not stale).
|
|
|
|
For an hourly job, grace = 30 min (half the period, clamped to [120s, 2h]).
|
|
"""
|
|
job = create_job(prompt="Due now", schedule="every 1h")
|
|
# Force next_run_at to 10 minutes ago (within the 30-min grace for hourly)
|
|
jobs = load_jobs()
|
|
jobs[0]["next_run_at"] = (datetime.now() - timedelta(minutes=10)).isoformat()
|
|
save_jobs(jobs)
|
|
|
|
due = get_due_jobs()
|
|
assert len(due) == 1
|
|
assert due[0]["id"] == job["id"]
|
|
|
|
def test_stale_past_due_skipped(self, tmp_cron_dir):
|
|
"""Recurring jobs past their dynamic grace window are fast-forwarded, not fired.
|
|
|
|
For an hourly job, grace = 30 min. Setting 35 min late exceeds the window.
|
|
"""
|
|
job = create_job(prompt="Stale", schedule="every 1h")
|
|
# Force next_run_at to 35 minutes ago (beyond the 30-min grace for hourly)
|
|
jobs = load_jobs()
|
|
jobs[0]["next_run_at"] = (datetime.now() - timedelta(minutes=35)).isoformat()
|
|
save_jobs(jobs)
|
|
|
|
due = get_due_jobs()
|
|
assert len(due) == 0
|
|
# next_run_at should be fast-forwarded to the future
|
|
updated = get_job(job["id"])
|
|
from cron.jobs import _ensure_aware, _hermes_now
|
|
next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"]))
|
|
assert next_dt > _hermes_now()
|
|
|
|
def test_future_not_returned(self, tmp_cron_dir):
|
|
create_job(prompt="Not yet", schedule="every 1h")
|
|
due = get_due_jobs()
|
|
assert len(due) == 0
|
|
|
|
def test_disabled_not_returned(self, tmp_cron_dir):
|
|
job = create_job(prompt="Disabled", schedule="every 1h")
|
|
jobs = load_jobs()
|
|
jobs[0]["enabled"] = False
|
|
jobs[0]["next_run_at"] = (datetime.now() - timedelta(minutes=5)).isoformat()
|
|
save_jobs(jobs)
|
|
|
|
due = get_due_jobs()
|
|
assert len(due) == 0
|
|
|
|
def test_broken_recent_one_shot_without_next_run_is_recovered(self, tmp_cron_dir, monkeypatch):
|
|
now = datetime(2026, 3, 18, 4, 22, 30, tzinfo=timezone.utc)
|
|
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
|
|
|
|
run_at = "2026-03-18T04:22:00+00:00"
|
|
save_jobs(
|
|
[{
|
|
"id": "oneshot-recover",
|
|
"name": "Recover me",
|
|
"prompt": "Word of the day",
|
|
"schedule": {"kind": "once", "run_at": run_at, "display": "once at 2026-03-18 04:22"},
|
|
"schedule_display": "once at 2026-03-18 04:22",
|
|
"repeat": {"times": 1, "completed": 0},
|
|
"enabled": True,
|
|
"state": "scheduled",
|
|
"paused_at": None,
|
|
"paused_reason": None,
|
|
"created_at": "2026-03-18T04:21:00+00:00",
|
|
"next_run_at": None,
|
|
"last_run_at": None,
|
|
"last_status": None,
|
|
"last_error": None,
|
|
"deliver": "local",
|
|
"origin": None,
|
|
}]
|
|
)
|
|
|
|
due = get_due_jobs()
|
|
|
|
assert [job["id"] for job in due] == ["oneshot-recover"]
|
|
assert get_job("oneshot-recover")["next_run_at"] == run_at
|
|
|
|
def test_broken_stale_one_shot_without_next_run_is_not_recovered(self, tmp_cron_dir, monkeypatch):
|
|
now = datetime(2026, 3, 18, 4, 30, 0, tzinfo=timezone.utc)
|
|
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
|
|
|
|
save_jobs(
|
|
[{
|
|
"id": "oneshot-stale",
|
|
"name": "Too old",
|
|
"prompt": "Word of the day",
|
|
"schedule": {"kind": "once", "run_at": "2026-03-18T04:22:00+00:00", "display": "once at 2026-03-18 04:22"},
|
|
"schedule_display": "once at 2026-03-18 04:22",
|
|
"repeat": {"times": 1, "completed": 0},
|
|
"enabled": True,
|
|
"state": "scheduled",
|
|
"paused_at": None,
|
|
"paused_reason": None,
|
|
"created_at": "2026-03-18T04:21:00+00:00",
|
|
"next_run_at": None,
|
|
"last_run_at": None,
|
|
"last_status": None,
|
|
"last_error": None,
|
|
"deliver": "local",
|
|
"origin": None,
|
|
}]
|
|
)
|
|
|
|
assert get_due_jobs() == []
|
|
assert get_job("oneshot-stale")["next_run_at"] is None
|
|
|
|
|
|
class TestSaveJobOutput:
|
|
def test_creates_output_file(self, tmp_cron_dir):
|
|
output_file = save_job_output("test123", "# Results\nEverything ok.")
|
|
assert output_file.exists()
|
|
assert output_file.read_text() == "# Results\nEverything ok."
|
|
assert "test123" in str(output_file)
|