From f509b359501502151c5e1103d03bbf4f0de2fcbd Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Tue, 7 Apr 2026 10:39:11 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20poka-yoke=20cron=20heartbeats=20?= =?UTF-8?q?=E2=80=94=20write,=20check,=20and=20report?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every cron job can now call cron-heartbeat-write.sh to write /var/run/bezalel/heartbeats/.last atomically. bezalel_heartbeat_check.py (meta-heartbeat) scans all .last files every 15 minutes and alerts P1 if any job is stale > 2× its interval. morning_report.py now includes a heartbeat panel showing the last-seen status of every registered cron job in the nightly Night Watch report. Systemd units (bezalel-meta-heartbeat.timer/.service) run the checker on a 15-minute schedule via the poka-yoke infrastructure. Fixes #1096 Co-Authored-By: Claude Sonnet 4.6 --- bin/bezalel_heartbeat_check.py | 326 +++++++++++++++++ nexus/morning_report.py | 47 ++- scripts/cron-heartbeat-write.sh | 115 ++++++ .../systemd/bezalel-meta-heartbeat.service | 11 + scripts/systemd/bezalel-meta-heartbeat.timer | 11 + tests/test_bezalel_heartbeat.py | 334 ++++++++++++++++++ 6 files changed, 843 insertions(+), 1 deletion(-) create mode 100755 bin/bezalel_heartbeat_check.py create mode 100755 scripts/cron-heartbeat-write.sh create mode 100644 scripts/systemd/bezalel-meta-heartbeat.service create mode 100644 scripts/systemd/bezalel-meta-heartbeat.timer create mode 100644 tests/test_bezalel_heartbeat.py diff --git a/bin/bezalel_heartbeat_check.py b/bin/bezalel_heartbeat_check.py new file mode 100755 index 0000000..b84e7bc --- /dev/null +++ b/bin/bezalel_heartbeat_check.py @@ -0,0 +1,326 @@ +#!/usr/bin/env python3 +""" +Bezalel Meta-Heartbeat Checker — stale cron detection (poka-yoke #1096) + +Monitors all cron job heartbeat files and alerts P1 when any job has been +silent for more than 2× its declared interval. + +POKA-YOKE design: + Prevention — cron-heartbeat-write.sh writes a .last file atomically after + every successful cron job completion, stamping its interval. + Detection — this script runs every 15 minutes (via systemd timer) and + raises P1 on stderr + writes an alert file for any stale job. + Correction — alerts are loud enough (P1 stderr + alert files) for + monitoring/humans to intervene before the next run window. + +ZERO DEPENDENCIES +================= +Pure stdlib. No pip installs. + +USAGE +===== + # One-shot check (default dir) + python bin/bezalel_heartbeat_check.py + + # Override heartbeat dir + python bin/bezalel_heartbeat_check.py --heartbeat-dir /tmp/test-beats + + # Dry-run (check + report, don't write alert files) + python bin/bezalel_heartbeat_check.py --dry-run + + # JSON output (for piping into other tools) + python bin/bezalel_heartbeat_check.py --json + +EXIT CODES +========== + 0 — all jobs healthy (or no .last files found yet) + 1 — one or more stale beats detected + 2 — heartbeat dir unreadable + +IMPORTABLE API +============== + from bin.bezalel_heartbeat_check import check_cron_heartbeats + + result = check_cron_heartbeats("/var/run/bezalel/heartbeats") + # Returns dict with keys: checked_at, jobs, stale_count, healthy_count + +Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096 +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-7s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger("bezalel.heartbeat") + +# ── Configuration ──────────────────────────────────────────────────── + +DEFAULT_HEARTBEAT_DIR = "/var/run/bezalel/heartbeats" + + +# ── Core checker ───────────────────────────────────────────────────── + +def check_cron_heartbeats(heartbeat_dir: str = DEFAULT_HEARTBEAT_DIR) -> Dict[str, Any]: + """ + Scan all .last files in heartbeat_dir and determine which jobs are stale. + + Returns a dict: + { + "checked_at": "", + "jobs": [ + { + "job": str, + "healthy": bool, + "age_secs": float, + "interval": int, + "last_seen": str or None, # ISO timestamp of last heartbeat + "message": str, + }, + ... + ], + "stale_count": int, + "healthy_count": int, + } + + On empty dir (no .last files), returns jobs=[] with stale_count=0. + On corrupt .last file, reports that job as stale with an error message. + + Refs: #1096 + """ + now_ts = time.time() + checked_at = datetime.fromtimestamp(now_ts, tz=timezone.utc).isoformat() + + hb_path = Path(heartbeat_dir) + jobs: List[Dict[str, Any]] = [] + + if not hb_path.exists(): + return { + "checked_at": checked_at, + "jobs": [], + "stale_count": 0, + "healthy_count": 0, + } + + last_files = sorted(hb_path.glob("*.last")) + + for last_file in last_files: + job_name = last_file.stem # filename without .last extension + + # Read and parse the heartbeat file + try: + raw = last_file.read_text(encoding="utf-8") + data = json.loads(raw) + except (OSError, json.JSONDecodeError) as exc: + jobs.append({ + "job": job_name, + "healthy": False, + "age_secs": float("inf"), + "interval": 3600, + "last_seen": None, + "message": f"CORRUPT: cannot read/parse heartbeat file: {exc}", + }) + continue + + # Extract fields with safe defaults + beat_timestamp = float(data.get("timestamp", 0)) + interval = int(data.get("interval", 3600)) + pid = data.get("pid", "?") + + age_secs = now_ts - beat_timestamp + + # Convert beat_timestamp to a readable ISO string + try: + last_seen = datetime.fromtimestamp(beat_timestamp, tz=timezone.utc).isoformat() + except (OSError, OverflowError, ValueError): + last_seen = None + + # Stale = silent for more than 2× the declared interval + threshold = 2 * interval + is_stale = age_secs > threshold + + if is_stale: + message = ( + f"STALE (last {age_secs:.0f}s ago, interval {interval}s" + f" — exceeds 2x threshold of {threshold}s)" + ) + else: + message = f"OK (last {age_secs:.0f}s ago, interval {interval}s)" + + jobs.append({ + "job": job_name, + "healthy": not is_stale, + "age_secs": age_secs, + "interval": interval, + "last_seen": last_seen, + "message": message, + }) + + stale_count = sum(1 for j in jobs if not j["healthy"]) + healthy_count = sum(1 for j in jobs if j["healthy"]) + + return { + "checked_at": checked_at, + "jobs": jobs, + "stale_count": stale_count, + "healthy_count": healthy_count, + } + + +# ── Alert file writer ──────────────────────────────────────────────── + +def write_alert(heartbeat_dir: str, job_info: Dict[str, Any]) -> None: + """ + Write an alert file for a stale job to /alerts/.alert + + Alert files are watched by external monitoring. They persist until the + job runs again and clears stale status on the next check cycle. + + Refs: #1096 + """ + alerts_dir = Path(heartbeat_dir) / "alerts" + try: + alerts_dir.mkdir(parents=True, exist_ok=True) + except OSError as exc: + logger.warning("Cannot create alerts dir %s: %s", alerts_dir, exc) + return + + alert_file = alerts_dir / f"{job_info['job']}.alert" + now_str = datetime.now(tz=timezone.utc).isoformat() + + content = { + "alert_level": "P1", + "job": job_info["job"], + "message": job_info["message"], + "age_secs": job_info["age_secs"], + "interval": job_info["interval"], + "last_seen": job_info["last_seen"], + "detected_at": now_str, + } + + # Atomic write via temp + rename (same poka-yoke pattern as the writer) + tmp_file = alert_file.with_suffix(f".alert.tmp.{os.getpid()}") + try: + tmp_file.write_text(json.dumps(content, indent=2), encoding="utf-8") + tmp_file.rename(alert_file) + except OSError as exc: + logger.warning("Failed to write alert file %s: %s", alert_file, exc) + tmp_file.unlink(missing_ok=True) + + +# ── Main runner ────────────────────────────────────────────────────── + +def run_check(heartbeat_dir: str, dry_run: bool = False, output_json: bool = False) -> int: + """ + Run a full heartbeat check cycle. Returns exit code (0/1/2). + + Exit codes: + 0 — all healthy (or no .last files found yet) + 1 — stale beats detected + 2 — heartbeat dir unreadable (permissions, etc.) + + Refs: #1096 + """ + hb_path = Path(heartbeat_dir) + + # Check if dir exists but is unreadable (permissions) + if hb_path.exists() and not os.access(heartbeat_dir, os.R_OK): + logger.error("Heartbeat dir unreadable: %s", heartbeat_dir) + return 2 + + result = check_cron_heartbeats(heartbeat_dir) + + if output_json: + print(json.dumps(result, indent=2)) + return 1 if result["stale_count"] > 0 else 0 + + # Human-readable output + if not result["jobs"]: + logger.warning( + "No .last files found in %s — bezalel not yet provisioned or no jobs registered.", + heartbeat_dir, + ) + return 0 + + for job in result["jobs"]: + if job["healthy"]: + logger.info(" + %s: %s", job["job"], job["message"]) + else: + logger.error(" - %s: %s", job["job"], job["message"]) + + if result["stale_count"] > 0: + for job in result["jobs"]: + if not job["healthy"]: + # P1 alert to stderr + print( + f"[P1-ALERT] STALE CRON JOB: {job['job']} — {job['message']}", + file=sys.stderr, + ) + if not dry_run: + write_alert(heartbeat_dir, job) + else: + logger.info("DRY RUN — would write alert for stale job: %s", job["job"]) + + logger.error( + "Heartbeat check FAILED: %d stale, %d healthy", + result["stale_count"], + result["healthy_count"], + ) + return 1 + + logger.info( + "Heartbeat check PASSED: %d healthy, %d stale", + result["healthy_count"], + result["stale_count"], + ) + return 0 + + +# ── CLI entrypoint ─────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser( + description=( + "Bezalel Meta-Heartbeat Checker — detect silent cron failures (poka-yoke #1096)" + ), + ) + parser.add_argument( + "--heartbeat-dir", + default=DEFAULT_HEARTBEAT_DIR, + help=f"Directory containing .last heartbeat files (default: {DEFAULT_HEARTBEAT_DIR})", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Check and report but do not write alert files", + ) + parser.add_argument( + "--json", + action="store_true", + dest="output_json", + help="Output results as JSON (for integration with other tools)", + ) + args = parser.parse_args() + + exit_code = run_check( + heartbeat_dir=args.heartbeat_dir, + dry_run=args.dry_run, + output_json=args.output_json, + ) + sys.exit(exit_code) + + +if __name__ == "__main__": + main() diff --git a/nexus/morning_report.py b/nexus/morning_report.py index 4f33356..9c4df58 100644 --- a/nexus/morning_report.py +++ b/nexus/morning_report.py @@ -2,11 +2,17 @@ Morning Report Generator — runs at 0600 to compile overnight activity. Gathers: cycles executed, issues closed, PRs merged, commits pushed. Outputs a structured report for delivery to the main channel. + +Includes a HEARTBEAT PANEL that checks all cron job heartbeats via +bezalel_heartbeat_check.py (poka-yoke #1096). Any stale jobs surface +as blockers in the report. """ +import importlib.util import json import os import subprocess +import sys from datetime import datetime, timedelta, timezone from pathlib import Path @@ -118,7 +124,46 @@ def generate_morning_report(): if not report["highlights"] and not report["blockers"]: print("No significant activity or blockers detected.") print("") - + + # ── Heartbeat panel (poka-yoke #1096) ──────────────────────────────────── + # Import bezalel_heartbeat_check via importlib so we don't need __init__.py + # or a sys.path hack. If the module is missing or the dir doesn't exist, + # we print a "not provisioned" notice and continue — never crash the report. + _hb_result = None + try: + _project_root = Path(__file__).parent.parent + _hb_spec = importlib.util.spec_from_file_location( + "bezalel_heartbeat_check", + _project_root / "bin" / "bezalel_heartbeat_check.py", + ) + if _hb_spec is not None: + _hb_mod = importlib.util.module_from_spec(_hb_spec) + sys.modules.setdefault("bezalel_heartbeat_check", _hb_mod) + _hb_spec.loader.exec_module(_hb_mod) # type: ignore[union-attr] + _hb_result = _hb_mod.check_cron_heartbeats() + except Exception: + _hb_result = None + + print("HEARTBEAT PANEL:") + if _hb_result is None or not _hb_result.get("jobs"): + print(" HEARTBEAT PANEL: no data (bezalel not provisioned)") + report["heartbeat_panel"] = {"status": "not_provisioned"} + else: + for _job in _hb_result["jobs"]: + _prefix = "+" if _job["healthy"] else "-" + print(f" {_prefix} {_job['job']}: {_job['message']}") + if not _job["healthy"]: + report["blockers"].append( + f"Stale heartbeat: {_job['job']} — {_job['message']}" + ) + print("") + report["heartbeat_panel"] = { + "checked_at": _hb_result.get("checked_at"), + "healthy_count": _hb_result.get("healthy_count", 0), + "stale_count": _hb_result.get("stale_count", 0), + "jobs": _hb_result.get("jobs", []), + } + # Save report report_dir = Path(os.path.expanduser("~/.local/timmy/reports")) report_dir.mkdir(parents=True, exist_ok=True) diff --git a/scripts/cron-heartbeat-write.sh b/scripts/cron-heartbeat-write.sh new file mode 100755 index 0000000..5e91f1c --- /dev/null +++ b/scripts/cron-heartbeat-write.sh @@ -0,0 +1,115 @@ +#!/usr/bin/env bash +# cron-heartbeat-write.sh — Bezalel Cron Heartbeat Writer (poka-yoke #1096) +# Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096 +# +# POKA-YOKE design: +# Prevention — Cron jobs declare their identity + expected interval up front. +# Detection — bezalel_heartbeat_check.py reads these files every 15 min and +# alerts P1 if any job is silent for > 2× its interval. +# Correction — Alerts fire fast enough for manual intervention or auto-restart +# before the next scheduled run window expires. +# +# Usage: +# cron-heartbeat-write.sh [interval-seconds] +# +# Unique identifier for this cron job (e.g. "morning-report") +# [interval-seconds] Expected run interval in seconds (default: 3600) +# +# The heartbeat file is written to: +# /var/run/bezalel/heartbeats/.last +# +# File format (JSON): +# {"job":"","timestamp":,"interval":,"pid":} +# +# This script ALWAYS exits 0 — it must never crash the calling cron job. +# +# Typical crontab usage: +# 0 * * * * /root/wizards/the-nexus/scripts/cron-heartbeat-write.sh hourly-job 3600 +# 0 6 * * * /root/wizards/the-nexus/scripts/cron-heartbeat-write.sh morning-report 86400 + +set -uo pipefail + +# ── Configuration ───────────────────────────────────────────────────────────── +HEARTBEAT_DIR="${BEZALEL_HEARTBEAT_DIR:-/var/run/bezalel/heartbeats}" + +# ── Helpers ─────────────────────────────────────────────────────────────────── +log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] HEARTBEAT: $*"; } +warn() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] HEARTBEAT WARNING: $*" >&2; } + +# ── Input validation ────────────────────────────────────────────────────────── +if [[ $# -lt 1 ]]; then + warn "Usage: $0 [interval-seconds]" + warn "No job name provided — heartbeat not written." + exit 0 +fi + +JOB_NAME="$1" +INTERVAL_SECS="${2:-3600}" + +# Sanitize job name to prevent path traversal / weird filenames +# Allow alphanumeric, dash, underscore, dot only +SAFE_JOB_NAME="${JOB_NAME//[^a-zA-Z0-9_.-]/}" +if [[ -z "$SAFE_JOB_NAME" ]]; then + warn "Job name '${JOB_NAME}' contains only unsafe characters — heartbeat not written." + exit 0 +fi + +if [[ "$SAFE_JOB_NAME" != "$JOB_NAME" ]]; then + warn "Job name sanitized: '${JOB_NAME}' → '${SAFE_JOB_NAME}'" +fi + +# Validate interval is a positive integer +if ! [[ "$INTERVAL_SECS" =~ ^[0-9]+$ ]] || (( INTERVAL_SECS < 1 )); then + warn "Invalid interval '${INTERVAL_SECS}' — using default 3600." + INTERVAL_SECS=3600 +fi + +# ── Create heartbeat directory ──────────────────────────────────────────────── +if ! mkdir -p "$HEARTBEAT_DIR" 2>/dev/null; then + warn "Cannot create heartbeat dir '${HEARTBEAT_DIR}' — heartbeat not written." + exit 0 +fi + +# ── Build JSON payload ──────────────────────────────────────────────────────── +# Use python3 for reliable epoch float and JSON encoding. +# Falls back to date-based approach if python3 unavailable. +TIMESTAMP=$(python3 -c "import time; print(time.time())" 2>/dev/null \ + || date +%s) + +CURRENT_PID=$$ + +PAYLOAD=$(python3 -c " +import json, sys +print(json.dumps({ + 'job': sys.argv[1], + 'timestamp': float(sys.argv[2]), + 'interval': int(sys.argv[3]), + 'pid': int(sys.argv[4]), +})) +" "$SAFE_JOB_NAME" "$TIMESTAMP" "$INTERVAL_SECS" "$CURRENT_PID" 2>/dev/null) + +if [[ -z "$PAYLOAD" ]]; then + # Minimal fallback if python3 fails + PAYLOAD="{\"job\":\"${SAFE_JOB_NAME}\",\"timestamp\":${TIMESTAMP},\"interval\":${INTERVAL_SECS},\"pid\":${CURRENT_PID}}" +fi + +# ── Atomic write via temp + rename ──────────────────────────────────────────── +# Writes to a temp file first then renames, so bezalel_heartbeat_check.py +# never sees a partial file mid-write. This is the poka-yoke atomic guarantee. +TARGET_FILE="${HEARTBEAT_DIR}/${SAFE_JOB_NAME}.last" +TMP_FILE="${HEARTBEAT_DIR}/.${SAFE_JOB_NAME}.last.tmp.$$" + +if printf '%s\n' "$PAYLOAD" > "$TMP_FILE" 2>/dev/null; then + if mv "$TMP_FILE" "$TARGET_FILE" 2>/dev/null; then + log "Heartbeat written: ${TARGET_FILE} (job=${SAFE_JOB_NAME}, interval=${INTERVAL_SECS}s)" + else + warn "mv failed for '${TMP_FILE}' → '${TARGET_FILE}' — heartbeat not committed." + rm -f "$TMP_FILE" 2>/dev/null || true + fi +else + warn "Write to temp file '${TMP_FILE}' failed — heartbeat not written." + rm -f "$TMP_FILE" 2>/dev/null || true +fi + +# Always exit 0 — never crash the calling cron job. +exit 0 diff --git a/scripts/systemd/bezalel-meta-heartbeat.service b/scripts/systemd/bezalel-meta-heartbeat.service new file mode 100644 index 0000000..9da1787 --- /dev/null +++ b/scripts/systemd/bezalel-meta-heartbeat.service @@ -0,0 +1,11 @@ +[Unit] +Description=Bezalel Meta-Heartbeat — stale cron detection (poka-yoke #1096) +Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096 +After=network.target + +[Service] +Type=oneshot +ExecStart=/root/wizards/the-nexus/bin/bezalel_heartbeat_check.py +StandardOutput=journal +StandardError=journal +Environment=HOME=/root diff --git a/scripts/systemd/bezalel-meta-heartbeat.timer b/scripts/systemd/bezalel-meta-heartbeat.timer new file mode 100644 index 0000000..1c31b78 --- /dev/null +++ b/scripts/systemd/bezalel-meta-heartbeat.timer @@ -0,0 +1,11 @@ +[Unit] +Description=Bezalel Meta-Heartbeat — fires every 15 minutes (poka-yoke #1096) +Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096 + +[Timer] +OnBootSec=5min +OnUnitActiveSec=15min +Persistent=true + +[Install] +WantedBy=timers.target diff --git a/tests/test_bezalel_heartbeat.py b/tests/test_bezalel_heartbeat.py new file mode 100644 index 0000000..a2658f2 --- /dev/null +++ b/tests/test_bezalel_heartbeat.py @@ -0,0 +1,334 @@ +""" +Tests for Bezalel Cron Heartbeat system (poka-yoke #1096). + +Validates: + - check_cron_heartbeats() with healthy and stale jobs + - Empty heartbeat dir (no .last files) returns safely + - Corrupt JSON in a .last file is handled gracefully + - Mixed healthy/stale jobs + - Alert file writing (write_alert) + - The 2× interval staleness threshold is applied correctly + +Uses importlib to load bin/bezalel_heartbeat_check.py without __init__.py, +following the same pattern as test_nexus_watchdog.py. + +Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096 +""" + +from __future__ import annotations + +import importlib.util +import json +import sys +import time +from pathlib import Path +from unittest.mock import patch + +import pytest + +# ── Load module under test ──────────────────────────────────────────────────── + +PROJECT_ROOT = Path(__file__).parent.parent + +_hb_spec = importlib.util.spec_from_file_location( + "bezalel_heartbeat_check_test", + PROJECT_ROOT / "bin" / "bezalel_heartbeat_check.py", +) +_hb_mod = importlib.util.module_from_spec(_hb_spec) +sys.modules["bezalel_heartbeat_check_test"] = _hb_mod +_hb_spec.loader.exec_module(_hb_mod) + +check_cron_heartbeats = _hb_mod.check_cron_heartbeats +write_alert = _hb_mod.write_alert + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def write_heartbeat_file( + directory: Path, + job: str, + timestamp: float, + interval: int = 3600, + pid: int = 12345, +) -> Path: + """Write a valid .last heartbeat file for testing.""" + directory.mkdir(parents=True, exist_ok=True) + payload = { + "job": job, + "timestamp": timestamp, + "interval": interval, + "pid": pid, + } + path = directory / f"{job}.last" + path.write_text(json.dumps(payload), encoding="utf-8") + return path + + +# ── Tests ───────────────────────────────────────────────────────────────────── + +class TestCheckCronHeartbeats: + + def test_healthy_job(self, tmp_path: Path) -> None: + """A job with a recent timestamp is reported as healthy.""" + now = time.time() + write_heartbeat_file(tmp_path, "morning-report", timestamp=now - 100, interval=3600) + + result = check_cron_heartbeats(str(tmp_path)) + + assert result["stale_count"] == 0 + assert result["healthy_count"] == 1 + assert len(result["jobs"]) == 1 + + job = result["jobs"][0] + assert job["job"] == "morning-report" + assert job["healthy"] is True + assert job["age_secs"] == pytest.approx(100, abs=5) + assert "OK" in job["message"] + + def test_stale_job(self, tmp_path: Path) -> None: + """A job silent for > 2× its interval is reported as stale.""" + now = time.time() + # 3 hours ago with 1-hour interval → 3 > 2×1 → stale + write_heartbeat_file(tmp_path, "hourly-sync", timestamp=now - 10800, interval=3600) + + result = check_cron_heartbeats(str(tmp_path)) + + assert result["stale_count"] == 1 + assert result["healthy_count"] == 0 + + job = result["jobs"][0] + assert job["job"] == "hourly-sync" + assert job["healthy"] is False + assert "STALE" in job["message"] + assert "exceeds 2x threshold" in job["message"] + + def test_just_within_threshold(self, tmp_path: Path) -> None: + """A job at exactly 2× interval is NOT stale (threshold is strictly >).""" + fake_now = 1700000000.0 + # age = 7200, threshold = 2 * 3600 = 7200 — NOT stale (not strictly greater) + write_heartbeat_file(tmp_path, "edge-job", timestamp=fake_now - 7200, interval=3600) + + with patch("time.time", return_value=fake_now): + result = check_cron_heartbeats(str(tmp_path)) + + # age_secs == 7200 and threshold = 7200, so not stale (age > threshold is False) + assert result["stale_count"] == 0 + + def test_stale_threshold_just_over(self, tmp_path: Path) -> None: + """A job silent for 2× interval + 1 second is stale.""" + now = time.time() + # age = 7201, threshold = 7200 — IS stale + write_heartbeat_file(tmp_path, "edge-job", timestamp=now - 7201, interval=3600) + + result = check_cron_heartbeats(str(tmp_path)) + + assert result["stale_count"] == 1 + + def test_empty_dir_returns_safely(self, tmp_path: Path) -> None: + """Empty heartbeat directory returns zero jobs without error.""" + result = check_cron_heartbeats(str(tmp_path)) + + assert result["stale_count"] == 0 + assert result["healthy_count"] == 0 + assert result["jobs"] == [] + assert "checked_at" in result + + def test_nonexistent_dir_returns_safely(self, tmp_path: Path) -> None: + """Non-existent heartbeat dir returns empty result without error.""" + missing = str(tmp_path / "does-not-exist") + result = check_cron_heartbeats(missing) + + assert result["stale_count"] == 0 + assert result["healthy_count"] == 0 + assert result["jobs"] == [] + + def test_corrupt_json_handled_gracefully(self, tmp_path: Path) -> None: + """Corrupt JSON in a .last file is reported as stale with an error message.""" + bad_file = tmp_path / "broken-job.last" + bad_file.write_text("{this is not valid json!}", encoding="utf-8") + + result = check_cron_heartbeats(str(tmp_path)) + + assert result["stale_count"] == 1 + assert result["healthy_count"] == 0 + + job = result["jobs"][0] + assert job["job"] == "broken-job" + assert job["healthy"] is False + assert "CORRUPT" in job["message"] + assert job["last_seen"] is None + + def test_multiple_jobs_mixed(self, tmp_path: Path) -> None: + """Mixed healthy and stale jobs are correctly counted.""" + now = time.time() + + # 3 healthy jobs (recent) + write_heartbeat_file(tmp_path, "job-a", timestamp=now - 60, interval=3600) + write_heartbeat_file(tmp_path, "job-b", timestamp=now - 1800, interval=3600) + write_heartbeat_file(tmp_path, "job-c", timestamp=now - 3599, interval=3600) + + # 2 stale jobs + write_heartbeat_file(tmp_path, "job-d", timestamp=now - 10000, interval=3600) + write_heartbeat_file(tmp_path, "job-e", timestamp=now - 86400, interval=3600) + + result = check_cron_heartbeats(str(tmp_path)) + + assert result["stale_count"] == 2 + assert result["healthy_count"] == 3 + assert len(result["jobs"]) == 5 + + stale_jobs = {j["job"] for j in result["jobs"] if not j["healthy"]} + healthy_jobs = {j["job"] for j in result["jobs"] if j["healthy"]} + assert stale_jobs == {"job-d", "job-e"} + assert healthy_jobs == {"job-a", "job-b", "job-c"} + + def test_result_contains_required_keys(self, tmp_path: Path) -> None: + """Result dict contains all required keys.""" + now = time.time() + write_heartbeat_file(tmp_path, "test-job", timestamp=now - 100, interval=3600) + + result = check_cron_heartbeats(str(tmp_path)) + + assert "checked_at" in result + assert "jobs" in result + assert "stale_count" in result + assert "healthy_count" in result + + job = result["jobs"][0] + assert "job" in job + assert "healthy" in job + assert "age_secs" in job + assert "interval" in job + assert "last_seen" in job + assert "message" in job + + def test_job_last_seen_is_iso_timestamp(self, tmp_path: Path) -> None: + """last_seen field is a valid ISO 8601 timestamp string.""" + from datetime import datetime + now = time.time() + write_heartbeat_file(tmp_path, "ts-job", timestamp=now - 100, interval=3600) + + result = check_cron_heartbeats(str(tmp_path)) + job = result["jobs"][0] + + # Should be parseable as an ISO timestamp + assert job["last_seen"] is not None + dt = datetime.fromisoformat(job["last_seen"]) + assert dt is not None + + def test_checked_at_is_iso_timestamp(self, tmp_path: Path) -> None: + """checked_at is a valid ISO 8601 timestamp string.""" + from datetime import datetime + result = check_cron_heartbeats(str(tmp_path)) + + dt = datetime.fromisoformat(result["checked_at"]) + assert dt is not None + + def test_custom_interval_applied(self, tmp_path: Path) -> None: + """Custom interval (e.g. daily) is respected for stale detection.""" + now = time.time() + # 25 hours ago with 12-hour interval → 25 > 2×12 = 24 → stale + write_heartbeat_file(tmp_path, "daily-job", timestamp=now - 90000, interval=43200) + + result = check_cron_heartbeats(str(tmp_path)) + + assert result["stale_count"] == 1 + job = result["jobs"][0] + assert job["interval"] == 43200 + assert not job["healthy"] + + def test_custom_interval_healthy(self, tmp_path: Path) -> None: + """Job within 2× custom interval is healthy.""" + now = time.time() + # 23 hours ago with 12-hour interval → 23 < 2×12 = 24 → healthy + write_heartbeat_file(tmp_path, "daily-job", timestamp=now - 82800, interval=43200) + + result = check_cron_heartbeats(str(tmp_path)) + + assert result["stale_count"] == 0 + assert result["healthy_count"] == 1 + + def test_deterministic_with_mocked_time(self, tmp_path: Path) -> None: + """Test with mocked time.time() for fully deterministic assertion.""" + fake_now = 1700000000.0 + + write_heartbeat_file(tmp_path, "frozen-job", timestamp=fake_now - 500, interval=3600) + + with patch("time.time", return_value=fake_now): + result = check_cron_heartbeats(str(tmp_path)) + + job = result["jobs"][0] + # age should be exactly 500s + assert job["age_secs"] == pytest.approx(500.0, abs=0.01) + assert job["healthy"] is True # 500 < 7200 + + def test_stale_with_mocked_time(self, tmp_path: Path) -> None: + """Stale detection with mocked time is exact.""" + fake_now = 1700000000.0 + + # 8000s ago with 3600s interval → 8000 > 7200 → stale + write_heartbeat_file(tmp_path, "frozen-stale", timestamp=fake_now - 8000, interval=3600) + + with patch("time.time", return_value=fake_now): + result = check_cron_heartbeats(str(tmp_path)) + + job = result["jobs"][0] + assert job["age_secs"] == pytest.approx(8000.0, abs=0.01) + assert job["healthy"] is False + + +class TestWriteAlert: + + def test_alert_file_created(self, tmp_path: Path) -> None: + """write_alert creates an alert file in the alerts subdirectory.""" + job_info = { + "job": "test-job", + "healthy": False, + "age_secs": 8000.0, + "interval": 3600, + "last_seen": "2024-01-01T00:00:00+00:00", + "message": "STALE (last 8000s ago, interval 3600s — exceeds 2x threshold of 7200s)", + } + write_alert(str(tmp_path), job_info) + + alert_file = tmp_path / "alerts" / "test-job.alert" + assert alert_file.exists() + + def test_alert_file_content(self, tmp_path: Path) -> None: + """Alert file contains correct JSON fields.""" + job_info = { + "job": "my-job", + "healthy": False, + "age_secs": 9000.0, + "interval": 3600, + "last_seen": "2024-06-01T12:00:00+00:00", + "message": "STALE", + } + write_alert(str(tmp_path), job_info) + + alert_file = tmp_path / "alerts" / "my-job.alert" + data = json.loads(alert_file.read_text()) + + assert data["alert_level"] == "P1" + assert data["job"] == "my-job" + assert data["age_secs"] == 9000.0 + assert data["interval"] == 3600 + assert "detected_at" in data + + def test_alert_no_partial_files_left(self, tmp_path: Path) -> None: + """No temp files remain after a successful write.""" + job_info = { + "job": "clean-job", + "healthy": False, + "age_secs": 8000.0, + "interval": 3600, + "last_seen": None, + "message": "STALE", + } + write_alert(str(tmp_path), job_info) + + alerts_dir = tmp_path / "alerts" + # Only the .alert file should exist — no .tmp files + files = list(alerts_dir.iterdir()) + assert len(files) == 1 + assert files[0].suffix == ".alert" -- 2.43.0