[claude] Poka-yoke cron heartbeats: write, check, and report (#1096) #1107

Merged
claude merged 1 commits from claude/issue-1096 into main 2026-04-07 14:44:06 +00:00
6 changed files with 843 additions and 1 deletions

326
bin/bezalel_heartbeat_check.py Executable file
View File

@@ -0,0 +1,326 @@
#!/usr/bin/env python3
"""
Bezalel Meta-Heartbeat Checker — stale cron detection (poka-yoke #1096)
Monitors all cron job heartbeat files and alerts P1 when any job has been
silent for more than 2× its declared interval.
POKA-YOKE design:
Prevention — cron-heartbeat-write.sh writes a .last file atomically after
every successful cron job completion, stamping its interval.
Detection — this script runs every 15 minutes (via systemd timer) and
raises P1 on stderr + writes an alert file for any stale job.
Correction — alerts are loud enough (P1 stderr + alert files) for
monitoring/humans to intervene before the next run window.
ZERO DEPENDENCIES
=================
Pure stdlib. No pip installs.
USAGE
=====
# One-shot check (default dir)
python bin/bezalel_heartbeat_check.py
# Override heartbeat dir
python bin/bezalel_heartbeat_check.py --heartbeat-dir /tmp/test-beats
# Dry-run (check + report, don't write alert files)
python bin/bezalel_heartbeat_check.py --dry-run
# JSON output (for piping into other tools)
python bin/bezalel_heartbeat_check.py --json
EXIT CODES
==========
0 — all jobs healthy (or no .last files found yet)
1 — one or more stale beats detected
2 — heartbeat dir unreadable
IMPORTABLE API
==============
from bin.bezalel_heartbeat_check import check_cron_heartbeats
result = check_cron_heartbeats("/var/run/bezalel/heartbeats")
# Returns dict with keys: checked_at, jobs, stale_count, healthy_count
Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("bezalel.heartbeat")
# ── Configuration ────────────────────────────────────────────────────
DEFAULT_HEARTBEAT_DIR = "/var/run/bezalel/heartbeats"
# ── Core checker ─────────────────────────────────────────────────────
def check_cron_heartbeats(heartbeat_dir: str = DEFAULT_HEARTBEAT_DIR) -> Dict[str, Any]:
"""
Scan all .last files in heartbeat_dir and determine which jobs are stale.
Returns a dict:
{
"checked_at": "<ISO 8601 timestamp>",
"jobs": [
{
"job": str,
"healthy": bool,
"age_secs": float,
"interval": int,
"last_seen": str or None, # ISO timestamp of last heartbeat
"message": str,
},
...
],
"stale_count": int,
"healthy_count": int,
}
On empty dir (no .last files), returns jobs=[] with stale_count=0.
On corrupt .last file, reports that job as stale with an error message.
Refs: #1096
"""
now_ts = time.time()
checked_at = datetime.fromtimestamp(now_ts, tz=timezone.utc).isoformat()
hb_path = Path(heartbeat_dir)
jobs: List[Dict[str, Any]] = []
if not hb_path.exists():
return {
"checked_at": checked_at,
"jobs": [],
"stale_count": 0,
"healthy_count": 0,
}
last_files = sorted(hb_path.glob("*.last"))
for last_file in last_files:
job_name = last_file.stem # filename without .last extension
# Read and parse the heartbeat file
try:
raw = last_file.read_text(encoding="utf-8")
data = json.loads(raw)
except (OSError, json.JSONDecodeError) as exc:
jobs.append({
"job": job_name,
"healthy": False,
"age_secs": float("inf"),
"interval": 3600,
"last_seen": None,
"message": f"CORRUPT: cannot read/parse heartbeat file: {exc}",
})
continue
# Extract fields with safe defaults
beat_timestamp = float(data.get("timestamp", 0))
interval = int(data.get("interval", 3600))
pid = data.get("pid", "?")
age_secs = now_ts - beat_timestamp
# Convert beat_timestamp to a readable ISO string
try:
last_seen = datetime.fromtimestamp(beat_timestamp, tz=timezone.utc).isoformat()
except (OSError, OverflowError, ValueError):
last_seen = None
# Stale = silent for more than 2× the declared interval
threshold = 2 * interval
is_stale = age_secs > threshold
if is_stale:
message = (
f"STALE (last {age_secs:.0f}s ago, interval {interval}s"
f" — exceeds 2x threshold of {threshold}s)"
)
else:
message = f"OK (last {age_secs:.0f}s ago, interval {interval}s)"
jobs.append({
"job": job_name,
"healthy": not is_stale,
"age_secs": age_secs,
"interval": interval,
"last_seen": last_seen,
"message": message,
})
stale_count = sum(1 for j in jobs if not j["healthy"])
healthy_count = sum(1 for j in jobs if j["healthy"])
return {
"checked_at": checked_at,
"jobs": jobs,
"stale_count": stale_count,
"healthy_count": healthy_count,
}
# ── Alert file writer ────────────────────────────────────────────────
def write_alert(heartbeat_dir: str, job_info: Dict[str, Any]) -> None:
"""
Write an alert file for a stale job to <heartbeat_dir>/alerts/<job>.alert
Alert files are watched by external monitoring. They persist until the
job runs again and clears stale status on the next check cycle.
Refs: #1096
"""
alerts_dir = Path(heartbeat_dir) / "alerts"
try:
alerts_dir.mkdir(parents=True, exist_ok=True)
except OSError as exc:
logger.warning("Cannot create alerts dir %s: %s", alerts_dir, exc)
return
alert_file = alerts_dir / f"{job_info['job']}.alert"
now_str = datetime.now(tz=timezone.utc).isoformat()
content = {
"alert_level": "P1",
"job": job_info["job"],
"message": job_info["message"],
"age_secs": job_info["age_secs"],
"interval": job_info["interval"],
"last_seen": job_info["last_seen"],
"detected_at": now_str,
}
# Atomic write via temp + rename (same poka-yoke pattern as the writer)
tmp_file = alert_file.with_suffix(f".alert.tmp.{os.getpid()}")
try:
tmp_file.write_text(json.dumps(content, indent=2), encoding="utf-8")
tmp_file.rename(alert_file)
except OSError as exc:
logger.warning("Failed to write alert file %s: %s", alert_file, exc)
tmp_file.unlink(missing_ok=True)
# ── Main runner ──────────────────────────────────────────────────────
def run_check(heartbeat_dir: str, dry_run: bool = False, output_json: bool = False) -> int:
"""
Run a full heartbeat check cycle. Returns exit code (0/1/2).
Exit codes:
0 — all healthy (or no .last files found yet)
1 — stale beats detected
2 — heartbeat dir unreadable (permissions, etc.)
Refs: #1096
"""
hb_path = Path(heartbeat_dir)
# Check if dir exists but is unreadable (permissions)
if hb_path.exists() and not os.access(heartbeat_dir, os.R_OK):
logger.error("Heartbeat dir unreadable: %s", heartbeat_dir)
return 2
result = check_cron_heartbeats(heartbeat_dir)
if output_json:
print(json.dumps(result, indent=2))
return 1 if result["stale_count"] > 0 else 0
# Human-readable output
if not result["jobs"]:
logger.warning(
"No .last files found in %s — bezalel not yet provisioned or no jobs registered.",
heartbeat_dir,
)
return 0
for job in result["jobs"]:
if job["healthy"]:
logger.info(" + %s: %s", job["job"], job["message"])
else:
logger.error(" - %s: %s", job["job"], job["message"])
if result["stale_count"] > 0:
for job in result["jobs"]:
if not job["healthy"]:
# P1 alert to stderr
print(
f"[P1-ALERT] STALE CRON JOB: {job['job']}{job['message']}",
file=sys.stderr,
)
if not dry_run:
write_alert(heartbeat_dir, job)
else:
logger.info("DRY RUN — would write alert for stale job: %s", job["job"])
logger.error(
"Heartbeat check FAILED: %d stale, %d healthy",
result["stale_count"],
result["healthy_count"],
)
return 1
logger.info(
"Heartbeat check PASSED: %d healthy, %d stale",
result["healthy_count"],
result["stale_count"],
)
return 0
# ── CLI entrypoint ───────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description=(
"Bezalel Meta-Heartbeat Checker — detect silent cron failures (poka-yoke #1096)"
),
)
parser.add_argument(
"--heartbeat-dir",
default=DEFAULT_HEARTBEAT_DIR,
help=f"Directory containing .last heartbeat files (default: {DEFAULT_HEARTBEAT_DIR})",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Check and report but do not write alert files",
)
parser.add_argument(
"--json",
action="store_true",
dest="output_json",
help="Output results as JSON (for integration with other tools)",
)
args = parser.parse_args()
exit_code = run_check(
heartbeat_dir=args.heartbeat_dir,
dry_run=args.dry_run,
output_json=args.output_json,
)
sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@@ -2,11 +2,17 @@
Morning Report Generator — runs at 0600 to compile overnight activity.
Gathers: cycles executed, issues closed, PRs merged, commits pushed.
Outputs a structured report for delivery to the main channel.
Includes a HEARTBEAT PANEL that checks all cron job heartbeats via
bezalel_heartbeat_check.py (poka-yoke #1096). Any stale jobs surface
as blockers in the report.
"""
import importlib.util
import json
import os
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
@@ -118,7 +124,46 @@ def generate_morning_report():
if not report["highlights"] and not report["blockers"]:
print("No significant activity or blockers detected.")
print("")
# ── Heartbeat panel (poka-yoke #1096) ────────────────────────────────────
# Import bezalel_heartbeat_check via importlib so we don't need __init__.py
# or a sys.path hack. If the module is missing or the dir doesn't exist,
# we print a "not provisioned" notice and continue — never crash the report.
_hb_result = None
try:
_project_root = Path(__file__).parent.parent
_hb_spec = importlib.util.spec_from_file_location(
"bezalel_heartbeat_check",
_project_root / "bin" / "bezalel_heartbeat_check.py",
)
if _hb_spec is not None:
_hb_mod = importlib.util.module_from_spec(_hb_spec)
sys.modules.setdefault("bezalel_heartbeat_check", _hb_mod)
_hb_spec.loader.exec_module(_hb_mod) # type: ignore[union-attr]
_hb_result = _hb_mod.check_cron_heartbeats()
except Exception:
_hb_result = None
print("HEARTBEAT PANEL:")
if _hb_result is None or not _hb_result.get("jobs"):
print(" HEARTBEAT PANEL: no data (bezalel not provisioned)")
report["heartbeat_panel"] = {"status": "not_provisioned"}
else:
for _job in _hb_result["jobs"]:
_prefix = "+" if _job["healthy"] else "-"
print(f" {_prefix} {_job['job']}: {_job['message']}")
if not _job["healthy"]:
report["blockers"].append(
f"Stale heartbeat: {_job['job']}{_job['message']}"
)
print("")
report["heartbeat_panel"] = {
"checked_at": _hb_result.get("checked_at"),
"healthy_count": _hb_result.get("healthy_count", 0),
"stale_count": _hb_result.get("stale_count", 0),
"jobs": _hb_result.get("jobs", []),
}
# Save report
report_dir = Path(os.path.expanduser("~/.local/timmy/reports"))
report_dir.mkdir(parents=True, exist_ok=True)

115
scripts/cron-heartbeat-write.sh Executable file
View File

@@ -0,0 +1,115 @@
#!/usr/bin/env bash
# cron-heartbeat-write.sh — Bezalel Cron Heartbeat Writer (poka-yoke #1096)
# Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
#
# POKA-YOKE design:
# Prevention — Cron jobs declare their identity + expected interval up front.
# Detection — bezalel_heartbeat_check.py reads these files every 15 min and
# alerts P1 if any job is silent for > 2× its interval.
# Correction — Alerts fire fast enough for manual intervention or auto-restart
# before the next scheduled run window expires.
#
# Usage:
# cron-heartbeat-write.sh <job-name> [interval-seconds]
#
# <job-name> Unique identifier for this cron job (e.g. "morning-report")
# [interval-seconds] Expected run interval in seconds (default: 3600)
#
# The heartbeat file is written to:
# /var/run/bezalel/heartbeats/<job-name>.last
#
# File format (JSON):
# {"job":"<name>","timestamp":<epoch_float>,"interval":<secs>,"pid":<pid>}
#
# This script ALWAYS exits 0 — it must never crash the calling cron job.
#
# Typical crontab usage:
# 0 * * * * /root/wizards/the-nexus/scripts/cron-heartbeat-write.sh hourly-job 3600
# 0 6 * * * /root/wizards/the-nexus/scripts/cron-heartbeat-write.sh morning-report 86400
set -uo pipefail
# ── Configuration ─────────────────────────────────────────────────────────────
HEARTBEAT_DIR="${BEZALEL_HEARTBEAT_DIR:-/var/run/bezalel/heartbeats}"
# ── Helpers ───────────────────────────────────────────────────────────────────
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] HEARTBEAT: $*"; }
warn() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] HEARTBEAT WARNING: $*" >&2; }
# ── Input validation ──────────────────────────────────────────────────────────
if [[ $# -lt 1 ]]; then
warn "Usage: $0 <job-name> [interval-seconds]"
warn "No job name provided — heartbeat not written."
exit 0
fi
JOB_NAME="$1"
INTERVAL_SECS="${2:-3600}"
# Sanitize job name to prevent path traversal / weird filenames
# Allow alphanumeric, dash, underscore, dot only
SAFE_JOB_NAME="${JOB_NAME//[^a-zA-Z0-9_.-]/}"
if [[ -z "$SAFE_JOB_NAME" ]]; then
warn "Job name '${JOB_NAME}' contains only unsafe characters — heartbeat not written."
exit 0
fi
if [[ "$SAFE_JOB_NAME" != "$JOB_NAME" ]]; then
warn "Job name sanitized: '${JOB_NAME}' → '${SAFE_JOB_NAME}'"
fi
# Validate interval is a positive integer
if ! [[ "$INTERVAL_SECS" =~ ^[0-9]+$ ]] || (( INTERVAL_SECS < 1 )); then
warn "Invalid interval '${INTERVAL_SECS}' — using default 3600."
INTERVAL_SECS=3600
fi
# ── Create heartbeat directory ────────────────────────────────────────────────
if ! mkdir -p "$HEARTBEAT_DIR" 2>/dev/null; then
warn "Cannot create heartbeat dir '${HEARTBEAT_DIR}' — heartbeat not written."
exit 0
fi
# ── Build JSON payload ────────────────────────────────────────────────────────
# Use python3 for reliable epoch float and JSON encoding.
# Falls back to date-based approach if python3 unavailable.
TIMESTAMP=$(python3 -c "import time; print(time.time())" 2>/dev/null \
|| date +%s)
CURRENT_PID=$$
PAYLOAD=$(python3 -c "
import json, sys
print(json.dumps({
'job': sys.argv[1],
'timestamp': float(sys.argv[2]),
'interval': int(sys.argv[3]),
'pid': int(sys.argv[4]),
}))
" "$SAFE_JOB_NAME" "$TIMESTAMP" "$INTERVAL_SECS" "$CURRENT_PID" 2>/dev/null)
if [[ -z "$PAYLOAD" ]]; then
# Minimal fallback if python3 fails
PAYLOAD="{\"job\":\"${SAFE_JOB_NAME}\",\"timestamp\":${TIMESTAMP},\"interval\":${INTERVAL_SECS},\"pid\":${CURRENT_PID}}"
fi
# ── Atomic write via temp + rename ────────────────────────────────────────────
# Writes to a temp file first then renames, so bezalel_heartbeat_check.py
# never sees a partial file mid-write. This is the poka-yoke atomic guarantee.
TARGET_FILE="${HEARTBEAT_DIR}/${SAFE_JOB_NAME}.last"
TMP_FILE="${HEARTBEAT_DIR}/.${SAFE_JOB_NAME}.last.tmp.$$"
if printf '%s\n' "$PAYLOAD" > "$TMP_FILE" 2>/dev/null; then
if mv "$TMP_FILE" "$TARGET_FILE" 2>/dev/null; then
log "Heartbeat written: ${TARGET_FILE} (job=${SAFE_JOB_NAME}, interval=${INTERVAL_SECS}s)"
else
warn "mv failed for '${TMP_FILE}' → '${TARGET_FILE}' — heartbeat not committed."
rm -f "$TMP_FILE" 2>/dev/null || true
fi
else
warn "Write to temp file '${TMP_FILE}' failed — heartbeat not written."
rm -f "$TMP_FILE" 2>/dev/null || true
fi
# Always exit 0 — never crash the calling cron job.
exit 0

View File

@@ -0,0 +1,11 @@
[Unit]
Description=Bezalel Meta-Heartbeat — stale cron detection (poka-yoke #1096)
Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
After=network.target
[Service]
Type=oneshot
ExecStart=/root/wizards/the-nexus/bin/bezalel_heartbeat_check.py
StandardOutput=journal
StandardError=journal
Environment=HOME=/root

View File

@@ -0,0 +1,11 @@
[Unit]
Description=Bezalel Meta-Heartbeat — fires every 15 minutes (poka-yoke #1096)
Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
[Timer]
OnBootSec=5min
OnUnitActiveSec=15min
Persistent=true
[Install]
WantedBy=timers.target

View File

@@ -0,0 +1,334 @@
"""
Tests for Bezalel Cron Heartbeat system (poka-yoke #1096).
Validates:
- check_cron_heartbeats() with healthy and stale jobs
- Empty heartbeat dir (no .last files) returns safely
- Corrupt JSON in a .last file is handled gracefully
- Mixed healthy/stale jobs
- Alert file writing (write_alert)
- The 2× interval staleness threshold is applied correctly
Uses importlib to load bin/bezalel_heartbeat_check.py without __init__.py,
following the same pattern as test_nexus_watchdog.py.
Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
"""
from __future__ import annotations
import importlib.util
import json
import sys
import time
from pathlib import Path
from unittest.mock import patch
import pytest
# ── Load module under test ────────────────────────────────────────────────────
PROJECT_ROOT = Path(__file__).parent.parent
_hb_spec = importlib.util.spec_from_file_location(
"bezalel_heartbeat_check_test",
PROJECT_ROOT / "bin" / "bezalel_heartbeat_check.py",
)
_hb_mod = importlib.util.module_from_spec(_hb_spec)
sys.modules["bezalel_heartbeat_check_test"] = _hb_mod
_hb_spec.loader.exec_module(_hb_mod)
check_cron_heartbeats = _hb_mod.check_cron_heartbeats
write_alert = _hb_mod.write_alert
# ── Helpers ───────────────────────────────────────────────────────────────────
def write_heartbeat_file(
directory: Path,
job: str,
timestamp: float,
interval: int = 3600,
pid: int = 12345,
) -> Path:
"""Write a valid .last heartbeat file for testing."""
directory.mkdir(parents=True, exist_ok=True)
payload = {
"job": job,
"timestamp": timestamp,
"interval": interval,
"pid": pid,
}
path = directory / f"{job}.last"
path.write_text(json.dumps(payload), encoding="utf-8")
return path
# ── Tests ─────────────────────────────────────────────────────────────────────
class TestCheckCronHeartbeats:
def test_healthy_job(self, tmp_path: Path) -> None:
"""A job with a recent timestamp is reported as healthy."""
now = time.time()
write_heartbeat_file(tmp_path, "morning-report", timestamp=now - 100, interval=3600)
result = check_cron_heartbeats(str(tmp_path))
assert result["stale_count"] == 0
assert result["healthy_count"] == 1
assert len(result["jobs"]) == 1
job = result["jobs"][0]
assert job["job"] == "morning-report"
assert job["healthy"] is True
assert job["age_secs"] == pytest.approx(100, abs=5)
assert "OK" in job["message"]
def test_stale_job(self, tmp_path: Path) -> None:
"""A job silent for > 2× its interval is reported as stale."""
now = time.time()
# 3 hours ago with 1-hour interval → 3 > 2×1 → stale
write_heartbeat_file(tmp_path, "hourly-sync", timestamp=now - 10800, interval=3600)
result = check_cron_heartbeats(str(tmp_path))
assert result["stale_count"] == 1
assert result["healthy_count"] == 0
job = result["jobs"][0]
assert job["job"] == "hourly-sync"
assert job["healthy"] is False
assert "STALE" in job["message"]
assert "exceeds 2x threshold" in job["message"]
def test_just_within_threshold(self, tmp_path: Path) -> None:
"""A job at exactly 2× interval is NOT stale (threshold is strictly >)."""
fake_now = 1700000000.0
# age = 7200, threshold = 2 * 3600 = 7200 — NOT stale (not strictly greater)
write_heartbeat_file(tmp_path, "edge-job", timestamp=fake_now - 7200, interval=3600)
with patch("time.time", return_value=fake_now):
result = check_cron_heartbeats(str(tmp_path))
# age_secs == 7200 and threshold = 7200, so not stale (age > threshold is False)
assert result["stale_count"] == 0
def test_stale_threshold_just_over(self, tmp_path: Path) -> None:
"""A job silent for 2× interval + 1 second is stale."""
now = time.time()
# age = 7201, threshold = 7200 — IS stale
write_heartbeat_file(tmp_path, "edge-job", timestamp=now - 7201, interval=3600)
result = check_cron_heartbeats(str(tmp_path))
assert result["stale_count"] == 1
def test_empty_dir_returns_safely(self, tmp_path: Path) -> None:
"""Empty heartbeat directory returns zero jobs without error."""
result = check_cron_heartbeats(str(tmp_path))
assert result["stale_count"] == 0
assert result["healthy_count"] == 0
assert result["jobs"] == []
assert "checked_at" in result
def test_nonexistent_dir_returns_safely(self, tmp_path: Path) -> None:
"""Non-existent heartbeat dir returns empty result without error."""
missing = str(tmp_path / "does-not-exist")
result = check_cron_heartbeats(missing)
assert result["stale_count"] == 0
assert result["healthy_count"] == 0
assert result["jobs"] == []
def test_corrupt_json_handled_gracefully(self, tmp_path: Path) -> None:
"""Corrupt JSON in a .last file is reported as stale with an error message."""
bad_file = tmp_path / "broken-job.last"
bad_file.write_text("{this is not valid json!}", encoding="utf-8")
result = check_cron_heartbeats(str(tmp_path))
assert result["stale_count"] == 1
assert result["healthy_count"] == 0
job = result["jobs"][0]
assert job["job"] == "broken-job"
assert job["healthy"] is False
assert "CORRUPT" in job["message"]
assert job["last_seen"] is None
def test_multiple_jobs_mixed(self, tmp_path: Path) -> None:
"""Mixed healthy and stale jobs are correctly counted."""
now = time.time()
# 3 healthy jobs (recent)
write_heartbeat_file(tmp_path, "job-a", timestamp=now - 60, interval=3600)
write_heartbeat_file(tmp_path, "job-b", timestamp=now - 1800, interval=3600)
write_heartbeat_file(tmp_path, "job-c", timestamp=now - 3599, interval=3600)
# 2 stale jobs
write_heartbeat_file(tmp_path, "job-d", timestamp=now - 10000, interval=3600)
write_heartbeat_file(tmp_path, "job-e", timestamp=now - 86400, interval=3600)
result = check_cron_heartbeats(str(tmp_path))
assert result["stale_count"] == 2
assert result["healthy_count"] == 3
assert len(result["jobs"]) == 5
stale_jobs = {j["job"] for j in result["jobs"] if not j["healthy"]}
healthy_jobs = {j["job"] for j in result["jobs"] if j["healthy"]}
assert stale_jobs == {"job-d", "job-e"}
assert healthy_jobs == {"job-a", "job-b", "job-c"}
def test_result_contains_required_keys(self, tmp_path: Path) -> None:
"""Result dict contains all required keys."""
now = time.time()
write_heartbeat_file(tmp_path, "test-job", timestamp=now - 100, interval=3600)
result = check_cron_heartbeats(str(tmp_path))
assert "checked_at" in result
assert "jobs" in result
assert "stale_count" in result
assert "healthy_count" in result
job = result["jobs"][0]
assert "job" in job
assert "healthy" in job
assert "age_secs" in job
assert "interval" in job
assert "last_seen" in job
assert "message" in job
def test_job_last_seen_is_iso_timestamp(self, tmp_path: Path) -> None:
"""last_seen field is a valid ISO 8601 timestamp string."""
from datetime import datetime
now = time.time()
write_heartbeat_file(tmp_path, "ts-job", timestamp=now - 100, interval=3600)
result = check_cron_heartbeats(str(tmp_path))
job = result["jobs"][0]
# Should be parseable as an ISO timestamp
assert job["last_seen"] is not None
dt = datetime.fromisoformat(job["last_seen"])
assert dt is not None
def test_checked_at_is_iso_timestamp(self, tmp_path: Path) -> None:
"""checked_at is a valid ISO 8601 timestamp string."""
from datetime import datetime
result = check_cron_heartbeats(str(tmp_path))
dt = datetime.fromisoformat(result["checked_at"])
assert dt is not None
def test_custom_interval_applied(self, tmp_path: Path) -> None:
"""Custom interval (e.g. daily) is respected for stale detection."""
now = time.time()
# 25 hours ago with 12-hour interval → 25 > 2×12 = 24 → stale
write_heartbeat_file(tmp_path, "daily-job", timestamp=now - 90000, interval=43200)
result = check_cron_heartbeats(str(tmp_path))
assert result["stale_count"] == 1
job = result["jobs"][0]
assert job["interval"] == 43200
assert not job["healthy"]
def test_custom_interval_healthy(self, tmp_path: Path) -> None:
"""Job within 2× custom interval is healthy."""
now = time.time()
# 23 hours ago with 12-hour interval → 23 < 2×12 = 24 → healthy
write_heartbeat_file(tmp_path, "daily-job", timestamp=now - 82800, interval=43200)
result = check_cron_heartbeats(str(tmp_path))
assert result["stale_count"] == 0
assert result["healthy_count"] == 1
def test_deterministic_with_mocked_time(self, tmp_path: Path) -> None:
"""Test with mocked time.time() for fully deterministic assertion."""
fake_now = 1700000000.0
write_heartbeat_file(tmp_path, "frozen-job", timestamp=fake_now - 500, interval=3600)
with patch("time.time", return_value=fake_now):
result = check_cron_heartbeats(str(tmp_path))
job = result["jobs"][0]
# age should be exactly 500s
assert job["age_secs"] == pytest.approx(500.0, abs=0.01)
assert job["healthy"] is True # 500 < 7200
def test_stale_with_mocked_time(self, tmp_path: Path) -> None:
"""Stale detection with mocked time is exact."""
fake_now = 1700000000.0
# 8000s ago with 3600s interval → 8000 > 7200 → stale
write_heartbeat_file(tmp_path, "frozen-stale", timestamp=fake_now - 8000, interval=3600)
with patch("time.time", return_value=fake_now):
result = check_cron_heartbeats(str(tmp_path))
job = result["jobs"][0]
assert job["age_secs"] == pytest.approx(8000.0, abs=0.01)
assert job["healthy"] is False
class TestWriteAlert:
def test_alert_file_created(self, tmp_path: Path) -> None:
"""write_alert creates an alert file in the alerts subdirectory."""
job_info = {
"job": "test-job",
"healthy": False,
"age_secs": 8000.0,
"interval": 3600,
"last_seen": "2024-01-01T00:00:00+00:00",
"message": "STALE (last 8000s ago, interval 3600s — exceeds 2x threshold of 7200s)",
}
write_alert(str(tmp_path), job_info)
alert_file = tmp_path / "alerts" / "test-job.alert"
assert alert_file.exists()
def test_alert_file_content(self, tmp_path: Path) -> None:
"""Alert file contains correct JSON fields."""
job_info = {
"job": "my-job",
"healthy": False,
"age_secs": 9000.0,
"interval": 3600,
"last_seen": "2024-06-01T12:00:00+00:00",
"message": "STALE",
}
write_alert(str(tmp_path), job_info)
alert_file = tmp_path / "alerts" / "my-job.alert"
data = json.loads(alert_file.read_text())
assert data["alert_level"] == "P1"
assert data["job"] == "my-job"
assert data["age_secs"] == 9000.0
assert data["interval"] == 3600
assert "detected_at" in data
def test_alert_no_partial_files_left(self, tmp_path: Path) -> None:
"""No temp files remain after a successful write."""
job_info = {
"job": "clean-job",
"healthy": False,
"age_secs": 8000.0,
"interval": 3600,
"last_seen": None,
"message": "STALE",
}
write_alert(str(tmp_path), job_info)
alerts_dir = tmp_path / "alerts"
# Only the .alert file should exist — no .tmp files
files = list(alerts_dir.iterdir())
assert len(files) == 1
assert files[0].suffix == ".alert"