248 lines
8.2 KiB
Python
248 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Night Watch — Bezalel nightly report generator.
|
|
|
|
Runs once per night (typically at 03:00 local time via cron) and writes a
|
|
markdown report to ``reports/bezalel/nightly/<YYYY-MM-DD>.md``.
|
|
|
|
The report always includes a **Heartbeat Panel** (acceptance criterion #3 of
|
|
issue #1096) so silent cron failures are visible in the morning brief.
|
|
|
|
USAGE
|
|
-----
|
|
python bin/night_watch.py # write today's report
|
|
python bin/night_watch.py --dry-run # print to stdout, don't write file
|
|
python bin/night_watch.py --date 2026-04-08 # specific date
|
|
|
|
CRONTAB
|
|
-------
|
|
0 3 * * * cd /path/to/the-nexus && python bin/night_watch.py \\
|
|
>> /var/log/bezalel/night-watch.log 2>&1
|
|
|
|
ZERO DEPENDENCIES
|
|
-----------------
|
|
Pure stdlib, plus ``check_cron_heartbeats`` from this repo (also stdlib).
|
|
|
|
Refs: #1096
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import importlib.util
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)-7s %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
logger = logging.getLogger("bezalel.night_watch")
|
|
|
|
PROJECT_ROOT = Path(__file__).parent.parent
|
|
REPORTS_DIR = PROJECT_ROOT / "reports" / "bezalel" / "nightly"
|
|
|
|
# ── Load check_cron_heartbeats without relying on sys.path hacks ──────
|
|
|
|
def _load_checker():
|
|
"""Import bin/check_cron_heartbeats.py as a module."""
|
|
spec = importlib.util.spec_from_file_location(
|
|
"_check_cron_heartbeats",
|
|
PROJECT_ROOT / "bin" / "check_cron_heartbeats.py",
|
|
)
|
|
mod = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(mod)
|
|
return mod
|
|
|
|
|
|
# ── System checks ─────────────────────────────────────────────────────
|
|
|
|
def _check_service(service_name: str) -> tuple[str, str]:
|
|
"""Return (status, detail) for a systemd service."""
|
|
try:
|
|
result = subprocess.run(
|
|
["systemctl", "is-active", service_name],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
active = result.stdout.strip()
|
|
if active == "active":
|
|
return "OK", f"{service_name} is active"
|
|
return "WARN", f"{service_name} is {active}"
|
|
except FileNotFoundError:
|
|
return "OK", f"{service_name} status unknown (systemctl not available)"
|
|
except Exception as exc:
|
|
return "WARN", f"systemctl error: {exc}"
|
|
|
|
|
|
def _check_disk(threshold_pct: int = 90) -> tuple[str, str]:
|
|
"""Return (status, detail) for disk usage on /."""
|
|
try:
|
|
usage = shutil.disk_usage("/")
|
|
pct = int(usage.used / usage.total * 100)
|
|
status = "OK" if pct < threshold_pct else "WARN"
|
|
return status, f"disk usage {pct}%"
|
|
except Exception as exc:
|
|
return "WARN", f"disk check failed: {exc}"
|
|
|
|
|
|
def _check_memory(threshold_pct: int = 90) -> tuple[str, str]:
|
|
"""Return (status, detail) for memory usage."""
|
|
try:
|
|
meminfo = Path("/proc/meminfo").read_text()
|
|
data = {}
|
|
for line in meminfo.splitlines():
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
data[parts[0].rstrip(":")] = int(parts[1])
|
|
total = data.get("MemTotal", 0)
|
|
available = data.get("MemAvailable", 0)
|
|
if total == 0:
|
|
return "OK", "memory info unavailable"
|
|
pct = int((total - available) / total * 100)
|
|
status = "OK" if pct < threshold_pct else "WARN"
|
|
return status, f"memory usage {pct}%"
|
|
except FileNotFoundError:
|
|
# Not Linux (e.g. macOS dev machine)
|
|
return "OK", "memory check skipped (not Linux)"
|
|
except Exception as exc:
|
|
return "WARN", f"memory check failed: {exc}"
|
|
|
|
|
|
def _check_gitea_reachability(gitea_url: str = "https://forge.alexanderwhitestone.com") -> tuple[str, str]:
|
|
"""Return (status, detail) for Gitea HTTPS reachability."""
|
|
import urllib.request
|
|
import urllib.error
|
|
try:
|
|
with urllib.request.urlopen(gitea_url, timeout=10) as resp:
|
|
code = resp.status
|
|
if code == 200:
|
|
return "OK", f"Alpha SSH not configured from Beta, but Gitea HTTPS is responding ({code})"
|
|
return "WARN", f"Gitea returned HTTP {code}"
|
|
except Exception as exc:
|
|
return "WARN", f"Gitea unreachable: {exc}"
|
|
|
|
|
|
def _check_world_readable_secrets() -> tuple[str, str]:
|
|
"""Return (status, detail) for world-readable sensitive files."""
|
|
sensitive_patterns = ["*.key", "*.pem", "*.secret", ".env", "*.token"]
|
|
found = []
|
|
try:
|
|
for pattern in sensitive_patterns:
|
|
for path in PROJECT_ROOT.rglob(pattern):
|
|
try:
|
|
mode = path.stat().st_mode
|
|
if mode & 0o004: # world-readable
|
|
found.append(str(path.relative_to(PROJECT_ROOT)))
|
|
except OSError:
|
|
pass
|
|
if found:
|
|
return "WARN", f"world-readable sensitive files: {', '.join(found[:3])}"
|
|
return "OK", "no sensitive recently-modified world-readable files found"
|
|
except Exception as exc:
|
|
return "WARN", f"security check failed: {exc}"
|
|
|
|
|
|
# ── Report generation ─────────────────────────────────────────────────
|
|
|
|
def generate_report(date_str: str, checker_mod) -> str:
|
|
"""Build the full nightly report markdown string."""
|
|
now_utc = datetime.now(timezone.utc)
|
|
ts = now_utc.strftime("%Y-%m-%d %02H:%M UTC")
|
|
|
|
rows: list[tuple[str, str, str]] = []
|
|
|
|
service_status, service_detail = _check_service("hermes-bezalel")
|
|
rows.append(("Service", service_status, service_detail))
|
|
|
|
disk_status, disk_detail = _check_disk()
|
|
rows.append(("Disk", disk_status, disk_detail))
|
|
|
|
mem_status, mem_detail = _check_memory()
|
|
rows.append(("Memory", mem_status, mem_detail))
|
|
|
|
gitea_status, gitea_detail = _check_gitea_reachability()
|
|
rows.append(("Alpha VPS", gitea_status, gitea_detail))
|
|
|
|
sec_status, sec_detail = _check_world_readable_secrets()
|
|
rows.append(("Security", sec_status, sec_detail))
|
|
|
|
overall = "OK" if all(r[1] == "OK" for r in rows) else "WARN"
|
|
|
|
lines = [
|
|
f"# Bezalel Night Watch — {ts}",
|
|
"",
|
|
f"**Overall:** {overall}",
|
|
"",
|
|
"| Check | Status | Detail |",
|
|
"|-------|--------|--------|",
|
|
]
|
|
for check, status, detail in rows:
|
|
lines.append(f"| {check} | {status} | {detail} |")
|
|
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("")
|
|
|
|
# ── Heartbeat Panel (acceptance criterion #1096) ──────────────────
|
|
try:
|
|
hb_report = checker_mod.build_report()
|
|
lines.append(hb_report.to_panel_markdown())
|
|
except Exception as exc:
|
|
lines += [
|
|
"## Heartbeat Panel",
|
|
"",
|
|
f"*(heartbeat check failed: {exc})*",
|
|
]
|
|
|
|
lines += [
|
|
"",
|
|
"---",
|
|
"",
|
|
"*Automated by Bezalel Night Watch*",
|
|
"",
|
|
]
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ── Entry point ───────────────────────────────────────────────────────
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Bezalel Night Watch — nightly report generator",
|
|
)
|
|
parser.add_argument(
|
|
"--date", default=None,
|
|
help="Report date as YYYY-MM-DD (default: today UTC)",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run", action="store_true",
|
|
help="Print report to stdout instead of writing to disk",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
|
|
checker = _load_checker()
|
|
report_text = generate_report(date_str, checker)
|
|
|
|
if args.dry_run:
|
|
print(report_text)
|
|
return
|
|
|
|
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
report_path = REPORTS_DIR / f"{date_str}.md"
|
|
report_path.write_text(report_text)
|
|
logger.info("Night Watch report written to %s", report_path)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|