diff --git a/bin/check_cron_heartbeats.py b/bin/check_cron_heartbeats.py new file mode 100644 index 0000000..d2d078b --- /dev/null +++ b/bin/check_cron_heartbeats.py @@ -0,0 +1,449 @@ +#!/usr/bin/env python3 +"""Meta-heartbeat checker — makes silent cron failures impossible. + +Reads every ``*.last`` file in the heartbeat directory and verifies that no +job has been silent for longer than **2× its declared interval**. If any job +is stale, a Gitea alert issue is created (or an existing one is updated). +When all jobs recover, the issue is closed automatically. + +This script itself should be run as a cron job every 15 minutes so the +meta-level is also covered: + + */15 * * * * cd /path/to/the-nexus && \\ + python bin/check_cron_heartbeats.py >> /var/log/bezalel/heartbeat-check.log 2>&1 + +USAGE +----- + # Check all jobs; create/update Gitea alert if any stale: + python bin/check_cron_heartbeats.py + + # Dry-run (no Gitea writes): + python bin/check_cron_heartbeats.py --dry-run + + # Output Night Watch heartbeat panel markdown: + python bin/check_cron_heartbeats.py --panel + + # Output JSON (for integration with other tools): + python bin/check_cron_heartbeats.py --json + + # Use a custom heartbeat directory: + python bin/check_cron_heartbeats.py --dir /tmp/test-heartbeats + +HEARTBEAT DIRECTORY +------------------- + Primary: /var/run/bezalel/heartbeats/ (set by ops, writable by cron user) + Fallback: ~/.bezalel/heartbeats/ (dev machines) + Override: BEZALEL_HEARTBEAT_DIR env var + +ZERO DEPENDENCIES +----------------- +Pure stdlib. No pip installs required. + +Refs: #1096 +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-7s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger("bezalel.heartbeat_checker") + +# ── Configuration ───────────────────────────────────────────────────── + +PRIMARY_HEARTBEAT_DIR = Path("/var/run/bezalel/heartbeats") +FALLBACK_HEARTBEAT_DIR = Path.home() / ".bezalel" / "heartbeats" + +GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com") +GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "") +GITEA_REPO = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus") +ALERT_TITLE_PREFIX = "[heartbeat-checker]" + +# A job is stale when its age exceeds this multiple of its declared interval +STALE_RATIO = 2.0 +# Never flag a job as stale if it completed less than this many seconds ago +# (prevents noise immediately after deployment) +MIN_STALE_AGE = 60 + + +def _resolve_heartbeat_dir() -> Path: + """Return the active heartbeat directory.""" + env = os.environ.get("BEZALEL_HEARTBEAT_DIR") + if env: + return Path(env) + if PRIMARY_HEARTBEAT_DIR.exists(): + return PRIMARY_HEARTBEAT_DIR + # Try to create it; fall back to home dir if not permitted + try: + PRIMARY_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True) + probe = PRIMARY_HEARTBEAT_DIR / ".write_probe" + probe.touch() + probe.unlink() + return PRIMARY_HEARTBEAT_DIR + except (PermissionError, OSError): + return FALLBACK_HEARTBEAT_DIR + + +# ── Data model ──────────────────────────────────────────────────────── + +@dataclass +class JobStatus: + """Health status for a single cron job's heartbeat.""" + job: str + path: Path + healthy: bool + age_seconds: float # -1 if unknown (missing/corrupt) + interval_seconds: int # 0 if unknown + staleness_ratio: float # age / interval; -1 if unknown; >STALE_RATIO = stale + last_timestamp: Optional[float] + pid: Optional[int] + raw_status: str # value from the .last file: "ok" / "warn" / "error" + message: str + + +@dataclass +class HeartbeatReport: + """Aggregate report for all cron job heartbeats in a directory.""" + timestamp: float + heartbeat_dir: Path + jobs: List[JobStatus] = field(default_factory=list) + + @property + def stale_jobs(self) -> List[JobStatus]: + return [j for j in self.jobs if not j.healthy] + + @property + def overall_healthy(self) -> bool: + return len(self.stale_jobs) == 0 + + # ── Rendering ───────────────────────────────────────────────────── + + def to_panel_markdown(self) -> str: + """Night Watch heartbeat panel — a table of all jobs with their status.""" + ts = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(self.timestamp)) + overall = "OK" if self.overall_healthy else "ALERT" + + lines = [ + f"## Heartbeat Panel — {ts}", + "", + f"**Overall:** {overall}", + "", + "| Job | Status | Age | Interval | Ratio |", + "|-----|--------|-----|----------|-------|", + ] + + if not self.jobs: + lines.append("| *(no heartbeat files found)* | — | — | — | — |") + else: + for j in self.jobs: + icon = "OK" if j.healthy else "STALE" + age_str = _fmt_duration(j.age_seconds) if j.age_seconds >= 0 else "N/A" + interval_str = _fmt_duration(j.interval_seconds) if j.interval_seconds > 0 else "N/A" + ratio_str = f"{j.staleness_ratio:.1f}x" if j.staleness_ratio >= 0 else "N/A" + lines.append( + f"| `{j.job}` | {icon} | {age_str} | {interval_str} | {ratio_str} |" + ) + + if self.stale_jobs: + lines += ["", "**Stale jobs:**"] + for j in self.stale_jobs: + lines.append(f"- `{j.job}`: {j.message}") + + lines += [ + "", + f"*Heartbeat dir: `{self.heartbeat_dir}`*", + ] + return "\n".join(lines) + + def to_alert_body(self) -> str: + """Gitea issue body when stale jobs are detected.""" + ts = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime(self.timestamp)) + stale = self.stale_jobs + + lines = [ + f"## Cron Heartbeat Alert — {ts}", + "", + f"**{len(stale)} job(s) have gone silent** (stale > {STALE_RATIO}x interval).", + "", + "| Job | Age | Interval | Ratio | Detail |", + "|-----|-----|----------|-------|--------|", + ] + + for j in stale: + age_str = _fmt_duration(j.age_seconds) if j.age_seconds >= 0 else "N/A" + interval_str = _fmt_duration(j.interval_seconds) if j.interval_seconds > 0 else "N/A" + ratio_str = f"{j.staleness_ratio:.1f}x" if j.staleness_ratio >= 0 else "N/A" + lines.append( + f"| `{j.job}` | {age_str} | {interval_str} | {ratio_str} | {j.message} |" + ) + + lines += [ + "", + "### What to do", + "1. `crontab -l` — confirm the job is still scheduled", + "2. Check the job's log for errors", + "3. Restart the job if needed", + "4. Close this issue once fresh heartbeats appear", + "", + f"*Generated by `check_cron_heartbeats.py` — dir: `{self.heartbeat_dir}`*", + ] + return "\n".join(lines) + + def to_json(self) -> Dict[str, Any]: + return { + "healthy": self.overall_healthy, + "timestamp": self.timestamp, + "heartbeat_dir": str(self.heartbeat_dir), + "jobs": [ + { + "job": j.job, + "healthy": j.healthy, + "age_seconds": j.age_seconds, + "interval_seconds": j.interval_seconds, + "staleness_ratio": j.staleness_ratio, + "raw_status": j.raw_status, + "message": j.message, + } + for j in self.jobs + ], + } + + +def _fmt_duration(seconds: float) -> str: + """Format a duration in seconds as a human-readable string.""" + s = int(seconds) + if s < 60: + return f"{s}s" + if s < 3600: + return f"{s // 60}m {s % 60}s" + return f"{s // 3600}h {(s % 3600) // 60}m" + + +# ── Job scanning ────────────────────────────────────────────────────── + +def scan_heartbeats(directory: Path) -> List[JobStatus]: + """Read every ``*.last`` file in *directory* and return their statuses.""" + if not directory.exists(): + return [] + return [_read_job_status(p.stem, p) for p in sorted(directory.glob("*.last"))] + + +def _read_job_status(job: str, path: Path) -> JobStatus: + """Parse one ``.last`` file and produce a ``JobStatus``.""" + now = time.time() + + if not path.exists(): + return JobStatus( + job=job, path=path, + healthy=False, + age_seconds=-1, + interval_seconds=0, + staleness_ratio=-1, + last_timestamp=None, + pid=None, + raw_status="missing", + message=f"Heartbeat file missing: {path}", + ) + + try: + data = json.loads(path.read_text()) + except (json.JSONDecodeError, OSError) as exc: + return JobStatus( + job=job, path=path, + healthy=False, + age_seconds=-1, + interval_seconds=0, + staleness_ratio=-1, + last_timestamp=None, + pid=None, + raw_status="corrupt", + message=f"Corrupt heartbeat: {exc}", + ) + + timestamp = float(data.get("timestamp", 0)) + interval = int(data.get("interval_seconds", 0)) + pid = data.get("pid") + raw_status = data.get("status", "ok") + + age = now - timestamp + ratio = age / interval if interval > 0 else float("inf") + stale = ratio > STALE_RATIO and age > MIN_STALE_AGE + + if stale: + message = ( + f"Silent for {_fmt_duration(age)} " + f"({ratio:.1f}x interval of {_fmt_duration(interval)})" + ) + else: + message = f"Last beat {_fmt_duration(age)} ago (ratio {ratio:.1f}x)" + + return JobStatus( + job=job, path=path, + healthy=not stale, + age_seconds=age, + interval_seconds=interval, + staleness_ratio=ratio, + last_timestamp=timestamp, + pid=pid, + raw_status=raw_status if not stale else "stale", + message=message, + ) + + +# ── Gitea alerting ──────────────────────────────────────────────────── + +def _gitea_request(method: str, path: str, data: Optional[dict] = None) -> Any: + """Make a Gitea API request; return parsed JSON or None on error.""" + import urllib.request + import urllib.error + + url = f"{GITEA_URL.rstrip('/')}/api/v1{path}" + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, method=method) + if GITEA_TOKEN: + req.add_header("Authorization", f"token {GITEA_TOKEN}") + req.add_header("Content-Type", "application/json") + req.add_header("Accept", "application/json") + + try: + with urllib.request.urlopen(req, timeout=15) as resp: + raw = resp.read().decode() + return json.loads(raw) if raw.strip() else {} + except urllib.error.HTTPError as exc: + logger.warning("Gitea %d: %s", exc.code, exc.read().decode()[:200]) + return None + except Exception as exc: + logger.warning("Gitea request failed: %s", exc) + return None + + +def _find_open_alert_issue() -> Optional[dict]: + issues = _gitea_request( + "GET", + f"/repos/{GITEA_REPO}/issues?state=open&type=issues&limit=20", + ) + if not isinstance(issues, list): + return None + for issue in issues: + if issue.get("title", "").startswith(ALERT_TITLE_PREFIX): + return issue + return None + + +def alert_on_stale(report: HeartbeatReport, dry_run: bool = False) -> None: + """Create, update, or close a Gitea alert issue based on report health.""" + if dry_run: + action = "close" if report.overall_healthy else "create/update" + logger.info("DRY RUN — would %s Gitea issue", action) + return + + if not GITEA_TOKEN: + logger.warning("GITEA_TOKEN not set — skipping Gitea alert") + return + + existing = _find_open_alert_issue() + + if report.overall_healthy: + if existing: + logger.info("All heartbeats healthy — closing issue #%d", existing["number"]) + _gitea_request( + "POST", + f"/repos/{GITEA_REPO}/issues/{existing['number']}/comments", + data={"body": "All cron heartbeats are now fresh. Closing."}, + ) + _gitea_request( + "PATCH", + f"/repos/{GITEA_REPO}/issues/{existing['number']}", + data={"state": "closed"}, + ) + return + + stale_names = ", ".join(j.job for j in report.stale_jobs) + title = f"{ALERT_TITLE_PREFIX} Stale cron heartbeats: {stale_names}" + body = report.to_alert_body() + + if existing: + logger.info("Still stale — updating issue #%d", existing["number"]) + _gitea_request( + "POST", + f"/repos/{GITEA_REPO}/issues/{existing['number']}/comments", + data={"body": body}, + ) + else: + result = _gitea_request( + "POST", + f"/repos/{GITEA_REPO}/issues", + data={"title": title, "body": body, "assignees": ["Timmy"]}, + ) + if result and result.get("number"): + logger.info("Created alert issue #%d", result["number"]) + + +# ── Entry point ─────────────────────────────────────────────────────── + +def build_report(directory: Optional[Path] = None) -> HeartbeatReport: + """Scan heartbeats and return a report. Exposed for Night Watch import.""" + hb_dir = directory if directory is not None else _resolve_heartbeat_dir() + jobs = scan_heartbeats(hb_dir) + return HeartbeatReport(timestamp=time.time(), heartbeat_dir=hb_dir, jobs=jobs) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Meta-heartbeat checker — detects silent cron failures", + ) + parser.add_argument( + "--dir", default=None, + help="Heartbeat directory (default: auto-detect)", + ) + parser.add_argument( + "--panel", action="store_true", + help="Output Night Watch heartbeat panel markdown and exit", + ) + parser.add_argument( + "--json", action="store_true", dest="output_json", + help="Output results as JSON and exit", + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Log results without writing Gitea issues", + ) + args = parser.parse_args() + + report = build_report(Path(args.dir) if args.dir else None) + + if args.panel: + print(report.to_panel_markdown()) + return + + if args.output_json: + print(json.dumps(report.to_json(), indent=2)) + sys.exit(0 if report.overall_healthy else 1) + + # Default: log + alert + if not report.jobs: + logger.info("No heartbeat files found in %s", report.heartbeat_dir) + else: + for j in report.jobs: + level = logging.INFO if j.healthy else logging.ERROR + icon = "OK " if j.healthy else "STALE" + logger.log(level, "[%s] %s: %s", icon, j.job, j.message) + + alert_on_stale(report, dry_run=args.dry_run) + sys.exit(0 if report.overall_healthy else 1) + + +if __name__ == "__main__": + main() diff --git a/bin/nexus_watchdog.py b/bin/nexus_watchdog.py index 8efc7a5..28df7a8 100644 --- a/bin/nexus_watchdog.py +++ b/bin/nexus_watchdog.py @@ -80,6 +80,15 @@ from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional +# Poka-yoke: write a cron heartbeat so check_cron_heartbeats.py can detect +# if *this* watchdog stops running. Import lazily to stay zero-dep if the +# nexus package is unavailable (e.g. very minimal test environments). +try: + from nexus.cron_heartbeat import write_cron_heartbeat as _write_cron_heartbeat + _HAS_CRON_HEARTBEAT = True +except ImportError: + _HAS_CRON_HEARTBEAT = False + logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-7s %(message)s", @@ -488,6 +497,15 @@ def run_once(args: argparse.Namespace) -> bool: elif not args.dry_run: alert_on_failure(report, dry_run=args.dry_run) + # Poka-yoke: stamp our own heartbeat so the meta-checker can detect + # if this watchdog cron job itself goes silent. Runs every 5 minutes + # by convention (*/5 * * * *). + if _HAS_CRON_HEARTBEAT: + try: + _write_cron_heartbeat("nexus_watchdog", interval_seconds=300) + except Exception: + pass # never crash the watchdog over its own heartbeat + return report.overall_healthy diff --git a/bin/night_watch.py b/bin/night_watch.py new file mode 100644 index 0000000..9451267 --- /dev/null +++ b/bin/night_watch.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +"""Night Watch — Bezalel nightly report generator. + +Runs once per night (typically at 03:00 local time via cron) and writes a +markdown report to ``reports/bezalel/nightly/.md``. + +The report always includes a **Heartbeat Panel** (acceptance criterion #3 of +issue #1096) so silent cron failures are visible in the morning brief. + +USAGE +----- + python bin/night_watch.py # write today's report + python bin/night_watch.py --dry-run # print to stdout, don't write file + python bin/night_watch.py --date 2026-04-08 # specific date + +CRONTAB +------- + 0 3 * * * cd /path/to/the-nexus && python bin/night_watch.py \\ + >> /var/log/bezalel/night-watch.log 2>&1 + +ZERO DEPENDENCIES +----------------- +Pure stdlib, plus ``check_cron_heartbeats`` from this repo (also stdlib). + +Refs: #1096 +""" + +from __future__ import annotations + +import argparse +import importlib.util +import json +import logging +import os +import shutil +import subprocess +import sys +import time +from datetime import datetime, timezone +from pathlib import Path + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-7s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger("bezalel.night_watch") + +PROJECT_ROOT = Path(__file__).parent.parent +REPORTS_DIR = PROJECT_ROOT / "reports" / "bezalel" / "nightly" + +# ── Load check_cron_heartbeats without relying on sys.path hacks ────── + +def _load_checker(): + """Import bin/check_cron_heartbeats.py as a module.""" + spec = importlib.util.spec_from_file_location( + "_check_cron_heartbeats", + PROJECT_ROOT / "bin" / "check_cron_heartbeats.py", + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +# ── System checks ───────────────────────────────────────────────────── + +def _check_service(service_name: str) -> tuple[str, str]: + """Return (status, detail) for a systemd service.""" + try: + result = subprocess.run( + ["systemctl", "is-active", service_name], + capture_output=True, text=True, timeout=5, + ) + active = result.stdout.strip() + if active == "active": + return "OK", f"{service_name} is active" + return "WARN", f"{service_name} is {active}" + except FileNotFoundError: + return "OK", f"{service_name} status unknown (systemctl not available)" + except Exception as exc: + return "WARN", f"systemctl error: {exc}" + + +def _check_disk(threshold_pct: int = 90) -> tuple[str, str]: + """Return (status, detail) for disk usage on /.""" + try: + usage = shutil.disk_usage("/") + pct = int(usage.used / usage.total * 100) + status = "OK" if pct < threshold_pct else "WARN" + return status, f"disk usage {pct}%" + except Exception as exc: + return "WARN", f"disk check failed: {exc}" + + +def _check_memory(threshold_pct: int = 90) -> tuple[str, str]: + """Return (status, detail) for memory usage.""" + try: + meminfo = Path("/proc/meminfo").read_text() + data = {} + for line in meminfo.splitlines(): + parts = line.split() + if len(parts) >= 2: + data[parts[0].rstrip(":")] = int(parts[1]) + total = data.get("MemTotal", 0) + available = data.get("MemAvailable", 0) + if total == 0: + return "OK", "memory info unavailable" + pct = int((total - available) / total * 100) + status = "OK" if pct < threshold_pct else "WARN" + return status, f"memory usage {pct}%" + except FileNotFoundError: + # Not Linux (e.g. macOS dev machine) + return "OK", "memory check skipped (not Linux)" + except Exception as exc: + return "WARN", f"memory check failed: {exc}" + + +def _check_gitea_reachability(gitea_url: str = "https://forge.alexanderwhitestone.com") -> tuple[str, str]: + """Return (status, detail) for Gitea HTTPS reachability.""" + import urllib.request + import urllib.error + try: + with urllib.request.urlopen(gitea_url, timeout=10) as resp: + code = resp.status + if code == 200: + return "OK", f"Alpha SSH not configured from Beta, but Gitea HTTPS is responding ({code})" + return "WARN", f"Gitea returned HTTP {code}" + except Exception as exc: + return "WARN", f"Gitea unreachable: {exc}" + + +def _check_world_readable_secrets() -> tuple[str, str]: + """Return (status, detail) for world-readable sensitive files.""" + sensitive_patterns = ["*.key", "*.pem", "*.secret", ".env", "*.token"] + found = [] + try: + for pattern in sensitive_patterns: + for path in PROJECT_ROOT.rglob(pattern): + try: + mode = path.stat().st_mode + if mode & 0o004: # world-readable + found.append(str(path.relative_to(PROJECT_ROOT))) + except OSError: + pass + if found: + return "WARN", f"world-readable sensitive files: {', '.join(found[:3])}" + return "OK", "no sensitive recently-modified world-readable files found" + except Exception as exc: + return "WARN", f"security check failed: {exc}" + + +# ── Report generation ───────────────────────────────────────────────── + +def generate_report(date_str: str, checker_mod) -> str: + """Build the full nightly report markdown string.""" + now_utc = datetime.now(timezone.utc) + ts = now_utc.strftime("%Y-%m-%d %02H:%M UTC") + + rows: list[tuple[str, str, str]] = [] + + service_status, service_detail = _check_service("hermes-bezalel") + rows.append(("Service", service_status, service_detail)) + + disk_status, disk_detail = _check_disk() + rows.append(("Disk", disk_status, disk_detail)) + + mem_status, mem_detail = _check_memory() + rows.append(("Memory", mem_status, mem_detail)) + + gitea_status, gitea_detail = _check_gitea_reachability() + rows.append(("Alpha VPS", gitea_status, gitea_detail)) + + sec_status, sec_detail = _check_world_readable_secrets() + rows.append(("Security", sec_status, sec_detail)) + + overall = "OK" if all(r[1] == "OK" for r in rows) else "WARN" + + lines = [ + f"# Bezalel Night Watch — {ts}", + "", + f"**Overall:** {overall}", + "", + "| Check | Status | Detail |", + "|-------|--------|--------|", + ] + for check, status, detail in rows: + lines.append(f"| {check} | {status} | {detail} |") + + lines.append("") + lines.append("---") + lines.append("") + + # ── Heartbeat Panel (acceptance criterion #1096) ────────────────── + try: + hb_report = checker_mod.build_report() + lines.append(hb_report.to_panel_markdown()) + except Exception as exc: + lines += [ + "## Heartbeat Panel", + "", + f"*(heartbeat check failed: {exc})*", + ] + + lines += [ + "", + "---", + "", + "*Automated by Bezalel Night Watch*", + "", + ] + + return "\n".join(lines) + + +# ── Entry point ─────────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser( + description="Bezalel Night Watch — nightly report generator", + ) + parser.add_argument( + "--date", default=None, + help="Report date as YYYY-MM-DD (default: today UTC)", + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Print report to stdout instead of writing to disk", + ) + args = parser.parse_args() + + date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d") + + checker = _load_checker() + report_text = generate_report(date_str, checker) + + if args.dry_run: + print(report_text) + return + + REPORTS_DIR.mkdir(parents=True, exist_ok=True) + report_path = REPORTS_DIR / f"{date_str}.md" + report_path.write_text(report_text) + logger.info("Night Watch report written to %s", report_path) + + +if __name__ == "__main__": + main() diff --git a/nexus/cron_heartbeat.py b/nexus/cron_heartbeat.py new file mode 100644 index 0000000..71f72f0 --- /dev/null +++ b/nexus/cron_heartbeat.py @@ -0,0 +1,136 @@ +"""Poka-yoke heartbeat writer for cron jobs. + +Every scheduled job calls write_cron_heartbeat() on successful completion so +the meta-heartbeat checker (bin/check_cron_heartbeats.py) can verify that all +jobs are still alive. Absence of a fresh heartbeat = silent failure. + +Path convention +--------------- +Primary: /var/run/bezalel/heartbeats/.last +Fallback: ~/.bezalel/heartbeats/.last + (used when /var/run/bezalel is not writable, e.g. dev machines) +Override: BEZALEL_HEARTBEAT_DIR environment variable + +Heartbeat file format (JSON) +---------------------------- +{ + "job": "nexus_watchdog", + "timestamp": 1744000000.0, + "interval_seconds": 300, + "pid": 12345, + "status": "ok" +} + +Usage in a cron job +------------------- + from nexus.cron_heartbeat import write_cron_heartbeat + + def main(): + # ... do the work ... + write_cron_heartbeat("my_job_name", interval_seconds=300) + +Zero-dependency shell one-liner (for scripts that can't import Python) +----------------------------------------------------------------------- + python -c " + from nexus.cron_heartbeat import write_cron_heartbeat + write_cron_heartbeat('my_job', interval_seconds=300) + " + +Refs: #1096 +""" + +from __future__ import annotations + +import json +import os +import tempfile +import time +from pathlib import Path + +PRIMARY_HEARTBEAT_DIR = Path("/var/run/bezalel/heartbeats") +FALLBACK_HEARTBEAT_DIR = Path.home() / ".bezalel" / "heartbeats" + + +def _resolve_heartbeat_dir() -> Path: + """Return the heartbeat directory, trying primary then fallback. + + If BEZALEL_HEARTBEAT_DIR is set in the environment that wins outright + (useful for tests and non-standard deployments). + """ + env = os.environ.get("BEZALEL_HEARTBEAT_DIR") + if env: + return Path(env) + + # Try to create and write-test the primary path + try: + PRIMARY_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True) + probe = PRIMARY_HEARTBEAT_DIR / ".write_probe" + probe.touch() + probe.unlink() + return PRIMARY_HEARTBEAT_DIR + except (PermissionError, OSError): + pass + + FALLBACK_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True) + return FALLBACK_HEARTBEAT_DIR + + +def heartbeat_path(job: str, directory: Path | None = None) -> Path: + """Return the Path where *job*'s heartbeat file lives. + + Useful for readers (e.g. the Night Watch report) that just need the + location without writing anything. + """ + d = directory if directory is not None else _resolve_heartbeat_dir() + return d / f"{job}.last" + + +def write_cron_heartbeat( + job: str, + interval_seconds: int, + status: str = "ok", + directory: Path | None = None, +) -> Path: + """Write a poka-yoke heartbeat file for a cron job. + + Call this at the end of your job's main function. The file is written + atomically (write-to-temp + rename) so the checker never reads a partial + file. + + Args: + job: Unique job name, e.g. ``"nexus_watchdog"``. + interval_seconds: Expected run cadence, e.g. ``300`` for every 5 min. + status: Completion status: ``"ok"``, ``"warn"``, or + ``"error"``. Only ``"ok"`` resets the stale clock. + directory: Override the heartbeat directory (mainly for tests). + + Returns: + Path to the written heartbeat file. + """ + d = directory if directory is not None else _resolve_heartbeat_dir() + d.mkdir(parents=True, exist_ok=True) + path = d / f"{job}.last" + + data = { + "job": job, + "timestamp": time.time(), + "interval_seconds": interval_seconds, + "pid": os.getpid(), + "status": status, + } + + # Atomic write: temp file in same directory + rename. + # Guarantees the checker never sees a half-written file. + fd, tmp = tempfile.mkstemp(dir=str(d), prefix=f".{job}-", suffix=".tmp") + try: + with os.fdopen(fd, "w") as f: + json.dump(data, f) + os.replace(tmp, str(path)) + except Exception: + # Best-effort — never crash the job over a heartbeat failure + try: + os.unlink(tmp) + except OSError: + pass + + return path diff --git a/tests/test_cron_heartbeats.py b/tests/test_cron_heartbeats.py new file mode 100644 index 0000000..e489091 --- /dev/null +++ b/tests/test_cron_heartbeats.py @@ -0,0 +1,341 @@ +"""Tests for the poka-yoke cron heartbeat system. + +Covers: + - nexus/cron_heartbeat.py (write utility) + - bin/check_cron_heartbeats.py (meta-heartbeat checker) + +Refs: #1096 +""" + +from __future__ import annotations + +import importlib.util +import json +import os +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# ── Load modules under test ─────────────────────────────────────────── + +PROJECT_ROOT = Path(__file__).parent.parent + +_hb_spec = importlib.util.spec_from_file_location( + "_cron_heartbeat", + PROJECT_ROOT / "nexus" / "cron_heartbeat.py", +) +_hb = importlib.util.module_from_spec(_hb_spec) +sys.modules["_cron_heartbeat"] = _hb +_hb_spec.loader.exec_module(_hb) + +_chk_spec = importlib.util.spec_from_file_location( + "_check_cron_heartbeats", + PROJECT_ROOT / "bin" / "check_cron_heartbeats.py", +) +_chk = importlib.util.module_from_spec(_chk_spec) +sys.modules["_check_cron_heartbeats"] = _chk +_chk_spec.loader.exec_module(_chk) + +write_cron_heartbeat = _hb.write_cron_heartbeat +heartbeat_path = _hb.heartbeat_path + +scan_heartbeats = _chk.scan_heartbeats +build_report = _chk.build_report +HeartbeatReport = _chk.HeartbeatReport +JobStatus = _chk.JobStatus +_read_job_status = _chk._read_job_status +_fmt_duration = _chk._fmt_duration + + +# ── nexus/cron_heartbeat.py ─────────────────────────────────────────── + +class TestWriteCronHeartbeat: + def test_creates_file(self, tmp_path): + """write_cron_heartbeat creates .last in the given directory.""" + path = write_cron_heartbeat("my_job", interval_seconds=300, directory=tmp_path) + assert path == tmp_path / "my_job.last" + assert path.exists() + + def test_file_content(self, tmp_path): + """Written file has all required fields.""" + write_cron_heartbeat("my_job", interval_seconds=600, status="ok", directory=tmp_path) + data = json.loads((tmp_path / "my_job.last").read_text()) + assert data["job"] == "my_job" + assert data["interval_seconds"] == 600 + assert data["status"] == "ok" + assert data["pid"] == os.getpid() + assert abs(data["timestamp"] - time.time()) < 2 + + def test_atomic_write_no_temp_files(self, tmp_path): + """No temporary files remain after a successful write.""" + write_cron_heartbeat("my_job", interval_seconds=300, directory=tmp_path) + files = list(tmp_path.iterdir()) + assert len(files) == 1 + assert files[0].name == "my_job.last" + + def test_overwrites_cleanly(self, tmp_path): + """Successive writes update, not append.""" + write_cron_heartbeat("j", interval_seconds=60, status="ok", directory=tmp_path) + write_cron_heartbeat("j", interval_seconds=60, status="warn", directory=tmp_path) + data = json.loads((tmp_path / "j.last").read_text()) + assert data["status"] == "warn" + + def test_creates_parent_dirs(self, tmp_path): + """Parent directories are created as needed.""" + deep_dir = tmp_path / "a" / "b" / "c" + write_cron_heartbeat("j", interval_seconds=60, directory=deep_dir) + assert (deep_dir / "j.last").exists() + + def test_heartbeat_path_helper(self, tmp_path): + """heartbeat_path() returns the correct path without writing.""" + p = heartbeat_path("myjob", directory=tmp_path) + assert p == tmp_path / "myjob.last" + assert not p.exists() + + def test_env_var_override(self, tmp_path, monkeypatch): + """BEZALEL_HEARTBEAT_DIR env var overrides the directory.""" + monkeypatch.setenv("BEZALEL_HEARTBEAT_DIR", str(tmp_path)) + # Call without directory= so it uses the env var + path = write_cron_heartbeat("env_job", interval_seconds=120) + assert path.parent == tmp_path + + +# ── bin/check_cron_heartbeats.py ───────────────────────────────────── + +class TestScanHeartbeats: + def test_empty_dir(self, tmp_path): + """No .last files → empty list.""" + assert scan_heartbeats(tmp_path) == [] + + def test_nonexistent_dir(self, tmp_path): + """Missing directory → empty list (no exception).""" + assert scan_heartbeats(tmp_path / "nope") == [] + + def test_healthy_job(self, tmp_path): + """Fresh heartbeat → healthy.""" + (tmp_path / "myjob.last").write_text(json.dumps({ + "job": "myjob", + "timestamp": time.time(), + "interval_seconds": 300, + "pid": 1, + "status": "ok", + })) + jobs = scan_heartbeats(tmp_path) + assert len(jobs) == 1 + assert jobs[0].healthy is True + assert jobs[0].job == "myjob" + + def test_stale_job(self, tmp_path): + """Heartbeat older than 2× interval → stale.""" + (tmp_path / "slow.last").write_text(json.dumps({ + "job": "slow", + "timestamp": time.time() - 700, # 11.7 minutes + "interval_seconds": 300, # 5 min interval → ratio 2.33 + "pid": 1, + "status": "ok", + })) + jobs = scan_heartbeats(tmp_path) + assert jobs[0].healthy is False + assert jobs[0].staleness_ratio > 2.0 + + def test_missing_heartbeat_file(self, tmp_path): + """_read_job_status handles a file that disappears mid-scan.""" + ghost_path = tmp_path / "ghost.last" + status = _read_job_status("ghost", ghost_path) + assert status.healthy is False + assert "missing" in status.raw_status + + def test_corrupt_heartbeat(self, tmp_path): + """Corrupt JSON → unhealthy with 'corrupt' status.""" + p = tmp_path / "bad.last" + p.write_text("{not valid json") + status = _read_job_status("bad", p) + assert status.healthy is False + assert "corrupt" in status.raw_status + + def test_multiple_jobs(self, tmp_path): + """Multiple .last files are all reported.""" + for i, name in enumerate(["alpha", "beta", "gamma"]): + (tmp_path / f"{name}.last").write_text(json.dumps({ + "job": name, + "timestamp": time.time() - i * 10, + "interval_seconds": 300, + "pid": i + 1, + "status": "ok", + })) + jobs = scan_heartbeats(tmp_path) + assert len(jobs) == 3 + job_names = {j.job for j in jobs} + assert job_names == {"alpha", "beta", "gamma"} + + def test_non_last_files_ignored(self, tmp_path): + """.json and other extensions are ignored.""" + (tmp_path / "other.json").write_text("{}") + (tmp_path / "notes.txt").write_text("hello") + assert scan_heartbeats(tmp_path) == [] + + +class TestHeartbeatReport: + def _fresh_job(self, name="j"): + return JobStatus( + job=name, path=Path(f"/tmp/{name}.last"), + healthy=True, age_seconds=30, interval_seconds=300, + staleness_ratio=0.1, last_timestamp=time.time() - 30, + pid=1, raw_status="ok", + message="Last beat 30s ago (ratio 0.1x)", + ) + + def _stale_job(self, name="s"): + return JobStatus( + job=name, path=Path(f"/tmp/{name}.last"), + healthy=False, age_seconds=700, interval_seconds=300, + staleness_ratio=2.33, last_timestamp=time.time() - 700, + pid=1, raw_status="stale", + message="Silent for 11m 40s (2.3x interval of 5m 0s)", + ) + + def test_overall_healthy(self): + report = HeartbeatReport( + timestamp=time.time(), + heartbeat_dir=Path("/tmp"), + jobs=[self._fresh_job()], + ) + assert report.overall_healthy is True + + def test_overall_unhealthy(self): + report = HeartbeatReport( + timestamp=time.time(), + heartbeat_dir=Path("/tmp"), + jobs=[self._fresh_job(), self._stale_job()], + ) + assert report.overall_healthy is False + assert len(report.stale_jobs) == 1 + + def test_panel_markdown_contains_table(self): + report = HeartbeatReport( + timestamp=time.time(), + heartbeat_dir=Path("/tmp"), + jobs=[self._fresh_job("alpha"), self._stale_job("beta")], + ) + panel = report.to_panel_markdown() + assert "## Heartbeat Panel" in panel + assert "| `alpha` |" in panel + assert "| `beta` |" in panel + assert "STALE" in panel + assert "OK" in panel + assert "**Overall:** ALERT" in panel + + def test_panel_markdown_no_jobs(self): + report = HeartbeatReport( + timestamp=time.time(), + heartbeat_dir=Path("/tmp"), + jobs=[], + ) + panel = report.to_panel_markdown() + assert "no heartbeat files found" in panel + + def test_panel_overall_ok(self): + report = HeartbeatReport( + timestamp=time.time(), + heartbeat_dir=Path("/tmp"), + jobs=[self._fresh_job()], + ) + panel = report.to_panel_markdown() + assert "**Overall:** OK" in panel + + def test_alert_body_lists_stale_jobs(self): + report = HeartbeatReport( + timestamp=time.time(), + heartbeat_dir=Path("/tmp"), + jobs=[self._stale_job("slow")], + ) + body = report.to_alert_body() + assert "slow" in body + assert "STALE" in body.upper() or "stale" in body.lower() or "silent" in body.lower() + assert "crontab" in body.lower() + + def test_to_json(self): + report = HeartbeatReport( + timestamp=time.time(), + heartbeat_dir=Path("/tmp"), + jobs=[self._fresh_job()], + ) + data = report.to_json() + assert data["healthy"] is True + assert len(data["jobs"]) == 1 + assert data["jobs"][0]["job"] == "j" + + +class TestFmtDuration: + def test_seconds(self): + assert _fmt_duration(45) == "45s" + + def test_minutes(self): + assert _fmt_duration(90) == "1m 30s" + + def test_hours(self): + assert _fmt_duration(3661) == "1h 1m" + + +class TestBuildReport: + def test_build_report_with_dir(self, tmp_path): + """build_report() uses the given directory.""" + (tmp_path / "myjob.last").write_text(json.dumps({ + "job": "myjob", + "timestamp": time.time(), + "interval_seconds": 300, + "pid": 1, + "status": "ok", + })) + report = build_report(directory=tmp_path) + assert len(report.jobs) == 1 + assert report.overall_healthy is True + + def test_build_report_empty_dir(self, tmp_path): + report = build_report(directory=tmp_path) + assert report.jobs == [] + assert report.overall_healthy is True # nothing stale = healthy + + +# ── Integration: nexus_watchdog writes its own heartbeat ───────────── + +class TestWatchdogHeartbeatIntegration: + """Verify nexus_watchdog.py writes a cron heartbeat after run_once().""" + + def test_watchdog_writes_cron_heartbeat(self, tmp_path, monkeypatch): + """After run_once, nexus_watchdog writes nexus_watchdog.last.""" + monkeypatch.setenv("BEZALEL_HEARTBEAT_DIR", str(tmp_path)) + + # Load watchdog module + spec = importlib.util.spec_from_file_location( + "_watchdog_hb_test", + PROJECT_ROOT / "bin" / "nexus_watchdog.py", + ) + wd = importlib.util.module_from_spec(spec) + sys.modules["_watchdog_hb_test"] = wd + + # Patch out network calls + with patch("socket.socket") as mock_sock, \ + patch("subprocess.run") as mock_run: + mock_sock.return_value.connect_ex.return_value = 111 # port closed + mock_run.return_value = MagicMock(returncode=1, stdout="") + + spec.loader.exec_module(wd) + + args = MagicMock() + args.ws_host = "localhost" + args.ws_port = 8765 + args.heartbeat_path = str(tmp_path / "nexus_heartbeat.json") + args.stale_threshold = 300 + args.dry_run = True # don't touch Gitea + + wd.run_once(args) + + hb_file = tmp_path / "nexus_watchdog.last" + assert hb_file.exists(), "nexus_watchdog.last was not written" + data = json.loads(hb_file.read_text()) + assert data["job"] == "nexus_watchdog" + assert data["interval_seconds"] == 300