[claude] Poka-yoke cron heartbeats: write, check, and report (#1096) #1107
326
bin/bezalel_heartbeat_check.py
Executable file
326
bin/bezalel_heartbeat_check.py
Executable file
@@ -0,0 +1,326 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Bezalel Meta-Heartbeat Checker — stale cron detection (poka-yoke #1096)
|
||||
|
||||
Monitors all cron job heartbeat files and alerts P1 when any job has been
|
||||
silent for more than 2× its declared interval.
|
||||
|
||||
POKA-YOKE design:
|
||||
Prevention — cron-heartbeat-write.sh writes a .last file atomically after
|
||||
every successful cron job completion, stamping its interval.
|
||||
Detection — this script runs every 15 minutes (via systemd timer) and
|
||||
raises P1 on stderr + writes an alert file for any stale job.
|
||||
Correction — alerts are loud enough (P1 stderr + alert files) for
|
||||
monitoring/humans to intervene before the next run window.
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
=================
|
||||
Pure stdlib. No pip installs.
|
||||
|
||||
USAGE
|
||||
=====
|
||||
# One-shot check (default dir)
|
||||
python bin/bezalel_heartbeat_check.py
|
||||
|
||||
# Override heartbeat dir
|
||||
python bin/bezalel_heartbeat_check.py --heartbeat-dir /tmp/test-beats
|
||||
|
||||
# Dry-run (check + report, don't write alert files)
|
||||
python bin/bezalel_heartbeat_check.py --dry-run
|
||||
|
||||
# JSON output (for piping into other tools)
|
||||
python bin/bezalel_heartbeat_check.py --json
|
||||
|
||||
EXIT CODES
|
||||
==========
|
||||
0 — all jobs healthy (or no .last files found yet)
|
||||
1 — one or more stale beats detected
|
||||
2 — heartbeat dir unreadable
|
||||
|
||||
IMPORTABLE API
|
||||
==============
|
||||
from bin.bezalel_heartbeat_check import check_cron_heartbeats
|
||||
|
||||
result = check_cron_heartbeats("/var/run/bezalel/heartbeats")
|
||||
# Returns dict with keys: checked_at, jobs, stale_count, healthy_count
|
||||
|
||||
Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("bezalel.heartbeat")
|
||||
|
||||
# ── Configuration ────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_HEARTBEAT_DIR = "/var/run/bezalel/heartbeats"
|
||||
|
||||
|
||||
# ── Core checker ─────────────────────────────────────────────────────
|
||||
|
||||
def check_cron_heartbeats(heartbeat_dir: str = DEFAULT_HEARTBEAT_DIR) -> Dict[str, Any]:
|
||||
"""
|
||||
Scan all .last files in heartbeat_dir and determine which jobs are stale.
|
||||
|
||||
Returns a dict:
|
||||
{
|
||||
"checked_at": "<ISO 8601 timestamp>",
|
||||
"jobs": [
|
||||
{
|
||||
"job": str,
|
||||
"healthy": bool,
|
||||
"age_secs": float,
|
||||
"interval": int,
|
||||
"last_seen": str or None, # ISO timestamp of last heartbeat
|
||||
"message": str,
|
||||
},
|
||||
...
|
||||
],
|
||||
"stale_count": int,
|
||||
"healthy_count": int,
|
||||
}
|
||||
|
||||
On empty dir (no .last files), returns jobs=[] with stale_count=0.
|
||||
On corrupt .last file, reports that job as stale with an error message.
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
now_ts = time.time()
|
||||
checked_at = datetime.fromtimestamp(now_ts, tz=timezone.utc).isoformat()
|
||||
|
||||
hb_path = Path(heartbeat_dir)
|
||||
jobs: List[Dict[str, Any]] = []
|
||||
|
||||
if not hb_path.exists():
|
||||
return {
|
||||
"checked_at": checked_at,
|
||||
"jobs": [],
|
||||
"stale_count": 0,
|
||||
"healthy_count": 0,
|
||||
}
|
||||
|
||||
last_files = sorted(hb_path.glob("*.last"))
|
||||
|
||||
for last_file in last_files:
|
||||
job_name = last_file.stem # filename without .last extension
|
||||
|
||||
# Read and parse the heartbeat file
|
||||
try:
|
||||
raw = last_file.read_text(encoding="utf-8")
|
||||
data = json.loads(raw)
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
jobs.append({
|
||||
"job": job_name,
|
||||
"healthy": False,
|
||||
"age_secs": float("inf"),
|
||||
"interval": 3600,
|
||||
"last_seen": None,
|
||||
"message": f"CORRUPT: cannot read/parse heartbeat file: {exc}",
|
||||
})
|
||||
continue
|
||||
|
||||
# Extract fields with safe defaults
|
||||
beat_timestamp = float(data.get("timestamp", 0))
|
||||
interval = int(data.get("interval", 3600))
|
||||
pid = data.get("pid", "?")
|
||||
|
||||
age_secs = now_ts - beat_timestamp
|
||||
|
||||
# Convert beat_timestamp to a readable ISO string
|
||||
try:
|
||||
last_seen = datetime.fromtimestamp(beat_timestamp, tz=timezone.utc).isoformat()
|
||||
except (OSError, OverflowError, ValueError):
|
||||
last_seen = None
|
||||
|
||||
# Stale = silent for more than 2× the declared interval
|
||||
threshold = 2 * interval
|
||||
is_stale = age_secs > threshold
|
||||
|
||||
if is_stale:
|
||||
message = (
|
||||
f"STALE (last {age_secs:.0f}s ago, interval {interval}s"
|
||||
f" — exceeds 2x threshold of {threshold}s)"
|
||||
)
|
||||
else:
|
||||
message = f"OK (last {age_secs:.0f}s ago, interval {interval}s)"
|
||||
|
||||
jobs.append({
|
||||
"job": job_name,
|
||||
"healthy": not is_stale,
|
||||
"age_secs": age_secs,
|
||||
"interval": interval,
|
||||
"last_seen": last_seen,
|
||||
"message": message,
|
||||
})
|
||||
|
||||
stale_count = sum(1 for j in jobs if not j["healthy"])
|
||||
healthy_count = sum(1 for j in jobs if j["healthy"])
|
||||
|
||||
return {
|
||||
"checked_at": checked_at,
|
||||
"jobs": jobs,
|
||||
"stale_count": stale_count,
|
||||
"healthy_count": healthy_count,
|
||||
}
|
||||
|
||||
|
||||
# ── Alert file writer ────────────────────────────────────────────────
|
||||
|
||||
def write_alert(heartbeat_dir: str, job_info: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Write an alert file for a stale job to <heartbeat_dir>/alerts/<job>.alert
|
||||
|
||||
Alert files are watched by external monitoring. They persist until the
|
||||
job runs again and clears stale status on the next check cycle.
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
alerts_dir = Path(heartbeat_dir) / "alerts"
|
||||
try:
|
||||
alerts_dir.mkdir(parents=True, exist_ok=True)
|
||||
except OSError as exc:
|
||||
logger.warning("Cannot create alerts dir %s: %s", alerts_dir, exc)
|
||||
return
|
||||
|
||||
alert_file = alerts_dir / f"{job_info['job']}.alert"
|
||||
now_str = datetime.now(tz=timezone.utc).isoformat()
|
||||
|
||||
content = {
|
||||
"alert_level": "P1",
|
||||
"job": job_info["job"],
|
||||
"message": job_info["message"],
|
||||
"age_secs": job_info["age_secs"],
|
||||
"interval": job_info["interval"],
|
||||
"last_seen": job_info["last_seen"],
|
||||
"detected_at": now_str,
|
||||
}
|
||||
|
||||
# Atomic write via temp + rename (same poka-yoke pattern as the writer)
|
||||
tmp_file = alert_file.with_suffix(f".alert.tmp.{os.getpid()}")
|
||||
try:
|
||||
tmp_file.write_text(json.dumps(content, indent=2), encoding="utf-8")
|
||||
tmp_file.rename(alert_file)
|
||||
except OSError as exc:
|
||||
logger.warning("Failed to write alert file %s: %s", alert_file, exc)
|
||||
tmp_file.unlink(missing_ok=True)
|
||||
|
||||
|
||||
# ── Main runner ──────────────────────────────────────────────────────
|
||||
|
||||
def run_check(heartbeat_dir: str, dry_run: bool = False, output_json: bool = False) -> int:
|
||||
"""
|
||||
Run a full heartbeat check cycle. Returns exit code (0/1/2).
|
||||
|
||||
Exit codes:
|
||||
0 — all healthy (or no .last files found yet)
|
||||
1 — stale beats detected
|
||||
2 — heartbeat dir unreadable (permissions, etc.)
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
hb_path = Path(heartbeat_dir)
|
||||
|
||||
# Check if dir exists but is unreadable (permissions)
|
||||
if hb_path.exists() and not os.access(heartbeat_dir, os.R_OK):
|
||||
logger.error("Heartbeat dir unreadable: %s", heartbeat_dir)
|
||||
return 2
|
||||
|
||||
result = check_cron_heartbeats(heartbeat_dir)
|
||||
|
||||
if output_json:
|
||||
print(json.dumps(result, indent=2))
|
||||
return 1 if result["stale_count"] > 0 else 0
|
||||
|
||||
# Human-readable output
|
||||
if not result["jobs"]:
|
||||
logger.warning(
|
||||
"No .last files found in %s — bezalel not yet provisioned or no jobs registered.",
|
||||
heartbeat_dir,
|
||||
)
|
||||
return 0
|
||||
|
||||
for job in result["jobs"]:
|
||||
if job["healthy"]:
|
||||
logger.info(" + %s: %s", job["job"], job["message"])
|
||||
else:
|
||||
logger.error(" - %s: %s", job["job"], job["message"])
|
||||
|
||||
if result["stale_count"] > 0:
|
||||
for job in result["jobs"]:
|
||||
if not job["healthy"]:
|
||||
# P1 alert to stderr
|
||||
print(
|
||||
f"[P1-ALERT] STALE CRON JOB: {job['job']} — {job['message']}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
if not dry_run:
|
||||
write_alert(heartbeat_dir, job)
|
||||
else:
|
||||
logger.info("DRY RUN — would write alert for stale job: %s", job["job"])
|
||||
|
||||
logger.error(
|
||||
"Heartbeat check FAILED: %d stale, %d healthy",
|
||||
result["stale_count"],
|
||||
result["healthy_count"],
|
||||
)
|
||||
return 1
|
||||
|
||||
logger.info(
|
||||
"Heartbeat check PASSED: %d healthy, %d stale",
|
||||
result["healthy_count"],
|
||||
result["stale_count"],
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
# ── CLI entrypoint ───────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description=(
|
||||
"Bezalel Meta-Heartbeat Checker — detect silent cron failures (poka-yoke #1096)"
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--heartbeat-dir",
|
||||
default=DEFAULT_HEARTBEAT_DIR,
|
||||
help=f"Directory containing .last heartbeat files (default: {DEFAULT_HEARTBEAT_DIR})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Check and report but do not write alert files",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json",
|
||||
action="store_true",
|
||||
dest="output_json",
|
||||
help="Output results as JSON (for integration with other tools)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
exit_code = run_check(
|
||||
heartbeat_dir=args.heartbeat_dir,
|
||||
dry_run=args.dry_run,
|
||||
output_json=args.output_json,
|
||||
)
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -2,11 +2,17 @@
|
||||
Morning Report Generator — runs at 0600 to compile overnight activity.
|
||||
Gathers: cycles executed, issues closed, PRs merged, commits pushed.
|
||||
Outputs a structured report for delivery to the main channel.
|
||||
|
||||
Includes a HEARTBEAT PANEL that checks all cron job heartbeats via
|
||||
bezalel_heartbeat_check.py (poka-yoke #1096). Any stale jobs surface
|
||||
as blockers in the report.
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
@@ -118,7 +124,46 @@ def generate_morning_report():
|
||||
if not report["highlights"] and not report["blockers"]:
|
||||
print("No significant activity or blockers detected.")
|
||||
print("")
|
||||
|
||||
|
||||
# ── Heartbeat panel (poka-yoke #1096) ────────────────────────────────────
|
||||
# Import bezalel_heartbeat_check via importlib so we don't need __init__.py
|
||||
# or a sys.path hack. If the module is missing or the dir doesn't exist,
|
||||
# we print a "not provisioned" notice and continue — never crash the report.
|
||||
_hb_result = None
|
||||
try:
|
||||
_project_root = Path(__file__).parent.parent
|
||||
_hb_spec = importlib.util.spec_from_file_location(
|
||||
"bezalel_heartbeat_check",
|
||||
_project_root / "bin" / "bezalel_heartbeat_check.py",
|
||||
)
|
||||
if _hb_spec is not None:
|
||||
_hb_mod = importlib.util.module_from_spec(_hb_spec)
|
||||
sys.modules.setdefault("bezalel_heartbeat_check", _hb_mod)
|
||||
_hb_spec.loader.exec_module(_hb_mod) # type: ignore[union-attr]
|
||||
_hb_result = _hb_mod.check_cron_heartbeats()
|
||||
except Exception:
|
||||
_hb_result = None
|
||||
|
||||
print("HEARTBEAT PANEL:")
|
||||
if _hb_result is None or not _hb_result.get("jobs"):
|
||||
print(" HEARTBEAT PANEL: no data (bezalel not provisioned)")
|
||||
report["heartbeat_panel"] = {"status": "not_provisioned"}
|
||||
else:
|
||||
for _job in _hb_result["jobs"]:
|
||||
_prefix = "+" if _job["healthy"] else "-"
|
||||
print(f" {_prefix} {_job['job']}: {_job['message']}")
|
||||
if not _job["healthy"]:
|
||||
report["blockers"].append(
|
||||
f"Stale heartbeat: {_job['job']} — {_job['message']}"
|
||||
)
|
||||
print("")
|
||||
report["heartbeat_panel"] = {
|
||||
"checked_at": _hb_result.get("checked_at"),
|
||||
"healthy_count": _hb_result.get("healthy_count", 0),
|
||||
"stale_count": _hb_result.get("stale_count", 0),
|
||||
"jobs": _hb_result.get("jobs", []),
|
||||
}
|
||||
|
||||
# Save report
|
||||
report_dir = Path(os.path.expanduser("~/.local/timmy/reports"))
|
||||
report_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
115
scripts/cron-heartbeat-write.sh
Executable file
115
scripts/cron-heartbeat-write.sh
Executable file
@@ -0,0 +1,115 @@
|
||||
#!/usr/bin/env bash
|
||||
# cron-heartbeat-write.sh — Bezalel Cron Heartbeat Writer (poka-yoke #1096)
|
||||
# Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
#
|
||||
# POKA-YOKE design:
|
||||
# Prevention — Cron jobs declare their identity + expected interval up front.
|
||||
# Detection — bezalel_heartbeat_check.py reads these files every 15 min and
|
||||
# alerts P1 if any job is silent for > 2× its interval.
|
||||
# Correction — Alerts fire fast enough for manual intervention or auto-restart
|
||||
# before the next scheduled run window expires.
|
||||
#
|
||||
# Usage:
|
||||
# cron-heartbeat-write.sh <job-name> [interval-seconds]
|
||||
#
|
||||
# <job-name> Unique identifier for this cron job (e.g. "morning-report")
|
||||
# [interval-seconds] Expected run interval in seconds (default: 3600)
|
||||
#
|
||||
# The heartbeat file is written to:
|
||||
# /var/run/bezalel/heartbeats/<job-name>.last
|
||||
#
|
||||
# File format (JSON):
|
||||
# {"job":"<name>","timestamp":<epoch_float>,"interval":<secs>,"pid":<pid>}
|
||||
#
|
||||
# This script ALWAYS exits 0 — it must never crash the calling cron job.
|
||||
#
|
||||
# Typical crontab usage:
|
||||
# 0 * * * * /root/wizards/the-nexus/scripts/cron-heartbeat-write.sh hourly-job 3600
|
||||
# 0 6 * * * /root/wizards/the-nexus/scripts/cron-heartbeat-write.sh morning-report 86400
|
||||
|
||||
set -uo pipefail
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────────
|
||||
HEARTBEAT_DIR="${BEZALEL_HEARTBEAT_DIR:-/var/run/bezalel/heartbeats}"
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] HEARTBEAT: $*"; }
|
||||
warn() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] HEARTBEAT WARNING: $*" >&2; }
|
||||
|
||||
# ── Input validation ──────────────────────────────────────────────────────────
|
||||
if [[ $# -lt 1 ]]; then
|
||||
warn "Usage: $0 <job-name> [interval-seconds]"
|
||||
warn "No job name provided — heartbeat not written."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
JOB_NAME="$1"
|
||||
INTERVAL_SECS="${2:-3600}"
|
||||
|
||||
# Sanitize job name to prevent path traversal / weird filenames
|
||||
# Allow alphanumeric, dash, underscore, dot only
|
||||
SAFE_JOB_NAME="${JOB_NAME//[^a-zA-Z0-9_.-]/}"
|
||||
if [[ -z "$SAFE_JOB_NAME" ]]; then
|
||||
warn "Job name '${JOB_NAME}' contains only unsafe characters — heartbeat not written."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [[ "$SAFE_JOB_NAME" != "$JOB_NAME" ]]; then
|
||||
warn "Job name sanitized: '${JOB_NAME}' → '${SAFE_JOB_NAME}'"
|
||||
fi
|
||||
|
||||
# Validate interval is a positive integer
|
||||
if ! [[ "$INTERVAL_SECS" =~ ^[0-9]+$ ]] || (( INTERVAL_SECS < 1 )); then
|
||||
warn "Invalid interval '${INTERVAL_SECS}' — using default 3600."
|
||||
INTERVAL_SECS=3600
|
||||
fi
|
||||
|
||||
# ── Create heartbeat directory ────────────────────────────────────────────────
|
||||
if ! mkdir -p "$HEARTBEAT_DIR" 2>/dev/null; then
|
||||
warn "Cannot create heartbeat dir '${HEARTBEAT_DIR}' — heartbeat not written."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# ── Build JSON payload ────────────────────────────────────────────────────────
|
||||
# Use python3 for reliable epoch float and JSON encoding.
|
||||
# Falls back to date-based approach if python3 unavailable.
|
||||
TIMESTAMP=$(python3 -c "import time; print(time.time())" 2>/dev/null \
|
||||
|| date +%s)
|
||||
|
||||
CURRENT_PID=$$
|
||||
|
||||
PAYLOAD=$(python3 -c "
|
||||
import json, sys
|
||||
print(json.dumps({
|
||||
'job': sys.argv[1],
|
||||
'timestamp': float(sys.argv[2]),
|
||||
'interval': int(sys.argv[3]),
|
||||
'pid': int(sys.argv[4]),
|
||||
}))
|
||||
" "$SAFE_JOB_NAME" "$TIMESTAMP" "$INTERVAL_SECS" "$CURRENT_PID" 2>/dev/null)
|
||||
|
||||
if [[ -z "$PAYLOAD" ]]; then
|
||||
# Minimal fallback if python3 fails
|
||||
PAYLOAD="{\"job\":\"${SAFE_JOB_NAME}\",\"timestamp\":${TIMESTAMP},\"interval\":${INTERVAL_SECS},\"pid\":${CURRENT_PID}}"
|
||||
fi
|
||||
|
||||
# ── Atomic write via temp + rename ────────────────────────────────────────────
|
||||
# Writes to a temp file first then renames, so bezalel_heartbeat_check.py
|
||||
# never sees a partial file mid-write. This is the poka-yoke atomic guarantee.
|
||||
TARGET_FILE="${HEARTBEAT_DIR}/${SAFE_JOB_NAME}.last"
|
||||
TMP_FILE="${HEARTBEAT_DIR}/.${SAFE_JOB_NAME}.last.tmp.$$"
|
||||
|
||||
if printf '%s\n' "$PAYLOAD" > "$TMP_FILE" 2>/dev/null; then
|
||||
if mv "$TMP_FILE" "$TARGET_FILE" 2>/dev/null; then
|
||||
log "Heartbeat written: ${TARGET_FILE} (job=${SAFE_JOB_NAME}, interval=${INTERVAL_SECS}s)"
|
||||
else
|
||||
warn "mv failed for '${TMP_FILE}' → '${TARGET_FILE}' — heartbeat not committed."
|
||||
rm -f "$TMP_FILE" 2>/dev/null || true
|
||||
fi
|
||||
else
|
||||
warn "Write to temp file '${TMP_FILE}' failed — heartbeat not written."
|
||||
rm -f "$TMP_FILE" 2>/dev/null || true
|
||||
fi
|
||||
|
||||
# Always exit 0 — never crash the calling cron job.
|
||||
exit 0
|
||||
11
scripts/systemd/bezalel-meta-heartbeat.service
Normal file
11
scripts/systemd/bezalel-meta-heartbeat.service
Normal file
@@ -0,0 +1,11 @@
|
||||
[Unit]
|
||||
Description=Bezalel Meta-Heartbeat — stale cron detection (poka-yoke #1096)
|
||||
Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=oneshot
|
||||
ExecStart=/root/wizards/the-nexus/bin/bezalel_heartbeat_check.py
|
||||
StandardOutput=journal
|
||||
StandardError=journal
|
||||
Environment=HOME=/root
|
||||
11
scripts/systemd/bezalel-meta-heartbeat.timer
Normal file
11
scripts/systemd/bezalel-meta-heartbeat.timer
Normal file
@@ -0,0 +1,11 @@
|
||||
[Unit]
|
||||
Description=Bezalel Meta-Heartbeat — fires every 15 minutes (poka-yoke #1096)
|
||||
Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
|
||||
[Timer]
|
||||
OnBootSec=5min
|
||||
OnUnitActiveSec=15min
|
||||
Persistent=true
|
||||
|
||||
[Install]
|
||||
WantedBy=timers.target
|
||||
334
tests/test_bezalel_heartbeat.py
Normal file
334
tests/test_bezalel_heartbeat.py
Normal file
@@ -0,0 +1,334 @@
|
||||
"""
|
||||
Tests for Bezalel Cron Heartbeat system (poka-yoke #1096).
|
||||
|
||||
Validates:
|
||||
- check_cron_heartbeats() with healthy and stale jobs
|
||||
- Empty heartbeat dir (no .last files) returns safely
|
||||
- Corrupt JSON in a .last file is handled gracefully
|
||||
- Mixed healthy/stale jobs
|
||||
- Alert file writing (write_alert)
|
||||
- The 2× interval staleness threshold is applied correctly
|
||||
|
||||
Uses importlib to load bin/bezalel_heartbeat_check.py without __init__.py,
|
||||
following the same pattern as test_nexus_watchdog.py.
|
||||
|
||||
Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Load module under test ────────────────────────────────────────────────────
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
_hb_spec = importlib.util.spec_from_file_location(
|
||||
"bezalel_heartbeat_check_test",
|
||||
PROJECT_ROOT / "bin" / "bezalel_heartbeat_check.py",
|
||||
)
|
||||
_hb_mod = importlib.util.module_from_spec(_hb_spec)
|
||||
sys.modules["bezalel_heartbeat_check_test"] = _hb_mod
|
||||
_hb_spec.loader.exec_module(_hb_mod)
|
||||
|
||||
check_cron_heartbeats = _hb_mod.check_cron_heartbeats
|
||||
write_alert = _hb_mod.write_alert
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def write_heartbeat_file(
|
||||
directory: Path,
|
||||
job: str,
|
||||
timestamp: float,
|
||||
interval: int = 3600,
|
||||
pid: int = 12345,
|
||||
) -> Path:
|
||||
"""Write a valid .last heartbeat file for testing."""
|
||||
directory.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"job": job,
|
||||
"timestamp": timestamp,
|
||||
"interval": interval,
|
||||
"pid": pid,
|
||||
}
|
||||
path = directory / f"{job}.last"
|
||||
path.write_text(json.dumps(payload), encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
# ── Tests ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCheckCronHeartbeats:
|
||||
|
||||
def test_healthy_job(self, tmp_path: Path) -> None:
|
||||
"""A job with a recent timestamp is reported as healthy."""
|
||||
now = time.time()
|
||||
write_heartbeat_file(tmp_path, "morning-report", timestamp=now - 100, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 1
|
||||
assert len(result["jobs"]) == 1
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["job"] == "morning-report"
|
||||
assert job["healthy"] is True
|
||||
assert job["age_secs"] == pytest.approx(100, abs=5)
|
||||
assert "OK" in job["message"]
|
||||
|
||||
def test_stale_job(self, tmp_path: Path) -> None:
|
||||
"""A job silent for > 2× its interval is reported as stale."""
|
||||
now = time.time()
|
||||
# 3 hours ago with 1-hour interval → 3 > 2×1 → stale
|
||||
write_heartbeat_file(tmp_path, "hourly-sync", timestamp=now - 10800, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
assert result["healthy_count"] == 0
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["job"] == "hourly-sync"
|
||||
assert job["healthy"] is False
|
||||
assert "STALE" in job["message"]
|
||||
assert "exceeds 2x threshold" in job["message"]
|
||||
|
||||
def test_just_within_threshold(self, tmp_path: Path) -> None:
|
||||
"""A job at exactly 2× interval is NOT stale (threshold is strictly >)."""
|
||||
fake_now = 1700000000.0
|
||||
# age = 7200, threshold = 2 * 3600 = 7200 — NOT stale (not strictly greater)
|
||||
write_heartbeat_file(tmp_path, "edge-job", timestamp=fake_now - 7200, interval=3600)
|
||||
|
||||
with patch("time.time", return_value=fake_now):
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
# age_secs == 7200 and threshold = 7200, so not stale (age > threshold is False)
|
||||
assert result["stale_count"] == 0
|
||||
|
||||
def test_stale_threshold_just_over(self, tmp_path: Path) -> None:
|
||||
"""A job silent for 2× interval + 1 second is stale."""
|
||||
now = time.time()
|
||||
# age = 7201, threshold = 7200 — IS stale
|
||||
write_heartbeat_file(tmp_path, "edge-job", timestamp=now - 7201, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
|
||||
def test_empty_dir_returns_safely(self, tmp_path: Path) -> None:
|
||||
"""Empty heartbeat directory returns zero jobs without error."""
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 0
|
||||
assert result["jobs"] == []
|
||||
assert "checked_at" in result
|
||||
|
||||
def test_nonexistent_dir_returns_safely(self, tmp_path: Path) -> None:
|
||||
"""Non-existent heartbeat dir returns empty result without error."""
|
||||
missing = str(tmp_path / "does-not-exist")
|
||||
result = check_cron_heartbeats(missing)
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 0
|
||||
assert result["jobs"] == []
|
||||
|
||||
def test_corrupt_json_handled_gracefully(self, tmp_path: Path) -> None:
|
||||
"""Corrupt JSON in a .last file is reported as stale with an error message."""
|
||||
bad_file = tmp_path / "broken-job.last"
|
||||
bad_file.write_text("{this is not valid json!}", encoding="utf-8")
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
assert result["healthy_count"] == 0
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["job"] == "broken-job"
|
||||
assert job["healthy"] is False
|
||||
assert "CORRUPT" in job["message"]
|
||||
assert job["last_seen"] is None
|
||||
|
||||
def test_multiple_jobs_mixed(self, tmp_path: Path) -> None:
|
||||
"""Mixed healthy and stale jobs are correctly counted."""
|
||||
now = time.time()
|
||||
|
||||
# 3 healthy jobs (recent)
|
||||
write_heartbeat_file(tmp_path, "job-a", timestamp=now - 60, interval=3600)
|
||||
write_heartbeat_file(tmp_path, "job-b", timestamp=now - 1800, interval=3600)
|
||||
write_heartbeat_file(tmp_path, "job-c", timestamp=now - 3599, interval=3600)
|
||||
|
||||
# 2 stale jobs
|
||||
write_heartbeat_file(tmp_path, "job-d", timestamp=now - 10000, interval=3600)
|
||||
write_heartbeat_file(tmp_path, "job-e", timestamp=now - 86400, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 2
|
||||
assert result["healthy_count"] == 3
|
||||
assert len(result["jobs"]) == 5
|
||||
|
||||
stale_jobs = {j["job"] for j in result["jobs"] if not j["healthy"]}
|
||||
healthy_jobs = {j["job"] for j in result["jobs"] if j["healthy"]}
|
||||
assert stale_jobs == {"job-d", "job-e"}
|
||||
assert healthy_jobs == {"job-a", "job-b", "job-c"}
|
||||
|
||||
def test_result_contains_required_keys(self, tmp_path: Path) -> None:
|
||||
"""Result dict contains all required keys."""
|
||||
now = time.time()
|
||||
write_heartbeat_file(tmp_path, "test-job", timestamp=now - 100, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert "checked_at" in result
|
||||
assert "jobs" in result
|
||||
assert "stale_count" in result
|
||||
assert "healthy_count" in result
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert "job" in job
|
||||
assert "healthy" in job
|
||||
assert "age_secs" in job
|
||||
assert "interval" in job
|
||||
assert "last_seen" in job
|
||||
assert "message" in job
|
||||
|
||||
def test_job_last_seen_is_iso_timestamp(self, tmp_path: Path) -> None:
|
||||
"""last_seen field is a valid ISO 8601 timestamp string."""
|
||||
from datetime import datetime
|
||||
now = time.time()
|
||||
write_heartbeat_file(tmp_path, "ts-job", timestamp=now - 100, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
job = result["jobs"][0]
|
||||
|
||||
# Should be parseable as an ISO timestamp
|
||||
assert job["last_seen"] is not None
|
||||
dt = datetime.fromisoformat(job["last_seen"])
|
||||
assert dt is not None
|
||||
|
||||
def test_checked_at_is_iso_timestamp(self, tmp_path: Path) -> None:
|
||||
"""checked_at is a valid ISO 8601 timestamp string."""
|
||||
from datetime import datetime
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
dt = datetime.fromisoformat(result["checked_at"])
|
||||
assert dt is not None
|
||||
|
||||
def test_custom_interval_applied(self, tmp_path: Path) -> None:
|
||||
"""Custom interval (e.g. daily) is respected for stale detection."""
|
||||
now = time.time()
|
||||
# 25 hours ago with 12-hour interval → 25 > 2×12 = 24 → stale
|
||||
write_heartbeat_file(tmp_path, "daily-job", timestamp=now - 90000, interval=43200)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
job = result["jobs"][0]
|
||||
assert job["interval"] == 43200
|
||||
assert not job["healthy"]
|
||||
|
||||
def test_custom_interval_healthy(self, tmp_path: Path) -> None:
|
||||
"""Job within 2× custom interval is healthy."""
|
||||
now = time.time()
|
||||
# 23 hours ago with 12-hour interval → 23 < 2×12 = 24 → healthy
|
||||
write_heartbeat_file(tmp_path, "daily-job", timestamp=now - 82800, interval=43200)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 1
|
||||
|
||||
def test_deterministic_with_mocked_time(self, tmp_path: Path) -> None:
|
||||
"""Test with mocked time.time() for fully deterministic assertion."""
|
||||
fake_now = 1700000000.0
|
||||
|
||||
write_heartbeat_file(tmp_path, "frozen-job", timestamp=fake_now - 500, interval=3600)
|
||||
|
||||
with patch("time.time", return_value=fake_now):
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
job = result["jobs"][0]
|
||||
# age should be exactly 500s
|
||||
assert job["age_secs"] == pytest.approx(500.0, abs=0.01)
|
||||
assert job["healthy"] is True # 500 < 7200
|
||||
|
||||
def test_stale_with_mocked_time(self, tmp_path: Path) -> None:
|
||||
"""Stale detection with mocked time is exact."""
|
||||
fake_now = 1700000000.0
|
||||
|
||||
# 8000s ago with 3600s interval → 8000 > 7200 → stale
|
||||
write_heartbeat_file(tmp_path, "frozen-stale", timestamp=fake_now - 8000, interval=3600)
|
||||
|
||||
with patch("time.time", return_value=fake_now):
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["age_secs"] == pytest.approx(8000.0, abs=0.01)
|
||||
assert job["healthy"] is False
|
||||
|
||||
|
||||
class TestWriteAlert:
|
||||
|
||||
def test_alert_file_created(self, tmp_path: Path) -> None:
|
||||
"""write_alert creates an alert file in the alerts subdirectory."""
|
||||
job_info = {
|
||||
"job": "test-job",
|
||||
"healthy": False,
|
||||
"age_secs": 8000.0,
|
||||
"interval": 3600,
|
||||
"last_seen": "2024-01-01T00:00:00+00:00",
|
||||
"message": "STALE (last 8000s ago, interval 3600s — exceeds 2x threshold of 7200s)",
|
||||
}
|
||||
write_alert(str(tmp_path), job_info)
|
||||
|
||||
alert_file = tmp_path / "alerts" / "test-job.alert"
|
||||
assert alert_file.exists()
|
||||
|
||||
def test_alert_file_content(self, tmp_path: Path) -> None:
|
||||
"""Alert file contains correct JSON fields."""
|
||||
job_info = {
|
||||
"job": "my-job",
|
||||
"healthy": False,
|
||||
"age_secs": 9000.0,
|
||||
"interval": 3600,
|
||||
"last_seen": "2024-06-01T12:00:00+00:00",
|
||||
"message": "STALE",
|
||||
}
|
||||
write_alert(str(tmp_path), job_info)
|
||||
|
||||
alert_file = tmp_path / "alerts" / "my-job.alert"
|
||||
data = json.loads(alert_file.read_text())
|
||||
|
||||
assert data["alert_level"] == "P1"
|
||||
assert data["job"] == "my-job"
|
||||
assert data["age_secs"] == 9000.0
|
||||
assert data["interval"] == 3600
|
||||
assert "detected_at" in data
|
||||
|
||||
def test_alert_no_partial_files_left(self, tmp_path: Path) -> None:
|
||||
"""No temp files remain after a successful write."""
|
||||
job_info = {
|
||||
"job": "clean-job",
|
||||
"healthy": False,
|
||||
"age_secs": 8000.0,
|
||||
"interval": 3600,
|
||||
"last_seen": None,
|
||||
"message": "STALE",
|
||||
}
|
||||
write_alert(str(tmp_path), job_info)
|
||||
|
||||
alerts_dir = tmp_path / "alerts"
|
||||
# Only the .alert file should exist — no .tmp files
|
||||
files = list(alerts_dir.iterdir())
|
||||
assert len(files) == 1
|
||||
assert files[0].suffix == ".alert"
|
||||
Reference in New Issue
Block a user