Files
timmy-config/scripts/cron-audit-662.py
STEP35 FREE BURN d913be508b
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 22s
Smoke Test / smoke (pull_request) Failing after 21s
Validate Config / YAML Lint (pull_request) Failing after 15s
Validate Config / JSON Validate (pull_request) Successful in 18s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 57s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 13s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 14s
Validate Config / Shell Script Lint (pull_request) Failing after 53s
Validate Config / Playbook Schema Validation (pull_request) Successful in 25s
Architecture Lint / Lint Repository (pull_request) Failing after 20s
PR Checklist / pr-checklist (pull_request) Failing after 3m32s
fix(#442): deprecate ad-hoc recovery scripts; add ansible structure tests
- Add DEPRECATION headers to overlapping ad-hoc recovery scripts:
  - config_drift.py, config_drift_detector.py → golden_state role
  - self_healing.py → deadman_switch role
  - reset_pipeline_state.py → agent_startup playbook
  - provision_wizard.py → site.yml + wizard_base role
  - cron_audit.py, cron-audit-662.py → cron_manager role

- Add tests/test_ansible_structure.py (19 pytest checks) validating
  the complete canonical ansible/ structure required by #442

- Update docs/CANONICAL_SERVICES.md with explicit supersession table
  mapping each ad-hoc mechanism to its Ansible replacement

- ansible/README.md: document webhook setup as required step

This establishes ansible/ as the single canonical source of truth
for fleet lifecycle management and formally retires the overlapping
ad-hoc recovery mechanisms.

Closes #442
2026-04-29 01:41:34 -04:00

448 lines
15 KiB
Python

#!/usr/bin/env python3
"""
cron-audit-662.py — DEPRECATED
Cron job auditing is superseded by source-controlled Ansible cron_manager.
Superseded by: ansible/playbooks/cron_schedule.yml + ansible/roles/cron_manager
DO NOT USE. Use the Ansible canonical playbook instead:
ansible-playbook -i ansible/inventory/hosts.yml ansible/playbooks/cron_schedule.yml
This script remains only for historical reference. It will be removed after
fleet-wide migration is verified (issue timmy-config #442).
Issue: timmy-config #442 — [P2] Ansible IaC — Canonical Playbook for Fleet Management
Deprecated: 2026-04-29 (STEP35 FREE BURN)
"""
#!/usr/bin/env python3
"""
Cron Fleet Audit Script — #662
Reads hermes cron job state, categorizes all jobs into:
- healthy: last_status=ok or never-run-and-enabled
- transient: recent errors (likely network/timeout)
- systemic: repeated errors over 48+ hours
Outputs a JSON report and optionally:
--disable Disable systemic jobs erroring 48+ hours
--issues File Gitea issues for systemic failures
"""
import json
import sys
import os
import argparse
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import List, Dict, Any
# --- Config ---
ERROR_THRESHOLD_HOURS = 48
CRON_STATE_PATHS = [
Path.home() / ".hermes" / "cron" / "jobs.json",
Path.home() / ".hermes" / "cron" / "state.json",
Path("/root/.hermes/cron/jobs.json"),
Path("/root/.hermes/cron/state.json"),
]
def load_cron_state() -> List[Dict[str, Any]]:
"""Load cron job state from known locations."""
for path in CRON_STATE_PATHS:
if path.exists():
try:
with open(path) as f:
data = json.load(f)
if isinstance(data, dict) and "jobs" in data:
return data["jobs"]
if isinstance(data, list):
return data
except (json.JSONDecodeError, IOError):
continue
# Fallback: try hermes cron list CLI
try:
import subprocess
result = subprocess.run(
["hermes", "cron", "list", "--json"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
data = json.loads(result.stdout)
if isinstance(data, dict) and "jobs" in data:
return data["jobs"]
if isinstance(data, list):
return data
except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError):
pass
return []
def parse_timestamp(ts: str) -> datetime:
"""Parse ISO timestamp, handle various formats."""
if not ts:
return None
# Normalize timezone
ts = ts.replace("+00:00", "+00:00")
try:
dt = datetime.fromisoformat(ts)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return None
def categorize_job(job: Dict[str, Any], now: datetime) -> Dict[str, Any]:
"""Categorize a single job."""
status = job.get("last_status", "")
last_error = job.get("last_error", "")
last_run = parse_timestamp(job.get("last_run_at"))
enabled = job.get("enabled", False)
state = job.get("state", "unknown")
name = job.get("name", job.get("id", "unknown"))
entry = {
"id": job.get("id", ""),
"name": name,
"schedule": job.get("schedule_display", str(job.get("schedule", ""))),
"state": state,
"enabled": enabled,
"last_status": status,
"last_error": last_error,
"last_run_at": job.get("last_run_at"),
"category": "healthy",
"reason": "",
"action": "",
}
# Never run / no error
if status is None and not last_error:
entry["category"] = "healthy"
entry["reason"] = "Never run, no errors"
return entry
# Explicitly paused with reason
if state == "paused":
entry["category"] = "healthy"
entry["reason"] = job.get("paused_reason", "Manually paused")
entry["action"] = "none — paused intentionally"
return entry
# Completed jobs
if state == "completed":
entry["category"] = "healthy"
entry["reason"] = "Completed (one-shot)"
return entry
# Error status
if status == "error" and last_error:
age_hours = None
if last_run:
age_hours = (now - last_run).total_seconds() / 3600
if age_hours is not None and age_hours >= ERROR_THRESHOLD_HOURS:
entry["category"] = "systemic"
entry["reason"] = f"Erroring for {age_hours:.1f}h (>{ERROR_THRESHOLD_HOURS}h threshold)"
entry["action"] = "disable"
else:
entry["category"] = "transient"
age_str = f"{age_hours:.1f}h ago" if age_hours is not None else "unknown age"
entry["reason"] = f"Recent error ({age_str}), may be transient"
entry["action"] = "monitor"
return entry
# OK status
if status == "ok":
entry["category"] = "healthy"
entry["reason"] = "Last run succeeded"
return entry
# Scheduled but never errored
if state == "scheduled" and enabled:
entry["category"] = "healthy"
entry["reason"] = "Scheduled and running"
return entry
# Unknown state
entry["category"] = "transient"
entry["reason"] = f"Unknown state: {state}, status: {status}"
entry["action"] = "investigate"
return entry
def audit_jobs(jobs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Run full audit on job list."""
now = datetime.now(timezone.utc)
categorized = [categorize_job(j, now) for j in jobs]
healthy = [c for c in categorized if c["category"] == "healthy"]
transient = [c for c in categorized if c["category"] == "transient"]
systemic = [c for c in categorized if c["category"] == "systemic"]
report = {
"audit_time": now.isoformat(),
"total_jobs": len(jobs),
"summary": {
"healthy": len(healthy),
"transient_errors": len(transient),
"systemic_failures": len(systemic),
},
"systemic_jobs": [
{
"id": j["id"],
"name": j["name"],
"reason": j["reason"],
"last_error": j["last_error"],
}
for j in systemic
],
"transient_jobs": [
{
"id": j["id"],
"name": j["name"],
"reason": j["reason"],
}
for j in transient
],
"all_jobs": categorized,
}
return report
def generate_issue_body(job: Dict[str, Any]) -> str:
"""Generate a Gitea issue body for a systemic cron failure."""
return f"""## Systemic Cron Failure — Auto-Filed by Audit #662
**Job:** {job['name']} (`{job['id']}`)
**Schedule:** {job['schedule']}
**State:** {job['state']}
**Last Error:**
```
{job['last_error'] or 'No error details available'}
```
**Audit Finding:** {job['reason']}
### Action Required
- [ ] Diagnose root cause of repeated failure
- [ ] Fix configuration or remove broken job
- [ ] Verify job resumes healthy after fix
*Auto-generated by cron-audit-662.py*
"""
# --- Crontab Parsing ---
def parse_crontab(text: str, source: str = "unknown") -> list:
"""Parse a crontab file into job-like dicts for audit."""
import re
jobs = []
cron_pattern = re.compile(
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)\s+(.*)'
)
schedule_pattern = re.compile(
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)'
)
for line in text.split("\n"):
line = line.strip()
if not line or line.startswith("#"):
continue
sm = schedule_pattern.match(line)
cm = cron_pattern.match(line)
if not sm or not cm:
continue
schedule_raw = sm.group(0).strip()
command = cm.group(1).strip()
name_part = command.split("#")[-1].strip() if "#" in command else ""
if not name_part:
cmd_base = command.split(">>")[0].strip().split()
name_part = os.path.basename(cmd_base[-1]) if cmd_base else "unnamed"
clean_cmd = command.split(">>")[0].split("#")[0].strip()[:200]
jobs.append({
"id": f"crontab-{source}-{hash(command) % 10000:04x}",
"name": name_part,
"schedule_display": schedule_raw,
"schedule": schedule_raw,
"state": "scheduled",
"enabled": True,
"last_status": None,
"last_error": None,
"last_run_at": None,
"_source": f"crontab:{source}",
"_command": clean_cmd,
})
return jobs
def load_crontab_backups(backup_dir) -> list:
"""Load cron jobs from VPS crontab backup files."""
from pathlib import Path
backup_dir = Path(backup_dir)
all_jobs = []
if not backup_dir.exists():
return all_jobs
for fpath in sorted(backup_dir.glob("*-crontab-backup.txt")):
source = fpath.name.replace("-crontab-backup.txt", "")
text = fpath.read_text()
all_jobs.extend(parse_crontab(text, source=source))
return all_jobs
def audit_fleet(hermes_jobs: list, crontab_jobs: list) -> dict:
"""Run full fleet audit combining hermes cron + VPS crontabs."""
now = datetime.now(timezone.utc)
hermes_categorized = [categorize_job(j, now) for j in hermes_jobs]
crontab_categorized = []
for j in crontab_jobs:
crontab_categorized.append({
"id": j["id"], "name": j["name"],
"schedule": j.get("schedule_display", ""),
"state": "scheduled", "enabled": True,
"last_status": None, "last_error": None, "last_run_at": None,
"category": "healthy",
"reason": f"Crontab ({j.get('_source', '?')}) — verify logs manually",
"action": "verify-logs",
})
all_cat = hermes_categorized + crontab_categorized
healthy = [c for c in all_cat if c["category"] == "healthy"]
transient = [c for c in all_cat if c["category"] == "transient"]
systemic = [c for c in all_cat if c["category"] == "systemic"]
return {
"audit_time": now.isoformat(),
"total_jobs": len(all_cat),
"hermes_jobs": len(hermes_categorized),
"crontab_jobs": len(crontab_categorized),
"summary": {"healthy": len(healthy), "transient_errors": len(transient), "systemic_failures": len(systemic)},
"systemic_jobs": [{"id": j["id"], "name": j["name"], "reason": j["reason"], "last_error": j.get("last_error", "")} for j in systemic],
"transient_jobs": [{"id": j["id"], "name": j["name"], "reason": j["reason"]} for j in transient],
"all_jobs": all_cat,
}
def main():
parser = argparse.ArgumentParser(description="Cron fleet audit (#662)")
parser.add_argument("--jobs-file", help="Path to jobs.json override")
parser.add_argument("--disable", action="store_true",
help="Disable systemic jobs (requires hermes CLI)")
parser.add_argument("--issues", action="store_true",
help="File Gitea issues for systemic failures")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--json", action="store_true", help="JSON output only")
args = parser.parse_args()
# Load jobs
jobs = []
if args.jobs_file:
with open(args.jobs_file) as f:
data = json.load(f)
jobs = data.get("jobs", data) if isinstance(data, dict) else data
else:
jobs = load_cron_state()
# Also load VPS crontab backups
crontab_dir = Path(__file__).parent.parent / "cron" / "vps"
crontab_jobs = load_crontab_backups(crontab_dir)
if not jobs:
print("ERROR: No cron jobs found. Check ~/.hermes/cron/ or run 'hermes cron list'.")
sys.exit(1)
# Run audit
if crontab_jobs:
report = audit_fleet(jobs, crontab_jobs)
else:
report = audit_jobs(jobs)
# Output
if args.json:
print(json.dumps(report, indent=2))
else:
print(f"\n{'='*60}")
print(f" CRON FLEET AUDIT — {report['total_jobs']} jobs")
print(f"{'='*60}")
print(f" Healthy: {report['summary']['healthy']}")
print(f" Transient errors: {report['summary']['transient_errors']}")
print(f" Systemic failures: {report['summary']['systemic_failures']}")
print(f"{'='*60}")
if report["systemic_jobs"]:
print(f"\n SYSTEMIC FAILURES (>{ERROR_THRESHOLD_HOURS}h):")
for j in report["systemic_jobs"]:
print(f" - {j['name']} ({j['id']}): {j['reason']}")
if j["last_error"]:
print(f" Error: {j['last_error'][:100]}")
if report["transient_jobs"]:
print(f"\n TRANSIENT ERRORS:")
for j in report["transient_jobs"]:
print(f" - {j['name']} ({j['id']}): {j['reason']}")
print()
# Write report file
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"Report written to {args.output}")
# Disable systemic jobs
if args.disable and report["systemic_jobs"]:
import subprocess
for j in report["systemic_jobs"]:
print(f"Disabling: {j['name']} ({j['id']})")
try:
subprocess.run(
["hermes", "cron", "pause", j["id"]],
capture_output=True, text=True, timeout=10
)
print(f" → Disabled")
except Exception as e:
print(f" → Failed: {e}")
# File issues for systemic failures
if args.issues and report["systemic_jobs"]:
gitea_token = os.environ.get("GITEA_TOKEN") or ""
if not gitea_token:
token_path = Path.home() / ".config" / "gitea" / "token"
if token_path.exists():
gitea_token = token_path.read_text().strip()
if not gitea_token:
print("ERROR: No Gitea token found. Set GITEA_TOKEN or ~/.config/gitea/token")
sys.exit(1)
import urllib.request
base = "https://forge.alexanderwhitestone.com/api/v1"
headers = {
"Authorization": f"token {gitea_token}",
"Content-Type": "application/json",
}
for j in report["systemic_jobs"]:
title = f"CRON FAIL: {j['name']} — systemic error ({j['id']})"
body = generate_issue_body(j)
data = json.dumps({"title": title, "body": body}).encode()
req = urllib.request.Request(
f"{base}/repos/Timmy_Foundation/timmy-config/issues",
data=data, headers=headers, method="POST"
)
try:
resp = urllib.request.urlopen(req)
result = json.loads(resp.read())
print(f"Issued #{result['number']}: {title}")
except Exception as e:
print(f"Failed to file issue for {j['name']}: {e}")
# Exit code: non-zero if systemic failures found
sys.exit(1 if report["systemic_jobs"] else 0)
if __name__ == "__main__":
main()