diff --git a/scripts/ci-cron-validate.py b/scripts/ci-cron-validate.py old mode 100644 new mode 100755 diff --git a/scripts/cron-audit-662.py b/scripts/cron-audit-662.py old mode 100644 new mode 100755 index 2778224c..219299d0 --- a/scripts/cron-audit-662.py +++ b/scripts/cron-audit-662.py @@ -1,428 +1,11 @@ #!/usr/bin/env python3 -""" -Cron Fleet Audit Script — #662 +"""CLI wrapper for the importable cron_audit_662 module.""" -Reads hermes cron job state, categorizes all jobs into: - - healthy: last_status=ok or never-run-and-enabled - - transient: recent errors (likely network/timeout) - - systemic: repeated errors over 48+ hours - -Outputs a JSON report and optionally: - --disable Disable systemic jobs erroring 48+ hours - --issues File Gitea issues for systemic failures -""" - -import json -import sys -import os -import argparse -from datetime import datetime, timezone, timedelta from pathlib import Path -from typing import List, Dict, Any +import sys -# --- Config --- -ERROR_THRESHOLD_HOURS = 48 -CRON_STATE_PATHS = [ - Path.home() / ".hermes" / "cron" / "jobs.json", - Path.home() / ".hermes" / "cron" / "state.json", - Path("/root/.hermes/cron/jobs.json"), - Path("/root/.hermes/cron/state.json"), -] - -def load_cron_state() -> List[Dict[str, Any]]: - """Load cron job state from known locations.""" - for path in CRON_STATE_PATHS: - if path.exists(): - try: - with open(path) as f: - data = json.load(f) - if isinstance(data, dict) and "jobs" in data: - return data["jobs"] - if isinstance(data, list): - return data - except (json.JSONDecodeError, IOError): - continue - - # Fallback: try hermes cron list CLI - try: - import subprocess - result = subprocess.run( - ["hermes", "cron", "list", "--json"], - capture_output=True, text=True, timeout=30 - ) - if result.returncode == 0: - data = json.loads(result.stdout) - if isinstance(data, dict) and "jobs" in data: - return data["jobs"] - if isinstance(data, list): - return data - except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError): - pass - - return [] - - -def parse_timestamp(ts: str) -> datetime: - """Parse ISO timestamp, handle various formats.""" - if not ts: - return None - # Normalize timezone - ts = ts.replace("+00:00", "+00:00") - try: - dt = datetime.fromisoformat(ts) - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - return dt - except ValueError: - return None - - -def categorize_job(job: Dict[str, Any], now: datetime) -> Dict[str, Any]: - """Categorize a single job.""" - status = job.get("last_status", "") - last_error = job.get("last_error", "") - last_run = parse_timestamp(job.get("last_run_at")) - enabled = job.get("enabled", False) - state = job.get("state", "unknown") - name = job.get("name", job.get("id", "unknown")) - - entry = { - "id": job.get("id", ""), - "name": name, - "schedule": job.get("schedule_display", str(job.get("schedule", ""))), - "state": state, - "enabled": enabled, - "last_status": status, - "last_error": last_error, - "last_run_at": job.get("last_run_at"), - "category": "healthy", - "reason": "", - "action": "", - } - - # Never run / no error - if status is None and not last_error: - entry["category"] = "healthy" - entry["reason"] = "Never run, no errors" - return entry - - # Explicitly paused with reason - if state == "paused": - entry["category"] = "healthy" - entry["reason"] = job.get("paused_reason", "Manually paused") - entry["action"] = "none — paused intentionally" - return entry - - # Completed jobs - if state == "completed": - entry["category"] = "healthy" - entry["reason"] = "Completed (one-shot)" - return entry - - # Error status - if status == "error" and last_error: - age_hours = None - if last_run: - age_hours = (now - last_run).total_seconds() / 3600 - - if age_hours is not None and age_hours >= ERROR_THRESHOLD_HOURS: - entry["category"] = "systemic" - entry["reason"] = f"Erroring for {age_hours:.1f}h (>{ERROR_THRESHOLD_HOURS}h threshold)" - entry["action"] = "disable" - else: - entry["category"] = "transient" - age_str = f"{age_hours:.1f}h ago" if age_hours is not None else "unknown age" - entry["reason"] = f"Recent error ({age_str}), may be transient" - entry["action"] = "monitor" - return entry - - # OK status - if status == "ok": - entry["category"] = "healthy" - entry["reason"] = "Last run succeeded" - return entry - - # Scheduled but never errored - if state == "scheduled" and enabled: - entry["category"] = "healthy" - entry["reason"] = "Scheduled and running" - return entry - - # Unknown state - entry["category"] = "transient" - entry["reason"] = f"Unknown state: {state}, status: {status}" - entry["action"] = "investigate" - return entry - - -def audit_jobs(jobs: List[Dict[str, Any]]) -> Dict[str, Any]: - """Run full audit on job list.""" - now = datetime.now(timezone.utc) - categorized = [categorize_job(j, now) for j in jobs] - - healthy = [c for c in categorized if c["category"] == "healthy"] - transient = [c for c in categorized if c["category"] == "transient"] - systemic = [c for c in categorized if c["category"] == "systemic"] - - report = { - "audit_time": now.isoformat(), - "total_jobs": len(jobs), - "summary": { - "healthy": len(healthy), - "transient_errors": len(transient), - "systemic_failures": len(systemic), - }, - "systemic_jobs": [ - { - "id": j["id"], - "name": j["name"], - "reason": j["reason"], - "last_error": j["last_error"], - } - for j in systemic - ], - "transient_jobs": [ - { - "id": j["id"], - "name": j["name"], - "reason": j["reason"], - } - for j in transient - ], - "all_jobs": categorized, - } - - return report - - -def generate_issue_body(job: Dict[str, Any]) -> str: - """Generate a Gitea issue body for a systemic cron failure.""" - return f"""## Systemic Cron Failure — Auto-Filed by Audit #662 - -**Job:** {job['name']} (`{job['id']}`) -**Schedule:** {job['schedule']} -**State:** {job['state']} -**Last Error:** -``` -{job['last_error'] or 'No error details available'} -``` - -**Audit Finding:** {job['reason']} - -### Action Required -- [ ] Diagnose root cause of repeated failure -- [ ] Fix configuration or remove broken job -- [ ] Verify job resumes healthy after fix - -*Auto-generated by cron-audit-662.py* -""" - - -# --- Crontab Parsing --- - -def parse_crontab(text: str, source: str = "unknown") -> list: - """Parse a crontab file into job-like dicts for audit.""" - import re - jobs = [] - cron_pattern = re.compile( - r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)\s+(.*)' - ) - schedule_pattern = re.compile( - r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)' - ) - - for line in text.split("\n"): - line = line.strip() - if not line or line.startswith("#"): - continue - sm = schedule_pattern.match(line) - cm = cron_pattern.match(line) - if not sm or not cm: - continue - schedule_raw = sm.group(0).strip() - command = cm.group(1).strip() - name_part = command.split("#")[-1].strip() if "#" in command else "" - if not name_part: - cmd_base = command.split(">>")[0].strip().split() - name_part = os.path.basename(cmd_base[-1]) if cmd_base else "unnamed" - clean_cmd = command.split(">>")[0].split("#")[0].strip()[:200] - jobs.append({ - "id": f"crontab-{source}-{hash(command) % 10000:04x}", - "name": name_part, - "schedule_display": schedule_raw, - "schedule": schedule_raw, - "state": "scheduled", - "enabled": True, - "last_status": None, - "last_error": None, - "last_run_at": None, - "_source": f"crontab:{source}", - "_command": clean_cmd, - }) - return jobs - - -def load_crontab_backups(backup_dir) -> list: - """Load cron jobs from VPS crontab backup files.""" - from pathlib import Path - backup_dir = Path(backup_dir) - all_jobs = [] - if not backup_dir.exists(): - return all_jobs - for fpath in sorted(backup_dir.glob("*-crontab-backup.txt")): - source = fpath.name.replace("-crontab-backup.txt", "") - text = fpath.read_text() - all_jobs.extend(parse_crontab(text, source=source)) - return all_jobs - - -def audit_fleet(hermes_jobs: list, crontab_jobs: list) -> dict: - """Run full fleet audit combining hermes cron + VPS crontabs.""" - now = datetime.now(timezone.utc) - hermes_categorized = [categorize_job(j, now) for j in hermes_jobs] - crontab_categorized = [] - for j in crontab_jobs: - crontab_categorized.append({ - "id": j["id"], "name": j["name"], - "schedule": j.get("schedule_display", ""), - "state": "scheduled", "enabled": True, - "last_status": None, "last_error": None, "last_run_at": None, - "category": "healthy", - "reason": f"Crontab ({j.get('_source', '?')}) — verify logs manually", - "action": "verify-logs", - }) - all_cat = hermes_categorized + crontab_categorized - healthy = [c for c in all_cat if c["category"] == "healthy"] - transient = [c for c in all_cat if c["category"] == "transient"] - systemic = [c for c in all_cat if c["category"] == "systemic"] - return { - "audit_time": now.isoformat(), - "total_jobs": len(all_cat), - "hermes_jobs": len(hermes_categorized), - "crontab_jobs": len(crontab_categorized), - "summary": {"healthy": len(healthy), "transient_errors": len(transient), "systemic_failures": len(systemic)}, - "systemic_jobs": [{"id": j["id"], "name": j["name"], "reason": j["reason"], "last_error": j.get("last_error", "")} for j in systemic], - "transient_jobs": [{"id": j["id"], "name": j["name"], "reason": j["reason"]} for j in transient], - "all_jobs": all_cat, - } - - -def main(): - parser = argparse.ArgumentParser(description="Cron fleet audit (#662)") - parser.add_argument("--jobs-file", help="Path to jobs.json override") - parser.add_argument("--disable", action="store_true", - help="Disable systemic jobs (requires hermes CLI)") - parser.add_argument("--issues", action="store_true", - help="File Gitea issues for systemic failures") - parser.add_argument("--output", help="Write report to file") - parser.add_argument("--json", action="store_true", help="JSON output only") - args = parser.parse_args() - - # Load jobs - jobs = [] - if args.jobs_file: - with open(args.jobs_file) as f: - data = json.load(f) - jobs = data.get("jobs", data) if isinstance(data, dict) else data - else: - jobs = load_cron_state() - - # Also load VPS crontab backups - crontab_dir = Path(__file__).parent.parent / "cron" / "vps" - crontab_jobs = load_crontab_backups(crontab_dir) - - if not jobs: - print("ERROR: No cron jobs found. Check ~/.hermes/cron/ or run 'hermes cron list'.") - sys.exit(1) - - # Run audit - if crontab_jobs: - report = audit_fleet(jobs, crontab_jobs) - else: - report = audit_jobs(jobs) - - # Output - if args.json: - print(json.dumps(report, indent=2)) - else: - print(f"\n{'='*60}") - print(f" CRON FLEET AUDIT — {report['total_jobs']} jobs") - print(f"{'='*60}") - print(f" Healthy: {report['summary']['healthy']}") - print(f" Transient errors: {report['summary']['transient_errors']}") - print(f" Systemic failures: {report['summary']['systemic_failures']}") - print(f"{'='*60}") - - if report["systemic_jobs"]: - print(f"\n SYSTEMIC FAILURES (>{ERROR_THRESHOLD_HOURS}h):") - for j in report["systemic_jobs"]: - print(f" - {j['name']} ({j['id']}): {j['reason']}") - if j["last_error"]: - print(f" Error: {j['last_error'][:100]}") - - if report["transient_jobs"]: - print(f"\n TRANSIENT ERRORS:") - for j in report["transient_jobs"]: - print(f" - {j['name']} ({j['id']}): {j['reason']}") - - print() - - # Write report file - if args.output: - with open(args.output, "w") as f: - json.dump(report, f, indent=2) - print(f"Report written to {args.output}") - - # Disable systemic jobs - if args.disable and report["systemic_jobs"]: - import subprocess - for j in report["systemic_jobs"]: - print(f"Disabling: {j['name']} ({j['id']})") - try: - subprocess.run( - ["hermes", "cron", "pause", j["id"]], - capture_output=True, text=True, timeout=10 - ) - print(f" → Disabled") - except Exception as e: - print(f" → Failed: {e}") - - # File issues for systemic failures - if args.issues and report["systemic_jobs"]: - gitea_token = os.environ.get("GITEA_TOKEN") or "" - if not gitea_token: - token_path = Path.home() / ".config" / "gitea" / "token" - if token_path.exists(): - gitea_token = token_path.read_text().strip() - - if not gitea_token: - print("ERROR: No Gitea token found. Set GITEA_TOKEN or ~/.config/gitea/token") - sys.exit(1) - - import urllib.request - base = "https://forge.alexanderwhitestone.com/api/v1" - headers = { - "Authorization": f"token {gitea_token}", - "Content-Type": "application/json", - } - - for j in report["systemic_jobs"]: - title = f"CRON FAIL: {j['name']} — systemic error ({j['id']})" - body = generate_issue_body(j) - data = json.dumps({"title": title, "body": body}).encode() - req = urllib.request.Request( - f"{base}/repos/Timmy_Foundation/timmy-config/issues", - data=data, headers=headers, method="POST" - ) - try: - resp = urllib.request.urlopen(req) - result = json.loads(resp.read()) - print(f"Issued #{result['number']}: {title}") - except Exception as e: - print(f"Failed to file issue for {j['name']}: {e}") - - # Exit code: non-zero if systemic failures found - sys.exit(1 if report["systemic_jobs"] else 0) +sys.path.insert(0, str(Path(__file__).resolve().parent)) +from cron_audit_662 import main if __name__ == "__main__": diff --git a/scripts/cron_audit_662.py b/scripts/cron_audit_662.py new file mode 100755 index 00000000..4a5cf8e1 --- /dev/null +++ b/scripts/cron_audit_662.py @@ -0,0 +1,630 @@ +#!/usr/bin/env python3 +""" +Cron Fleet Audit Script — #662 + +Reads hermes cron job state, categorizes all jobs into: + - healthy: last_status=ok or never-run-and-enabled + - transient: recent errors (likely network/timeout) + - systemic: repeated errors over 48+ hours + +Outputs a JSON report and optionally: + --disable Disable systemic jobs erroring 48+ hours + --issues File Gitea issues for systemic failures +""" + +import argparse +import json +import os +import re +import subprocess +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +# --- Config --- +ERROR_THRESHOLD_HOURS = 48 +TRANSIENT_ERROR_MARKERS = ( + "timeout", + "timed out", + "rate limit", + "429", + "503", + "temporary", + "temporarily", + "auth store lock", + "connection reset", + "connection refused", + "connection aborted", + "delivery error", + "telegram send failed", + "matrix send failed", + "nodename nor servname provided", + "name or service not known", + "empty response", +) +SYSTEMIC_ERROR_MARKERS = ( + "cannot import", + "modulenotfounderror", + "importerror", + "attributeerror", + "syntaxerror", + "permission denied", + "no such file", + "file not found", + "dict' object has no attribute", + 'dict" object has no attribute', +) +CRON_STATE_PATHS = [ + Path.home() / ".hermes" / "cron" / "jobs.json", + Path.home() / ".hermes" / "cron" / "state.json", + Path("/root/.hermes/cron/jobs.json"), + Path("/root/.hermes/cron/state.json"), +] + +def load_jobs_file(path: Path) -> List[Dict[str, Any]]: + """Load a cron jobs/state JSON file.""" + with path.open() as f: + data = json.load(f) + if isinstance(data, dict) and "jobs" in data: + return data["jobs"] + if isinstance(data, list): + return data + return [] + + +def parse_cron_list_output(text: str) -> List[Dict[str, Any]]: + """Parse `hermes cron list --all` output as a last-resort fallback.""" + jobs: List[Dict[str, Any]] = [] + current: Optional[Dict[str, Any]] = None + job_header = re.compile(r"^\s{2}(?P\S+) \[(?P[^\]]+)\]\s*$") + + for raw_line in text.splitlines(): + line = raw_line.rstrip() + match = job_header.match(line) + if match: + if current: + jobs.append(current) + state = match.group("state").strip().lower() + current = { + "id": match.group("id"), + "name": match.group("id"), + "schedule_display": "", + "schedule": "", + "state": "paused" if state == "paused" else "scheduled", + "enabled": state == "active", + "last_status": None, + "last_error": None, + "last_delivery_error": None, + "last_run_at": None, + } + continue + + if not current: + continue + + stripped = line.strip() + if stripped.startswith("Name:"): + current["name"] = stripped.split("Name:", 1)[1].strip() + elif stripped.startswith("Schedule:"): + schedule = stripped.split("Schedule:", 1)[1].strip() + current["schedule_display"] = schedule + current["schedule"] = schedule + elif stripped.startswith("Last run:"): + payload = stripped.split("Last run:", 1)[1].strip() + if payload in {"-", ""}: + continue + if " error: " in payload: + ts, error = payload.split(" error: ", 1) + current["last_run_at"] = ts.strip() + current["last_status"] = "error" + current["last_error"] = error.strip() + elif payload.endswith(" ok"): + current["last_run_at"] = payload[:-4].strip() + current["last_status"] = "ok" + else: + current["last_run_at"] = payload + + if current: + jobs.append(current) + return jobs + + +def load_cron_state() -> List[Dict[str, Any]]: + """Load cron job state from known locations.""" + for path in CRON_STATE_PATHS: + if not path.exists(): + continue + try: + jobs = load_jobs_file(path) + if jobs: + return jobs + except (json.JSONDecodeError, IOError): + continue + + try: + result = subprocess.run( + ["hermes", "cron", "list", "--all"], + capture_output=True, + text=True, + timeout=30, + ) + jobs = parse_cron_list_output(result.stdout) + if jobs: + return jobs + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + return [] + + +def parse_timestamp(ts: Optional[str]) -> Optional[datetime]: + """Parse ISO timestamp, handle various formats.""" + if not ts: + return None + ts = str(ts).strip().replace("Z", "+00:00") + try: + dt = datetime.fromisoformat(ts) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + return None + + +def format_schedule(job: Dict[str, Any]) -> str: + """Normalize mixed schedule formats into a readable string.""" + display = job.get("schedule_display") + if display: + return str(display) + + schedule = job.get("schedule") + if isinstance(schedule, dict): + return ( + schedule.get("display") + or schedule.get("expr") + or schedule.get("value") + or json.dumps(schedule, sort_keys=True) + ) + return str(schedule or "") + + +def find_error_marker(text: str, markers: tuple[str, ...]) -> Optional[str]: + for marker in markers: + if marker in text: + return marker + return None + + +def classify_error(last_error: str, last_delivery_error: str, age_hours: Optional[float]) -> tuple[str, str, str]: + """Classify an error as transient vs systemic using both age and error text.""" + combined = "\n".join(part for part in [last_error, last_delivery_error] if part).lower() + age_text = f"{age_hours:.1f}h ago" if age_hours is not None else "unknown age" + + if age_hours is not None and age_hours >= ERROR_THRESHOLD_HOURS: + return ( + "systemic", + f"Error persisted for {age_hours:.1f}h (>= {ERROR_THRESHOLD_HOURS}h threshold)", + "disable", + ) + + systemic_marker = find_error_marker(combined, SYSTEMIC_ERROR_MARKERS) + if systemic_marker: + return ( + "systemic", + f"Systemic error signature: {systemic_marker} ({age_text})", + "disable", + ) + + transient_marker = find_error_marker(combined, TRANSIENT_ERROR_MARKERS) + if transient_marker: + return ( + "transient", + f"Transient error signature: {transient_marker} ({age_text})", + "monitor", + ) + + return ( + "transient", + f"Unclassified recent error ({age_text})", + "investigate", + ) + + +def categorize_job(job: Dict[str, Any], now: datetime) -> Dict[str, Any]: + """Categorize a single job.""" + status = job.get("last_status", "") + last_error = job.get("last_error", "") + last_delivery_error = job.get("last_delivery_error", "") + last_run = parse_timestamp(job.get("last_run_at")) + enabled = job.get("enabled", False) + state = job.get("state", "unknown") + name = job.get("name", job.get("id", "unknown")) + + entry = { + "id": job.get("id", ""), + "name": name, + "schedule": format_schedule(job), + "state": state, + "enabled": enabled, + "last_status": status, + "last_error": last_error, + "last_delivery_error": last_delivery_error, + "last_run_at": job.get("last_run_at"), + "category": "healthy", + "reason": "", + "action": "", + } + + # Never run / no error + if status is None and not last_error: + entry["category"] = "healthy" + entry["reason"] = "Never run, no errors" + return entry + + # Explicitly paused with reason + if state == "paused": + entry["category"] = "healthy" + entry["reason"] = job.get("paused_reason", "Manually paused") + entry["action"] = "none — paused intentionally" + return entry + + # Completed jobs + if state == "completed": + entry["category"] = "healthy" + entry["reason"] = "Completed (one-shot)" + return entry + + # Error status + if status == "error": + age_hours = None + if last_run: + age_hours = (now - last_run).total_seconds() / 3600 + + entry["category"], entry["reason"], entry["action"] = classify_error( + str(last_error or ""), + str(last_delivery_error or ""), + age_hours, + ) + return entry + + if status == "ok" and last_delivery_error: + entry["category"] = "transient" + entry["reason"] = f"Job completed but delivery failed: {last_delivery_error}" + entry["action"] = "monitor" + return entry + + # OK status + if status == "ok": + entry["category"] = "healthy" + entry["reason"] = "Last run succeeded" + return entry + + # Scheduled but never errored + if state == "scheduled" and enabled: + entry["category"] = "healthy" + entry["reason"] = "Scheduled and running" + return entry + + # Unknown state + entry["category"] = "transient" + entry["reason"] = f"Unknown state: {state}, status: {status}" + entry["action"] = "investigate" + return entry + + +def audit_jobs(jobs: List[Dict[str, Any]]) -> Dict[str, Any]: + """Run full audit on job list.""" + now = datetime.now(timezone.utc) + categorized = [categorize_job(j, now) for j in jobs] + + healthy = [c for c in categorized if c["category"] == "healthy"] + transient = [c for c in categorized if c["category"] == "transient"] + systemic = [c for c in categorized if c["category"] == "systemic"] + + report = { + "audit_time": now.isoformat(), + "total_jobs": len(jobs), + "summary": { + "healthy": len(healthy), + "transient_errors": len(transient), + "systemic_failures": len(systemic), + }, + "systemic_jobs": [ + { + "id": j["id"], + "name": j["name"], + "schedule": j["schedule"], + "state": j["state"], + "reason": j["reason"], + "last_error": j["last_error"], + "last_delivery_error": j["last_delivery_error"], + "last_run_at": j["last_run_at"], + "action": j["action"], + } + for j in systemic + ], + "transient_jobs": [ + { + "id": j["id"], + "name": j["name"], + "schedule": j["schedule"], + "state": j["state"], + "reason": j["reason"], + "last_error": j["last_error"], + "last_delivery_error": j["last_delivery_error"], + "last_run_at": j["last_run_at"], + "action": j["action"], + } + for j in transient + ], + "all_jobs": categorized, + } + + return report + + +def generate_issue_body(job: Dict[str, Any]) -> str: + """Generate a Gitea issue body for a systemic cron failure.""" + delivery_error = job.get("last_delivery_error") or "" + delivery_block = "" + if delivery_error: + delivery_block = f"\n**Last Delivery Error:**\n```\n{delivery_error}\n```\n" + + return f"""## Systemic Cron Failure — Auto-Filed by Audit #662 + +**Job:** {job.get('name', 'unknown')} (`{job.get('id', 'unknown')}`) +**Schedule:** {job.get('schedule', 'unknown')} +**State:** {job.get('state', 'unknown')} +**Last Error:** +``` +{job.get('last_error') or 'No error details available'} +``` +{delivery_block} + +**Audit Finding:** {job.get('reason', 'No audit reason captured')} + +### Action Required +- [ ] Diagnose root cause of repeated failure +- [ ] Fix configuration or remove broken job +- [ ] Verify job resumes healthy after fix + +*Auto-generated by cron-audit-662.py* +""" + + +# --- Crontab Parsing --- + +def parse_crontab(text: str, source: str = "unknown") -> list: + """Parse a crontab file into job-like dicts for audit.""" + import re + jobs = [] + cron_pattern = re.compile( + r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)\s+(.*)' + ) + schedule_pattern = re.compile( + r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)' + ) + + for line in text.split("\n"): + line = line.strip() + if not line or line.startswith("#"): + continue + sm = schedule_pattern.match(line) + cm = cron_pattern.match(line) + if not sm or not cm: + continue + schedule_raw = sm.group(0).strip() + command = cm.group(1).strip() + name_part = command.split("#")[-1].strip() if "#" in command else "" + if not name_part: + cmd_base = command.split(">>")[0].strip().split() + name_part = os.path.basename(cmd_base[-1]) if cmd_base else "unnamed" + clean_cmd = command.split(">>")[0].split("#")[0].strip()[:200] + jobs.append({ + "id": f"crontab-{source}-{hash(command) % 10000:04x}", + "name": name_part, + "schedule_display": schedule_raw, + "schedule": schedule_raw, + "state": "scheduled", + "enabled": True, + "last_status": None, + "last_error": None, + "last_run_at": None, + "_source": f"crontab:{source}", + "_command": clean_cmd, + }) + return jobs + + +def load_crontab_backups(backup_dir) -> list: + """Load cron jobs from VPS crontab backup files.""" + from pathlib import Path + backup_dir = Path(backup_dir) + all_jobs = [] + if not backup_dir.exists(): + return all_jobs + for fpath in sorted(backup_dir.glob("*-crontab-backup.txt")): + source = fpath.name.replace("-crontab-backup.txt", "") + text = fpath.read_text() + all_jobs.extend(parse_crontab(text, source=source)) + return all_jobs + + +def audit_fleet(hermes_jobs: list, crontab_jobs: list) -> dict: + """Run full fleet audit combining hermes cron + VPS crontabs.""" + now = datetime.now(timezone.utc) + hermes_categorized = [categorize_job(j, now) for j in hermes_jobs] + crontab_categorized = [] + for j in crontab_jobs: + crontab_categorized.append({ + "id": j["id"], "name": j["name"], + "schedule": j.get("schedule_display", ""), + "state": "scheduled", "enabled": True, + "last_status": None, "last_error": None, "last_delivery_error": None, "last_run_at": None, + "category": "healthy", + "reason": f"Crontab ({j.get('_source', '?')}) — verify logs manually", + "action": "verify-logs", + }) + all_cat = hermes_categorized + crontab_categorized + healthy = [c for c in all_cat if c["category"] == "healthy"] + transient = [c for c in all_cat if c["category"] == "transient"] + systemic = [c for c in all_cat if c["category"] == "systemic"] + return { + "audit_time": now.isoformat(), + "total_jobs": len(all_cat), + "hermes_jobs": len(hermes_categorized), + "crontab_jobs": len(crontab_categorized), + "summary": {"healthy": len(healthy), "transient_errors": len(transient), "systemic_failures": len(systemic)}, + "systemic_jobs": [ + { + "id": j["id"], + "name": j["name"], + "schedule": j["schedule"], + "state": j["state"], + "reason": j["reason"], + "last_error": j.get("last_error", ""), + "last_delivery_error": j.get("last_delivery_error", ""), + "last_run_at": j.get("last_run_at"), + "action": j.get("action", ""), + } + for j in systemic + ], + "transient_jobs": [ + { + "id": j["id"], + "name": j["name"], + "schedule": j["schedule"], + "state": j["state"], + "reason": j["reason"], + "last_error": j.get("last_error", ""), + "last_delivery_error": j.get("last_delivery_error", ""), + "last_run_at": j.get("last_run_at"), + "action": j.get("action", ""), + } + for j in transient + ], + "all_jobs": all_cat, + } + + +def main(): + parser = argparse.ArgumentParser(description="Cron fleet audit (#662)") + parser.add_argument("--jobs-file", help="Path to jobs.json override") + parser.add_argument("--disable", action="store_true", + help="Disable systemic jobs (requires hermes CLI)") + parser.add_argument("--issues", action="store_true", + help="File Gitea issues for systemic failures") + parser.add_argument("--output", help="Write report to file") + parser.add_argument("--json", action="store_true", help="JSON output only") + args = parser.parse_args() + + # Load jobs + jobs = [] + if args.jobs_file: + with open(args.jobs_file) as f: + data = json.load(f) + jobs = data.get("jobs", data) if isinstance(data, dict) else data + else: + jobs = load_cron_state() + + # Also load VPS crontab backups + crontab_dir = Path(__file__).parent.parent / "cron" / "vps" + crontab_jobs = load_crontab_backups(crontab_dir) + + if not jobs: + print("ERROR: No cron jobs found. Check ~/.hermes/cron/ or run 'hermes cron list'.") + sys.exit(1) + + # Run audit + if crontab_jobs: + report = audit_fleet(jobs, crontab_jobs) + else: + report = audit_jobs(jobs) + + # Output + if args.json: + print(json.dumps(report, indent=2)) + else: + print(f"\n{'='*60}") + print(f" CRON FLEET AUDIT — {report['total_jobs']} jobs") + print(f"{'='*60}") + print(f" Healthy: {report['summary']['healthy']}") + print(f" Transient errors: {report['summary']['transient_errors']}") + print(f" Systemic failures: {report['summary']['systemic_failures']}") + print(f"{'='*60}") + + if report["systemic_jobs"]: + print(f"\n SYSTEMIC FAILURES (>{ERROR_THRESHOLD_HOURS}h):") + for j in report["systemic_jobs"]: + print(f" - {j['name']} ({j['id']}): {j['reason']}") + if j["last_error"]: + print(f" Error: {j['last_error'][:100]}") + + if report["transient_jobs"]: + print(f"\n TRANSIENT ERRORS:") + for j in report["transient_jobs"]: + print(f" - {j['name']} ({j['id']}): {j['reason']}") + + print() + + # Write report file + if args.output: + with open(args.output, "w") as f: + json.dump(report, f, indent=2) + print(f"Report written to {args.output}") + + # Disable systemic jobs + if args.disable and report["systemic_jobs"]: + import subprocess + for j in report["systemic_jobs"]: + print(f"Disabling: {j['name']} ({j['id']})") + try: + subprocess.run( + ["hermes", "cron", "pause", j["id"]], + capture_output=True, text=True, timeout=10 + ) + print(f" → Disabled") + except Exception as e: + print(f" → Failed: {e}") + + # File issues for systemic failures + if args.issues and report["systemic_jobs"]: + gitea_token = os.environ.get("GITEA_TOKEN") or "" + if not gitea_token: + token_path = Path.home() / ".config" / "gitea" / "token" + if token_path.exists(): + gitea_token = token_path.read_text().strip() + + if not gitea_token: + print("ERROR: No Gitea token found. Set GITEA_TOKEN or ~/.config/gitea/token") + sys.exit(1) + + import urllib.request + base = "https://forge.alexanderwhitestone.com/api/v1" + headers = { + "Authorization": f"token {gitea_token}", + "Content-Type": "application/json", + } + + for j in report["systemic_jobs"]: + title = f"CRON FAIL: {j['name']} — systemic error ({j['id']})" + body = generate_issue_body(j) + data = json.dumps({"title": title, "body": body}).encode() + req = urllib.request.Request( + f"{base}/repos/Timmy_Foundation/timmy-config/issues", + data=data, headers=headers, method="POST" + ) + try: + resp = urllib.request.urlopen(req) + result = json.loads(resp.read()) + print(f"Issued #{result['number']}: {title}") + except Exception as e: + print(f"Failed to file issue for {j['name']}: {e}") + + # Exit code: non-zero if systemic failures found + sys.exit(1 if report["systemic_jobs"] else 0) + + +if __name__ == "__main__": + main() diff --git a/tests/test_cron_audit.py b/tests/test_cron_audit.py index 5cdaf4c2..96f62cfe 100644 --- a/tests/test_cron_audit.py +++ b/tests/test_cron_audit.py @@ -71,6 +71,43 @@ class TestCategorizeJob: r = categorize_job({"name": "t", "state": "paused", "enabled": False}, datetime.now(timezone.utc)) assert r["category"] == "healthy" + def test_import_error_is_systemic_even_when_recent(self): + from cron_audit_662 import categorize_job + now = datetime.now(timezone.utc) + r = categorize_job({ + "name": "t", + "last_status": "error", + "last_error": "cannot import name 'AIAgent' from 'run_agent'", + "last_run_at": (now - timedelta(hours=1)).isoformat(), + }, now) + assert r["category"] == "systemic" + assert r["action"] == "disable" + + def test_empty_response_stays_transient(self): + from cron_audit_662 import categorize_job + now = datetime.now(timezone.utc) + r = categorize_job({ + "name": "t", + "last_status": "error", + "last_error": "Agent completed but produced empty response (model error, timeout, or misconfiguration)", + "last_run_at": (now - timedelta(hours=1)).isoformat(), + }, now) + assert r["category"] == "transient" + + def test_delivery_failure_after_success_is_transient(self): + from cron_audit_662 import categorize_job + now = datetime.now(timezone.utc) + r = categorize_job({ + "name": "t", + "last_status": "ok", + "last_delivery_error": "delivery error: Telegram send failed: Timed out", + "last_run_at": now.isoformat(), + "enabled": True, + "state": "scheduled", + }, now) + assert r["category"] == "transient" + assert "delivery failed" in r["reason"] + class TestAuditFleet: def test_empty(self): @@ -116,3 +153,23 @@ class TestTimestampParsing: from cron_audit_662 import parse_timestamp assert parse_timestamp("") is None assert parse_timestamp(None) is None + + +class TestIssueBody: + def test_includes_schedule_state_and_delivery_error(self): + from cron_audit_662 import generate_issue_body + + body = generate_issue_body({ + "id": "job-1", + "name": "Health Monitor", + "schedule": "every 5m", + "state": "scheduled", + "last_error": "cannot import name 'tool' from 'tools.registry'", + "last_delivery_error": "delivery error: Telegram send failed: Timed out", + "reason": "Systemic error signature: cannot import (1.0h ago)", + }) + + assert "Health Monitor" in body + assert "every 5m" in body + assert "scheduled" in body + assert "Last Delivery Error" in body