fix(#662): cron fleet audit — crontab parsing, tests, CI validation
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 24s
Smoke Test / smoke (pull_request) Failing after 14s
Validate Config / YAML Lint (pull_request) Failing after 14s
Validate Config / JSON Validate (pull_request) Successful in 16s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 46s
Validate Config / Cron Syntax Check (pull_request) Successful in 8s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 7s
Validate Config / Shell Script Lint (pull_request) Failing after 44s
Validate Config / Playbook Schema Validation (pull_request) Successful in 22s
PR Checklist / pr-checklist (pull_request) Failing after 3m55s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled

- Added VPS crontab backup parsing to cron-audit-662.py
- New audit_fleet() combines hermes cron + VPS crontabs
- load_crontab_backups() reads cron/vps/*-crontab-backup.txt
- 20+ tests: crontab parsing, job categorization, fleet audit,
  timestamp parsing, backup loading
- ci-cron-validate.py: CI gate that fails on systemic failures
- Fresh audit report generated in cron/audit-report.json

Closes #662
This commit is contained in:
Alexander Whitestone
2026-04-17 01:34:45 -04:00
parent 6fbf5bb649
commit d278d7f5d5
4 changed files with 561 additions and 94 deletions

View File

@@ -0,0 +1,22 @@
#!/usr/bin/env python3
"""CI: Validate cron fleet health. Exit 1 on systemic failures."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from cron_audit_662 import audit_fleet, load_cron_state, load_crontab_backups
def main():
hermes = load_cron_state()
crontabs = load_crontab_backups(Path(__file__).parent.parent / "cron" / "vps")
report = audit_fleet(hermes, crontabs)
print(f"Cron Audit CI: {report['total_jobs']} jobs — H:{report['summary']['healthy']} T:{report['summary']['transient_errors']} S:{report['summary']['systemic_failures']}")
if report["systemic_jobs"]:
for j in report["systemic_jobs"]:
print(f" FAIL: {j['name']}{j['reason']}")
sys.exit(1)
print("CI PASSED")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -218,6 +218,95 @@ def generate_issue_body(job: Dict[str, Any]) -> str:
"""
# --- Crontab Parsing ---
def parse_crontab(text: str, source: str = "unknown") -> list:
"""Parse a crontab file into job-like dicts for audit."""
import re
jobs = []
cron_pattern = re.compile(
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)\s+(.*)'
)
schedule_pattern = re.compile(
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)'
)
for line in text.split("\n"):
line = line.strip()
if not line or line.startswith("#"):
continue
sm = schedule_pattern.match(line)
cm = cron_pattern.match(line)
if not sm or not cm:
continue
schedule_raw = sm.group(0).strip()
command = cm.group(1).strip()
name_part = command.split("#")[-1].strip() if "#" in command else ""
if not name_part:
cmd_base = command.split(">>")[0].strip().split()
name_part = os.path.basename(cmd_base[-1]) if cmd_base else "unnamed"
clean_cmd = command.split(">>")[0].split("#")[0].strip()[:200]
jobs.append({
"id": f"crontab-{source}-{hash(command) % 10000:04x}",
"name": name_part,
"schedule_display": schedule_raw,
"schedule": schedule_raw,
"state": "scheduled",
"enabled": True,
"last_status": None,
"last_error": None,
"last_run_at": None,
"_source": f"crontab:{source}",
"_command": clean_cmd,
})
return jobs
def load_crontab_backups(backup_dir) -> list:
"""Load cron jobs from VPS crontab backup files."""
from pathlib import Path
backup_dir = Path(backup_dir)
all_jobs = []
if not backup_dir.exists():
return all_jobs
for fpath in sorted(backup_dir.glob("*-crontab-backup.txt")):
source = fpath.name.replace("-crontab-backup.txt", "")
text = fpath.read_text()
all_jobs.extend(parse_crontab(text, source=source))
return all_jobs
def audit_fleet(hermes_jobs: list, crontab_jobs: list) -> dict:
"""Run full fleet audit combining hermes cron + VPS crontabs."""
now = datetime.now(timezone.utc)
hermes_categorized = [categorize_job(j, now) for j in hermes_jobs]
crontab_categorized = []
for j in crontab_jobs:
crontab_categorized.append({
"id": j["id"], "name": j["name"],
"schedule": j.get("schedule_display", ""),
"state": "scheduled", "enabled": True,
"last_status": None, "last_error": None, "last_run_at": None,
"category": "healthy",
"reason": f"Crontab ({j.get('_source', '?')}) — verify logs manually",
"action": "verify-logs",
})
all_cat = hermes_categorized + crontab_categorized
healthy = [c for c in all_cat if c["category"] == "healthy"]
transient = [c for c in all_cat if c["category"] == "transient"]
systemic = [c for c in all_cat if c["category"] == "systemic"]
return {
"audit_time": now.isoformat(),
"total_jobs": len(all_cat),
"hermes_jobs": len(hermes_categorized),
"crontab_jobs": len(crontab_categorized),
"summary": {"healthy": len(healthy), "transient_errors": len(transient), "systemic_failures": len(systemic)},
"systemic_jobs": [{"id": j["id"], "name": j["name"], "reason": j["reason"], "last_error": j.get("last_error", "")} for j in systemic],
"transient_jobs": [{"id": j["id"], "name": j["name"], "reason": j["reason"]} for j in transient],
"all_jobs": all_cat,
}
def main():
parser = argparse.ArgumentParser(description="Cron fleet audit (#662)")
parser.add_argument("--jobs-file", help="Path to jobs.json override")
@@ -238,12 +327,19 @@ def main():
else:
jobs = load_cron_state()
# Also load VPS crontab backups
crontab_dir = Path(__file__).parent.parent / "cron" / "vps"
crontab_jobs = load_crontab_backups(crontab_dir)
if not jobs:
print("ERROR: No cron jobs found. Check ~/.hermes/cron/ or run 'hermes cron list'.")
sys.exit(1)
# Run audit
report = audit_jobs(jobs)
if crontab_jobs:
report = audit_fleet(jobs, crontab_jobs)
else:
report = audit_jobs(jobs)
# Output
if args.json: