[claude] Poka-yoke cron heartbeats — make silent failures impossible (#1096) #1102
449
bin/check_cron_heartbeats.py
Normal file
449
bin/check_cron_heartbeats.py
Normal file
@@ -0,0 +1,449 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Meta-heartbeat checker — makes silent cron failures impossible.
|
||||
|
||||
Reads every ``*.last`` file in the heartbeat directory and verifies that no
|
||||
job has been silent for longer than **2× its declared interval**. If any job
|
||||
is stale, a Gitea alert issue is created (or an existing one is updated).
|
||||
When all jobs recover, the issue is closed automatically.
|
||||
|
||||
This script itself should be run as a cron job every 15 minutes so the
|
||||
meta-level is also covered:
|
||||
|
||||
*/15 * * * * cd /path/to/the-nexus && \\
|
||||
python bin/check_cron_heartbeats.py >> /var/log/bezalel/heartbeat-check.log 2>&1
|
||||
|
||||
USAGE
|
||||
-----
|
||||
# Check all jobs; create/update Gitea alert if any stale:
|
||||
python bin/check_cron_heartbeats.py
|
||||
|
||||
# Dry-run (no Gitea writes):
|
||||
python bin/check_cron_heartbeats.py --dry-run
|
||||
|
||||
# Output Night Watch heartbeat panel markdown:
|
||||
python bin/check_cron_heartbeats.py --panel
|
||||
|
||||
# Output JSON (for integration with other tools):
|
||||
python bin/check_cron_heartbeats.py --json
|
||||
|
||||
# Use a custom heartbeat directory:
|
||||
python bin/check_cron_heartbeats.py --dir /tmp/test-heartbeats
|
||||
|
||||
HEARTBEAT DIRECTORY
|
||||
-------------------
|
||||
Primary: /var/run/bezalel/heartbeats/ (set by ops, writable by cron user)
|
||||
Fallback: ~/.bezalel/heartbeats/ (dev machines)
|
||||
Override: BEZALEL_HEARTBEAT_DIR env var
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
-----------------
|
||||
Pure stdlib. No pip installs required.
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("bezalel.heartbeat_checker")
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────
|
||||
|
||||
PRIMARY_HEARTBEAT_DIR = Path("/var/run/bezalel/heartbeats")
|
||||
FALLBACK_HEARTBEAT_DIR = Path.home() / ".bezalel" / "heartbeats"
|
||||
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_REPO = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
|
||||
ALERT_TITLE_PREFIX = "[heartbeat-checker]"
|
||||
|
||||
# A job is stale when its age exceeds this multiple of its declared interval
|
||||
STALE_RATIO = 2.0
|
||||
# Never flag a job as stale if it completed less than this many seconds ago
|
||||
# (prevents noise immediately after deployment)
|
||||
MIN_STALE_AGE = 60
|
||||
|
||||
|
||||
def _resolve_heartbeat_dir() -> Path:
|
||||
"""Return the active heartbeat directory."""
|
||||
env = os.environ.get("BEZALEL_HEARTBEAT_DIR")
|
||||
if env:
|
||||
return Path(env)
|
||||
if PRIMARY_HEARTBEAT_DIR.exists():
|
||||
return PRIMARY_HEARTBEAT_DIR
|
||||
# Try to create it; fall back to home dir if not permitted
|
||||
try:
|
||||
PRIMARY_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
probe = PRIMARY_HEARTBEAT_DIR / ".write_probe"
|
||||
probe.touch()
|
||||
probe.unlink()
|
||||
return PRIMARY_HEARTBEAT_DIR
|
||||
except (PermissionError, OSError):
|
||||
return FALLBACK_HEARTBEAT_DIR
|
||||
|
||||
|
||||
# ── Data model ────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class JobStatus:
|
||||
"""Health status for a single cron job's heartbeat."""
|
||||
job: str
|
||||
path: Path
|
||||
healthy: bool
|
||||
age_seconds: float # -1 if unknown (missing/corrupt)
|
||||
interval_seconds: int # 0 if unknown
|
||||
staleness_ratio: float # age / interval; -1 if unknown; >STALE_RATIO = stale
|
||||
last_timestamp: Optional[float]
|
||||
pid: Optional[int]
|
||||
raw_status: str # value from the .last file: "ok" / "warn" / "error"
|
||||
message: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class HeartbeatReport:
|
||||
"""Aggregate report for all cron job heartbeats in a directory."""
|
||||
timestamp: float
|
||||
heartbeat_dir: Path
|
||||
jobs: List[JobStatus] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def stale_jobs(self) -> List[JobStatus]:
|
||||
return [j for j in self.jobs if not j.healthy]
|
||||
|
||||
@property
|
||||
def overall_healthy(self) -> bool:
|
||||
return len(self.stale_jobs) == 0
|
||||
|
||||
# ── Rendering ─────────────────────────────────────────────────────
|
||||
|
||||
def to_panel_markdown(self) -> str:
|
||||
"""Night Watch heartbeat panel — a table of all jobs with their status."""
|
||||
ts = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(self.timestamp))
|
||||
overall = "OK" if self.overall_healthy else "ALERT"
|
||||
|
||||
lines = [
|
||||
f"## Heartbeat Panel — {ts}",
|
||||
"",
|
||||
f"**Overall:** {overall}",
|
||||
"",
|
||||
"| Job | Status | Age | Interval | Ratio |",
|
||||
"|-----|--------|-----|----------|-------|",
|
||||
]
|
||||
|
||||
if not self.jobs:
|
||||
lines.append("| *(no heartbeat files found)* | — | — | — | — |")
|
||||
else:
|
||||
for j in self.jobs:
|
||||
icon = "OK" if j.healthy else "STALE"
|
||||
age_str = _fmt_duration(j.age_seconds) if j.age_seconds >= 0 else "N/A"
|
||||
interval_str = _fmt_duration(j.interval_seconds) if j.interval_seconds > 0 else "N/A"
|
||||
ratio_str = f"{j.staleness_ratio:.1f}x" if j.staleness_ratio >= 0 else "N/A"
|
||||
lines.append(
|
||||
f"| `{j.job}` | {icon} | {age_str} | {interval_str} | {ratio_str} |"
|
||||
)
|
||||
|
||||
if self.stale_jobs:
|
||||
lines += ["", "**Stale jobs:**"]
|
||||
for j in self.stale_jobs:
|
||||
lines.append(f"- `{j.job}`: {j.message}")
|
||||
|
||||
lines += [
|
||||
"",
|
||||
f"*Heartbeat dir: `{self.heartbeat_dir}`*",
|
||||
]
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_alert_body(self) -> str:
|
||||
"""Gitea issue body when stale jobs are detected."""
|
||||
ts = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime(self.timestamp))
|
||||
stale = self.stale_jobs
|
||||
|
||||
lines = [
|
||||
f"## Cron Heartbeat Alert — {ts}",
|
||||
"",
|
||||
f"**{len(stale)} job(s) have gone silent** (stale > {STALE_RATIO}x interval).",
|
||||
"",
|
||||
"| Job | Age | Interval | Ratio | Detail |",
|
||||
"|-----|-----|----------|-------|--------|",
|
||||
]
|
||||
|
||||
for j in stale:
|
||||
age_str = _fmt_duration(j.age_seconds) if j.age_seconds >= 0 else "N/A"
|
||||
interval_str = _fmt_duration(j.interval_seconds) if j.interval_seconds > 0 else "N/A"
|
||||
ratio_str = f"{j.staleness_ratio:.1f}x" if j.staleness_ratio >= 0 else "N/A"
|
||||
lines.append(
|
||||
f"| `{j.job}` | {age_str} | {interval_str} | {ratio_str} | {j.message} |"
|
||||
)
|
||||
|
||||
lines += [
|
||||
"",
|
||||
"### What to do",
|
||||
"1. `crontab -l` — confirm the job is still scheduled",
|
||||
"2. Check the job's log for errors",
|
||||
"3. Restart the job if needed",
|
||||
"4. Close this issue once fresh heartbeats appear",
|
||||
"",
|
||||
f"*Generated by `check_cron_heartbeats.py` — dir: `{self.heartbeat_dir}`*",
|
||||
]
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_json(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"healthy": self.overall_healthy,
|
||||
"timestamp": self.timestamp,
|
||||
"heartbeat_dir": str(self.heartbeat_dir),
|
||||
"jobs": [
|
||||
{
|
||||
"job": j.job,
|
||||
"healthy": j.healthy,
|
||||
"age_seconds": j.age_seconds,
|
||||
"interval_seconds": j.interval_seconds,
|
||||
"staleness_ratio": j.staleness_ratio,
|
||||
"raw_status": j.raw_status,
|
||||
"message": j.message,
|
||||
}
|
||||
for j in self.jobs
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def _fmt_duration(seconds: float) -> str:
|
||||
"""Format a duration in seconds as a human-readable string."""
|
||||
s = int(seconds)
|
||||
if s < 60:
|
||||
return f"{s}s"
|
||||
if s < 3600:
|
||||
return f"{s // 60}m {s % 60}s"
|
||||
return f"{s // 3600}h {(s % 3600) // 60}m"
|
||||
|
||||
|
||||
# ── Job scanning ──────────────────────────────────────────────────────
|
||||
|
||||
def scan_heartbeats(directory: Path) -> List[JobStatus]:
|
||||
"""Read every ``*.last`` file in *directory* and return their statuses."""
|
||||
if not directory.exists():
|
||||
return []
|
||||
return [_read_job_status(p.stem, p) for p in sorted(directory.glob("*.last"))]
|
||||
|
||||
|
||||
def _read_job_status(job: str, path: Path) -> JobStatus:
|
||||
"""Parse one ``.last`` file and produce a ``JobStatus``."""
|
||||
now = time.time()
|
||||
|
||||
if not path.exists():
|
||||
return JobStatus(
|
||||
job=job, path=path,
|
||||
healthy=False,
|
||||
age_seconds=-1,
|
||||
interval_seconds=0,
|
||||
staleness_ratio=-1,
|
||||
last_timestamp=None,
|
||||
pid=None,
|
||||
raw_status="missing",
|
||||
message=f"Heartbeat file missing: {path}",
|
||||
)
|
||||
|
||||
try:
|
||||
data = json.loads(path.read_text())
|
||||
except (json.JSONDecodeError, OSError) as exc:
|
||||
return JobStatus(
|
||||
job=job, path=path,
|
||||
healthy=False,
|
||||
age_seconds=-1,
|
||||
interval_seconds=0,
|
||||
staleness_ratio=-1,
|
||||
last_timestamp=None,
|
||||
pid=None,
|
||||
raw_status="corrupt",
|
||||
message=f"Corrupt heartbeat: {exc}",
|
||||
)
|
||||
|
||||
timestamp = float(data.get("timestamp", 0))
|
||||
interval = int(data.get("interval_seconds", 0))
|
||||
pid = data.get("pid")
|
||||
raw_status = data.get("status", "ok")
|
||||
|
||||
age = now - timestamp
|
||||
ratio = age / interval if interval > 0 else float("inf")
|
||||
stale = ratio > STALE_RATIO and age > MIN_STALE_AGE
|
||||
|
||||
if stale:
|
||||
message = (
|
||||
f"Silent for {_fmt_duration(age)} "
|
||||
f"({ratio:.1f}x interval of {_fmt_duration(interval)})"
|
||||
)
|
||||
else:
|
||||
message = f"Last beat {_fmt_duration(age)} ago (ratio {ratio:.1f}x)"
|
||||
|
||||
return JobStatus(
|
||||
job=job, path=path,
|
||||
healthy=not stale,
|
||||
age_seconds=age,
|
||||
interval_seconds=interval,
|
||||
staleness_ratio=ratio,
|
||||
last_timestamp=timestamp,
|
||||
pid=pid,
|
||||
raw_status=raw_status if not stale else "stale",
|
||||
message=message,
|
||||
)
|
||||
|
||||
|
||||
# ── Gitea alerting ────────────────────────────────────────────────────
|
||||
|
||||
def _gitea_request(method: str, path: str, data: Optional[dict] = None) -> Any:
|
||||
"""Make a Gitea API request; return parsed JSON or None on error."""
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
url = f"{GITEA_URL.rstrip('/')}/api/v1{path}"
|
||||
body = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(url, data=body, method=method)
|
||||
if GITEA_TOKEN:
|
||||
req.add_header("Authorization", f"token {GITEA_TOKEN}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
req.add_header("Accept", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
raw = resp.read().decode()
|
||||
return json.loads(raw) if raw.strip() else {}
|
||||
except urllib.error.HTTPError as exc:
|
||||
logger.warning("Gitea %d: %s", exc.code, exc.read().decode()[:200])
|
||||
return None
|
||||
except Exception as exc:
|
||||
logger.warning("Gitea request failed: %s", exc)
|
||||
return None
|
||||
|
||||
|
||||
def _find_open_alert_issue() -> Optional[dict]:
|
||||
issues = _gitea_request(
|
||||
"GET",
|
||||
f"/repos/{GITEA_REPO}/issues?state=open&type=issues&limit=20",
|
||||
)
|
||||
if not isinstance(issues, list):
|
||||
return None
|
||||
for issue in issues:
|
||||
if issue.get("title", "").startswith(ALERT_TITLE_PREFIX):
|
||||
return issue
|
||||
return None
|
||||
|
||||
|
||||
def alert_on_stale(report: HeartbeatReport, dry_run: bool = False) -> None:
|
||||
"""Create, update, or close a Gitea alert issue based on report health."""
|
||||
if dry_run:
|
||||
action = "close" if report.overall_healthy else "create/update"
|
||||
logger.info("DRY RUN — would %s Gitea issue", action)
|
||||
return
|
||||
|
||||
if not GITEA_TOKEN:
|
||||
logger.warning("GITEA_TOKEN not set — skipping Gitea alert")
|
||||
return
|
||||
|
||||
existing = _find_open_alert_issue()
|
||||
|
||||
if report.overall_healthy:
|
||||
if existing:
|
||||
logger.info("All heartbeats healthy — closing issue #%d", existing["number"])
|
||||
_gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues/{existing['number']}/comments",
|
||||
data={"body": "All cron heartbeats are now fresh. Closing."},
|
||||
)
|
||||
_gitea_request(
|
||||
"PATCH",
|
||||
f"/repos/{GITEA_REPO}/issues/{existing['number']}",
|
||||
data={"state": "closed"},
|
||||
)
|
||||
return
|
||||
|
||||
stale_names = ", ".join(j.job for j in report.stale_jobs)
|
||||
title = f"{ALERT_TITLE_PREFIX} Stale cron heartbeats: {stale_names}"
|
||||
body = report.to_alert_body()
|
||||
|
||||
if existing:
|
||||
logger.info("Still stale — updating issue #%d", existing["number"])
|
||||
_gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues/{existing['number']}/comments",
|
||||
data={"body": body},
|
||||
)
|
||||
else:
|
||||
result = _gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues",
|
||||
data={"title": title, "body": body, "assignees": ["Timmy"]},
|
||||
)
|
||||
if result and result.get("number"):
|
||||
logger.info("Created alert issue #%d", result["number"])
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────
|
||||
|
||||
def build_report(directory: Optional[Path] = None) -> HeartbeatReport:
|
||||
"""Scan heartbeats and return a report. Exposed for Night Watch import."""
|
||||
hb_dir = directory if directory is not None else _resolve_heartbeat_dir()
|
||||
jobs = scan_heartbeats(hb_dir)
|
||||
return HeartbeatReport(timestamp=time.time(), heartbeat_dir=hb_dir, jobs=jobs)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Meta-heartbeat checker — detects silent cron failures",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dir", default=None,
|
||||
help="Heartbeat directory (default: auto-detect)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--panel", action="store_true",
|
||||
help="Output Night Watch heartbeat panel markdown and exit",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json", action="store_true", dest="output_json",
|
||||
help="Output results as JSON and exit",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run", action="store_true",
|
||||
help="Log results without writing Gitea issues",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
report = build_report(Path(args.dir) if args.dir else None)
|
||||
|
||||
if args.panel:
|
||||
print(report.to_panel_markdown())
|
||||
return
|
||||
|
||||
if args.output_json:
|
||||
print(json.dumps(report.to_json(), indent=2))
|
||||
sys.exit(0 if report.overall_healthy else 1)
|
||||
|
||||
# Default: log + alert
|
||||
if not report.jobs:
|
||||
logger.info("No heartbeat files found in %s", report.heartbeat_dir)
|
||||
else:
|
||||
for j in report.jobs:
|
||||
level = logging.INFO if j.healthy else logging.ERROR
|
||||
icon = "OK " if j.healthy else "STALE"
|
||||
logger.log(level, "[%s] %s: %s", icon, j.job, j.message)
|
||||
|
||||
alert_on_stale(report, dry_run=args.dry_run)
|
||||
sys.exit(0 if report.overall_healthy else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -80,6 +80,15 @@ from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
# Poka-yoke: write a cron heartbeat so check_cron_heartbeats.py can detect
|
||||
# if *this* watchdog stops running. Import lazily to stay zero-dep if the
|
||||
# nexus package is unavailable (e.g. very minimal test environments).
|
||||
try:
|
||||
from nexus.cron_heartbeat import write_cron_heartbeat as _write_cron_heartbeat
|
||||
_HAS_CRON_HEARTBEAT = True
|
||||
except ImportError:
|
||||
_HAS_CRON_HEARTBEAT = False
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
@@ -488,6 +497,15 @@ def run_once(args: argparse.Namespace) -> bool:
|
||||
elif not args.dry_run:
|
||||
alert_on_failure(report, dry_run=args.dry_run)
|
||||
|
||||
# Poka-yoke: stamp our own heartbeat so the meta-checker can detect
|
||||
# if this watchdog cron job itself goes silent. Runs every 5 minutes
|
||||
# by convention (*/5 * * * *).
|
||||
if _HAS_CRON_HEARTBEAT:
|
||||
try:
|
||||
_write_cron_heartbeat("nexus_watchdog", interval_seconds=300)
|
||||
except Exception:
|
||||
pass # never crash the watchdog over its own heartbeat
|
||||
|
||||
return report.overall_healthy
|
||||
|
||||
|
||||
|
||||
247
bin/night_watch.py
Normal file
247
bin/night_watch.py
Normal file
@@ -0,0 +1,247 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Night Watch — Bezalel nightly report generator.
|
||||
|
||||
Runs once per night (typically at 03:00 local time via cron) and writes a
|
||||
markdown report to ``reports/bezalel/nightly/<YYYY-MM-DD>.md``.
|
||||
|
||||
The report always includes a **Heartbeat Panel** (acceptance criterion #3 of
|
||||
issue #1096) so silent cron failures are visible in the morning brief.
|
||||
|
||||
USAGE
|
||||
-----
|
||||
python bin/night_watch.py # write today's report
|
||||
python bin/night_watch.py --dry-run # print to stdout, don't write file
|
||||
python bin/night_watch.py --date 2026-04-08 # specific date
|
||||
|
||||
CRONTAB
|
||||
-------
|
||||
0 3 * * * cd /path/to/the-nexus && python bin/night_watch.py \\
|
||||
>> /var/log/bezalel/night-watch.log 2>&1
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
-----------------
|
||||
Pure stdlib, plus ``check_cron_heartbeats`` from this repo (also stdlib).
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import importlib.util
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("bezalel.night_watch")
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
REPORTS_DIR = PROJECT_ROOT / "reports" / "bezalel" / "nightly"
|
||||
|
||||
# ── Load check_cron_heartbeats without relying on sys.path hacks ──────
|
||||
|
||||
def _load_checker():
|
||||
"""Import bin/check_cron_heartbeats.py as a module."""
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"_check_cron_heartbeats",
|
||||
PROJECT_ROOT / "bin" / "check_cron_heartbeats.py",
|
||||
)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
# ── System checks ─────────────────────────────────────────────────────
|
||||
|
||||
def _check_service(service_name: str) -> tuple[str, str]:
|
||||
"""Return (status, detail) for a systemd service."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["systemctl", "is-active", service_name],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
active = result.stdout.strip()
|
||||
if active == "active":
|
||||
return "OK", f"{service_name} is active"
|
||||
return "WARN", f"{service_name} is {active}"
|
||||
except FileNotFoundError:
|
||||
return "OK", f"{service_name} status unknown (systemctl not available)"
|
||||
except Exception as exc:
|
||||
return "WARN", f"systemctl error: {exc}"
|
||||
|
||||
|
||||
def _check_disk(threshold_pct: int = 90) -> tuple[str, str]:
|
||||
"""Return (status, detail) for disk usage on /."""
|
||||
try:
|
||||
usage = shutil.disk_usage("/")
|
||||
pct = int(usage.used / usage.total * 100)
|
||||
status = "OK" if pct < threshold_pct else "WARN"
|
||||
return status, f"disk usage {pct}%"
|
||||
except Exception as exc:
|
||||
return "WARN", f"disk check failed: {exc}"
|
||||
|
||||
|
||||
def _check_memory(threshold_pct: int = 90) -> tuple[str, str]:
|
||||
"""Return (status, detail) for memory usage."""
|
||||
try:
|
||||
meminfo = Path("/proc/meminfo").read_text()
|
||||
data = {}
|
||||
for line in meminfo.splitlines():
|
||||
parts = line.split()
|
||||
if len(parts) >= 2:
|
||||
data[parts[0].rstrip(":")] = int(parts[1])
|
||||
total = data.get("MemTotal", 0)
|
||||
available = data.get("MemAvailable", 0)
|
||||
if total == 0:
|
||||
return "OK", "memory info unavailable"
|
||||
pct = int((total - available) / total * 100)
|
||||
status = "OK" if pct < threshold_pct else "WARN"
|
||||
return status, f"memory usage {pct}%"
|
||||
except FileNotFoundError:
|
||||
# Not Linux (e.g. macOS dev machine)
|
||||
return "OK", "memory check skipped (not Linux)"
|
||||
except Exception as exc:
|
||||
return "WARN", f"memory check failed: {exc}"
|
||||
|
||||
|
||||
def _check_gitea_reachability(gitea_url: str = "https://forge.alexanderwhitestone.com") -> tuple[str, str]:
|
||||
"""Return (status, detail) for Gitea HTTPS reachability."""
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
try:
|
||||
with urllib.request.urlopen(gitea_url, timeout=10) as resp:
|
||||
code = resp.status
|
||||
if code == 200:
|
||||
return "OK", f"Alpha SSH not configured from Beta, but Gitea HTTPS is responding ({code})"
|
||||
return "WARN", f"Gitea returned HTTP {code}"
|
||||
except Exception as exc:
|
||||
return "WARN", f"Gitea unreachable: {exc}"
|
||||
|
||||
|
||||
def _check_world_readable_secrets() -> tuple[str, str]:
|
||||
"""Return (status, detail) for world-readable sensitive files."""
|
||||
sensitive_patterns = ["*.key", "*.pem", "*.secret", ".env", "*.token"]
|
||||
found = []
|
||||
try:
|
||||
for pattern in sensitive_patterns:
|
||||
for path in PROJECT_ROOT.rglob(pattern):
|
||||
try:
|
||||
mode = path.stat().st_mode
|
||||
if mode & 0o004: # world-readable
|
||||
found.append(str(path.relative_to(PROJECT_ROOT)))
|
||||
except OSError:
|
||||
pass
|
||||
if found:
|
||||
return "WARN", f"world-readable sensitive files: {', '.join(found[:3])}"
|
||||
return "OK", "no sensitive recently-modified world-readable files found"
|
||||
except Exception as exc:
|
||||
return "WARN", f"security check failed: {exc}"
|
||||
|
||||
|
||||
# ── Report generation ─────────────────────────────────────────────────
|
||||
|
||||
def generate_report(date_str: str, checker_mod) -> str:
|
||||
"""Build the full nightly report markdown string."""
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
ts = now_utc.strftime("%Y-%m-%d %02H:%M UTC")
|
||||
|
||||
rows: list[tuple[str, str, str]] = []
|
||||
|
||||
service_status, service_detail = _check_service("hermes-bezalel")
|
||||
rows.append(("Service", service_status, service_detail))
|
||||
|
||||
disk_status, disk_detail = _check_disk()
|
||||
rows.append(("Disk", disk_status, disk_detail))
|
||||
|
||||
mem_status, mem_detail = _check_memory()
|
||||
rows.append(("Memory", mem_status, mem_detail))
|
||||
|
||||
gitea_status, gitea_detail = _check_gitea_reachability()
|
||||
rows.append(("Alpha VPS", gitea_status, gitea_detail))
|
||||
|
||||
sec_status, sec_detail = _check_world_readable_secrets()
|
||||
rows.append(("Security", sec_status, sec_detail))
|
||||
|
||||
overall = "OK" if all(r[1] == "OK" for r in rows) else "WARN"
|
||||
|
||||
lines = [
|
||||
f"# Bezalel Night Watch — {ts}",
|
||||
"",
|
||||
f"**Overall:** {overall}",
|
||||
"",
|
||||
"| Check | Status | Detail |",
|
||||
"|-------|--------|--------|",
|
||||
]
|
||||
for check, status, detail in rows:
|
||||
lines.append(f"| {check} | {status} | {detail} |")
|
||||
|
||||
lines.append("")
|
||||
lines.append("---")
|
||||
lines.append("")
|
||||
|
||||
# ── Heartbeat Panel (acceptance criterion #1096) ──────────────────
|
||||
try:
|
||||
hb_report = checker_mod.build_report()
|
||||
lines.append(hb_report.to_panel_markdown())
|
||||
except Exception as exc:
|
||||
lines += [
|
||||
"## Heartbeat Panel",
|
||||
"",
|
||||
f"*(heartbeat check failed: {exc})*",
|
||||
]
|
||||
|
||||
lines += [
|
||||
"",
|
||||
"---",
|
||||
"",
|
||||
"*Automated by Bezalel Night Watch*",
|
||||
"",
|
||||
]
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Bezalel Night Watch — nightly report generator",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--date", default=None,
|
||||
help="Report date as YYYY-MM-DD (default: today UTC)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run", action="store_true",
|
||||
help="Print report to stdout instead of writing to disk",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
|
||||
checker = _load_checker()
|
||||
report_text = generate_report(date_str, checker)
|
||||
|
||||
if args.dry_run:
|
||||
print(report_text)
|
||||
return
|
||||
|
||||
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
report_path = REPORTS_DIR / f"{date_str}.md"
|
||||
report_path.write_text(report_text)
|
||||
logger.info("Night Watch report written to %s", report_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
136
nexus/cron_heartbeat.py
Normal file
136
nexus/cron_heartbeat.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""Poka-yoke heartbeat writer for cron jobs.
|
||||
|
||||
Every scheduled job calls write_cron_heartbeat() on successful completion so
|
||||
the meta-heartbeat checker (bin/check_cron_heartbeats.py) can verify that all
|
||||
jobs are still alive. Absence of a fresh heartbeat = silent failure.
|
||||
|
||||
Path convention
|
||||
---------------
|
||||
Primary: /var/run/bezalel/heartbeats/<job>.last
|
||||
Fallback: ~/.bezalel/heartbeats/<job>.last
|
||||
(used when /var/run/bezalel is not writable, e.g. dev machines)
|
||||
Override: BEZALEL_HEARTBEAT_DIR environment variable
|
||||
|
||||
Heartbeat file format (JSON)
|
||||
----------------------------
|
||||
{
|
||||
"job": "nexus_watchdog",
|
||||
"timestamp": 1744000000.0,
|
||||
"interval_seconds": 300,
|
||||
"pid": 12345,
|
||||
"status": "ok"
|
||||
}
|
||||
|
||||
Usage in a cron job
|
||||
-------------------
|
||||
from nexus.cron_heartbeat import write_cron_heartbeat
|
||||
|
||||
def main():
|
||||
# ... do the work ...
|
||||
write_cron_heartbeat("my_job_name", interval_seconds=300)
|
||||
|
||||
Zero-dependency shell one-liner (for scripts that can't import Python)
|
||||
-----------------------------------------------------------------------
|
||||
python -c "
|
||||
from nexus.cron_heartbeat import write_cron_heartbeat
|
||||
write_cron_heartbeat('my_job', interval_seconds=300)
|
||||
"
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
PRIMARY_HEARTBEAT_DIR = Path("/var/run/bezalel/heartbeats")
|
||||
FALLBACK_HEARTBEAT_DIR = Path.home() / ".bezalel" / "heartbeats"
|
||||
|
||||
|
||||
def _resolve_heartbeat_dir() -> Path:
|
||||
"""Return the heartbeat directory, trying primary then fallback.
|
||||
|
||||
If BEZALEL_HEARTBEAT_DIR is set in the environment that wins outright
|
||||
(useful for tests and non-standard deployments).
|
||||
"""
|
||||
env = os.environ.get("BEZALEL_HEARTBEAT_DIR")
|
||||
if env:
|
||||
return Path(env)
|
||||
|
||||
# Try to create and write-test the primary path
|
||||
try:
|
||||
PRIMARY_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
probe = PRIMARY_HEARTBEAT_DIR / ".write_probe"
|
||||
probe.touch()
|
||||
probe.unlink()
|
||||
return PRIMARY_HEARTBEAT_DIR
|
||||
except (PermissionError, OSError):
|
||||
pass
|
||||
|
||||
FALLBACK_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
return FALLBACK_HEARTBEAT_DIR
|
||||
|
||||
|
||||
def heartbeat_path(job: str, directory: Path | None = None) -> Path:
|
||||
"""Return the Path where *job*'s heartbeat file lives.
|
||||
|
||||
Useful for readers (e.g. the Night Watch report) that just need the
|
||||
location without writing anything.
|
||||
"""
|
||||
d = directory if directory is not None else _resolve_heartbeat_dir()
|
||||
return d / f"{job}.last"
|
||||
|
||||
|
||||
def write_cron_heartbeat(
|
||||
job: str,
|
||||
interval_seconds: int,
|
||||
status: str = "ok",
|
||||
directory: Path | None = None,
|
||||
) -> Path:
|
||||
"""Write a poka-yoke heartbeat file for a cron job.
|
||||
|
||||
Call this at the end of your job's main function. The file is written
|
||||
atomically (write-to-temp + rename) so the checker never reads a partial
|
||||
file.
|
||||
|
||||
Args:
|
||||
job: Unique job name, e.g. ``"nexus_watchdog"``.
|
||||
interval_seconds: Expected run cadence, e.g. ``300`` for every 5 min.
|
||||
status: Completion status: ``"ok"``, ``"warn"``, or
|
||||
``"error"``. Only ``"ok"`` resets the stale clock.
|
||||
directory: Override the heartbeat directory (mainly for tests).
|
||||
|
||||
Returns:
|
||||
Path to the written heartbeat file.
|
||||
"""
|
||||
d = directory if directory is not None else _resolve_heartbeat_dir()
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
path = d / f"{job}.last"
|
||||
|
||||
data = {
|
||||
"job": job,
|
||||
"timestamp": time.time(),
|
||||
"interval_seconds": interval_seconds,
|
||||
"pid": os.getpid(),
|
||||
"status": status,
|
||||
}
|
||||
|
||||
# Atomic write: temp file in same directory + rename.
|
||||
# Guarantees the checker never sees a half-written file.
|
||||
fd, tmp = tempfile.mkstemp(dir=str(d), prefix=f".{job}-", suffix=".tmp")
|
||||
try:
|
||||
with os.fdopen(fd, "w") as f:
|
||||
json.dump(data, f)
|
||||
os.replace(tmp, str(path))
|
||||
except Exception:
|
||||
# Best-effort — never crash the job over a heartbeat failure
|
||||
try:
|
||||
os.unlink(tmp)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
return path
|
||||
341
tests/test_cron_heartbeats.py
Normal file
341
tests/test_cron_heartbeats.py
Normal file
@@ -0,0 +1,341 @@
|
||||
"""Tests for the poka-yoke cron heartbeat system.
|
||||
|
||||
Covers:
|
||||
- nexus/cron_heartbeat.py (write utility)
|
||||
- bin/check_cron_heartbeats.py (meta-heartbeat checker)
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Load modules under test ───────────────────────────────────────────
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
_hb_spec = importlib.util.spec_from_file_location(
|
||||
"_cron_heartbeat",
|
||||
PROJECT_ROOT / "nexus" / "cron_heartbeat.py",
|
||||
)
|
||||
_hb = importlib.util.module_from_spec(_hb_spec)
|
||||
sys.modules["_cron_heartbeat"] = _hb
|
||||
_hb_spec.loader.exec_module(_hb)
|
||||
|
||||
_chk_spec = importlib.util.spec_from_file_location(
|
||||
"_check_cron_heartbeats",
|
||||
PROJECT_ROOT / "bin" / "check_cron_heartbeats.py",
|
||||
)
|
||||
_chk = importlib.util.module_from_spec(_chk_spec)
|
||||
sys.modules["_check_cron_heartbeats"] = _chk
|
||||
_chk_spec.loader.exec_module(_chk)
|
||||
|
||||
write_cron_heartbeat = _hb.write_cron_heartbeat
|
||||
heartbeat_path = _hb.heartbeat_path
|
||||
|
||||
scan_heartbeats = _chk.scan_heartbeats
|
||||
build_report = _chk.build_report
|
||||
HeartbeatReport = _chk.HeartbeatReport
|
||||
JobStatus = _chk.JobStatus
|
||||
_read_job_status = _chk._read_job_status
|
||||
_fmt_duration = _chk._fmt_duration
|
||||
|
||||
|
||||
# ── nexus/cron_heartbeat.py ───────────────────────────────────────────
|
||||
|
||||
class TestWriteCronHeartbeat:
|
||||
def test_creates_file(self, tmp_path):
|
||||
"""write_cron_heartbeat creates <job>.last in the given directory."""
|
||||
path = write_cron_heartbeat("my_job", interval_seconds=300, directory=tmp_path)
|
||||
assert path == tmp_path / "my_job.last"
|
||||
assert path.exists()
|
||||
|
||||
def test_file_content(self, tmp_path):
|
||||
"""Written file has all required fields."""
|
||||
write_cron_heartbeat("my_job", interval_seconds=600, status="ok", directory=tmp_path)
|
||||
data = json.loads((tmp_path / "my_job.last").read_text())
|
||||
assert data["job"] == "my_job"
|
||||
assert data["interval_seconds"] == 600
|
||||
assert data["status"] == "ok"
|
||||
assert data["pid"] == os.getpid()
|
||||
assert abs(data["timestamp"] - time.time()) < 2
|
||||
|
||||
def test_atomic_write_no_temp_files(self, tmp_path):
|
||||
"""No temporary files remain after a successful write."""
|
||||
write_cron_heartbeat("my_job", interval_seconds=300, directory=tmp_path)
|
||||
files = list(tmp_path.iterdir())
|
||||
assert len(files) == 1
|
||||
assert files[0].name == "my_job.last"
|
||||
|
||||
def test_overwrites_cleanly(self, tmp_path):
|
||||
"""Successive writes update, not append."""
|
||||
write_cron_heartbeat("j", interval_seconds=60, status="ok", directory=tmp_path)
|
||||
write_cron_heartbeat("j", interval_seconds=60, status="warn", directory=tmp_path)
|
||||
data = json.loads((tmp_path / "j.last").read_text())
|
||||
assert data["status"] == "warn"
|
||||
|
||||
def test_creates_parent_dirs(self, tmp_path):
|
||||
"""Parent directories are created as needed."""
|
||||
deep_dir = tmp_path / "a" / "b" / "c"
|
||||
write_cron_heartbeat("j", interval_seconds=60, directory=deep_dir)
|
||||
assert (deep_dir / "j.last").exists()
|
||||
|
||||
def test_heartbeat_path_helper(self, tmp_path):
|
||||
"""heartbeat_path() returns the correct path without writing."""
|
||||
p = heartbeat_path("myjob", directory=tmp_path)
|
||||
assert p == tmp_path / "myjob.last"
|
||||
assert not p.exists()
|
||||
|
||||
def test_env_var_override(self, tmp_path, monkeypatch):
|
||||
"""BEZALEL_HEARTBEAT_DIR env var overrides the directory."""
|
||||
monkeypatch.setenv("BEZALEL_HEARTBEAT_DIR", str(tmp_path))
|
||||
# Call without directory= so it uses the env var
|
||||
path = write_cron_heartbeat("env_job", interval_seconds=120)
|
||||
assert path.parent == tmp_path
|
||||
|
||||
|
||||
# ── bin/check_cron_heartbeats.py ─────────────────────────────────────
|
||||
|
||||
class TestScanHeartbeats:
|
||||
def test_empty_dir(self, tmp_path):
|
||||
"""No .last files → empty list."""
|
||||
assert scan_heartbeats(tmp_path) == []
|
||||
|
||||
def test_nonexistent_dir(self, tmp_path):
|
||||
"""Missing directory → empty list (no exception)."""
|
||||
assert scan_heartbeats(tmp_path / "nope") == []
|
||||
|
||||
def test_healthy_job(self, tmp_path):
|
||||
"""Fresh heartbeat → healthy."""
|
||||
(tmp_path / "myjob.last").write_text(json.dumps({
|
||||
"job": "myjob",
|
||||
"timestamp": time.time(),
|
||||
"interval_seconds": 300,
|
||||
"pid": 1,
|
||||
"status": "ok",
|
||||
}))
|
||||
jobs = scan_heartbeats(tmp_path)
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0].healthy is True
|
||||
assert jobs[0].job == "myjob"
|
||||
|
||||
def test_stale_job(self, tmp_path):
|
||||
"""Heartbeat older than 2× interval → stale."""
|
||||
(tmp_path / "slow.last").write_text(json.dumps({
|
||||
"job": "slow",
|
||||
"timestamp": time.time() - 700, # 11.7 minutes
|
||||
"interval_seconds": 300, # 5 min interval → ratio 2.33
|
||||
"pid": 1,
|
||||
"status": "ok",
|
||||
}))
|
||||
jobs = scan_heartbeats(tmp_path)
|
||||
assert jobs[0].healthy is False
|
||||
assert jobs[0].staleness_ratio > 2.0
|
||||
|
||||
def test_missing_heartbeat_file(self, tmp_path):
|
||||
"""_read_job_status handles a file that disappears mid-scan."""
|
||||
ghost_path = tmp_path / "ghost.last"
|
||||
status = _read_job_status("ghost", ghost_path)
|
||||
assert status.healthy is False
|
||||
assert "missing" in status.raw_status
|
||||
|
||||
def test_corrupt_heartbeat(self, tmp_path):
|
||||
"""Corrupt JSON → unhealthy with 'corrupt' status."""
|
||||
p = tmp_path / "bad.last"
|
||||
p.write_text("{not valid json")
|
||||
status = _read_job_status("bad", p)
|
||||
assert status.healthy is False
|
||||
assert "corrupt" in status.raw_status
|
||||
|
||||
def test_multiple_jobs(self, tmp_path):
|
||||
"""Multiple .last files are all reported."""
|
||||
for i, name in enumerate(["alpha", "beta", "gamma"]):
|
||||
(tmp_path / f"{name}.last").write_text(json.dumps({
|
||||
"job": name,
|
||||
"timestamp": time.time() - i * 10,
|
||||
"interval_seconds": 300,
|
||||
"pid": i + 1,
|
||||
"status": "ok",
|
||||
}))
|
||||
jobs = scan_heartbeats(tmp_path)
|
||||
assert len(jobs) == 3
|
||||
job_names = {j.job for j in jobs}
|
||||
assert job_names == {"alpha", "beta", "gamma"}
|
||||
|
||||
def test_non_last_files_ignored(self, tmp_path):
|
||||
""".json and other extensions are ignored."""
|
||||
(tmp_path / "other.json").write_text("{}")
|
||||
(tmp_path / "notes.txt").write_text("hello")
|
||||
assert scan_heartbeats(tmp_path) == []
|
||||
|
||||
|
||||
class TestHeartbeatReport:
|
||||
def _fresh_job(self, name="j"):
|
||||
return JobStatus(
|
||||
job=name, path=Path(f"/tmp/{name}.last"),
|
||||
healthy=True, age_seconds=30, interval_seconds=300,
|
||||
staleness_ratio=0.1, last_timestamp=time.time() - 30,
|
||||
pid=1, raw_status="ok",
|
||||
message="Last beat 30s ago (ratio 0.1x)",
|
||||
)
|
||||
|
||||
def _stale_job(self, name="s"):
|
||||
return JobStatus(
|
||||
job=name, path=Path(f"/tmp/{name}.last"),
|
||||
healthy=False, age_seconds=700, interval_seconds=300,
|
||||
staleness_ratio=2.33, last_timestamp=time.time() - 700,
|
||||
pid=1, raw_status="stale",
|
||||
message="Silent for 11m 40s (2.3x interval of 5m 0s)",
|
||||
)
|
||||
|
||||
def test_overall_healthy(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._fresh_job()],
|
||||
)
|
||||
assert report.overall_healthy is True
|
||||
|
||||
def test_overall_unhealthy(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._fresh_job(), self._stale_job()],
|
||||
)
|
||||
assert report.overall_healthy is False
|
||||
assert len(report.stale_jobs) == 1
|
||||
|
||||
def test_panel_markdown_contains_table(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._fresh_job("alpha"), self._stale_job("beta")],
|
||||
)
|
||||
panel = report.to_panel_markdown()
|
||||
assert "## Heartbeat Panel" in panel
|
||||
assert "| `alpha` |" in panel
|
||||
assert "| `beta` |" in panel
|
||||
assert "STALE" in panel
|
||||
assert "OK" in panel
|
||||
assert "**Overall:** ALERT" in panel
|
||||
|
||||
def test_panel_markdown_no_jobs(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[],
|
||||
)
|
||||
panel = report.to_panel_markdown()
|
||||
assert "no heartbeat files found" in panel
|
||||
|
||||
def test_panel_overall_ok(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._fresh_job()],
|
||||
)
|
||||
panel = report.to_panel_markdown()
|
||||
assert "**Overall:** OK" in panel
|
||||
|
||||
def test_alert_body_lists_stale_jobs(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._stale_job("slow")],
|
||||
)
|
||||
body = report.to_alert_body()
|
||||
assert "slow" in body
|
||||
assert "STALE" in body.upper() or "stale" in body.lower() or "silent" in body.lower()
|
||||
assert "crontab" in body.lower()
|
||||
|
||||
def test_to_json(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._fresh_job()],
|
||||
)
|
||||
data = report.to_json()
|
||||
assert data["healthy"] is True
|
||||
assert len(data["jobs"]) == 1
|
||||
assert data["jobs"][0]["job"] == "j"
|
||||
|
||||
|
||||
class TestFmtDuration:
|
||||
def test_seconds(self):
|
||||
assert _fmt_duration(45) == "45s"
|
||||
|
||||
def test_minutes(self):
|
||||
assert _fmt_duration(90) == "1m 30s"
|
||||
|
||||
def test_hours(self):
|
||||
assert _fmt_duration(3661) == "1h 1m"
|
||||
|
||||
|
||||
class TestBuildReport:
|
||||
def test_build_report_with_dir(self, tmp_path):
|
||||
"""build_report() uses the given directory."""
|
||||
(tmp_path / "myjob.last").write_text(json.dumps({
|
||||
"job": "myjob",
|
||||
"timestamp": time.time(),
|
||||
"interval_seconds": 300,
|
||||
"pid": 1,
|
||||
"status": "ok",
|
||||
}))
|
||||
report = build_report(directory=tmp_path)
|
||||
assert len(report.jobs) == 1
|
||||
assert report.overall_healthy is True
|
||||
|
||||
def test_build_report_empty_dir(self, tmp_path):
|
||||
report = build_report(directory=tmp_path)
|
||||
assert report.jobs == []
|
||||
assert report.overall_healthy is True # nothing stale = healthy
|
||||
|
||||
|
||||
# ── Integration: nexus_watchdog writes its own heartbeat ─────────────
|
||||
|
||||
class TestWatchdogHeartbeatIntegration:
|
||||
"""Verify nexus_watchdog.py writes a cron heartbeat after run_once()."""
|
||||
|
||||
def test_watchdog_writes_cron_heartbeat(self, tmp_path, monkeypatch):
|
||||
"""After run_once, nexus_watchdog writes nexus_watchdog.last."""
|
||||
monkeypatch.setenv("BEZALEL_HEARTBEAT_DIR", str(tmp_path))
|
||||
|
||||
# Load watchdog module
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"_watchdog_hb_test",
|
||||
PROJECT_ROOT / "bin" / "nexus_watchdog.py",
|
||||
)
|
||||
wd = importlib.util.module_from_spec(spec)
|
||||
sys.modules["_watchdog_hb_test"] = wd
|
||||
|
||||
# Patch out network calls
|
||||
with patch("socket.socket") as mock_sock, \
|
||||
patch("subprocess.run") as mock_run:
|
||||
mock_sock.return_value.connect_ex.return_value = 111 # port closed
|
||||
mock_run.return_value = MagicMock(returncode=1, stdout="")
|
||||
|
||||
spec.loader.exec_module(wd)
|
||||
|
||||
args = MagicMock()
|
||||
args.ws_host = "localhost"
|
||||
args.ws_port = 8765
|
||||
args.heartbeat_path = str(tmp_path / "nexus_heartbeat.json")
|
||||
args.stale_threshold = 300
|
||||
args.dry_run = True # don't touch Gitea
|
||||
|
||||
wd.run_once(args)
|
||||
|
||||
hb_file = tmp_path / "nexus_watchdog.last"
|
||||
assert hb_file.exists(), "nexus_watchdog.last was not written"
|
||||
data = json.loads(hb_file.read_text())
|
||||
assert data["job"] == "nexus_watchdog"
|
||||
assert data["interval_seconds"] == 300
|
||||
Reference in New Issue
Block a user