Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 33s
PR Checklist / pr-checklist (pull_request) Failing after 4m27s
Smoke Test / smoke (pull_request) Failing after 20s
Validate Config / YAML Lint (pull_request) Failing after 17s
Validate Config / JSON Validate (pull_request) Successful in 16s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m33s
Validate Config / Shell Script Lint (pull_request) Failing after 42s
Validate Config / Cron Syntax Check (pull_request) Successful in 9s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
Validate Config / Playbook Schema Validation (pull_request) Successful in 22s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
- scripts/cron-audit-662.py: Full audit tool that categorizes cron jobs into healthy/transient/systemic based on error age (48h threshold) Flags: --disable (pause systemic), --issues (file Gitea issues), --output (JSON report), --jobs-file (override path) Reads from ~/.hermes/cron/ or falls back to 'hermes cron list' - scripts/cron_audit_662.py: Symlink for importability - tests/test_cron_audit.py: 9 tests covering categorization logic: healthy (ok, never-run, paused, completed), transient (recent error), systemic (old error, boundary cases), mixed audit report - cron/audit-report.json: Initial audit of local jobs.json Result: 7/7 healthy, 0 transient, 0 systemic
334 lines
11 KiB
Python
334 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Cron Fleet Audit Script — #662
|
|
|
|
Reads hermes cron job state, categorizes all jobs into:
|
|
- healthy: last_status=ok or never-run-and-enabled
|
|
- transient: recent errors (likely network/timeout)
|
|
- systemic: repeated errors over 48+ hours
|
|
|
|
Outputs a JSON report and optionally:
|
|
--disable Disable systemic jobs erroring 48+ hours
|
|
--issues File Gitea issues for systemic failures
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import os
|
|
import argparse
|
|
from datetime import datetime, timezone, timedelta
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
|
|
# --- Config ---
|
|
ERROR_THRESHOLD_HOURS = 48
|
|
CRON_STATE_PATHS = [
|
|
Path.home() / ".hermes" / "cron" / "jobs.json",
|
|
Path.home() / ".hermes" / "cron" / "state.json",
|
|
Path("/root/.hermes/cron/jobs.json"),
|
|
Path("/root/.hermes/cron/state.json"),
|
|
]
|
|
|
|
def load_cron_state() -> List[Dict[str, Any]]:
|
|
"""Load cron job state from known locations."""
|
|
for path in CRON_STATE_PATHS:
|
|
if path.exists():
|
|
try:
|
|
with open(path) as f:
|
|
data = json.load(f)
|
|
if isinstance(data, dict) and "jobs" in data:
|
|
return data["jobs"]
|
|
if isinstance(data, list):
|
|
return data
|
|
except (json.JSONDecodeError, IOError):
|
|
continue
|
|
|
|
# Fallback: try hermes cron list CLI
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(
|
|
["hermes", "cron", "list", "--json"],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
if result.returncode == 0:
|
|
data = json.loads(result.stdout)
|
|
if isinstance(data, dict) and "jobs" in data:
|
|
return data["jobs"]
|
|
if isinstance(data, list):
|
|
return data
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError):
|
|
pass
|
|
|
|
return []
|
|
|
|
|
|
def parse_timestamp(ts: str) -> datetime:
|
|
"""Parse ISO timestamp, handle various formats."""
|
|
if not ts:
|
|
return None
|
|
# Normalize timezone
|
|
ts = ts.replace("+00:00", "+00:00")
|
|
try:
|
|
dt = datetime.fromisoformat(ts)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def categorize_job(job: Dict[str, Any], now: datetime) -> Dict[str, Any]:
|
|
"""Categorize a single job."""
|
|
status = job.get("last_status", "")
|
|
last_error = job.get("last_error", "")
|
|
last_run = parse_timestamp(job.get("last_run_at"))
|
|
enabled = job.get("enabled", False)
|
|
state = job.get("state", "unknown")
|
|
name = job.get("name", job.get("id", "unknown"))
|
|
|
|
entry = {
|
|
"id": job.get("id", ""),
|
|
"name": name,
|
|
"schedule": job.get("schedule_display", str(job.get("schedule", ""))),
|
|
"state": state,
|
|
"enabled": enabled,
|
|
"last_status": status,
|
|
"last_error": last_error,
|
|
"last_run_at": job.get("last_run_at"),
|
|
"category": "healthy",
|
|
"reason": "",
|
|
"action": "",
|
|
}
|
|
|
|
# Never run / no error
|
|
if status is None and not last_error:
|
|
entry["category"] = "healthy"
|
|
entry["reason"] = "Never run, no errors"
|
|
return entry
|
|
|
|
# Explicitly paused with reason
|
|
if state == "paused":
|
|
entry["category"] = "healthy"
|
|
entry["reason"] = job.get("paused_reason", "Manually paused")
|
|
entry["action"] = "none — paused intentionally"
|
|
return entry
|
|
|
|
# Completed jobs
|
|
if state == "completed":
|
|
entry["category"] = "healthy"
|
|
entry["reason"] = "Completed (one-shot)"
|
|
return entry
|
|
|
|
# Error status
|
|
if status == "error" and last_error:
|
|
age_hours = None
|
|
if last_run:
|
|
age_hours = (now - last_run).total_seconds() / 3600
|
|
|
|
if age_hours is not None and age_hours >= ERROR_THRESHOLD_HOURS:
|
|
entry["category"] = "systemic"
|
|
entry["reason"] = f"Erroring for {age_hours:.1f}h (>{ERROR_THRESHOLD_HOURS}h threshold)"
|
|
entry["action"] = "disable"
|
|
else:
|
|
entry["category"] = "transient"
|
|
age_str = f"{age_hours:.1f}h ago" if age_hours is not None else "unknown age"
|
|
entry["reason"] = f"Recent error ({age_str}), may be transient"
|
|
entry["action"] = "monitor"
|
|
return entry
|
|
|
|
# OK status
|
|
if status == "ok":
|
|
entry["category"] = "healthy"
|
|
entry["reason"] = "Last run succeeded"
|
|
return entry
|
|
|
|
# Scheduled but never errored
|
|
if state == "scheduled" and enabled:
|
|
entry["category"] = "healthy"
|
|
entry["reason"] = "Scheduled and running"
|
|
return entry
|
|
|
|
# Unknown state
|
|
entry["category"] = "transient"
|
|
entry["reason"] = f"Unknown state: {state}, status: {status}"
|
|
entry["action"] = "investigate"
|
|
return entry
|
|
|
|
|
|
def audit_jobs(jobs: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""Run full audit on job list."""
|
|
now = datetime.now(timezone.utc)
|
|
categorized = [categorize_job(j, now) for j in jobs]
|
|
|
|
healthy = [c for c in categorized if c["category"] == "healthy"]
|
|
transient = [c for c in categorized if c["category"] == "transient"]
|
|
systemic = [c for c in categorized if c["category"] == "systemic"]
|
|
|
|
report = {
|
|
"audit_time": now.isoformat(),
|
|
"total_jobs": len(jobs),
|
|
"summary": {
|
|
"healthy": len(healthy),
|
|
"transient_errors": len(transient),
|
|
"systemic_failures": len(systemic),
|
|
},
|
|
"systemic_jobs": [
|
|
{
|
|
"id": j["id"],
|
|
"name": j["name"],
|
|
"reason": j["reason"],
|
|
"last_error": j["last_error"],
|
|
}
|
|
for j in systemic
|
|
],
|
|
"transient_jobs": [
|
|
{
|
|
"id": j["id"],
|
|
"name": j["name"],
|
|
"reason": j["reason"],
|
|
}
|
|
for j in transient
|
|
],
|
|
"all_jobs": categorized,
|
|
}
|
|
|
|
return report
|
|
|
|
|
|
def generate_issue_body(job: Dict[str, Any]) -> str:
|
|
"""Generate a Gitea issue body for a systemic cron failure."""
|
|
return f"""## Systemic Cron Failure — Auto-Filed by Audit #662
|
|
|
|
**Job:** {job['name']} (`{job['id']}`)
|
|
**Schedule:** {job['schedule']}
|
|
**State:** {job['state']}
|
|
**Last Error:**
|
|
```
|
|
{job['last_error'] or 'No error details available'}
|
|
```
|
|
|
|
**Audit Finding:** {job['reason']}
|
|
|
|
### Action Required
|
|
- [ ] Diagnose root cause of repeated failure
|
|
- [ ] Fix configuration or remove broken job
|
|
- [ ] Verify job resumes healthy after fix
|
|
|
|
*Auto-generated by cron-audit-662.py*
|
|
"""
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Cron fleet audit (#662)")
|
|
parser.add_argument("--jobs-file", help="Path to jobs.json override")
|
|
parser.add_argument("--disable", action="store_true",
|
|
help="Disable systemic jobs (requires hermes CLI)")
|
|
parser.add_argument("--issues", action="store_true",
|
|
help="File Gitea issues for systemic failures")
|
|
parser.add_argument("--output", help="Write report to file")
|
|
parser.add_argument("--json", action="store_true", help="JSON output only")
|
|
args = parser.parse_args()
|
|
|
|
# Load jobs
|
|
jobs = []
|
|
if args.jobs_file:
|
|
with open(args.jobs_file) as f:
|
|
data = json.load(f)
|
|
jobs = data.get("jobs", data) if isinstance(data, dict) else data
|
|
else:
|
|
jobs = load_cron_state()
|
|
|
|
if not jobs:
|
|
print("ERROR: No cron jobs found. Check ~/.hermes/cron/ or run 'hermes cron list'.")
|
|
sys.exit(1)
|
|
|
|
# Run audit
|
|
report = audit_jobs(jobs)
|
|
|
|
# Output
|
|
if args.json:
|
|
print(json.dumps(report, indent=2))
|
|
else:
|
|
print(f"\n{'='*60}")
|
|
print(f" CRON FLEET AUDIT — {report['total_jobs']} jobs")
|
|
print(f"{'='*60}")
|
|
print(f" Healthy: {report['summary']['healthy']}")
|
|
print(f" Transient errors: {report['summary']['transient_errors']}")
|
|
print(f" Systemic failures: {report['summary']['systemic_failures']}")
|
|
print(f"{'='*60}")
|
|
|
|
if report["systemic_jobs"]:
|
|
print(f"\n SYSTEMIC FAILURES (>{ERROR_THRESHOLD_HOURS}h):")
|
|
for j in report["systemic_jobs"]:
|
|
print(f" - {j['name']} ({j['id']}): {j['reason']}")
|
|
if j["last_error"]:
|
|
print(f" Error: {j['last_error'][:100]}")
|
|
|
|
if report["transient_jobs"]:
|
|
print(f"\n TRANSIENT ERRORS:")
|
|
for j in report["transient_jobs"]:
|
|
print(f" - {j['name']} ({j['id']}): {j['reason']}")
|
|
|
|
print()
|
|
|
|
# Write report file
|
|
if args.output:
|
|
with open(args.output, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
print(f"Report written to {args.output}")
|
|
|
|
# Disable systemic jobs
|
|
if args.disable and report["systemic_jobs"]:
|
|
import subprocess
|
|
for j in report["systemic_jobs"]:
|
|
print(f"Disabling: {j['name']} ({j['id']})")
|
|
try:
|
|
subprocess.run(
|
|
["hermes", "cron", "pause", j["id"]],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
print(f" → Disabled")
|
|
except Exception as e:
|
|
print(f" → Failed: {e}")
|
|
|
|
# File issues for systemic failures
|
|
if args.issues and report["systemic_jobs"]:
|
|
gitea_token = os.environ.get("GITEA_TOKEN") or ""
|
|
if not gitea_token:
|
|
token_path = Path.home() / ".config" / "gitea" / "token"
|
|
if token_path.exists():
|
|
gitea_token = token_path.read_text().strip()
|
|
|
|
if not gitea_token:
|
|
print("ERROR: No Gitea token found. Set GITEA_TOKEN or ~/.config/gitea/token")
|
|
sys.exit(1)
|
|
|
|
import urllib.request
|
|
base = "https://forge.alexanderwhitestone.com/api/v1"
|
|
headers = {
|
|
"Authorization": f"token {gitea_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
for j in report["systemic_jobs"]:
|
|
title = f"CRON FAIL: {j['name']} — systemic error ({j['id']})"
|
|
body = generate_issue_body(j)
|
|
data = json.dumps({"title": title, "body": body}).encode()
|
|
req = urllib.request.Request(
|
|
f"{base}/repos/Timmy_Foundation/timmy-config/issues",
|
|
data=data, headers=headers, method="POST"
|
|
)
|
|
try:
|
|
resp = urllib.request.urlopen(req)
|
|
result = json.loads(resp.read())
|
|
print(f"Issued #{result['number']}: {title}")
|
|
except Exception as e:
|
|
print(f"Failed to file issue for {j['name']}: {e}")
|
|
|
|
# Exit code: non-zero if systemic failures found
|
|
sys.exit(1 if report["systemic_jobs"] else 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|