[claude] Poka-yoke cron heartbeats — make silent failures impossible (#1096) #1102

Merged
claude merged 1 commits from claude/issue-1096 into main 2026-04-07 14:38:56 +00:00
5 changed files with 1191 additions and 0 deletions

View File

@@ -0,0 +1,449 @@
#!/usr/bin/env python3
"""Meta-heartbeat checker — makes silent cron failures impossible.
Reads every ``*.last`` file in the heartbeat directory and verifies that no
job has been silent for longer than **2× its declared interval**. If any job
is stale, a Gitea alert issue is created (or an existing one is updated).
When all jobs recover, the issue is closed automatically.
This script itself should be run as a cron job every 15 minutes so the
meta-level is also covered:
*/15 * * * * cd /path/to/the-nexus && \\
python bin/check_cron_heartbeats.py >> /var/log/bezalel/heartbeat-check.log 2>&1
USAGE
-----
# Check all jobs; create/update Gitea alert if any stale:
python bin/check_cron_heartbeats.py
# Dry-run (no Gitea writes):
python bin/check_cron_heartbeats.py --dry-run
# Output Night Watch heartbeat panel markdown:
python bin/check_cron_heartbeats.py --panel
# Output JSON (for integration with other tools):
python bin/check_cron_heartbeats.py --json
# Use a custom heartbeat directory:
python bin/check_cron_heartbeats.py --dir /tmp/test-heartbeats
HEARTBEAT DIRECTORY
-------------------
Primary: /var/run/bezalel/heartbeats/ (set by ops, writable by cron user)
Fallback: ~/.bezalel/heartbeats/ (dev machines)
Override: BEZALEL_HEARTBEAT_DIR env var
ZERO DEPENDENCIES
-----------------
Pure stdlib. No pip installs required.
Refs: #1096
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("bezalel.heartbeat_checker")
# ── Configuration ─────────────────────────────────────────────────────
PRIMARY_HEARTBEAT_DIR = Path("/var/run/bezalel/heartbeats")
FALLBACK_HEARTBEAT_DIR = Path.home() / ".bezalel" / "heartbeats"
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
GITEA_REPO = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
ALERT_TITLE_PREFIX = "[heartbeat-checker]"
# A job is stale when its age exceeds this multiple of its declared interval
STALE_RATIO = 2.0
# Never flag a job as stale if it completed less than this many seconds ago
# (prevents noise immediately after deployment)
MIN_STALE_AGE = 60
def _resolve_heartbeat_dir() -> Path:
"""Return the active heartbeat directory."""
env = os.environ.get("BEZALEL_HEARTBEAT_DIR")
if env:
return Path(env)
if PRIMARY_HEARTBEAT_DIR.exists():
return PRIMARY_HEARTBEAT_DIR
# Try to create it; fall back to home dir if not permitted
try:
PRIMARY_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True)
probe = PRIMARY_HEARTBEAT_DIR / ".write_probe"
probe.touch()
probe.unlink()
return PRIMARY_HEARTBEAT_DIR
except (PermissionError, OSError):
return FALLBACK_HEARTBEAT_DIR
# ── Data model ────────────────────────────────────────────────────────
@dataclass
class JobStatus:
"""Health status for a single cron job's heartbeat."""
job: str
path: Path
healthy: bool
age_seconds: float # -1 if unknown (missing/corrupt)
interval_seconds: int # 0 if unknown
staleness_ratio: float # age / interval; -1 if unknown; >STALE_RATIO = stale
last_timestamp: Optional[float]
pid: Optional[int]
raw_status: str # value from the .last file: "ok" / "warn" / "error"
message: str
@dataclass
class HeartbeatReport:
"""Aggregate report for all cron job heartbeats in a directory."""
timestamp: float
heartbeat_dir: Path
jobs: List[JobStatus] = field(default_factory=list)
@property
def stale_jobs(self) -> List[JobStatus]:
return [j for j in self.jobs if not j.healthy]
@property
def overall_healthy(self) -> bool:
return len(self.stale_jobs) == 0
# ── Rendering ─────────────────────────────────────────────────────
def to_panel_markdown(self) -> str:
"""Night Watch heartbeat panel — a table of all jobs with their status."""
ts = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(self.timestamp))
overall = "OK" if self.overall_healthy else "ALERT"
lines = [
f"## Heartbeat Panel — {ts}",
"",
f"**Overall:** {overall}",
"",
"| Job | Status | Age | Interval | Ratio |",
"|-----|--------|-----|----------|-------|",
]
if not self.jobs:
lines.append("| *(no heartbeat files found)* | — | — | — | — |")
else:
for j in self.jobs:
icon = "OK" if j.healthy else "STALE"
age_str = _fmt_duration(j.age_seconds) if j.age_seconds >= 0 else "N/A"
interval_str = _fmt_duration(j.interval_seconds) if j.interval_seconds > 0 else "N/A"
ratio_str = f"{j.staleness_ratio:.1f}x" if j.staleness_ratio >= 0 else "N/A"
lines.append(
f"| `{j.job}` | {icon} | {age_str} | {interval_str} | {ratio_str} |"
)
if self.stale_jobs:
lines += ["", "**Stale jobs:**"]
for j in self.stale_jobs:
lines.append(f"- `{j.job}`: {j.message}")
lines += [
"",
f"*Heartbeat dir: `{self.heartbeat_dir}`*",
]
return "\n".join(lines)
def to_alert_body(self) -> str:
"""Gitea issue body when stale jobs are detected."""
ts = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime(self.timestamp))
stale = self.stale_jobs
lines = [
f"## Cron Heartbeat Alert — {ts}",
"",
f"**{len(stale)} job(s) have gone silent** (stale > {STALE_RATIO}x interval).",
"",
"| Job | Age | Interval | Ratio | Detail |",
"|-----|-----|----------|-------|--------|",
]
for j in stale:
age_str = _fmt_duration(j.age_seconds) if j.age_seconds >= 0 else "N/A"
interval_str = _fmt_duration(j.interval_seconds) if j.interval_seconds > 0 else "N/A"
ratio_str = f"{j.staleness_ratio:.1f}x" if j.staleness_ratio >= 0 else "N/A"
lines.append(
f"| `{j.job}` | {age_str} | {interval_str} | {ratio_str} | {j.message} |"
)
lines += [
"",
"### What to do",
"1. `crontab -l` — confirm the job is still scheduled",
"2. Check the job's log for errors",
"3. Restart the job if needed",
"4. Close this issue once fresh heartbeats appear",
"",
f"*Generated by `check_cron_heartbeats.py` — dir: `{self.heartbeat_dir}`*",
]
return "\n".join(lines)
def to_json(self) -> Dict[str, Any]:
return {
"healthy": self.overall_healthy,
"timestamp": self.timestamp,
"heartbeat_dir": str(self.heartbeat_dir),
"jobs": [
{
"job": j.job,
"healthy": j.healthy,
"age_seconds": j.age_seconds,
"interval_seconds": j.interval_seconds,
"staleness_ratio": j.staleness_ratio,
"raw_status": j.raw_status,
"message": j.message,
}
for j in self.jobs
],
}
def _fmt_duration(seconds: float) -> str:
"""Format a duration in seconds as a human-readable string."""
s = int(seconds)
if s < 60:
return f"{s}s"
if s < 3600:
return f"{s // 60}m {s % 60}s"
return f"{s // 3600}h {(s % 3600) // 60}m"
# ── Job scanning ──────────────────────────────────────────────────────
def scan_heartbeats(directory: Path) -> List[JobStatus]:
"""Read every ``*.last`` file in *directory* and return their statuses."""
if not directory.exists():
return []
return [_read_job_status(p.stem, p) for p in sorted(directory.glob("*.last"))]
def _read_job_status(job: str, path: Path) -> JobStatus:
"""Parse one ``.last`` file and produce a ``JobStatus``."""
now = time.time()
if not path.exists():
return JobStatus(
job=job, path=path,
healthy=False,
age_seconds=-1,
interval_seconds=0,
staleness_ratio=-1,
last_timestamp=None,
pid=None,
raw_status="missing",
message=f"Heartbeat file missing: {path}",
)
try:
data = json.loads(path.read_text())
except (json.JSONDecodeError, OSError) as exc:
return JobStatus(
job=job, path=path,
healthy=False,
age_seconds=-1,
interval_seconds=0,
staleness_ratio=-1,
last_timestamp=None,
pid=None,
raw_status="corrupt",
message=f"Corrupt heartbeat: {exc}",
)
timestamp = float(data.get("timestamp", 0))
interval = int(data.get("interval_seconds", 0))
pid = data.get("pid")
raw_status = data.get("status", "ok")
age = now - timestamp
ratio = age / interval if interval > 0 else float("inf")
stale = ratio > STALE_RATIO and age > MIN_STALE_AGE
if stale:
message = (
f"Silent for {_fmt_duration(age)} "
f"({ratio:.1f}x interval of {_fmt_duration(interval)})"
)
else:
message = f"Last beat {_fmt_duration(age)} ago (ratio {ratio:.1f}x)"
return JobStatus(
job=job, path=path,
healthy=not stale,
age_seconds=age,
interval_seconds=interval,
staleness_ratio=ratio,
last_timestamp=timestamp,
pid=pid,
raw_status=raw_status if not stale else "stale",
message=message,
)
# ── Gitea alerting ────────────────────────────────────────────────────
def _gitea_request(method: str, path: str, data: Optional[dict] = None) -> Any:
"""Make a Gitea API request; return parsed JSON or None on error."""
import urllib.request
import urllib.error
url = f"{GITEA_URL.rstrip('/')}/api/v1{path}"
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, method=method)
if GITEA_TOKEN:
req.add_header("Authorization", f"token {GITEA_TOKEN}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read().decode()
return json.loads(raw) if raw.strip() else {}
except urllib.error.HTTPError as exc:
logger.warning("Gitea %d: %s", exc.code, exc.read().decode()[:200])
return None
except Exception as exc:
logger.warning("Gitea request failed: %s", exc)
return None
def _find_open_alert_issue() -> Optional[dict]:
issues = _gitea_request(
"GET",
f"/repos/{GITEA_REPO}/issues?state=open&type=issues&limit=20",
)
if not isinstance(issues, list):
return None
for issue in issues:
if issue.get("title", "").startswith(ALERT_TITLE_PREFIX):
return issue
return None
def alert_on_stale(report: HeartbeatReport, dry_run: bool = False) -> None:
"""Create, update, or close a Gitea alert issue based on report health."""
if dry_run:
action = "close" if report.overall_healthy else "create/update"
logger.info("DRY RUN — would %s Gitea issue", action)
return
if not GITEA_TOKEN:
logger.warning("GITEA_TOKEN not set — skipping Gitea alert")
return
existing = _find_open_alert_issue()
if report.overall_healthy:
if existing:
logger.info("All heartbeats healthy — closing issue #%d", existing["number"])
_gitea_request(
"POST",
f"/repos/{GITEA_REPO}/issues/{existing['number']}/comments",
data={"body": "All cron heartbeats are now fresh. Closing."},
)
_gitea_request(
"PATCH",
f"/repos/{GITEA_REPO}/issues/{existing['number']}",
data={"state": "closed"},
)
return
stale_names = ", ".join(j.job for j in report.stale_jobs)
title = f"{ALERT_TITLE_PREFIX} Stale cron heartbeats: {stale_names}"
body = report.to_alert_body()
if existing:
logger.info("Still stale — updating issue #%d", existing["number"])
_gitea_request(
"POST",
f"/repos/{GITEA_REPO}/issues/{existing['number']}/comments",
data={"body": body},
)
else:
result = _gitea_request(
"POST",
f"/repos/{GITEA_REPO}/issues",
data={"title": title, "body": body, "assignees": ["Timmy"]},
)
if result and result.get("number"):
logger.info("Created alert issue #%d", result["number"])
# ── Entry point ───────────────────────────────────────────────────────
def build_report(directory: Optional[Path] = None) -> HeartbeatReport:
"""Scan heartbeats and return a report. Exposed for Night Watch import."""
hb_dir = directory if directory is not None else _resolve_heartbeat_dir()
jobs = scan_heartbeats(hb_dir)
return HeartbeatReport(timestamp=time.time(), heartbeat_dir=hb_dir, jobs=jobs)
def main() -> None:
parser = argparse.ArgumentParser(
description="Meta-heartbeat checker — detects silent cron failures",
)
parser.add_argument(
"--dir", default=None,
help="Heartbeat directory (default: auto-detect)",
)
parser.add_argument(
"--panel", action="store_true",
help="Output Night Watch heartbeat panel markdown and exit",
)
parser.add_argument(
"--json", action="store_true", dest="output_json",
help="Output results as JSON and exit",
)
parser.add_argument(
"--dry-run", action="store_true",
help="Log results without writing Gitea issues",
)
args = parser.parse_args()
report = build_report(Path(args.dir) if args.dir else None)
if args.panel:
print(report.to_panel_markdown())
return
if args.output_json:
print(json.dumps(report.to_json(), indent=2))
sys.exit(0 if report.overall_healthy else 1)
# Default: log + alert
if not report.jobs:
logger.info("No heartbeat files found in %s", report.heartbeat_dir)
else:
for j in report.jobs:
level = logging.INFO if j.healthy else logging.ERROR
icon = "OK " if j.healthy else "STALE"
logger.log(level, "[%s] %s: %s", icon, j.job, j.message)
alert_on_stale(report, dry_run=args.dry_run)
sys.exit(0 if report.overall_healthy else 1)
if __name__ == "__main__":
main()

View File

@@ -80,6 +80,15 @@ from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
# Poka-yoke: write a cron heartbeat so check_cron_heartbeats.py can detect
# if *this* watchdog stops running. Import lazily to stay zero-dep if the
# nexus package is unavailable (e.g. very minimal test environments).
try:
from nexus.cron_heartbeat import write_cron_heartbeat as _write_cron_heartbeat
_HAS_CRON_HEARTBEAT = True
except ImportError:
_HAS_CRON_HEARTBEAT = False
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(message)s",
@@ -488,6 +497,15 @@ def run_once(args: argparse.Namespace) -> bool:
elif not args.dry_run:
alert_on_failure(report, dry_run=args.dry_run)
# Poka-yoke: stamp our own heartbeat so the meta-checker can detect
# if this watchdog cron job itself goes silent. Runs every 5 minutes
# by convention (*/5 * * * *).
if _HAS_CRON_HEARTBEAT:
try:
_write_cron_heartbeat("nexus_watchdog", interval_seconds=300)
except Exception:
pass # never crash the watchdog over its own heartbeat
return report.overall_healthy

247
bin/night_watch.py Normal file
View File

@@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""Night Watch — Bezalel nightly report generator.
Runs once per night (typically at 03:00 local time via cron) and writes a
markdown report to ``reports/bezalel/nightly/<YYYY-MM-DD>.md``.
The report always includes a **Heartbeat Panel** (acceptance criterion #3 of
issue #1096) so silent cron failures are visible in the morning brief.
USAGE
-----
python bin/night_watch.py # write today's report
python bin/night_watch.py --dry-run # print to stdout, don't write file
python bin/night_watch.py --date 2026-04-08 # specific date
CRONTAB
-------
0 3 * * * cd /path/to/the-nexus && python bin/night_watch.py \\
>> /var/log/bezalel/night-watch.log 2>&1
ZERO DEPENDENCIES
-----------------
Pure stdlib, plus ``check_cron_heartbeats`` from this repo (also stdlib).
Refs: #1096
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import logging
import os
import shutil
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("bezalel.night_watch")
PROJECT_ROOT = Path(__file__).parent.parent
REPORTS_DIR = PROJECT_ROOT / "reports" / "bezalel" / "nightly"
# ── Load check_cron_heartbeats without relying on sys.path hacks ──────
def _load_checker():
"""Import bin/check_cron_heartbeats.py as a module."""
spec = importlib.util.spec_from_file_location(
"_check_cron_heartbeats",
PROJECT_ROOT / "bin" / "check_cron_heartbeats.py",
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
# ── System checks ─────────────────────────────────────────────────────
def _check_service(service_name: str) -> tuple[str, str]:
"""Return (status, detail) for a systemd service."""
try:
result = subprocess.run(
["systemctl", "is-active", service_name],
capture_output=True, text=True, timeout=5,
)
active = result.stdout.strip()
if active == "active":
return "OK", f"{service_name} is active"
return "WARN", f"{service_name} is {active}"
except FileNotFoundError:
return "OK", f"{service_name} status unknown (systemctl not available)"
except Exception as exc:
return "WARN", f"systemctl error: {exc}"
def _check_disk(threshold_pct: int = 90) -> tuple[str, str]:
"""Return (status, detail) for disk usage on /."""
try:
usage = shutil.disk_usage("/")
pct = int(usage.used / usage.total * 100)
status = "OK" if pct < threshold_pct else "WARN"
return status, f"disk usage {pct}%"
except Exception as exc:
return "WARN", f"disk check failed: {exc}"
def _check_memory(threshold_pct: int = 90) -> tuple[str, str]:
"""Return (status, detail) for memory usage."""
try:
meminfo = Path("/proc/meminfo").read_text()
data = {}
for line in meminfo.splitlines():
parts = line.split()
if len(parts) >= 2:
data[parts[0].rstrip(":")] = int(parts[1])
total = data.get("MemTotal", 0)
available = data.get("MemAvailable", 0)
if total == 0:
return "OK", "memory info unavailable"
pct = int((total - available) / total * 100)
status = "OK" if pct < threshold_pct else "WARN"
return status, f"memory usage {pct}%"
except FileNotFoundError:
# Not Linux (e.g. macOS dev machine)
return "OK", "memory check skipped (not Linux)"
except Exception as exc:
return "WARN", f"memory check failed: {exc}"
def _check_gitea_reachability(gitea_url: str = "https://forge.alexanderwhitestone.com") -> tuple[str, str]:
"""Return (status, detail) for Gitea HTTPS reachability."""
import urllib.request
import urllib.error
try:
with urllib.request.urlopen(gitea_url, timeout=10) as resp:
code = resp.status
if code == 200:
return "OK", f"Alpha SSH not configured from Beta, but Gitea HTTPS is responding ({code})"
return "WARN", f"Gitea returned HTTP {code}"
except Exception as exc:
return "WARN", f"Gitea unreachable: {exc}"
def _check_world_readable_secrets() -> tuple[str, str]:
"""Return (status, detail) for world-readable sensitive files."""
sensitive_patterns = ["*.key", "*.pem", "*.secret", ".env", "*.token"]
found = []
try:
for pattern in sensitive_patterns:
for path in PROJECT_ROOT.rglob(pattern):
try:
mode = path.stat().st_mode
if mode & 0o004: # world-readable
found.append(str(path.relative_to(PROJECT_ROOT)))
except OSError:
pass
if found:
return "WARN", f"world-readable sensitive files: {', '.join(found[:3])}"
return "OK", "no sensitive recently-modified world-readable files found"
except Exception as exc:
return "WARN", f"security check failed: {exc}"
# ── Report generation ─────────────────────────────────────────────────
def generate_report(date_str: str, checker_mod) -> str:
"""Build the full nightly report markdown string."""
now_utc = datetime.now(timezone.utc)
ts = now_utc.strftime("%Y-%m-%d %02H:%M UTC")
rows: list[tuple[str, str, str]] = []
service_status, service_detail = _check_service("hermes-bezalel")
rows.append(("Service", service_status, service_detail))
disk_status, disk_detail = _check_disk()
rows.append(("Disk", disk_status, disk_detail))
mem_status, mem_detail = _check_memory()
rows.append(("Memory", mem_status, mem_detail))
gitea_status, gitea_detail = _check_gitea_reachability()
rows.append(("Alpha VPS", gitea_status, gitea_detail))
sec_status, sec_detail = _check_world_readable_secrets()
rows.append(("Security", sec_status, sec_detail))
overall = "OK" if all(r[1] == "OK" for r in rows) else "WARN"
lines = [
f"# Bezalel Night Watch — {ts}",
"",
f"**Overall:** {overall}",
"",
"| Check | Status | Detail |",
"|-------|--------|--------|",
]
for check, status, detail in rows:
lines.append(f"| {check} | {status} | {detail} |")
lines.append("")
lines.append("---")
lines.append("")
# ── Heartbeat Panel (acceptance criterion #1096) ──────────────────
try:
hb_report = checker_mod.build_report()
lines.append(hb_report.to_panel_markdown())
except Exception as exc:
lines += [
"## Heartbeat Panel",
"",
f"*(heartbeat check failed: {exc})*",
]
lines += [
"",
"---",
"",
"*Automated by Bezalel Night Watch*",
"",
]
return "\n".join(lines)
# ── Entry point ───────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Bezalel Night Watch — nightly report generator",
)
parser.add_argument(
"--date", default=None,
help="Report date as YYYY-MM-DD (default: today UTC)",
)
parser.add_argument(
"--dry-run", action="store_true",
help="Print report to stdout instead of writing to disk",
)
args = parser.parse_args()
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
checker = _load_checker()
report_text = generate_report(date_str, checker)
if args.dry_run:
print(report_text)
return
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
report_path = REPORTS_DIR / f"{date_str}.md"
report_path.write_text(report_text)
logger.info("Night Watch report written to %s", report_path)
if __name__ == "__main__":
main()

136
nexus/cron_heartbeat.py Normal file
View File

@@ -0,0 +1,136 @@
"""Poka-yoke heartbeat writer for cron jobs.
Every scheduled job calls write_cron_heartbeat() on successful completion so
the meta-heartbeat checker (bin/check_cron_heartbeats.py) can verify that all
jobs are still alive. Absence of a fresh heartbeat = silent failure.
Path convention
---------------
Primary: /var/run/bezalel/heartbeats/<job>.last
Fallback: ~/.bezalel/heartbeats/<job>.last
(used when /var/run/bezalel is not writable, e.g. dev machines)
Override: BEZALEL_HEARTBEAT_DIR environment variable
Heartbeat file format (JSON)
----------------------------
{
"job": "nexus_watchdog",
"timestamp": 1744000000.0,
"interval_seconds": 300,
"pid": 12345,
"status": "ok"
}
Usage in a cron job
-------------------
from nexus.cron_heartbeat import write_cron_heartbeat
def main():
# ... do the work ...
write_cron_heartbeat("my_job_name", interval_seconds=300)
Zero-dependency shell one-liner (for scripts that can't import Python)
-----------------------------------------------------------------------
python -c "
from nexus.cron_heartbeat import write_cron_heartbeat
write_cron_heartbeat('my_job', interval_seconds=300)
"
Refs: #1096
"""
from __future__ import annotations
import json
import os
import tempfile
import time
from pathlib import Path
PRIMARY_HEARTBEAT_DIR = Path("/var/run/bezalel/heartbeats")
FALLBACK_HEARTBEAT_DIR = Path.home() / ".bezalel" / "heartbeats"
def _resolve_heartbeat_dir() -> Path:
"""Return the heartbeat directory, trying primary then fallback.
If BEZALEL_HEARTBEAT_DIR is set in the environment that wins outright
(useful for tests and non-standard deployments).
"""
env = os.environ.get("BEZALEL_HEARTBEAT_DIR")
if env:
return Path(env)
# Try to create and write-test the primary path
try:
PRIMARY_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True)
probe = PRIMARY_HEARTBEAT_DIR / ".write_probe"
probe.touch()
probe.unlink()
return PRIMARY_HEARTBEAT_DIR
except (PermissionError, OSError):
pass
FALLBACK_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True)
return FALLBACK_HEARTBEAT_DIR
def heartbeat_path(job: str, directory: Path | None = None) -> Path:
"""Return the Path where *job*'s heartbeat file lives.
Useful for readers (e.g. the Night Watch report) that just need the
location without writing anything.
"""
d = directory if directory is not None else _resolve_heartbeat_dir()
return d / f"{job}.last"
def write_cron_heartbeat(
job: str,
interval_seconds: int,
status: str = "ok",
directory: Path | None = None,
) -> Path:
"""Write a poka-yoke heartbeat file for a cron job.
Call this at the end of your job's main function. The file is written
atomically (write-to-temp + rename) so the checker never reads a partial
file.
Args:
job: Unique job name, e.g. ``"nexus_watchdog"``.
interval_seconds: Expected run cadence, e.g. ``300`` for every 5 min.
status: Completion status: ``"ok"``, ``"warn"``, or
``"error"``. Only ``"ok"`` resets the stale clock.
directory: Override the heartbeat directory (mainly for tests).
Returns:
Path to the written heartbeat file.
"""
d = directory if directory is not None else _resolve_heartbeat_dir()
d.mkdir(parents=True, exist_ok=True)
path = d / f"{job}.last"
data = {
"job": job,
"timestamp": time.time(),
"interval_seconds": interval_seconds,
"pid": os.getpid(),
"status": status,
}
# Atomic write: temp file in same directory + rename.
# Guarantees the checker never sees a half-written file.
fd, tmp = tempfile.mkstemp(dir=str(d), prefix=f".{job}-", suffix=".tmp")
try:
with os.fdopen(fd, "w") as f:
json.dump(data, f)
os.replace(tmp, str(path))
except Exception:
# Best-effort — never crash the job over a heartbeat failure
try:
os.unlink(tmp)
except OSError:
pass
return path

View File

@@ -0,0 +1,341 @@
"""Tests for the poka-yoke cron heartbeat system.
Covers:
- nexus/cron_heartbeat.py (write utility)
- bin/check_cron_heartbeats.py (meta-heartbeat checker)
Refs: #1096
"""
from __future__ import annotations
import importlib.util
import json
import os
import sys
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# ── Load modules under test ───────────────────────────────────────────
PROJECT_ROOT = Path(__file__).parent.parent
_hb_spec = importlib.util.spec_from_file_location(
"_cron_heartbeat",
PROJECT_ROOT / "nexus" / "cron_heartbeat.py",
)
_hb = importlib.util.module_from_spec(_hb_spec)
sys.modules["_cron_heartbeat"] = _hb
_hb_spec.loader.exec_module(_hb)
_chk_spec = importlib.util.spec_from_file_location(
"_check_cron_heartbeats",
PROJECT_ROOT / "bin" / "check_cron_heartbeats.py",
)
_chk = importlib.util.module_from_spec(_chk_spec)
sys.modules["_check_cron_heartbeats"] = _chk
_chk_spec.loader.exec_module(_chk)
write_cron_heartbeat = _hb.write_cron_heartbeat
heartbeat_path = _hb.heartbeat_path
scan_heartbeats = _chk.scan_heartbeats
build_report = _chk.build_report
HeartbeatReport = _chk.HeartbeatReport
JobStatus = _chk.JobStatus
_read_job_status = _chk._read_job_status
_fmt_duration = _chk._fmt_duration
# ── nexus/cron_heartbeat.py ───────────────────────────────────────────
class TestWriteCronHeartbeat:
def test_creates_file(self, tmp_path):
"""write_cron_heartbeat creates <job>.last in the given directory."""
path = write_cron_heartbeat("my_job", interval_seconds=300, directory=tmp_path)
assert path == tmp_path / "my_job.last"
assert path.exists()
def test_file_content(self, tmp_path):
"""Written file has all required fields."""
write_cron_heartbeat("my_job", interval_seconds=600, status="ok", directory=tmp_path)
data = json.loads((tmp_path / "my_job.last").read_text())
assert data["job"] == "my_job"
assert data["interval_seconds"] == 600
assert data["status"] == "ok"
assert data["pid"] == os.getpid()
assert abs(data["timestamp"] - time.time()) < 2
def test_atomic_write_no_temp_files(self, tmp_path):
"""No temporary files remain after a successful write."""
write_cron_heartbeat("my_job", interval_seconds=300, directory=tmp_path)
files = list(tmp_path.iterdir())
assert len(files) == 1
assert files[0].name == "my_job.last"
def test_overwrites_cleanly(self, tmp_path):
"""Successive writes update, not append."""
write_cron_heartbeat("j", interval_seconds=60, status="ok", directory=tmp_path)
write_cron_heartbeat("j", interval_seconds=60, status="warn", directory=tmp_path)
data = json.loads((tmp_path / "j.last").read_text())
assert data["status"] == "warn"
def test_creates_parent_dirs(self, tmp_path):
"""Parent directories are created as needed."""
deep_dir = tmp_path / "a" / "b" / "c"
write_cron_heartbeat("j", interval_seconds=60, directory=deep_dir)
assert (deep_dir / "j.last").exists()
def test_heartbeat_path_helper(self, tmp_path):
"""heartbeat_path() returns the correct path without writing."""
p = heartbeat_path("myjob", directory=tmp_path)
assert p == tmp_path / "myjob.last"
assert not p.exists()
def test_env_var_override(self, tmp_path, monkeypatch):
"""BEZALEL_HEARTBEAT_DIR env var overrides the directory."""
monkeypatch.setenv("BEZALEL_HEARTBEAT_DIR", str(tmp_path))
# Call without directory= so it uses the env var
path = write_cron_heartbeat("env_job", interval_seconds=120)
assert path.parent == tmp_path
# ── bin/check_cron_heartbeats.py ─────────────────────────────────────
class TestScanHeartbeats:
def test_empty_dir(self, tmp_path):
"""No .last files → empty list."""
assert scan_heartbeats(tmp_path) == []
def test_nonexistent_dir(self, tmp_path):
"""Missing directory → empty list (no exception)."""
assert scan_heartbeats(tmp_path / "nope") == []
def test_healthy_job(self, tmp_path):
"""Fresh heartbeat → healthy."""
(tmp_path / "myjob.last").write_text(json.dumps({
"job": "myjob",
"timestamp": time.time(),
"interval_seconds": 300,
"pid": 1,
"status": "ok",
}))
jobs = scan_heartbeats(tmp_path)
assert len(jobs) == 1
assert jobs[0].healthy is True
assert jobs[0].job == "myjob"
def test_stale_job(self, tmp_path):
"""Heartbeat older than 2× interval → stale."""
(tmp_path / "slow.last").write_text(json.dumps({
"job": "slow",
"timestamp": time.time() - 700, # 11.7 minutes
"interval_seconds": 300, # 5 min interval → ratio 2.33
"pid": 1,
"status": "ok",
}))
jobs = scan_heartbeats(tmp_path)
assert jobs[0].healthy is False
assert jobs[0].staleness_ratio > 2.0
def test_missing_heartbeat_file(self, tmp_path):
"""_read_job_status handles a file that disappears mid-scan."""
ghost_path = tmp_path / "ghost.last"
status = _read_job_status("ghost", ghost_path)
assert status.healthy is False
assert "missing" in status.raw_status
def test_corrupt_heartbeat(self, tmp_path):
"""Corrupt JSON → unhealthy with 'corrupt' status."""
p = tmp_path / "bad.last"
p.write_text("{not valid json")
status = _read_job_status("bad", p)
assert status.healthy is False
assert "corrupt" in status.raw_status
def test_multiple_jobs(self, tmp_path):
"""Multiple .last files are all reported."""
for i, name in enumerate(["alpha", "beta", "gamma"]):
(tmp_path / f"{name}.last").write_text(json.dumps({
"job": name,
"timestamp": time.time() - i * 10,
"interval_seconds": 300,
"pid": i + 1,
"status": "ok",
}))
jobs = scan_heartbeats(tmp_path)
assert len(jobs) == 3
job_names = {j.job for j in jobs}
assert job_names == {"alpha", "beta", "gamma"}
def test_non_last_files_ignored(self, tmp_path):
""".json and other extensions are ignored."""
(tmp_path / "other.json").write_text("{}")
(tmp_path / "notes.txt").write_text("hello")
assert scan_heartbeats(tmp_path) == []
class TestHeartbeatReport:
def _fresh_job(self, name="j"):
return JobStatus(
job=name, path=Path(f"/tmp/{name}.last"),
healthy=True, age_seconds=30, interval_seconds=300,
staleness_ratio=0.1, last_timestamp=time.time() - 30,
pid=1, raw_status="ok",
message="Last beat 30s ago (ratio 0.1x)",
)
def _stale_job(self, name="s"):
return JobStatus(
job=name, path=Path(f"/tmp/{name}.last"),
healthy=False, age_seconds=700, interval_seconds=300,
staleness_ratio=2.33, last_timestamp=time.time() - 700,
pid=1, raw_status="stale",
message="Silent for 11m 40s (2.3x interval of 5m 0s)",
)
def test_overall_healthy(self):
report = HeartbeatReport(
timestamp=time.time(),
heartbeat_dir=Path("/tmp"),
jobs=[self._fresh_job()],
)
assert report.overall_healthy is True
def test_overall_unhealthy(self):
report = HeartbeatReport(
timestamp=time.time(),
heartbeat_dir=Path("/tmp"),
jobs=[self._fresh_job(), self._stale_job()],
)
assert report.overall_healthy is False
assert len(report.stale_jobs) == 1
def test_panel_markdown_contains_table(self):
report = HeartbeatReport(
timestamp=time.time(),
heartbeat_dir=Path("/tmp"),
jobs=[self._fresh_job("alpha"), self._stale_job("beta")],
)
panel = report.to_panel_markdown()
assert "## Heartbeat Panel" in panel
assert "| `alpha` |" in panel
assert "| `beta` |" in panel
assert "STALE" in panel
assert "OK" in panel
assert "**Overall:** ALERT" in panel
def test_panel_markdown_no_jobs(self):
report = HeartbeatReport(
timestamp=time.time(),
heartbeat_dir=Path("/tmp"),
jobs=[],
)
panel = report.to_panel_markdown()
assert "no heartbeat files found" in panel
def test_panel_overall_ok(self):
report = HeartbeatReport(
timestamp=time.time(),
heartbeat_dir=Path("/tmp"),
jobs=[self._fresh_job()],
)
panel = report.to_panel_markdown()
assert "**Overall:** OK" in panel
def test_alert_body_lists_stale_jobs(self):
report = HeartbeatReport(
timestamp=time.time(),
heartbeat_dir=Path("/tmp"),
jobs=[self._stale_job("slow")],
)
body = report.to_alert_body()
assert "slow" in body
assert "STALE" in body.upper() or "stale" in body.lower() or "silent" in body.lower()
assert "crontab" in body.lower()
def test_to_json(self):
report = HeartbeatReport(
timestamp=time.time(),
heartbeat_dir=Path("/tmp"),
jobs=[self._fresh_job()],
)
data = report.to_json()
assert data["healthy"] is True
assert len(data["jobs"]) == 1
assert data["jobs"][0]["job"] == "j"
class TestFmtDuration:
def test_seconds(self):
assert _fmt_duration(45) == "45s"
def test_minutes(self):
assert _fmt_duration(90) == "1m 30s"
def test_hours(self):
assert _fmt_duration(3661) == "1h 1m"
class TestBuildReport:
def test_build_report_with_dir(self, tmp_path):
"""build_report() uses the given directory."""
(tmp_path / "myjob.last").write_text(json.dumps({
"job": "myjob",
"timestamp": time.time(),
"interval_seconds": 300,
"pid": 1,
"status": "ok",
}))
report = build_report(directory=tmp_path)
assert len(report.jobs) == 1
assert report.overall_healthy is True
def test_build_report_empty_dir(self, tmp_path):
report = build_report(directory=tmp_path)
assert report.jobs == []
assert report.overall_healthy is True # nothing stale = healthy
# ── Integration: nexus_watchdog writes its own heartbeat ─────────────
class TestWatchdogHeartbeatIntegration:
"""Verify nexus_watchdog.py writes a cron heartbeat after run_once()."""
def test_watchdog_writes_cron_heartbeat(self, tmp_path, monkeypatch):
"""After run_once, nexus_watchdog writes nexus_watchdog.last."""
monkeypatch.setenv("BEZALEL_HEARTBEAT_DIR", str(tmp_path))
# Load watchdog module
spec = importlib.util.spec_from_file_location(
"_watchdog_hb_test",
PROJECT_ROOT / "bin" / "nexus_watchdog.py",
)
wd = importlib.util.module_from_spec(spec)
sys.modules["_watchdog_hb_test"] = wd
# Patch out network calls
with patch("socket.socket") as mock_sock, \
patch("subprocess.run") as mock_run:
mock_sock.return_value.connect_ex.return_value = 111 # port closed
mock_run.return_value = MagicMock(returncode=1, stdout="")
spec.loader.exec_module(wd)
args = MagicMock()
args.ws_host = "localhost"
args.ws_port = 8765
args.heartbeat_path = str(tmp_path / "nexus_heartbeat.json")
args.stale_threshold = 300
args.dry_run = True # don't touch Gitea
wd.run_once(args)
hb_file = tmp_path / "nexus_watchdog.last"
assert hb_file.exists(), "nexus_watchdog.last was not written"
data = json.loads(hb_file.read_text())
assert data["job"] == "nexus_watchdog"
assert data["interval_seconds"] == 300