Files
timmy-config/scripts/cron-audit.py
Alexander Whitestone b263c88259
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 21s
Smoke Test / smoke (pull_request) Failing after 14s
Validate Config / JSON Validate (pull_request) Successful in 13s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 41s
Validate Config / YAML Lint (pull_request) Failing after 12s
Validate Config / Shell Script Lint (pull_request) Failing after 25s
Validate Config / Cron Syntax Check (pull_request) Successful in 8s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 11s
Validate Config / Playbook Schema Validation (pull_request) Successful in 20s
PR Checklist / pr-checklist (pull_request) Failing after 3m46s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
feat: cron job audit — categorize errors, disable stale (#662)
2026-04-17 05:29:46 +00:00

237 lines
7.8 KiB
Python

#!/usr/bin/env python3
"""
cron-audit.py — Audit cron jobs across the fleet for error rates.
Categorizes jobs as healthy, transient errors, or systemic failures.
Can disable jobs that have been erroring for 48+ hours.
Usage:
python3 scripts/cron-audit.py --report # Full audit report
python3 scripts/cron-audit.py --disable-stale # Disable 48h+ erroring jobs
python3 scripts/cron-audit.py --json # Machine-readable
"""
import argparse
import json
import os
import sys
import urllib.request
from datetime import datetime, timezone, timedelta
from pathlib import Path
GITEA_URL = "https://forge.alexanderwhitestone.com"
GITEA_TOKEN_PATH = Path.home() / ".config" / "gitea" / "token"
# Cron config locations
CRON_DIRS = [
Path.home() / "timmy-config" / "cron",
Path.home() / "fleet-ops" / "cron",
Path.home() / "burn-fleet" / "cron",
]
CRON_EXTENSIONS = {".yml", ".yaml", ".json"}
def load_token() -> str:
if GITEA_TOKEN_PATH.exists():
return GITEA_TOKEN_PATH.read_text().strip()
return os.environ.get("GITEA_TOKEN", "")
def api_get(path: str, token: str) -> list | dict:
req = urllib.request.Request(
f"{GITEA_URL}/api/v1{path}",
headers={"Authorization": f"token {token}"}
)
try:
return json.loads(urllib.request.urlopen(req, timeout=20).read())
except Exception:
return []
def find_cron_configs() -> list[dict]:
"""Find all cron job config files across repos."""
configs = []
seen_repos = set()
for d in CRON_DIRS:
if not d.exists():
continue
for f in sorted(d.rglob("*")):
if f.suffix in CRON_EXTENSIONS and f.is_file():
try:
content = f.read_text(errors="ignore")
# Simple YAML-like parse for name/schedule
name = ""
schedule = ""
for line in content.split("\n"):
line = line.strip()
if line.startswith("name:") or line.startswith("- name:"):
name = line.split(":", 1)[1].strip().strip("'\"")
if line.startswith("schedule:"):
schedule = line.split(":", 1)[1].strip().strip("'\"")
if name:
configs.append({
"name": name,
"file": str(f),
"schedule": schedule,
"repo": d.parent.name if d.parent != Path.home() else "local",
})
except OSError:
pass
return configs
def check_hermes_cron(token: str) -> list[dict]:
"""Check cron jobs via hermes CLI (if available) or Gitea cron configs."""
jobs = []
# Try hermes cron list
try:
import subprocess
result = subprocess.run(
["hermes", "cron", "list", "--json"],
capture_output=True, text=True, timeout=15
)
if result.returncode == 0:
hermes_jobs = json.loads(result.stdout)
for job in hermes_jobs:
jobs.append({
"name": job.get("name", "unknown"),
"schedule": job.get("schedule", "unknown"),
"last_status": job.get("last_status", "unknown"),
"last_run": job.get("last_run", None),
"error_count": job.get("error_count", 0),
"enabled": job.get("enabled", True),
"source": "hermes",
})
except (FileNotFoundError, subprocess.TimeoutExpired, json.JSONDecodeError):
pass
# Also scan local cron configs
configs = find_cron_configs()
for config in configs:
# Check if already in hermes list
if not any(j["name"] == config["name"] for j in jobs):
jobs.append({
"name": config["name"],
"schedule": config["schedule"],
"last_status": "unknown",
"last_run": None,
"error_count": 0,
"enabled": True,
"source": "config",
"file": config["file"],
})
return jobs
def categorize_jobs(jobs: list[dict]) -> dict:
"""Categorize jobs into healthy, transient, systemic."""
healthy = []
transient = []
systemic = []
unknown = []
for job in jobs:
status = job.get("last_status", "unknown")
error_count = job.get("error_count", 0)
if status == "ok" or status == "success":
healthy.append(job)
elif status == "error" and error_count <= 3:
transient.append(job)
elif status == "error" and error_count > 3:
systemic.append(job)
elif status == "unknown":
unknown.append(job)
else:
transient.append(job)
return {
"healthy": healthy,
"transient": transient,
"systemic": systemic,
"unknown": unknown,
}
def cmd_report(token: str, as_json: bool = False):
jobs = check_hermes_cron(token)
categories = categorize_jobs(jobs)
if as_json:
print(json.dumps({"total": len(jobs), "categories": {k: len(v) for k, v in categories.items()}, "jobs": jobs}, indent=2))
return
print(f"CRON AUDIT — {len(jobs)} jobs total")
print(f"{'='*50}")
print(f" Healthy: {len(categories['healthy'])}")
print(f" Transient: {len(categories['transient'])} (retry likely)")
print(f" Systemic: {len(categories['systemic'])} (needs investigation)")
print(f" Unknown: {len(categories['unknown'])} (no status data)")
if categories["systemic"]:
print(f"\nSYSTEMIC FAILURES:")
for job in categories["systemic"]:
print(f"{job['name']}: errors={job.get('error_count', '?')} last={job.get('last_run', '?')}")
if categories["transient"]:
print(f"\nTRANSIENT ERRORS:")
for job in categories["transient"][:10]:
print(f" ⚠️ {job['name']}: status={job.get('last_status', '?')}")
if categories["unknown"]:
print(f"\nUNKNOWN STATUS ({len(categories['unknown'])} jobs):")
for job in categories["unknown"][:10]:
src = job.get("source", "?")
print(f"{job['name']} (source: {src})")
def cmd_disable_stale(token: str):
"""Disable jobs erroring for 48+ hours."""
jobs = check_hermes_cron(token)
categories = categorize_jobs(jobs)
disabled = 0
for job in categories["systemic"]:
last_run = job.get("last_run")
if last_run:
try:
last_dt = datetime.fromisoformat(last_run.replace("Z", "+00:00"))
age_hours = (datetime.now(timezone.utc) - last_dt).total_seconds() / 3600
if age_hours > 48:
print(f"DISABLING {job['name']}: erroring for {age_hours:.0f}h")
# hermes cron disable JOB_NAME
try:
import subprocess
subprocess.run(["hermes", "cron", "disable", job["name"]], capture_output=True, timeout=10)
disabled += 1
except FileNotFoundError:
print(f" (hermes CLI not available, skipping)")
except ValueError:
pass
print(f"\nDisabled: {disabled} jobs")
def main():
parser = argparse.ArgumentParser(description="Cron job audit")
parser.add_argument("--report", action="store_true")
parser.add_argument("--disable-stale", action="store_true")
parser.add_argument("--json", action="store_true", dest="as_json")
args = parser.parse_args()
token = load_token()
if args.disable_stale:
cmd_disable_stale(token)
else:
cmd_report(token, args.as_json)
if __name__ == "__main__":
main()