Compare commits

...

3 Commits

Author SHA1 Message Date
Alexander Whitestone
3d9b8af136 fix: harden cron audit workflow for #662
Some checks failed
Smoke Test / smoke (pull_request) Failing after 23s
Architecture Lint / Linter Tests (pull_request) Successful in 26s
Validate Config / YAML Lint (pull_request) Failing after 17s
Validate Config / JSON Validate (pull_request) Successful in 20s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 55s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 1m0s
Validate Config / Cron Syntax Check (pull_request) Successful in 11s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
Validate Config / Playbook Schema Validation (pull_request) Successful in 23s
Architecture Lint / Lint Repository (pull_request) Failing after 19s
PR Checklist / pr-checklist (pull_request) Successful in 3m0s
Split the audit into an importable cron_audit_662 module plus a CLI wrapper,
classify recent systemic failures by error signature instead of age alone,
and include enough metadata for issue filing and delivery-failure reporting.

Add regression tests for import-path loading, systemic vs transient
classification, and issue body generation.
2026-04-22 14:48:42 -04:00
ae8c1d46ae Merge pull request 'feat(#407): Phase progression tracker with auto-eval, Telegram daily post, and blockers' (#857) from fix/407 into main
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 28s
Smoke Test / smoke (push) Failing after 21s
Validate Config / YAML Lint (push) Failing after 9s
Validate Config / JSON Validate (push) Successful in 12s
Validate Config / Python Syntax & Import Check (push) Failing after 35s
Validate Config / Python Test Suite (push) Has been skipped
Validate Config / Shell Script Lint (push) Failing after 38s
Validate Config / Cron Syntax Check (push) Successful in 7s
Validate Config / Deploy Script Dry Run (push) Successful in 7s
Validate Config / Playbook Schema Validation (push) Successful in 16s
Architecture Lint / Lint Repository (push) Failing after 20s
2026-04-22 07:36:26 +00:00
Alexander Whitestone
508441acb4 feat(#407): Phase progression tracker with auto-eval, Telegram daily post, and blockers
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 25s
Smoke Test / smoke (pull_request) Failing after 23s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 19s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m2s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 1m6s
Validate Config / Cron Syntax Check (pull_request) Successful in 14s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 14s
Validate Config / Playbook Schema Validation (pull_request) Successful in 28s
Architecture Lint / Lint Repository (pull_request) Failing after 27s
PR Checklist / pr-checklist (pull_request) Failing after 11m41s
2026-04-22 03:34:36 -04:00
5 changed files with 1015 additions and 491 deletions

0
scripts/ci-cron-validate.py Normal file → Executable file
View File

425
scripts/cron-audit-662.py Normal file → Executable file
View File

@@ -1,428 +1,11 @@
#!/usr/bin/env python3
"""
Cron Fleet Audit Script — #662
"""CLI wrapper for the importable cron_audit_662 module."""
Reads hermes cron job state, categorizes all jobs into:
- healthy: last_status=ok or never-run-and-enabled
- transient: recent errors (likely network/timeout)
- systemic: repeated errors over 48+ hours
Outputs a JSON report and optionally:
--disable Disable systemic jobs erroring 48+ hours
--issues File Gitea issues for systemic failures
"""
import json
import sys
import os
import argparse
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import List, Dict, Any
import sys
# --- Config ---
ERROR_THRESHOLD_HOURS = 48
CRON_STATE_PATHS = [
Path.home() / ".hermes" / "cron" / "jobs.json",
Path.home() / ".hermes" / "cron" / "state.json",
Path("/root/.hermes/cron/jobs.json"),
Path("/root/.hermes/cron/state.json"),
]
def load_cron_state() -> List[Dict[str, Any]]:
"""Load cron job state from known locations."""
for path in CRON_STATE_PATHS:
if path.exists():
try:
with open(path) as f:
data = json.load(f)
if isinstance(data, dict) and "jobs" in data:
return data["jobs"]
if isinstance(data, list):
return data
except (json.JSONDecodeError, IOError):
continue
# Fallback: try hermes cron list CLI
try:
import subprocess
result = subprocess.run(
["hermes", "cron", "list", "--json"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
data = json.loads(result.stdout)
if isinstance(data, dict) and "jobs" in data:
return data["jobs"]
if isinstance(data, list):
return data
except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError):
pass
return []
def parse_timestamp(ts: str) -> datetime:
"""Parse ISO timestamp, handle various formats."""
if not ts:
return None
# Normalize timezone
ts = ts.replace("+00:00", "+00:00")
try:
dt = datetime.fromisoformat(ts)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return None
def categorize_job(job: Dict[str, Any], now: datetime) -> Dict[str, Any]:
"""Categorize a single job."""
status = job.get("last_status", "")
last_error = job.get("last_error", "")
last_run = parse_timestamp(job.get("last_run_at"))
enabled = job.get("enabled", False)
state = job.get("state", "unknown")
name = job.get("name", job.get("id", "unknown"))
entry = {
"id": job.get("id", ""),
"name": name,
"schedule": job.get("schedule_display", str(job.get("schedule", ""))),
"state": state,
"enabled": enabled,
"last_status": status,
"last_error": last_error,
"last_run_at": job.get("last_run_at"),
"category": "healthy",
"reason": "",
"action": "",
}
# Never run / no error
if status is None and not last_error:
entry["category"] = "healthy"
entry["reason"] = "Never run, no errors"
return entry
# Explicitly paused with reason
if state == "paused":
entry["category"] = "healthy"
entry["reason"] = job.get("paused_reason", "Manually paused")
entry["action"] = "none — paused intentionally"
return entry
# Completed jobs
if state == "completed":
entry["category"] = "healthy"
entry["reason"] = "Completed (one-shot)"
return entry
# Error status
if status == "error" and last_error:
age_hours = None
if last_run:
age_hours = (now - last_run).total_seconds() / 3600
if age_hours is not None and age_hours >= ERROR_THRESHOLD_HOURS:
entry["category"] = "systemic"
entry["reason"] = f"Erroring for {age_hours:.1f}h (>{ERROR_THRESHOLD_HOURS}h threshold)"
entry["action"] = "disable"
else:
entry["category"] = "transient"
age_str = f"{age_hours:.1f}h ago" if age_hours is not None else "unknown age"
entry["reason"] = f"Recent error ({age_str}), may be transient"
entry["action"] = "monitor"
return entry
# OK status
if status == "ok":
entry["category"] = "healthy"
entry["reason"] = "Last run succeeded"
return entry
# Scheduled but never errored
if state == "scheduled" and enabled:
entry["category"] = "healthy"
entry["reason"] = "Scheduled and running"
return entry
# Unknown state
entry["category"] = "transient"
entry["reason"] = f"Unknown state: {state}, status: {status}"
entry["action"] = "investigate"
return entry
def audit_jobs(jobs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Run full audit on job list."""
now = datetime.now(timezone.utc)
categorized = [categorize_job(j, now) for j in jobs]
healthy = [c for c in categorized if c["category"] == "healthy"]
transient = [c for c in categorized if c["category"] == "transient"]
systemic = [c for c in categorized if c["category"] == "systemic"]
report = {
"audit_time": now.isoformat(),
"total_jobs": len(jobs),
"summary": {
"healthy": len(healthy),
"transient_errors": len(transient),
"systemic_failures": len(systemic),
},
"systemic_jobs": [
{
"id": j["id"],
"name": j["name"],
"reason": j["reason"],
"last_error": j["last_error"],
}
for j in systemic
],
"transient_jobs": [
{
"id": j["id"],
"name": j["name"],
"reason": j["reason"],
}
for j in transient
],
"all_jobs": categorized,
}
return report
def generate_issue_body(job: Dict[str, Any]) -> str:
"""Generate a Gitea issue body for a systemic cron failure."""
return f"""## Systemic Cron Failure — Auto-Filed by Audit #662
**Job:** {job['name']} (`{job['id']}`)
**Schedule:** {job['schedule']}
**State:** {job['state']}
**Last Error:**
```
{job['last_error'] or 'No error details available'}
```
**Audit Finding:** {job['reason']}
### Action Required
- [ ] Diagnose root cause of repeated failure
- [ ] Fix configuration or remove broken job
- [ ] Verify job resumes healthy after fix
*Auto-generated by cron-audit-662.py*
"""
# --- Crontab Parsing ---
def parse_crontab(text: str, source: str = "unknown") -> list:
"""Parse a crontab file into job-like dicts for audit."""
import re
jobs = []
cron_pattern = re.compile(
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)\s+(.*)'
)
schedule_pattern = re.compile(
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)'
)
for line in text.split("\n"):
line = line.strip()
if not line or line.startswith("#"):
continue
sm = schedule_pattern.match(line)
cm = cron_pattern.match(line)
if not sm or not cm:
continue
schedule_raw = sm.group(0).strip()
command = cm.group(1).strip()
name_part = command.split("#")[-1].strip() if "#" in command else ""
if not name_part:
cmd_base = command.split(">>")[0].strip().split()
name_part = os.path.basename(cmd_base[-1]) if cmd_base else "unnamed"
clean_cmd = command.split(">>")[0].split("#")[0].strip()[:200]
jobs.append({
"id": f"crontab-{source}-{hash(command) % 10000:04x}",
"name": name_part,
"schedule_display": schedule_raw,
"schedule": schedule_raw,
"state": "scheduled",
"enabled": True,
"last_status": None,
"last_error": None,
"last_run_at": None,
"_source": f"crontab:{source}",
"_command": clean_cmd,
})
return jobs
def load_crontab_backups(backup_dir) -> list:
"""Load cron jobs from VPS crontab backup files."""
from pathlib import Path
backup_dir = Path(backup_dir)
all_jobs = []
if not backup_dir.exists():
return all_jobs
for fpath in sorted(backup_dir.glob("*-crontab-backup.txt")):
source = fpath.name.replace("-crontab-backup.txt", "")
text = fpath.read_text()
all_jobs.extend(parse_crontab(text, source=source))
return all_jobs
def audit_fleet(hermes_jobs: list, crontab_jobs: list) -> dict:
"""Run full fleet audit combining hermes cron + VPS crontabs."""
now = datetime.now(timezone.utc)
hermes_categorized = [categorize_job(j, now) for j in hermes_jobs]
crontab_categorized = []
for j in crontab_jobs:
crontab_categorized.append({
"id": j["id"], "name": j["name"],
"schedule": j.get("schedule_display", ""),
"state": "scheduled", "enabled": True,
"last_status": None, "last_error": None, "last_run_at": None,
"category": "healthy",
"reason": f"Crontab ({j.get('_source', '?')}) — verify logs manually",
"action": "verify-logs",
})
all_cat = hermes_categorized + crontab_categorized
healthy = [c for c in all_cat if c["category"] == "healthy"]
transient = [c for c in all_cat if c["category"] == "transient"]
systemic = [c for c in all_cat if c["category"] == "systemic"]
return {
"audit_time": now.isoformat(),
"total_jobs": len(all_cat),
"hermes_jobs": len(hermes_categorized),
"crontab_jobs": len(crontab_categorized),
"summary": {"healthy": len(healthy), "transient_errors": len(transient), "systemic_failures": len(systemic)},
"systemic_jobs": [{"id": j["id"], "name": j["name"], "reason": j["reason"], "last_error": j.get("last_error", "")} for j in systemic],
"transient_jobs": [{"id": j["id"], "name": j["name"], "reason": j["reason"]} for j in transient],
"all_jobs": all_cat,
}
def main():
parser = argparse.ArgumentParser(description="Cron fleet audit (#662)")
parser.add_argument("--jobs-file", help="Path to jobs.json override")
parser.add_argument("--disable", action="store_true",
help="Disable systemic jobs (requires hermes CLI)")
parser.add_argument("--issues", action="store_true",
help="File Gitea issues for systemic failures")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--json", action="store_true", help="JSON output only")
args = parser.parse_args()
# Load jobs
jobs = []
if args.jobs_file:
with open(args.jobs_file) as f:
data = json.load(f)
jobs = data.get("jobs", data) if isinstance(data, dict) else data
else:
jobs = load_cron_state()
# Also load VPS crontab backups
crontab_dir = Path(__file__).parent.parent / "cron" / "vps"
crontab_jobs = load_crontab_backups(crontab_dir)
if not jobs:
print("ERROR: No cron jobs found. Check ~/.hermes/cron/ or run 'hermes cron list'.")
sys.exit(1)
# Run audit
if crontab_jobs:
report = audit_fleet(jobs, crontab_jobs)
else:
report = audit_jobs(jobs)
# Output
if args.json:
print(json.dumps(report, indent=2))
else:
print(f"\n{'='*60}")
print(f" CRON FLEET AUDIT — {report['total_jobs']} jobs")
print(f"{'='*60}")
print(f" Healthy: {report['summary']['healthy']}")
print(f" Transient errors: {report['summary']['transient_errors']}")
print(f" Systemic failures: {report['summary']['systemic_failures']}")
print(f"{'='*60}")
if report["systemic_jobs"]:
print(f"\n SYSTEMIC FAILURES (>{ERROR_THRESHOLD_HOURS}h):")
for j in report["systemic_jobs"]:
print(f" - {j['name']} ({j['id']}): {j['reason']}")
if j["last_error"]:
print(f" Error: {j['last_error'][:100]}")
if report["transient_jobs"]:
print(f"\n TRANSIENT ERRORS:")
for j in report["transient_jobs"]:
print(f" - {j['name']} ({j['id']}): {j['reason']}")
print()
# Write report file
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"Report written to {args.output}")
# Disable systemic jobs
if args.disable and report["systemic_jobs"]:
import subprocess
for j in report["systemic_jobs"]:
print(f"Disabling: {j['name']} ({j['id']})")
try:
subprocess.run(
["hermes", "cron", "pause", j["id"]],
capture_output=True, text=True, timeout=10
)
print(f" → Disabled")
except Exception as e:
print(f" → Failed: {e}")
# File issues for systemic failures
if args.issues and report["systemic_jobs"]:
gitea_token = os.environ.get("GITEA_TOKEN") or ""
if not gitea_token:
token_path = Path.home() / ".config" / "gitea" / "token"
if token_path.exists():
gitea_token = token_path.read_text().strip()
if not gitea_token:
print("ERROR: No Gitea token found. Set GITEA_TOKEN or ~/.config/gitea/token")
sys.exit(1)
import urllib.request
base = "https://forge.alexanderwhitestone.com/api/v1"
headers = {
"Authorization": f"token {gitea_token}",
"Content-Type": "application/json",
}
for j in report["systemic_jobs"]:
title = f"CRON FAIL: {j['name']} — systemic error ({j['id']})"
body = generate_issue_body(j)
data = json.dumps({"title": title, "body": body}).encode()
req = urllib.request.Request(
f"{base}/repos/Timmy_Foundation/timmy-config/issues",
data=data, headers=headers, method="POST"
)
try:
resp = urllib.request.urlopen(req)
result = json.loads(resp.read())
print(f"Issued #{result['number']}: {title}")
except Exception as e:
print(f"Failed to file issue for {j['name']}: {e}")
# Exit code: non-zero if systemic failures found
sys.exit(1 if report["systemic_jobs"] else 0)
sys.path.insert(0, str(Path(__file__).resolve().parent))
from cron_audit_662 import main
if __name__ == "__main__":

630
scripts/cron_audit_662.py Executable file
View File

@@ -0,0 +1,630 @@
#!/usr/bin/env python3
"""
Cron Fleet Audit Script — #662
Reads hermes cron job state, categorizes all jobs into:
- healthy: last_status=ok or never-run-and-enabled
- transient: recent errors (likely network/timeout)
- systemic: repeated errors over 48+ hours
Outputs a JSON report and optionally:
--disable Disable systemic jobs erroring 48+ hours
--issues File Gitea issues for systemic failures
"""
import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
# --- Config ---
ERROR_THRESHOLD_HOURS = 48
TRANSIENT_ERROR_MARKERS = (
"timeout",
"timed out",
"rate limit",
"429",
"503",
"temporary",
"temporarily",
"auth store lock",
"connection reset",
"connection refused",
"connection aborted",
"delivery error",
"telegram send failed",
"matrix send failed",
"nodename nor servname provided",
"name or service not known",
"empty response",
)
SYSTEMIC_ERROR_MARKERS = (
"cannot import",
"modulenotfounderror",
"importerror",
"attributeerror",
"syntaxerror",
"permission denied",
"no such file",
"file not found",
"dict' object has no attribute",
'dict" object has no attribute',
)
CRON_STATE_PATHS = [
Path.home() / ".hermes" / "cron" / "jobs.json",
Path.home() / ".hermes" / "cron" / "state.json",
Path("/root/.hermes/cron/jobs.json"),
Path("/root/.hermes/cron/state.json"),
]
def load_jobs_file(path: Path) -> List[Dict[str, Any]]:
"""Load a cron jobs/state JSON file."""
with path.open() as f:
data = json.load(f)
if isinstance(data, dict) and "jobs" in data:
return data["jobs"]
if isinstance(data, list):
return data
return []
def parse_cron_list_output(text: str) -> List[Dict[str, Any]]:
"""Parse `hermes cron list --all` output as a last-resort fallback."""
jobs: List[Dict[str, Any]] = []
current: Optional[Dict[str, Any]] = None
job_header = re.compile(r"^\s{2}(?P<id>\S+) \[(?P<state>[^\]]+)\]\s*$")
for raw_line in text.splitlines():
line = raw_line.rstrip()
match = job_header.match(line)
if match:
if current:
jobs.append(current)
state = match.group("state").strip().lower()
current = {
"id": match.group("id"),
"name": match.group("id"),
"schedule_display": "",
"schedule": "",
"state": "paused" if state == "paused" else "scheduled",
"enabled": state == "active",
"last_status": None,
"last_error": None,
"last_delivery_error": None,
"last_run_at": None,
}
continue
if not current:
continue
stripped = line.strip()
if stripped.startswith("Name:"):
current["name"] = stripped.split("Name:", 1)[1].strip()
elif stripped.startswith("Schedule:"):
schedule = stripped.split("Schedule:", 1)[1].strip()
current["schedule_display"] = schedule
current["schedule"] = schedule
elif stripped.startswith("Last run:"):
payload = stripped.split("Last run:", 1)[1].strip()
if payload in {"-", ""}:
continue
if " error: " in payload:
ts, error = payload.split(" error: ", 1)
current["last_run_at"] = ts.strip()
current["last_status"] = "error"
current["last_error"] = error.strip()
elif payload.endswith(" ok"):
current["last_run_at"] = payload[:-4].strip()
current["last_status"] = "ok"
else:
current["last_run_at"] = payload
if current:
jobs.append(current)
return jobs
def load_cron_state() -> List[Dict[str, Any]]:
"""Load cron job state from known locations."""
for path in CRON_STATE_PATHS:
if not path.exists():
continue
try:
jobs = load_jobs_file(path)
if jobs:
return jobs
except (json.JSONDecodeError, IOError):
continue
try:
result = subprocess.run(
["hermes", "cron", "list", "--all"],
capture_output=True,
text=True,
timeout=30,
)
jobs = parse_cron_list_output(result.stdout)
if jobs:
return jobs
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return []
def parse_timestamp(ts: Optional[str]) -> Optional[datetime]:
"""Parse ISO timestamp, handle various formats."""
if not ts:
return None
ts = str(ts).strip().replace("Z", "+00:00")
try:
dt = datetime.fromisoformat(ts)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return None
def format_schedule(job: Dict[str, Any]) -> str:
"""Normalize mixed schedule formats into a readable string."""
display = job.get("schedule_display")
if display:
return str(display)
schedule = job.get("schedule")
if isinstance(schedule, dict):
return (
schedule.get("display")
or schedule.get("expr")
or schedule.get("value")
or json.dumps(schedule, sort_keys=True)
)
return str(schedule or "")
def find_error_marker(text: str, markers: tuple[str, ...]) -> Optional[str]:
for marker in markers:
if marker in text:
return marker
return None
def classify_error(last_error: str, last_delivery_error: str, age_hours: Optional[float]) -> tuple[str, str, str]:
"""Classify an error as transient vs systemic using both age and error text."""
combined = "\n".join(part for part in [last_error, last_delivery_error] if part).lower()
age_text = f"{age_hours:.1f}h ago" if age_hours is not None else "unknown age"
if age_hours is not None and age_hours >= ERROR_THRESHOLD_HOURS:
return (
"systemic",
f"Error persisted for {age_hours:.1f}h (>= {ERROR_THRESHOLD_HOURS}h threshold)",
"disable",
)
systemic_marker = find_error_marker(combined, SYSTEMIC_ERROR_MARKERS)
if systemic_marker:
return (
"systemic",
f"Systemic error signature: {systemic_marker} ({age_text})",
"disable",
)
transient_marker = find_error_marker(combined, TRANSIENT_ERROR_MARKERS)
if transient_marker:
return (
"transient",
f"Transient error signature: {transient_marker} ({age_text})",
"monitor",
)
return (
"transient",
f"Unclassified recent error ({age_text})",
"investigate",
)
def categorize_job(job: Dict[str, Any], now: datetime) -> Dict[str, Any]:
"""Categorize a single job."""
status = job.get("last_status", "")
last_error = job.get("last_error", "")
last_delivery_error = job.get("last_delivery_error", "")
last_run = parse_timestamp(job.get("last_run_at"))
enabled = job.get("enabled", False)
state = job.get("state", "unknown")
name = job.get("name", job.get("id", "unknown"))
entry = {
"id": job.get("id", ""),
"name": name,
"schedule": format_schedule(job),
"state": state,
"enabled": enabled,
"last_status": status,
"last_error": last_error,
"last_delivery_error": last_delivery_error,
"last_run_at": job.get("last_run_at"),
"category": "healthy",
"reason": "",
"action": "",
}
# Never run / no error
if status is None and not last_error:
entry["category"] = "healthy"
entry["reason"] = "Never run, no errors"
return entry
# Explicitly paused with reason
if state == "paused":
entry["category"] = "healthy"
entry["reason"] = job.get("paused_reason", "Manually paused")
entry["action"] = "none — paused intentionally"
return entry
# Completed jobs
if state == "completed":
entry["category"] = "healthy"
entry["reason"] = "Completed (one-shot)"
return entry
# Error status
if status == "error":
age_hours = None
if last_run:
age_hours = (now - last_run).total_seconds() / 3600
entry["category"], entry["reason"], entry["action"] = classify_error(
str(last_error or ""),
str(last_delivery_error or ""),
age_hours,
)
return entry
if status == "ok" and last_delivery_error:
entry["category"] = "transient"
entry["reason"] = f"Job completed but delivery failed: {last_delivery_error}"
entry["action"] = "monitor"
return entry
# OK status
if status == "ok":
entry["category"] = "healthy"
entry["reason"] = "Last run succeeded"
return entry
# Scheduled but never errored
if state == "scheduled" and enabled:
entry["category"] = "healthy"
entry["reason"] = "Scheduled and running"
return entry
# Unknown state
entry["category"] = "transient"
entry["reason"] = f"Unknown state: {state}, status: {status}"
entry["action"] = "investigate"
return entry
def audit_jobs(jobs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Run full audit on job list."""
now = datetime.now(timezone.utc)
categorized = [categorize_job(j, now) for j in jobs]
healthy = [c for c in categorized if c["category"] == "healthy"]
transient = [c for c in categorized if c["category"] == "transient"]
systemic = [c for c in categorized if c["category"] == "systemic"]
report = {
"audit_time": now.isoformat(),
"total_jobs": len(jobs),
"summary": {
"healthy": len(healthy),
"transient_errors": len(transient),
"systemic_failures": len(systemic),
},
"systemic_jobs": [
{
"id": j["id"],
"name": j["name"],
"schedule": j["schedule"],
"state": j["state"],
"reason": j["reason"],
"last_error": j["last_error"],
"last_delivery_error": j["last_delivery_error"],
"last_run_at": j["last_run_at"],
"action": j["action"],
}
for j in systemic
],
"transient_jobs": [
{
"id": j["id"],
"name": j["name"],
"schedule": j["schedule"],
"state": j["state"],
"reason": j["reason"],
"last_error": j["last_error"],
"last_delivery_error": j["last_delivery_error"],
"last_run_at": j["last_run_at"],
"action": j["action"],
}
for j in transient
],
"all_jobs": categorized,
}
return report
def generate_issue_body(job: Dict[str, Any]) -> str:
"""Generate a Gitea issue body for a systemic cron failure."""
delivery_error = job.get("last_delivery_error") or ""
delivery_block = ""
if delivery_error:
delivery_block = f"\n**Last Delivery Error:**\n```\n{delivery_error}\n```\n"
return f"""## Systemic Cron Failure — Auto-Filed by Audit #662
**Job:** {job.get('name', 'unknown')} (`{job.get('id', 'unknown')}`)
**Schedule:** {job.get('schedule', 'unknown')}
**State:** {job.get('state', 'unknown')}
**Last Error:**
```
{job.get('last_error') or 'No error details available'}
```
{delivery_block}
**Audit Finding:** {job.get('reason', 'No audit reason captured')}
### Action Required
- [ ] Diagnose root cause of repeated failure
- [ ] Fix configuration or remove broken job
- [ ] Verify job resumes healthy after fix
*Auto-generated by cron-audit-662.py*
"""
# --- Crontab Parsing ---
def parse_crontab(text: str, source: str = "unknown") -> list:
"""Parse a crontab file into job-like dicts for audit."""
import re
jobs = []
cron_pattern = re.compile(
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)\s+(.*)'
)
schedule_pattern = re.compile(
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)'
)
for line in text.split("\n"):
line = line.strip()
if not line or line.startswith("#"):
continue
sm = schedule_pattern.match(line)
cm = cron_pattern.match(line)
if not sm or not cm:
continue
schedule_raw = sm.group(0).strip()
command = cm.group(1).strip()
name_part = command.split("#")[-1].strip() if "#" in command else ""
if not name_part:
cmd_base = command.split(">>")[0].strip().split()
name_part = os.path.basename(cmd_base[-1]) if cmd_base else "unnamed"
clean_cmd = command.split(">>")[0].split("#")[0].strip()[:200]
jobs.append({
"id": f"crontab-{source}-{hash(command) % 10000:04x}",
"name": name_part,
"schedule_display": schedule_raw,
"schedule": schedule_raw,
"state": "scheduled",
"enabled": True,
"last_status": None,
"last_error": None,
"last_run_at": None,
"_source": f"crontab:{source}",
"_command": clean_cmd,
})
return jobs
def load_crontab_backups(backup_dir) -> list:
"""Load cron jobs from VPS crontab backup files."""
from pathlib import Path
backup_dir = Path(backup_dir)
all_jobs = []
if not backup_dir.exists():
return all_jobs
for fpath in sorted(backup_dir.glob("*-crontab-backup.txt")):
source = fpath.name.replace("-crontab-backup.txt", "")
text = fpath.read_text()
all_jobs.extend(parse_crontab(text, source=source))
return all_jobs
def audit_fleet(hermes_jobs: list, crontab_jobs: list) -> dict:
"""Run full fleet audit combining hermes cron + VPS crontabs."""
now = datetime.now(timezone.utc)
hermes_categorized = [categorize_job(j, now) for j in hermes_jobs]
crontab_categorized = []
for j in crontab_jobs:
crontab_categorized.append({
"id": j["id"], "name": j["name"],
"schedule": j.get("schedule_display", ""),
"state": "scheduled", "enabled": True,
"last_status": None, "last_error": None, "last_delivery_error": None, "last_run_at": None,
"category": "healthy",
"reason": f"Crontab ({j.get('_source', '?')}) — verify logs manually",
"action": "verify-logs",
})
all_cat = hermes_categorized + crontab_categorized
healthy = [c for c in all_cat if c["category"] == "healthy"]
transient = [c for c in all_cat if c["category"] == "transient"]
systemic = [c for c in all_cat if c["category"] == "systemic"]
return {
"audit_time": now.isoformat(),
"total_jobs": len(all_cat),
"hermes_jobs": len(hermes_categorized),
"crontab_jobs": len(crontab_categorized),
"summary": {"healthy": len(healthy), "transient_errors": len(transient), "systemic_failures": len(systemic)},
"systemic_jobs": [
{
"id": j["id"],
"name": j["name"],
"schedule": j["schedule"],
"state": j["state"],
"reason": j["reason"],
"last_error": j.get("last_error", ""),
"last_delivery_error": j.get("last_delivery_error", ""),
"last_run_at": j.get("last_run_at"),
"action": j.get("action", ""),
}
for j in systemic
],
"transient_jobs": [
{
"id": j["id"],
"name": j["name"],
"schedule": j["schedule"],
"state": j["state"],
"reason": j["reason"],
"last_error": j.get("last_error", ""),
"last_delivery_error": j.get("last_delivery_error", ""),
"last_run_at": j.get("last_run_at"),
"action": j.get("action", ""),
}
for j in transient
],
"all_jobs": all_cat,
}
def main():
parser = argparse.ArgumentParser(description="Cron fleet audit (#662)")
parser.add_argument("--jobs-file", help="Path to jobs.json override")
parser.add_argument("--disable", action="store_true",
help="Disable systemic jobs (requires hermes CLI)")
parser.add_argument("--issues", action="store_true",
help="File Gitea issues for systemic failures")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--json", action="store_true", help="JSON output only")
args = parser.parse_args()
# Load jobs
jobs = []
if args.jobs_file:
with open(args.jobs_file) as f:
data = json.load(f)
jobs = data.get("jobs", data) if isinstance(data, dict) else data
else:
jobs = load_cron_state()
# Also load VPS crontab backups
crontab_dir = Path(__file__).parent.parent / "cron" / "vps"
crontab_jobs = load_crontab_backups(crontab_dir)
if not jobs:
print("ERROR: No cron jobs found. Check ~/.hermes/cron/ or run 'hermes cron list'.")
sys.exit(1)
# Run audit
if crontab_jobs:
report = audit_fleet(jobs, crontab_jobs)
else:
report = audit_jobs(jobs)
# Output
if args.json:
print(json.dumps(report, indent=2))
else:
print(f"\n{'='*60}")
print(f" CRON FLEET AUDIT — {report['total_jobs']} jobs")
print(f"{'='*60}")
print(f" Healthy: {report['summary']['healthy']}")
print(f" Transient errors: {report['summary']['transient_errors']}")
print(f" Systemic failures: {report['summary']['systemic_failures']}")
print(f"{'='*60}")
if report["systemic_jobs"]:
print(f"\n SYSTEMIC FAILURES (>{ERROR_THRESHOLD_HOURS}h):")
for j in report["systemic_jobs"]:
print(f" - {j['name']} ({j['id']}): {j['reason']}")
if j["last_error"]:
print(f" Error: {j['last_error'][:100]}")
if report["transient_jobs"]:
print(f"\n TRANSIENT ERRORS:")
for j in report["transient_jobs"]:
print(f" - {j['name']} ({j['id']}): {j['reason']}")
print()
# Write report file
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"Report written to {args.output}")
# Disable systemic jobs
if args.disable and report["systemic_jobs"]:
import subprocess
for j in report["systemic_jobs"]:
print(f"Disabling: {j['name']} ({j['id']})")
try:
subprocess.run(
["hermes", "cron", "pause", j["id"]],
capture_output=True, text=True, timeout=10
)
print(f" → Disabled")
except Exception as e:
print(f" → Failed: {e}")
# File issues for systemic failures
if args.issues and report["systemic_jobs"]:
gitea_token = os.environ.get("GITEA_TOKEN") or ""
if not gitea_token:
token_path = Path.home() / ".config" / "gitea" / "token"
if token_path.exists():
gitea_token = token_path.read_text().strip()
if not gitea_token:
print("ERROR: No Gitea token found. Set GITEA_TOKEN or ~/.config/gitea/token")
sys.exit(1)
import urllib.request
base = "https://forge.alexanderwhitestone.com/api/v1"
headers = {
"Authorization": f"token {gitea_token}",
"Content-Type": "application/json",
}
for j in report["systemic_jobs"]:
title = f"CRON FAIL: {j['name']} — systemic error ({j['id']})"
body = generate_issue_body(j)
data = json.dumps({"title": title, "body": body}).encode()
req = urllib.request.Request(
f"{base}/repos/Timmy_Foundation/timmy-config/issues",
data=data, headers=headers, method="POST"
)
try:
resp = urllib.request.urlopen(req)
result = json.loads(resp.read())
print(f"Issued #{result['number']}: {title}")
except Exception as e:
print(f"Failed to file issue for {j['name']}: {e}")
# Exit code: non-zero if systemic failures found
sys.exit(1 if report["systemic_jobs"] else 0)
if __name__ == "__main__":
main()

View File

@@ -4,111 +4,365 @@
Part of the Gemini Sovereign Infrastructure Suite.
Tracks the fleet's progress through the Paperclips-inspired evolution arc.
Usage:
python3 scripts/phase_tracker.py status # Show current state
python3 scripts/phase_tracker.py evaluate # Auto-evaluate checkable milestones
python3 scripts/phase_tracker.py complete M4 # Mark milestone complete
python3 scripts/phase_tracker.py telegram # Post daily update to Telegram
python3 scripts/phase_tracker.py daily # evaluate + telegram
"""
import os
import sys
import json
import re
import argparse
import urllib.request
import subprocess
from pathlib import Path
from datetime import datetime, timezone, timedelta
MILESTONES_FILE = "fleet/milestones.md"
COMPLETED_FILE = "fleet/completed_milestones.json"
LOG_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-health"))
UPTIME_FILE = LOG_DIR / "uptime.json"
TELEGRAM_TOKEN_PATHS = [
Path.home() / ".config" / "timmy" / "telegram_bot_token",
Path.home() / ".hermes" / "telegram_bot_token",
Path.home() / ".hermes" / "telegram_token",
]
TELEGRAM_CHAT = os.environ.get("TELEGRAM_HOME_CHANNEL", "-1003664764329")
HOSTS = {
"ezra": {"ip": "143.198.27.163"},
"allegro": {"ip": "167.99.126.228"},
"bezalel": {"ip": "159.203.146.185"},
}
def _find_repo_root() -> Path:
script_dir = Path(__file__).resolve().parent
return script_dir.parent
def _read_token() -> str | None:
for p in TELEGRAM_TOKEN_PATHS:
if p.exists():
return p.read_text().strip()
return os.environ.get("TELEGRAM_BOT_TOKEN") or None
def telegram_send(text: str) -> bool:
token = _read_token()
if not token:
print("[WARN] No Telegram token found.", file=sys.stderr)
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
body = json.dumps({"chat_id": TELEGRAM_CHAT, "text": text, "parse_mode": "HTML"}).encode()
req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status == 200
except Exception as e:
print(f"[WARN] Telegram send failed: {e}", file=sys.stderr)
return False
class Milestone:
def __init__(self, m_id: str, title: str, trigger: str, message: str):
self.id = m_id
self.title = title
self.trigger = trigger
self.message = message
class Phase:
def __init__(self, name: str, number: int, unlock_condition: str | None):
self.name = name
self.number = number
self.unlock_condition = unlock_condition
self.milestones: list[Milestone] = []
class PhaseTracker:
def __init__(self):
# Find files relative to repo root
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
self.milestones_path = os.path.join(repo_root, MILESTONES_FILE)
self.completed_path = os.path.join(repo_root, COMPLETED_FILE)
self.milestones = self.parse_milestones()
self.completed = self.load_completed()
self.repo_root = _find_repo_root()
self.milestones_path = self.repo_root / MILESTONES_FILE
self.completed_path = self.repo_root / COMPLETED_FILE
self.phases: list[Phase] = self._parse_milestones()
self.completed: set[str] = self._load_completed()
def _parse_milestones(self) -> list[Phase]:
if not self.milestones_path.exists():
return []
content = self.milestones_path.read_text()
phases: list[Phase] = []
current_phase: Phase | None = None
for line in content.splitlines():
phase_match = re.match(r"##\s*Phase\s*(\d+):\s*(.+?)\s*(?:\(([^)]+)\))?\s*$", line)
if phase_match:
num = int(phase_match.group(1))
name = phase_match.group(2).strip()
unlock = phase_match.group(3)
current_phase = Phase(name, num, unlock)
phases.append(current_phase)
continue
m_match = re.match(r"###\s*(M\d+):\s*(.+)$", line)
if m_match and current_phase is not None:
m_id = m_match.group(1)
title = m_match.group(2).strip()
current_phase.milestones.append(Milestone(m_id, title, "", ""))
continue
if line.startswith("**Trigger:**") and current_phase and current_phase.milestones:
current_phase.milestones[-1].trigger = line.replace("**Trigger:**", "").strip()
continue
if line.startswith("**Message:**") and current_phase and current_phase.milestones:
current_phase.milestones[-1].message = line.replace("**Message:**", "").strip().strip('"')
continue
def parse_milestones(self):
if not os.path.exists(self.milestones_path):
return {}
with open(self.milestones_path, "r") as f:
content = f.read()
phases = {}
current_phase = None
for line in content.split("\n"):
if line.startswith("## Phase"):
current_phase = line.replace("## ", "").strip()
phases[current_phase] = []
elif line.startswith("### M"):
m_id = line.split(":")[0].replace("### ", "").strip()
title = line.split(":")[1].strip()
phases[current_phase].append({"id": m_id, "title": title})
return phases
def load_completed(self):
if os.path.exists(self.completed_path):
with open(self.completed_path, "r") as f:
try:
return json.load(f)
except:
return []
return []
def _load_completed(self) -> set[str]:
if self.completed_path.exists():
try:
data = json.loads(self.completed_path.read_text())
if isinstance(data, list):
return set(data)
except Exception:
pass
return set()
def save_completed(self):
with open(self.completed_path, "w") as f:
json.dump(self.completed, f, indent=2)
self.completed_path.write_text(json.dumps(sorted(self.completed), indent=2))
def show_progress(self):
print("--- Fleet Phase Progression Tracker ---")
total_milestones = 0
total_completed = 0
if not self.milestones:
print("[ERROR] No milestones found in fleet/milestones.md")
return
for phase, ms in self.milestones.items():
print(f"\n{phase}")
for m in ms:
total_milestones += 1
done = m["id"] in self.completed
if done:
total_completed += 1
status = "" if done else ""
print(f" {status} {m['id']}: {m['title']}")
percent = (total_completed / total_milestones) * 100 if total_milestones > 0 else 0
print(f"\nOverall Progress: {total_completed}/{total_milestones} ({percent:.1f}%)")
def mark_complete(self, m_id: str):
def mark_complete(self, m_id: str) -> bool:
m_id = m_id.upper()
exists = any(m.id == m_id for p in self.phases for m in p.milestones)
if not exists:
print(f"[ERROR] Unknown milestone: {m_id}")
return False
if m_id not in self.completed:
self.completed.append(m_id)
self.completed.add(m_id)
self.save_completed()
print(f"[SUCCESS] Marked {m_id} as complete.")
return True
print(f"[INFO] {m_id} is already complete.")
return True
def _get_phase_state(self) -> tuple[int, float, list[str], list[str]]:
"""Returns (current_phase_number, decimal_progress, blockers, next_milestones)."""
blockers = []
next_milestones = []
for phase in self.phases:
phase_completed = sum(1 for m in phase.milestones if m.id in self.completed)
phase_total = len(phase.milestones)
if phase_total == 0:
continue
if phase_completed < phase_total:
progress = phase_completed / phase_total
decimal = phase.number + progress
# Find next incomplete milestone
for m in phase.milestones:
if m.id not in self.completed:
next_milestones.append(f"{m.id}: {m.title}")
if m.trigger:
blockers.append(f"{m.id}: {m.trigger}")
break
# Phase unlock condition as blocker if near end
if phase_completed == phase_total - 1 and phase.unlock_condition:
blockers.append(f"Unlock Phase {phase.number + 1}: {phase.unlock_condition}")
return phase.number, decimal, blockers, next_milestones
# All done
last = self.phases[-1] if self.phases else None
if last:
return last.number, float(last.number) + 1.0, ["All phases complete."], []
return 0, 0.0, ["No milestones defined."], []
def show_progress(self):
phase_num, decimal, blockers, next_ms = self._get_phase_state()
total_ms = sum(len(p.milestones) for p in self.phases)
total_completed = len(self.completed)
overall_pct = (total_completed / total_ms * 100) if total_ms else 0
print("=" * 50)
print(" Fleet Phase Progression Tracker")
print("=" * 50)
print(f"\nCurrent Phase: Phase {phase_num}{self.phases[phase_num - 1].name if phase_num <= len(self.phases) else 'Complete'}")
print(f"Decimal Progress: Phase {decimal:.1f}")
print(f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)")
print("\n--- Milestones ---")
for phase in self.phases:
done = sum(1 for m in phase.milestones if m.id in self.completed)
total = len(phase.milestones)
status = "" if done == total else ""
print(f"\n{status} Phase {phase.number}: {phase.name} ({done}/{total})")
for m in phase.milestones:
mark = "" if m.id in self.completed else ""
print(f" {mark} {m.id}: {m.title}")
print("\n--- Next Up ---")
for nm in next_ms[:3]:
print(f"{nm}")
print("\n--- Blockers ---")
for b in blockers[:5]:
print(f" ⚠️ {b}")
if not blockers:
print(" 🚀 Nothing blocking.")
print()
def summary_text(self) -> str:
phase_num, decimal, blockers, next_ms = self._get_phase_state()
total_ms = sum(len(p.milestones) for p in self.phases)
total_completed = len(self.completed)
overall_pct = (total_completed / total_ms * 100) if total_ms else 0
phase_name = self.phases[phase_num - 1].name if phase_num <= len(self.phases) else "Complete"
next_phase = phase_num + 1 if phase_num < len(self.phases) else phase_num
progress_to_next = (decimal - phase_num) * 100
lines = [
f"Fleet: Phase {decimal:.1f} ({progress_to_next:.0f}% to Phase {next_phase})",
f"Phase: {phase_num}{phase_name}",
f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)",
]
if next_ms:
lines.append(f"Next: {next_ms[0]}")
if blockers and blockers[0] != "All phases complete.":
lines.append(f"Blocker: {blockers[0]}")
return "\n".join(lines)
# === Auto-evaluation heuristics ===
def _eval_file_exists(self, path: str) -> bool:
return (self.repo_root / path).exists()
def _eval_command(self, cmd: str) -> bool:
try:
result = subprocess.run(cmd, shell=True, capture_output=True, timeout=10)
return result.returncode == 0
except Exception:
return False
def _eval_uptime(self, target: float) -> bool:
if not UPTIME_FILE.exists():
return False
try:
data = json.loads(UPTIME_FILE.read_text())
uptime = data.get("uptime_30d_percent", 0.0)
return uptime >= target
except Exception:
return False
def _eval_local_model_multi(self) -> bool:
count = 0
for host in HOSTS:
if self._eval_command(f"ssh -o ConnectTimeout=5 {host} 'pgrep -f ollama >/dev/null 2>&1'"):
count += 1
return count >= 2
def _eval_zero_manual_restarts(self, days: int = 7) -> bool:
log = LOG_DIR / "auto_restart.log"
if not log.exists():
return False
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
try:
with open(log) as f:
for line in f:
if "manual restart" in line.lower():
# crude timestamp parse
try:
ts = datetime.fromisoformat(line[:19])
if ts > cutoff:
return False
except Exception:
continue
return True
except Exception:
return False
def evaluate(self):
"""Auto-check milestones where we have heuristics."""
print("[EVAL] Running automatic milestone checks...\n")
checks = [
("M1", self._eval_command, "python3 fleet/health_check.py --dry-run 2>/dev/null || python3 fleet/health_check.py 2>&1 | head -1 >/dev/null"),
("M2", self._eval_command, "test -f ~/.local/timmy/fleet-health/auto_restart.log && grep -q 'restarted' ~/.local/timmy/fleet-health/auto_restart.log"),
("M3", self._eval_command, "test -d ~/.local/timmy/backups && ls ~/.local/timmy/backups | grep -q ."),
("M4", self._eval_uptime, 95.0),
("M5", self._eval_uptime, 97.0),
("M6", self._eval_zero_manual_restarts, 7),
("M9", self._eval_uptime, 98.0),
("M11", self._eval_local_model_multi, None),
]
newly_found = []
for m_id, check_fn, arg in checks:
if m_id in self.completed:
continue
result = check_fn(arg) if arg is not None else check_fn()
if result:
print(f"{m_id} appears satisfied — marking complete.")
self.completed.add(m_id)
newly_found.append(m_id)
else:
print(f"{m_id} not yet satisfied.")
if newly_found:
self.save_completed()
print(f"\n[SUCCESS] Auto-completed {len(newly_found)} milestone(s): {', '.join(newly_found)}")
else:
print(f"[INFO] {m_id} is already complete.")
print("\n[INFO] No new milestones auto-detected.")
def daily(self):
self.evaluate()
text = self.summary_text()
print(text)
ok = telegram_send(text)
if ok:
print("\n[TELEGRAM] Daily update sent.")
else:
print("\n[TELEGRAM] Failed to send update.")
def main():
parser = argparse.ArgumentParser(description="Gemini Phase Tracker")
parser = argparse.ArgumentParser(description="Fleet Phase Progression Tracker")
subparsers = parser.add_subparsers(dest="command")
subparsers.add_parser("status", help="Show current progress")
subparsers.add_parser("evaluate", help="Auto-evaluate checkable milestones")
subparsers.add_parser("telegram", help="Post summary to Telegram")
subparsers.add_parser("daily", help="Evaluate then post to Telegram")
complete_parser = subparsers.add_parser("complete", help="Mark a milestone as complete")
complete_parser.add_argument("id", help="Milestone ID (e.g. M1)")
args = parser.parse_args()
tracker = PhaseTracker()
if args.command == "status":
tracker.show_progress()
elif args.command == "evaluate":
tracker.evaluate()
elif args.command == "telegram":
ok = telegram_send(tracker.summary_text())
sys.exit(0 if ok else 1)
elif args.command == "daily":
tracker.daily()
elif args.command == "complete":
tracker.mark_complete(args.id)
ok = tracker.mark_complete(args.id)
sys.exit(0 if ok else 1)
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -71,6 +71,43 @@ class TestCategorizeJob:
r = categorize_job({"name": "t", "state": "paused", "enabled": False}, datetime.now(timezone.utc))
assert r["category"] == "healthy"
def test_import_error_is_systemic_even_when_recent(self):
from cron_audit_662 import categorize_job
now = datetime.now(timezone.utc)
r = categorize_job({
"name": "t",
"last_status": "error",
"last_error": "cannot import name 'AIAgent' from 'run_agent'",
"last_run_at": (now - timedelta(hours=1)).isoformat(),
}, now)
assert r["category"] == "systemic"
assert r["action"] == "disable"
def test_empty_response_stays_transient(self):
from cron_audit_662 import categorize_job
now = datetime.now(timezone.utc)
r = categorize_job({
"name": "t",
"last_status": "error",
"last_error": "Agent completed but produced empty response (model error, timeout, or misconfiguration)",
"last_run_at": (now - timedelta(hours=1)).isoformat(),
}, now)
assert r["category"] == "transient"
def test_delivery_failure_after_success_is_transient(self):
from cron_audit_662 import categorize_job
now = datetime.now(timezone.utc)
r = categorize_job({
"name": "t",
"last_status": "ok",
"last_delivery_error": "delivery error: Telegram send failed: Timed out",
"last_run_at": now.isoformat(),
"enabled": True,
"state": "scheduled",
}, now)
assert r["category"] == "transient"
assert "delivery failed" in r["reason"]
class TestAuditFleet:
def test_empty(self):
@@ -116,3 +153,23 @@ class TestTimestampParsing:
from cron_audit_662 import parse_timestamp
assert parse_timestamp("") is None
assert parse_timestamp(None) is None
class TestIssueBody:
def test_includes_schedule_state_and_delivery_error(self):
from cron_audit_662 import generate_issue_body
body = generate_issue_body({
"id": "job-1",
"name": "Health Monitor",
"schedule": "every 5m",
"state": "scheduled",
"last_error": "cannot import name 'tool' from 'tools.registry'",
"last_delivery_error": "delivery error: Telegram send failed: Timed out",
"reason": "Systemic error signature: cannot import (1.0h ago)",
})
assert "Health Monitor" in body
assert "every 5m" in body
assert "scheduled" in body
assert "Last Delivery Error" in body