Compare commits
3 Commits
burn/timmy
...
fix/662
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d9b8af136 | ||
| ae8c1d46ae | |||
|
|
508441acb4 |
0
scripts/ci-cron-validate.py
Normal file → Executable file
0
scripts/ci-cron-validate.py
Normal file → Executable file
425
scripts/cron-audit-662.py
Normal file → Executable file
425
scripts/cron-audit-662.py
Normal file → Executable file
@@ -1,428 +1,11 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Cron Fleet Audit Script — #662
|
||||
"""CLI wrapper for the importable cron_audit_662 module."""
|
||||
|
||||
Reads hermes cron job state, categorizes all jobs into:
|
||||
- healthy: last_status=ok or never-run-and-enabled
|
||||
- transient: recent errors (likely network/timeout)
|
||||
- systemic: repeated errors over 48+ hours
|
||||
|
||||
Outputs a JSON report and optionally:
|
||||
--disable Disable systemic jobs erroring 48+ hours
|
||||
--issues File Gitea issues for systemic failures
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any
|
||||
import sys
|
||||
|
||||
# --- Config ---
|
||||
ERROR_THRESHOLD_HOURS = 48
|
||||
CRON_STATE_PATHS = [
|
||||
Path.home() / ".hermes" / "cron" / "jobs.json",
|
||||
Path.home() / ".hermes" / "cron" / "state.json",
|
||||
Path("/root/.hermes/cron/jobs.json"),
|
||||
Path("/root/.hermes/cron/state.json"),
|
||||
]
|
||||
|
||||
def load_cron_state() -> List[Dict[str, Any]]:
|
||||
"""Load cron job state from known locations."""
|
||||
for path in CRON_STATE_PATHS:
|
||||
if path.exists():
|
||||
try:
|
||||
with open(path) as f:
|
||||
data = json.load(f)
|
||||
if isinstance(data, dict) and "jobs" in data:
|
||||
return data["jobs"]
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
except (json.JSONDecodeError, IOError):
|
||||
continue
|
||||
|
||||
# Fallback: try hermes cron list CLI
|
||||
try:
|
||||
import subprocess
|
||||
result = subprocess.run(
|
||||
["hermes", "cron", "list", "--json"],
|
||||
capture_output=True, text=True, timeout=30
|
||||
)
|
||||
if result.returncode == 0:
|
||||
data = json.loads(result.stdout)
|
||||
if isinstance(data, dict) and "jobs" in data:
|
||||
return data["jobs"]
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError):
|
||||
pass
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def parse_timestamp(ts: str) -> datetime:
|
||||
"""Parse ISO timestamp, handle various formats."""
|
||||
if not ts:
|
||||
return None
|
||||
# Normalize timezone
|
||||
ts = ts.replace("+00:00", "+00:00")
|
||||
try:
|
||||
dt = datetime.fromisoformat(ts)
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def categorize_job(job: Dict[str, Any], now: datetime) -> Dict[str, Any]:
|
||||
"""Categorize a single job."""
|
||||
status = job.get("last_status", "")
|
||||
last_error = job.get("last_error", "")
|
||||
last_run = parse_timestamp(job.get("last_run_at"))
|
||||
enabled = job.get("enabled", False)
|
||||
state = job.get("state", "unknown")
|
||||
name = job.get("name", job.get("id", "unknown"))
|
||||
|
||||
entry = {
|
||||
"id": job.get("id", ""),
|
||||
"name": name,
|
||||
"schedule": job.get("schedule_display", str(job.get("schedule", ""))),
|
||||
"state": state,
|
||||
"enabled": enabled,
|
||||
"last_status": status,
|
||||
"last_error": last_error,
|
||||
"last_run_at": job.get("last_run_at"),
|
||||
"category": "healthy",
|
||||
"reason": "",
|
||||
"action": "",
|
||||
}
|
||||
|
||||
# Never run / no error
|
||||
if status is None and not last_error:
|
||||
entry["category"] = "healthy"
|
||||
entry["reason"] = "Never run, no errors"
|
||||
return entry
|
||||
|
||||
# Explicitly paused with reason
|
||||
if state == "paused":
|
||||
entry["category"] = "healthy"
|
||||
entry["reason"] = job.get("paused_reason", "Manually paused")
|
||||
entry["action"] = "none — paused intentionally"
|
||||
return entry
|
||||
|
||||
# Completed jobs
|
||||
if state == "completed":
|
||||
entry["category"] = "healthy"
|
||||
entry["reason"] = "Completed (one-shot)"
|
||||
return entry
|
||||
|
||||
# Error status
|
||||
if status == "error" and last_error:
|
||||
age_hours = None
|
||||
if last_run:
|
||||
age_hours = (now - last_run).total_seconds() / 3600
|
||||
|
||||
if age_hours is not None and age_hours >= ERROR_THRESHOLD_HOURS:
|
||||
entry["category"] = "systemic"
|
||||
entry["reason"] = f"Erroring for {age_hours:.1f}h (>{ERROR_THRESHOLD_HOURS}h threshold)"
|
||||
entry["action"] = "disable"
|
||||
else:
|
||||
entry["category"] = "transient"
|
||||
age_str = f"{age_hours:.1f}h ago" if age_hours is not None else "unknown age"
|
||||
entry["reason"] = f"Recent error ({age_str}), may be transient"
|
||||
entry["action"] = "monitor"
|
||||
return entry
|
||||
|
||||
# OK status
|
||||
if status == "ok":
|
||||
entry["category"] = "healthy"
|
||||
entry["reason"] = "Last run succeeded"
|
||||
return entry
|
||||
|
||||
# Scheduled but never errored
|
||||
if state == "scheduled" and enabled:
|
||||
entry["category"] = "healthy"
|
||||
entry["reason"] = "Scheduled and running"
|
||||
return entry
|
||||
|
||||
# Unknown state
|
||||
entry["category"] = "transient"
|
||||
entry["reason"] = f"Unknown state: {state}, status: {status}"
|
||||
entry["action"] = "investigate"
|
||||
return entry
|
||||
|
||||
|
||||
def audit_jobs(jobs: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""Run full audit on job list."""
|
||||
now = datetime.now(timezone.utc)
|
||||
categorized = [categorize_job(j, now) for j in jobs]
|
||||
|
||||
healthy = [c for c in categorized if c["category"] == "healthy"]
|
||||
transient = [c for c in categorized if c["category"] == "transient"]
|
||||
systemic = [c for c in categorized if c["category"] == "systemic"]
|
||||
|
||||
report = {
|
||||
"audit_time": now.isoformat(),
|
||||
"total_jobs": len(jobs),
|
||||
"summary": {
|
||||
"healthy": len(healthy),
|
||||
"transient_errors": len(transient),
|
||||
"systemic_failures": len(systemic),
|
||||
},
|
||||
"systemic_jobs": [
|
||||
{
|
||||
"id": j["id"],
|
||||
"name": j["name"],
|
||||
"reason": j["reason"],
|
||||
"last_error": j["last_error"],
|
||||
}
|
||||
for j in systemic
|
||||
],
|
||||
"transient_jobs": [
|
||||
{
|
||||
"id": j["id"],
|
||||
"name": j["name"],
|
||||
"reason": j["reason"],
|
||||
}
|
||||
for j in transient
|
||||
],
|
||||
"all_jobs": categorized,
|
||||
}
|
||||
|
||||
return report
|
||||
|
||||
|
||||
def generate_issue_body(job: Dict[str, Any]) -> str:
|
||||
"""Generate a Gitea issue body for a systemic cron failure."""
|
||||
return f"""## Systemic Cron Failure — Auto-Filed by Audit #662
|
||||
|
||||
**Job:** {job['name']} (`{job['id']}`)
|
||||
**Schedule:** {job['schedule']}
|
||||
**State:** {job['state']}
|
||||
**Last Error:**
|
||||
```
|
||||
{job['last_error'] or 'No error details available'}
|
||||
```
|
||||
|
||||
**Audit Finding:** {job['reason']}
|
||||
|
||||
### Action Required
|
||||
- [ ] Diagnose root cause of repeated failure
|
||||
- [ ] Fix configuration or remove broken job
|
||||
- [ ] Verify job resumes healthy after fix
|
||||
|
||||
*Auto-generated by cron-audit-662.py*
|
||||
"""
|
||||
|
||||
|
||||
# --- Crontab Parsing ---
|
||||
|
||||
def parse_crontab(text: str, source: str = "unknown") -> list:
|
||||
"""Parse a crontab file into job-like dicts for audit."""
|
||||
import re
|
||||
jobs = []
|
||||
cron_pattern = re.compile(
|
||||
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)\s+(.*)'
|
||||
)
|
||||
schedule_pattern = re.compile(
|
||||
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)'
|
||||
)
|
||||
|
||||
for line in text.split("\n"):
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
sm = schedule_pattern.match(line)
|
||||
cm = cron_pattern.match(line)
|
||||
if not sm or not cm:
|
||||
continue
|
||||
schedule_raw = sm.group(0).strip()
|
||||
command = cm.group(1).strip()
|
||||
name_part = command.split("#")[-1].strip() if "#" in command else ""
|
||||
if not name_part:
|
||||
cmd_base = command.split(">>")[0].strip().split()
|
||||
name_part = os.path.basename(cmd_base[-1]) if cmd_base else "unnamed"
|
||||
clean_cmd = command.split(">>")[0].split("#")[0].strip()[:200]
|
||||
jobs.append({
|
||||
"id": f"crontab-{source}-{hash(command) % 10000:04x}",
|
||||
"name": name_part,
|
||||
"schedule_display": schedule_raw,
|
||||
"schedule": schedule_raw,
|
||||
"state": "scheduled",
|
||||
"enabled": True,
|
||||
"last_status": None,
|
||||
"last_error": None,
|
||||
"last_run_at": None,
|
||||
"_source": f"crontab:{source}",
|
||||
"_command": clean_cmd,
|
||||
})
|
||||
return jobs
|
||||
|
||||
|
||||
def load_crontab_backups(backup_dir) -> list:
|
||||
"""Load cron jobs from VPS crontab backup files."""
|
||||
from pathlib import Path
|
||||
backup_dir = Path(backup_dir)
|
||||
all_jobs = []
|
||||
if not backup_dir.exists():
|
||||
return all_jobs
|
||||
for fpath in sorted(backup_dir.glob("*-crontab-backup.txt")):
|
||||
source = fpath.name.replace("-crontab-backup.txt", "")
|
||||
text = fpath.read_text()
|
||||
all_jobs.extend(parse_crontab(text, source=source))
|
||||
return all_jobs
|
||||
|
||||
|
||||
def audit_fleet(hermes_jobs: list, crontab_jobs: list) -> dict:
|
||||
"""Run full fleet audit combining hermes cron + VPS crontabs."""
|
||||
now = datetime.now(timezone.utc)
|
||||
hermes_categorized = [categorize_job(j, now) for j in hermes_jobs]
|
||||
crontab_categorized = []
|
||||
for j in crontab_jobs:
|
||||
crontab_categorized.append({
|
||||
"id": j["id"], "name": j["name"],
|
||||
"schedule": j.get("schedule_display", ""),
|
||||
"state": "scheduled", "enabled": True,
|
||||
"last_status": None, "last_error": None, "last_run_at": None,
|
||||
"category": "healthy",
|
||||
"reason": f"Crontab ({j.get('_source', '?')}) — verify logs manually",
|
||||
"action": "verify-logs",
|
||||
})
|
||||
all_cat = hermes_categorized + crontab_categorized
|
||||
healthy = [c for c in all_cat if c["category"] == "healthy"]
|
||||
transient = [c for c in all_cat if c["category"] == "transient"]
|
||||
systemic = [c for c in all_cat if c["category"] == "systemic"]
|
||||
return {
|
||||
"audit_time": now.isoformat(),
|
||||
"total_jobs": len(all_cat),
|
||||
"hermes_jobs": len(hermes_categorized),
|
||||
"crontab_jobs": len(crontab_categorized),
|
||||
"summary": {"healthy": len(healthy), "transient_errors": len(transient), "systemic_failures": len(systemic)},
|
||||
"systemic_jobs": [{"id": j["id"], "name": j["name"], "reason": j["reason"], "last_error": j.get("last_error", "")} for j in systemic],
|
||||
"transient_jobs": [{"id": j["id"], "name": j["name"], "reason": j["reason"]} for j in transient],
|
||||
"all_jobs": all_cat,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Cron fleet audit (#662)")
|
||||
parser.add_argument("--jobs-file", help="Path to jobs.json override")
|
||||
parser.add_argument("--disable", action="store_true",
|
||||
help="Disable systemic jobs (requires hermes CLI)")
|
||||
parser.add_argument("--issues", action="store_true",
|
||||
help="File Gitea issues for systemic failures")
|
||||
parser.add_argument("--output", help="Write report to file")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output only")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load jobs
|
||||
jobs = []
|
||||
if args.jobs_file:
|
||||
with open(args.jobs_file) as f:
|
||||
data = json.load(f)
|
||||
jobs = data.get("jobs", data) if isinstance(data, dict) else data
|
||||
else:
|
||||
jobs = load_cron_state()
|
||||
|
||||
# Also load VPS crontab backups
|
||||
crontab_dir = Path(__file__).parent.parent / "cron" / "vps"
|
||||
crontab_jobs = load_crontab_backups(crontab_dir)
|
||||
|
||||
if not jobs:
|
||||
print("ERROR: No cron jobs found. Check ~/.hermes/cron/ or run 'hermes cron list'.")
|
||||
sys.exit(1)
|
||||
|
||||
# Run audit
|
||||
if crontab_jobs:
|
||||
report = audit_fleet(jobs, crontab_jobs)
|
||||
else:
|
||||
report = audit_jobs(jobs)
|
||||
|
||||
# Output
|
||||
if args.json:
|
||||
print(json.dumps(report, indent=2))
|
||||
else:
|
||||
print(f"\n{'='*60}")
|
||||
print(f" CRON FLEET AUDIT — {report['total_jobs']} jobs")
|
||||
print(f"{'='*60}")
|
||||
print(f" Healthy: {report['summary']['healthy']}")
|
||||
print(f" Transient errors: {report['summary']['transient_errors']}")
|
||||
print(f" Systemic failures: {report['summary']['systemic_failures']}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
if report["systemic_jobs"]:
|
||||
print(f"\n SYSTEMIC FAILURES (>{ERROR_THRESHOLD_HOURS}h):")
|
||||
for j in report["systemic_jobs"]:
|
||||
print(f" - {j['name']} ({j['id']}): {j['reason']}")
|
||||
if j["last_error"]:
|
||||
print(f" Error: {j['last_error'][:100]}")
|
||||
|
||||
if report["transient_jobs"]:
|
||||
print(f"\n TRANSIENT ERRORS:")
|
||||
for j in report["transient_jobs"]:
|
||||
print(f" - {j['name']} ({j['id']}): {j['reason']}")
|
||||
|
||||
print()
|
||||
|
||||
# Write report file
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(report, f, indent=2)
|
||||
print(f"Report written to {args.output}")
|
||||
|
||||
# Disable systemic jobs
|
||||
if args.disable and report["systemic_jobs"]:
|
||||
import subprocess
|
||||
for j in report["systemic_jobs"]:
|
||||
print(f"Disabling: {j['name']} ({j['id']})")
|
||||
try:
|
||||
subprocess.run(
|
||||
["hermes", "cron", "pause", j["id"]],
|
||||
capture_output=True, text=True, timeout=10
|
||||
)
|
||||
print(f" → Disabled")
|
||||
except Exception as e:
|
||||
print(f" → Failed: {e}")
|
||||
|
||||
# File issues for systemic failures
|
||||
if args.issues and report["systemic_jobs"]:
|
||||
gitea_token = os.environ.get("GITEA_TOKEN") or ""
|
||||
if not gitea_token:
|
||||
token_path = Path.home() / ".config" / "gitea" / "token"
|
||||
if token_path.exists():
|
||||
gitea_token = token_path.read_text().strip()
|
||||
|
||||
if not gitea_token:
|
||||
print("ERROR: No Gitea token found. Set GITEA_TOKEN or ~/.config/gitea/token")
|
||||
sys.exit(1)
|
||||
|
||||
import urllib.request
|
||||
base = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
headers = {
|
||||
"Authorization": f"token {gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
for j in report["systemic_jobs"]:
|
||||
title = f"CRON FAIL: {j['name']} — systemic error ({j['id']})"
|
||||
body = generate_issue_body(j)
|
||||
data = json.dumps({"title": title, "body": body}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{base}/repos/Timmy_Foundation/timmy-config/issues",
|
||||
data=data, headers=headers, method="POST"
|
||||
)
|
||||
try:
|
||||
resp = urllib.request.urlopen(req)
|
||||
result = json.loads(resp.read())
|
||||
print(f"Issued #{result['number']}: {title}")
|
||||
except Exception as e:
|
||||
print(f"Failed to file issue for {j['name']}: {e}")
|
||||
|
||||
# Exit code: non-zero if systemic failures found
|
||||
sys.exit(1 if report["systemic_jobs"] else 0)
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||||
from cron_audit_662 import main
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
630
scripts/cron_audit_662.py
Executable file
630
scripts/cron_audit_662.py
Executable file
@@ -0,0 +1,630 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Cron Fleet Audit Script — #662
|
||||
|
||||
Reads hermes cron job state, categorizes all jobs into:
|
||||
- healthy: last_status=ok or never-run-and-enabled
|
||||
- transient: recent errors (likely network/timeout)
|
||||
- systemic: repeated errors over 48+ hours
|
||||
|
||||
Outputs a JSON report and optionally:
|
||||
--disable Disable systemic jobs erroring 48+ hours
|
||||
--issues File Gitea issues for systemic failures
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
# --- Config ---
|
||||
ERROR_THRESHOLD_HOURS = 48
|
||||
TRANSIENT_ERROR_MARKERS = (
|
||||
"timeout",
|
||||
"timed out",
|
||||
"rate limit",
|
||||
"429",
|
||||
"503",
|
||||
"temporary",
|
||||
"temporarily",
|
||||
"auth store lock",
|
||||
"connection reset",
|
||||
"connection refused",
|
||||
"connection aborted",
|
||||
"delivery error",
|
||||
"telegram send failed",
|
||||
"matrix send failed",
|
||||
"nodename nor servname provided",
|
||||
"name or service not known",
|
||||
"empty response",
|
||||
)
|
||||
SYSTEMIC_ERROR_MARKERS = (
|
||||
"cannot import",
|
||||
"modulenotfounderror",
|
||||
"importerror",
|
||||
"attributeerror",
|
||||
"syntaxerror",
|
||||
"permission denied",
|
||||
"no such file",
|
||||
"file not found",
|
||||
"dict' object has no attribute",
|
||||
'dict" object has no attribute',
|
||||
)
|
||||
CRON_STATE_PATHS = [
|
||||
Path.home() / ".hermes" / "cron" / "jobs.json",
|
||||
Path.home() / ".hermes" / "cron" / "state.json",
|
||||
Path("/root/.hermes/cron/jobs.json"),
|
||||
Path("/root/.hermes/cron/state.json"),
|
||||
]
|
||||
|
||||
def load_jobs_file(path: Path) -> List[Dict[str, Any]]:
|
||||
"""Load a cron jobs/state JSON file."""
|
||||
with path.open() as f:
|
||||
data = json.load(f)
|
||||
if isinstance(data, dict) and "jobs" in data:
|
||||
return data["jobs"]
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
return []
|
||||
|
||||
|
||||
def parse_cron_list_output(text: str) -> List[Dict[str, Any]]:
|
||||
"""Parse `hermes cron list --all` output as a last-resort fallback."""
|
||||
jobs: List[Dict[str, Any]] = []
|
||||
current: Optional[Dict[str, Any]] = None
|
||||
job_header = re.compile(r"^\s{2}(?P<id>\S+) \[(?P<state>[^\]]+)\]\s*$")
|
||||
|
||||
for raw_line in text.splitlines():
|
||||
line = raw_line.rstrip()
|
||||
match = job_header.match(line)
|
||||
if match:
|
||||
if current:
|
||||
jobs.append(current)
|
||||
state = match.group("state").strip().lower()
|
||||
current = {
|
||||
"id": match.group("id"),
|
||||
"name": match.group("id"),
|
||||
"schedule_display": "",
|
||||
"schedule": "",
|
||||
"state": "paused" if state == "paused" else "scheduled",
|
||||
"enabled": state == "active",
|
||||
"last_status": None,
|
||||
"last_error": None,
|
||||
"last_delivery_error": None,
|
||||
"last_run_at": None,
|
||||
}
|
||||
continue
|
||||
|
||||
if not current:
|
||||
continue
|
||||
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("Name:"):
|
||||
current["name"] = stripped.split("Name:", 1)[1].strip()
|
||||
elif stripped.startswith("Schedule:"):
|
||||
schedule = stripped.split("Schedule:", 1)[1].strip()
|
||||
current["schedule_display"] = schedule
|
||||
current["schedule"] = schedule
|
||||
elif stripped.startswith("Last run:"):
|
||||
payload = stripped.split("Last run:", 1)[1].strip()
|
||||
if payload in {"-", ""}:
|
||||
continue
|
||||
if " error: " in payload:
|
||||
ts, error = payload.split(" error: ", 1)
|
||||
current["last_run_at"] = ts.strip()
|
||||
current["last_status"] = "error"
|
||||
current["last_error"] = error.strip()
|
||||
elif payload.endswith(" ok"):
|
||||
current["last_run_at"] = payload[:-4].strip()
|
||||
current["last_status"] = "ok"
|
||||
else:
|
||||
current["last_run_at"] = payload
|
||||
|
||||
if current:
|
||||
jobs.append(current)
|
||||
return jobs
|
||||
|
||||
|
||||
def load_cron_state() -> List[Dict[str, Any]]:
|
||||
"""Load cron job state from known locations."""
|
||||
for path in CRON_STATE_PATHS:
|
||||
if not path.exists():
|
||||
continue
|
||||
try:
|
||||
jobs = load_jobs_file(path)
|
||||
if jobs:
|
||||
return jobs
|
||||
except (json.JSONDecodeError, IOError):
|
||||
continue
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["hermes", "cron", "list", "--all"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
jobs = parse_cron_list_output(result.stdout)
|
||||
if jobs:
|
||||
return jobs
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def parse_timestamp(ts: Optional[str]) -> Optional[datetime]:
|
||||
"""Parse ISO timestamp, handle various formats."""
|
||||
if not ts:
|
||||
return None
|
||||
ts = str(ts).strip().replace("Z", "+00:00")
|
||||
try:
|
||||
dt = datetime.fromisoformat(ts)
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def format_schedule(job: Dict[str, Any]) -> str:
|
||||
"""Normalize mixed schedule formats into a readable string."""
|
||||
display = job.get("schedule_display")
|
||||
if display:
|
||||
return str(display)
|
||||
|
||||
schedule = job.get("schedule")
|
||||
if isinstance(schedule, dict):
|
||||
return (
|
||||
schedule.get("display")
|
||||
or schedule.get("expr")
|
||||
or schedule.get("value")
|
||||
or json.dumps(schedule, sort_keys=True)
|
||||
)
|
||||
return str(schedule or "")
|
||||
|
||||
|
||||
def find_error_marker(text: str, markers: tuple[str, ...]) -> Optional[str]:
|
||||
for marker in markers:
|
||||
if marker in text:
|
||||
return marker
|
||||
return None
|
||||
|
||||
|
||||
def classify_error(last_error: str, last_delivery_error: str, age_hours: Optional[float]) -> tuple[str, str, str]:
|
||||
"""Classify an error as transient vs systemic using both age and error text."""
|
||||
combined = "\n".join(part for part in [last_error, last_delivery_error] if part).lower()
|
||||
age_text = f"{age_hours:.1f}h ago" if age_hours is not None else "unknown age"
|
||||
|
||||
if age_hours is not None and age_hours >= ERROR_THRESHOLD_HOURS:
|
||||
return (
|
||||
"systemic",
|
||||
f"Error persisted for {age_hours:.1f}h (>= {ERROR_THRESHOLD_HOURS}h threshold)",
|
||||
"disable",
|
||||
)
|
||||
|
||||
systemic_marker = find_error_marker(combined, SYSTEMIC_ERROR_MARKERS)
|
||||
if systemic_marker:
|
||||
return (
|
||||
"systemic",
|
||||
f"Systemic error signature: {systemic_marker} ({age_text})",
|
||||
"disable",
|
||||
)
|
||||
|
||||
transient_marker = find_error_marker(combined, TRANSIENT_ERROR_MARKERS)
|
||||
if transient_marker:
|
||||
return (
|
||||
"transient",
|
||||
f"Transient error signature: {transient_marker} ({age_text})",
|
||||
"monitor",
|
||||
)
|
||||
|
||||
return (
|
||||
"transient",
|
||||
f"Unclassified recent error ({age_text})",
|
||||
"investigate",
|
||||
)
|
||||
|
||||
|
||||
def categorize_job(job: Dict[str, Any], now: datetime) -> Dict[str, Any]:
|
||||
"""Categorize a single job."""
|
||||
status = job.get("last_status", "")
|
||||
last_error = job.get("last_error", "")
|
||||
last_delivery_error = job.get("last_delivery_error", "")
|
||||
last_run = parse_timestamp(job.get("last_run_at"))
|
||||
enabled = job.get("enabled", False)
|
||||
state = job.get("state", "unknown")
|
||||
name = job.get("name", job.get("id", "unknown"))
|
||||
|
||||
entry = {
|
||||
"id": job.get("id", ""),
|
||||
"name": name,
|
||||
"schedule": format_schedule(job),
|
||||
"state": state,
|
||||
"enabled": enabled,
|
||||
"last_status": status,
|
||||
"last_error": last_error,
|
||||
"last_delivery_error": last_delivery_error,
|
||||
"last_run_at": job.get("last_run_at"),
|
||||
"category": "healthy",
|
||||
"reason": "",
|
||||
"action": "",
|
||||
}
|
||||
|
||||
# Never run / no error
|
||||
if status is None and not last_error:
|
||||
entry["category"] = "healthy"
|
||||
entry["reason"] = "Never run, no errors"
|
||||
return entry
|
||||
|
||||
# Explicitly paused with reason
|
||||
if state == "paused":
|
||||
entry["category"] = "healthy"
|
||||
entry["reason"] = job.get("paused_reason", "Manually paused")
|
||||
entry["action"] = "none — paused intentionally"
|
||||
return entry
|
||||
|
||||
# Completed jobs
|
||||
if state == "completed":
|
||||
entry["category"] = "healthy"
|
||||
entry["reason"] = "Completed (one-shot)"
|
||||
return entry
|
||||
|
||||
# Error status
|
||||
if status == "error":
|
||||
age_hours = None
|
||||
if last_run:
|
||||
age_hours = (now - last_run).total_seconds() / 3600
|
||||
|
||||
entry["category"], entry["reason"], entry["action"] = classify_error(
|
||||
str(last_error or ""),
|
||||
str(last_delivery_error or ""),
|
||||
age_hours,
|
||||
)
|
||||
return entry
|
||||
|
||||
if status == "ok" and last_delivery_error:
|
||||
entry["category"] = "transient"
|
||||
entry["reason"] = f"Job completed but delivery failed: {last_delivery_error}"
|
||||
entry["action"] = "monitor"
|
||||
return entry
|
||||
|
||||
# OK status
|
||||
if status == "ok":
|
||||
entry["category"] = "healthy"
|
||||
entry["reason"] = "Last run succeeded"
|
||||
return entry
|
||||
|
||||
# Scheduled but never errored
|
||||
if state == "scheduled" and enabled:
|
||||
entry["category"] = "healthy"
|
||||
entry["reason"] = "Scheduled and running"
|
||||
return entry
|
||||
|
||||
# Unknown state
|
||||
entry["category"] = "transient"
|
||||
entry["reason"] = f"Unknown state: {state}, status: {status}"
|
||||
entry["action"] = "investigate"
|
||||
return entry
|
||||
|
||||
|
||||
def audit_jobs(jobs: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""Run full audit on job list."""
|
||||
now = datetime.now(timezone.utc)
|
||||
categorized = [categorize_job(j, now) for j in jobs]
|
||||
|
||||
healthy = [c for c in categorized if c["category"] == "healthy"]
|
||||
transient = [c for c in categorized if c["category"] == "transient"]
|
||||
systemic = [c for c in categorized if c["category"] == "systemic"]
|
||||
|
||||
report = {
|
||||
"audit_time": now.isoformat(),
|
||||
"total_jobs": len(jobs),
|
||||
"summary": {
|
||||
"healthy": len(healthy),
|
||||
"transient_errors": len(transient),
|
||||
"systemic_failures": len(systemic),
|
||||
},
|
||||
"systemic_jobs": [
|
||||
{
|
||||
"id": j["id"],
|
||||
"name": j["name"],
|
||||
"schedule": j["schedule"],
|
||||
"state": j["state"],
|
||||
"reason": j["reason"],
|
||||
"last_error": j["last_error"],
|
||||
"last_delivery_error": j["last_delivery_error"],
|
||||
"last_run_at": j["last_run_at"],
|
||||
"action": j["action"],
|
||||
}
|
||||
for j in systemic
|
||||
],
|
||||
"transient_jobs": [
|
||||
{
|
||||
"id": j["id"],
|
||||
"name": j["name"],
|
||||
"schedule": j["schedule"],
|
||||
"state": j["state"],
|
||||
"reason": j["reason"],
|
||||
"last_error": j["last_error"],
|
||||
"last_delivery_error": j["last_delivery_error"],
|
||||
"last_run_at": j["last_run_at"],
|
||||
"action": j["action"],
|
||||
}
|
||||
for j in transient
|
||||
],
|
||||
"all_jobs": categorized,
|
||||
}
|
||||
|
||||
return report
|
||||
|
||||
|
||||
def generate_issue_body(job: Dict[str, Any]) -> str:
|
||||
"""Generate a Gitea issue body for a systemic cron failure."""
|
||||
delivery_error = job.get("last_delivery_error") or ""
|
||||
delivery_block = ""
|
||||
if delivery_error:
|
||||
delivery_block = f"\n**Last Delivery Error:**\n```\n{delivery_error}\n```\n"
|
||||
|
||||
return f"""## Systemic Cron Failure — Auto-Filed by Audit #662
|
||||
|
||||
**Job:** {job.get('name', 'unknown')} (`{job.get('id', 'unknown')}`)
|
||||
**Schedule:** {job.get('schedule', 'unknown')}
|
||||
**State:** {job.get('state', 'unknown')}
|
||||
**Last Error:**
|
||||
```
|
||||
{job.get('last_error') or 'No error details available'}
|
||||
```
|
||||
{delivery_block}
|
||||
|
||||
**Audit Finding:** {job.get('reason', 'No audit reason captured')}
|
||||
|
||||
### Action Required
|
||||
- [ ] Diagnose root cause of repeated failure
|
||||
- [ ] Fix configuration or remove broken job
|
||||
- [ ] Verify job resumes healthy after fix
|
||||
|
||||
*Auto-generated by cron-audit-662.py*
|
||||
"""
|
||||
|
||||
|
||||
# --- Crontab Parsing ---
|
||||
|
||||
def parse_crontab(text: str, source: str = "unknown") -> list:
|
||||
"""Parse a crontab file into job-like dicts for audit."""
|
||||
import re
|
||||
jobs = []
|
||||
cron_pattern = re.compile(
|
||||
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)\s+(.*)'
|
||||
)
|
||||
schedule_pattern = re.compile(
|
||||
r'^(?:@\w+|[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+\s+[\d*,/\-]+)'
|
||||
)
|
||||
|
||||
for line in text.split("\n"):
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
sm = schedule_pattern.match(line)
|
||||
cm = cron_pattern.match(line)
|
||||
if not sm or not cm:
|
||||
continue
|
||||
schedule_raw = sm.group(0).strip()
|
||||
command = cm.group(1).strip()
|
||||
name_part = command.split("#")[-1].strip() if "#" in command else ""
|
||||
if not name_part:
|
||||
cmd_base = command.split(">>")[0].strip().split()
|
||||
name_part = os.path.basename(cmd_base[-1]) if cmd_base else "unnamed"
|
||||
clean_cmd = command.split(">>")[0].split("#")[0].strip()[:200]
|
||||
jobs.append({
|
||||
"id": f"crontab-{source}-{hash(command) % 10000:04x}",
|
||||
"name": name_part,
|
||||
"schedule_display": schedule_raw,
|
||||
"schedule": schedule_raw,
|
||||
"state": "scheduled",
|
||||
"enabled": True,
|
||||
"last_status": None,
|
||||
"last_error": None,
|
||||
"last_run_at": None,
|
||||
"_source": f"crontab:{source}",
|
||||
"_command": clean_cmd,
|
||||
})
|
||||
return jobs
|
||||
|
||||
|
||||
def load_crontab_backups(backup_dir) -> list:
|
||||
"""Load cron jobs from VPS crontab backup files."""
|
||||
from pathlib import Path
|
||||
backup_dir = Path(backup_dir)
|
||||
all_jobs = []
|
||||
if not backup_dir.exists():
|
||||
return all_jobs
|
||||
for fpath in sorted(backup_dir.glob("*-crontab-backup.txt")):
|
||||
source = fpath.name.replace("-crontab-backup.txt", "")
|
||||
text = fpath.read_text()
|
||||
all_jobs.extend(parse_crontab(text, source=source))
|
||||
return all_jobs
|
||||
|
||||
|
||||
def audit_fleet(hermes_jobs: list, crontab_jobs: list) -> dict:
|
||||
"""Run full fleet audit combining hermes cron + VPS crontabs."""
|
||||
now = datetime.now(timezone.utc)
|
||||
hermes_categorized = [categorize_job(j, now) for j in hermes_jobs]
|
||||
crontab_categorized = []
|
||||
for j in crontab_jobs:
|
||||
crontab_categorized.append({
|
||||
"id": j["id"], "name": j["name"],
|
||||
"schedule": j.get("schedule_display", ""),
|
||||
"state": "scheduled", "enabled": True,
|
||||
"last_status": None, "last_error": None, "last_delivery_error": None, "last_run_at": None,
|
||||
"category": "healthy",
|
||||
"reason": f"Crontab ({j.get('_source', '?')}) — verify logs manually",
|
||||
"action": "verify-logs",
|
||||
})
|
||||
all_cat = hermes_categorized + crontab_categorized
|
||||
healthy = [c for c in all_cat if c["category"] == "healthy"]
|
||||
transient = [c for c in all_cat if c["category"] == "transient"]
|
||||
systemic = [c for c in all_cat if c["category"] == "systemic"]
|
||||
return {
|
||||
"audit_time": now.isoformat(),
|
||||
"total_jobs": len(all_cat),
|
||||
"hermes_jobs": len(hermes_categorized),
|
||||
"crontab_jobs": len(crontab_categorized),
|
||||
"summary": {"healthy": len(healthy), "transient_errors": len(transient), "systemic_failures": len(systemic)},
|
||||
"systemic_jobs": [
|
||||
{
|
||||
"id": j["id"],
|
||||
"name": j["name"],
|
||||
"schedule": j["schedule"],
|
||||
"state": j["state"],
|
||||
"reason": j["reason"],
|
||||
"last_error": j.get("last_error", ""),
|
||||
"last_delivery_error": j.get("last_delivery_error", ""),
|
||||
"last_run_at": j.get("last_run_at"),
|
||||
"action": j.get("action", ""),
|
||||
}
|
||||
for j in systemic
|
||||
],
|
||||
"transient_jobs": [
|
||||
{
|
||||
"id": j["id"],
|
||||
"name": j["name"],
|
||||
"schedule": j["schedule"],
|
||||
"state": j["state"],
|
||||
"reason": j["reason"],
|
||||
"last_error": j.get("last_error", ""),
|
||||
"last_delivery_error": j.get("last_delivery_error", ""),
|
||||
"last_run_at": j.get("last_run_at"),
|
||||
"action": j.get("action", ""),
|
||||
}
|
||||
for j in transient
|
||||
],
|
||||
"all_jobs": all_cat,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Cron fleet audit (#662)")
|
||||
parser.add_argument("--jobs-file", help="Path to jobs.json override")
|
||||
parser.add_argument("--disable", action="store_true",
|
||||
help="Disable systemic jobs (requires hermes CLI)")
|
||||
parser.add_argument("--issues", action="store_true",
|
||||
help="File Gitea issues for systemic failures")
|
||||
parser.add_argument("--output", help="Write report to file")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output only")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load jobs
|
||||
jobs = []
|
||||
if args.jobs_file:
|
||||
with open(args.jobs_file) as f:
|
||||
data = json.load(f)
|
||||
jobs = data.get("jobs", data) if isinstance(data, dict) else data
|
||||
else:
|
||||
jobs = load_cron_state()
|
||||
|
||||
# Also load VPS crontab backups
|
||||
crontab_dir = Path(__file__).parent.parent / "cron" / "vps"
|
||||
crontab_jobs = load_crontab_backups(crontab_dir)
|
||||
|
||||
if not jobs:
|
||||
print("ERROR: No cron jobs found. Check ~/.hermes/cron/ or run 'hermes cron list'.")
|
||||
sys.exit(1)
|
||||
|
||||
# Run audit
|
||||
if crontab_jobs:
|
||||
report = audit_fleet(jobs, crontab_jobs)
|
||||
else:
|
||||
report = audit_jobs(jobs)
|
||||
|
||||
# Output
|
||||
if args.json:
|
||||
print(json.dumps(report, indent=2))
|
||||
else:
|
||||
print(f"\n{'='*60}")
|
||||
print(f" CRON FLEET AUDIT — {report['total_jobs']} jobs")
|
||||
print(f"{'='*60}")
|
||||
print(f" Healthy: {report['summary']['healthy']}")
|
||||
print(f" Transient errors: {report['summary']['transient_errors']}")
|
||||
print(f" Systemic failures: {report['summary']['systemic_failures']}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
if report["systemic_jobs"]:
|
||||
print(f"\n SYSTEMIC FAILURES (>{ERROR_THRESHOLD_HOURS}h):")
|
||||
for j in report["systemic_jobs"]:
|
||||
print(f" - {j['name']} ({j['id']}): {j['reason']}")
|
||||
if j["last_error"]:
|
||||
print(f" Error: {j['last_error'][:100]}")
|
||||
|
||||
if report["transient_jobs"]:
|
||||
print(f"\n TRANSIENT ERRORS:")
|
||||
for j in report["transient_jobs"]:
|
||||
print(f" - {j['name']} ({j['id']}): {j['reason']}")
|
||||
|
||||
print()
|
||||
|
||||
# Write report file
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(report, f, indent=2)
|
||||
print(f"Report written to {args.output}")
|
||||
|
||||
# Disable systemic jobs
|
||||
if args.disable and report["systemic_jobs"]:
|
||||
import subprocess
|
||||
for j in report["systemic_jobs"]:
|
||||
print(f"Disabling: {j['name']} ({j['id']})")
|
||||
try:
|
||||
subprocess.run(
|
||||
["hermes", "cron", "pause", j["id"]],
|
||||
capture_output=True, text=True, timeout=10
|
||||
)
|
||||
print(f" → Disabled")
|
||||
except Exception as e:
|
||||
print(f" → Failed: {e}")
|
||||
|
||||
# File issues for systemic failures
|
||||
if args.issues and report["systemic_jobs"]:
|
||||
gitea_token = os.environ.get("GITEA_TOKEN") or ""
|
||||
if not gitea_token:
|
||||
token_path = Path.home() / ".config" / "gitea" / "token"
|
||||
if token_path.exists():
|
||||
gitea_token = token_path.read_text().strip()
|
||||
|
||||
if not gitea_token:
|
||||
print("ERROR: No Gitea token found. Set GITEA_TOKEN or ~/.config/gitea/token")
|
||||
sys.exit(1)
|
||||
|
||||
import urllib.request
|
||||
base = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
headers = {
|
||||
"Authorization": f"token {gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
for j in report["systemic_jobs"]:
|
||||
title = f"CRON FAIL: {j['name']} — systemic error ({j['id']})"
|
||||
body = generate_issue_body(j)
|
||||
data = json.dumps({"title": title, "body": body}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{base}/repos/Timmy_Foundation/timmy-config/issues",
|
||||
data=data, headers=headers, method="POST"
|
||||
)
|
||||
try:
|
||||
resp = urllib.request.urlopen(req)
|
||||
result = json.loads(resp.read())
|
||||
print(f"Issued #{result['number']}: {title}")
|
||||
except Exception as e:
|
||||
print(f"Failed to file issue for {j['name']}: {e}")
|
||||
|
||||
# Exit code: non-zero if systemic failures found
|
||||
sys.exit(1 if report["systemic_jobs"] else 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -4,111 +4,365 @@
|
||||
Part of the Gemini Sovereign Infrastructure Suite.
|
||||
|
||||
Tracks the fleet's progress through the Paperclips-inspired evolution arc.
|
||||
|
||||
Usage:
|
||||
python3 scripts/phase_tracker.py status # Show current state
|
||||
python3 scripts/phase_tracker.py evaluate # Auto-evaluate checkable milestones
|
||||
python3 scripts/phase_tracker.py complete M4 # Mark milestone complete
|
||||
python3 scripts/phase_tracker.py telegram # Post daily update to Telegram
|
||||
python3 scripts/phase_tracker.py daily # evaluate + telegram
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import re
|
||||
import argparse
|
||||
import urllib.request
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
MILESTONES_FILE = "fleet/milestones.md"
|
||||
COMPLETED_FILE = "fleet/completed_milestones.json"
|
||||
LOG_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-health"))
|
||||
UPTIME_FILE = LOG_DIR / "uptime.json"
|
||||
|
||||
TELEGRAM_TOKEN_PATHS = [
|
||||
Path.home() / ".config" / "timmy" / "telegram_bot_token",
|
||||
Path.home() / ".hermes" / "telegram_bot_token",
|
||||
Path.home() / ".hermes" / "telegram_token",
|
||||
]
|
||||
TELEGRAM_CHAT = os.environ.get("TELEGRAM_HOME_CHANNEL", "-1003664764329")
|
||||
|
||||
HOSTS = {
|
||||
"ezra": {"ip": "143.198.27.163"},
|
||||
"allegro": {"ip": "167.99.126.228"},
|
||||
"bezalel": {"ip": "159.203.146.185"},
|
||||
}
|
||||
|
||||
|
||||
def _find_repo_root() -> Path:
|
||||
script_dir = Path(__file__).resolve().parent
|
||||
return script_dir.parent
|
||||
|
||||
|
||||
def _read_token() -> str | None:
|
||||
for p in TELEGRAM_TOKEN_PATHS:
|
||||
if p.exists():
|
||||
return p.read_text().strip()
|
||||
return os.environ.get("TELEGRAM_BOT_TOKEN") or None
|
||||
|
||||
|
||||
def telegram_send(text: str) -> bool:
|
||||
token = _read_token()
|
||||
if not token:
|
||||
print("[WARN] No Telegram token found.", file=sys.stderr)
|
||||
return False
|
||||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||||
body = json.dumps({"chat_id": TELEGRAM_CHAT, "text": text, "parse_mode": "HTML"}).encode()
|
||||
req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status == 200
|
||||
except Exception as e:
|
||||
print(f"[WARN] Telegram send failed: {e}", file=sys.stderr)
|
||||
return False
|
||||
|
||||
|
||||
class Milestone:
|
||||
def __init__(self, m_id: str, title: str, trigger: str, message: str):
|
||||
self.id = m_id
|
||||
self.title = title
|
||||
self.trigger = trigger
|
||||
self.message = message
|
||||
|
||||
|
||||
class Phase:
|
||||
def __init__(self, name: str, number: int, unlock_condition: str | None):
|
||||
self.name = name
|
||||
self.number = number
|
||||
self.unlock_condition = unlock_condition
|
||||
self.milestones: list[Milestone] = []
|
||||
|
||||
|
||||
class PhaseTracker:
|
||||
def __init__(self):
|
||||
# Find files relative to repo root
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
repo_root = os.path.dirname(script_dir)
|
||||
|
||||
self.milestones_path = os.path.join(repo_root, MILESTONES_FILE)
|
||||
self.completed_path = os.path.join(repo_root, COMPLETED_FILE)
|
||||
|
||||
self.milestones = self.parse_milestones()
|
||||
self.completed = self.load_completed()
|
||||
self.repo_root = _find_repo_root()
|
||||
self.milestones_path = self.repo_root / MILESTONES_FILE
|
||||
self.completed_path = self.repo_root / COMPLETED_FILE
|
||||
self.phases: list[Phase] = self._parse_milestones()
|
||||
self.completed: set[str] = self._load_completed()
|
||||
|
||||
def _parse_milestones(self) -> list[Phase]:
|
||||
if not self.milestones_path.exists():
|
||||
return []
|
||||
content = self.milestones_path.read_text()
|
||||
phases: list[Phase] = []
|
||||
current_phase: Phase | None = None
|
||||
|
||||
for line in content.splitlines():
|
||||
phase_match = re.match(r"##\s*Phase\s*(\d+):\s*(.+?)\s*(?:\(([^)]+)\))?\s*$", line)
|
||||
if phase_match:
|
||||
num = int(phase_match.group(1))
|
||||
name = phase_match.group(2).strip()
|
||||
unlock = phase_match.group(3)
|
||||
current_phase = Phase(name, num, unlock)
|
||||
phases.append(current_phase)
|
||||
continue
|
||||
|
||||
m_match = re.match(r"###\s*(M\d+):\s*(.+)$", line)
|
||||
if m_match and current_phase is not None:
|
||||
m_id = m_match.group(1)
|
||||
title = m_match.group(2).strip()
|
||||
current_phase.milestones.append(Milestone(m_id, title, "", ""))
|
||||
continue
|
||||
|
||||
if line.startswith("**Trigger:**") and current_phase and current_phase.milestones:
|
||||
current_phase.milestones[-1].trigger = line.replace("**Trigger:**", "").strip()
|
||||
continue
|
||||
|
||||
if line.startswith("**Message:**") and current_phase and current_phase.milestones:
|
||||
current_phase.milestones[-1].message = line.replace("**Message:**", "").strip().strip('"')
|
||||
continue
|
||||
|
||||
def parse_milestones(self):
|
||||
if not os.path.exists(self.milestones_path):
|
||||
return {}
|
||||
|
||||
with open(self.milestones_path, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
phases = {}
|
||||
current_phase = None
|
||||
|
||||
for line in content.split("\n"):
|
||||
if line.startswith("## Phase"):
|
||||
current_phase = line.replace("## ", "").strip()
|
||||
phases[current_phase] = []
|
||||
elif line.startswith("### M"):
|
||||
m_id = line.split(":")[0].replace("### ", "").strip()
|
||||
title = line.split(":")[1].strip()
|
||||
phases[current_phase].append({"id": m_id, "title": title})
|
||||
|
||||
return phases
|
||||
|
||||
def load_completed(self):
|
||||
if os.path.exists(self.completed_path):
|
||||
with open(self.completed_path, "r") as f:
|
||||
try:
|
||||
return json.load(f)
|
||||
except:
|
||||
return []
|
||||
return []
|
||||
def _load_completed(self) -> set[str]:
|
||||
if self.completed_path.exists():
|
||||
try:
|
||||
data = json.loads(self.completed_path.read_text())
|
||||
if isinstance(data, list):
|
||||
return set(data)
|
||||
except Exception:
|
||||
pass
|
||||
return set()
|
||||
|
||||
def save_completed(self):
|
||||
with open(self.completed_path, "w") as f:
|
||||
json.dump(self.completed, f, indent=2)
|
||||
self.completed_path.write_text(json.dumps(sorted(self.completed), indent=2))
|
||||
|
||||
def show_progress(self):
|
||||
print("--- Fleet Phase Progression Tracker ---")
|
||||
total_milestones = 0
|
||||
total_completed = 0
|
||||
|
||||
if not self.milestones:
|
||||
print("[ERROR] No milestones found in fleet/milestones.md")
|
||||
return
|
||||
|
||||
for phase, ms in self.milestones.items():
|
||||
print(f"\n{phase}")
|
||||
for m in ms:
|
||||
total_milestones += 1
|
||||
done = m["id"] in self.completed
|
||||
if done:
|
||||
total_completed += 1
|
||||
status = "✅" if done else "⭕"
|
||||
print(f" {status} {m['id']}: {m['title']}")
|
||||
|
||||
percent = (total_completed / total_milestones) * 100 if total_milestones > 0 else 0
|
||||
print(f"\nOverall Progress: {total_completed}/{total_milestones} ({percent:.1f}%)")
|
||||
|
||||
def mark_complete(self, m_id: str):
|
||||
def mark_complete(self, m_id: str) -> bool:
|
||||
m_id = m_id.upper()
|
||||
exists = any(m.id == m_id for p in self.phases for m in p.milestones)
|
||||
if not exists:
|
||||
print(f"[ERROR] Unknown milestone: {m_id}")
|
||||
return False
|
||||
if m_id not in self.completed:
|
||||
self.completed.append(m_id)
|
||||
self.completed.add(m_id)
|
||||
self.save_completed()
|
||||
print(f"[SUCCESS] Marked {m_id} as complete.")
|
||||
return True
|
||||
print(f"[INFO] {m_id} is already complete.")
|
||||
return True
|
||||
|
||||
def _get_phase_state(self) -> tuple[int, float, list[str], list[str]]:
|
||||
"""Returns (current_phase_number, decimal_progress, blockers, next_milestones)."""
|
||||
blockers = []
|
||||
next_milestones = []
|
||||
|
||||
for phase in self.phases:
|
||||
phase_completed = sum(1 for m in phase.milestones if m.id in self.completed)
|
||||
phase_total = len(phase.milestones)
|
||||
if phase_total == 0:
|
||||
continue
|
||||
|
||||
if phase_completed < phase_total:
|
||||
progress = phase_completed / phase_total
|
||||
decimal = phase.number + progress
|
||||
# Find next incomplete milestone
|
||||
for m in phase.milestones:
|
||||
if m.id not in self.completed:
|
||||
next_milestones.append(f"{m.id}: {m.title}")
|
||||
if m.trigger:
|
||||
blockers.append(f"{m.id}: {m.trigger}")
|
||||
break
|
||||
# Phase unlock condition as blocker if near end
|
||||
if phase_completed == phase_total - 1 and phase.unlock_condition:
|
||||
blockers.append(f"Unlock Phase {phase.number + 1}: {phase.unlock_condition}")
|
||||
return phase.number, decimal, blockers, next_milestones
|
||||
|
||||
# All done
|
||||
last = self.phases[-1] if self.phases else None
|
||||
if last:
|
||||
return last.number, float(last.number) + 1.0, ["All phases complete."], []
|
||||
return 0, 0.0, ["No milestones defined."], []
|
||||
|
||||
def show_progress(self):
|
||||
phase_num, decimal, blockers, next_ms = self._get_phase_state()
|
||||
total_ms = sum(len(p.milestones) for p in self.phases)
|
||||
total_completed = len(self.completed)
|
||||
overall_pct = (total_completed / total_ms * 100) if total_ms else 0
|
||||
|
||||
print("=" * 50)
|
||||
print(" Fleet Phase Progression Tracker")
|
||||
print("=" * 50)
|
||||
print(f"\nCurrent Phase: Phase {phase_num} — {self.phases[phase_num - 1].name if phase_num <= len(self.phases) else 'Complete'}")
|
||||
print(f"Decimal Progress: Phase {decimal:.1f}")
|
||||
print(f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)")
|
||||
|
||||
print("\n--- Milestones ---")
|
||||
for phase in self.phases:
|
||||
done = sum(1 for m in phase.milestones if m.id in self.completed)
|
||||
total = len(phase.milestones)
|
||||
status = "✅" if done == total else "⏳"
|
||||
print(f"\n{status} Phase {phase.number}: {phase.name} ({done}/{total})")
|
||||
for m in phase.milestones:
|
||||
mark = "✅" if m.id in self.completed else "⭕"
|
||||
print(f" {mark} {m.id}: {m.title}")
|
||||
|
||||
print("\n--- Next Up ---")
|
||||
for nm in next_ms[:3]:
|
||||
print(f" → {nm}")
|
||||
|
||||
print("\n--- Blockers ---")
|
||||
for b in blockers[:5]:
|
||||
print(f" ⚠️ {b}")
|
||||
if not blockers:
|
||||
print(" 🚀 Nothing blocking.")
|
||||
print()
|
||||
|
||||
def summary_text(self) -> str:
|
||||
phase_num, decimal, blockers, next_ms = self._get_phase_state()
|
||||
total_ms = sum(len(p.milestones) for p in self.phases)
|
||||
total_completed = len(self.completed)
|
||||
overall_pct = (total_completed / total_ms * 100) if total_ms else 0
|
||||
|
||||
phase_name = self.phases[phase_num - 1].name if phase_num <= len(self.phases) else "Complete"
|
||||
next_phase = phase_num + 1 if phase_num < len(self.phases) else phase_num
|
||||
progress_to_next = (decimal - phase_num) * 100
|
||||
|
||||
lines = [
|
||||
f"Fleet: Phase {decimal:.1f} ({progress_to_next:.0f}% to Phase {next_phase})",
|
||||
f"Phase: {phase_num} — {phase_name}",
|
||||
f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)",
|
||||
]
|
||||
if next_ms:
|
||||
lines.append(f"Next: {next_ms[0]}")
|
||||
if blockers and blockers[0] != "All phases complete.":
|
||||
lines.append(f"Blocker: {blockers[0]}")
|
||||
return "\n".join(lines)
|
||||
|
||||
# === Auto-evaluation heuristics ===
|
||||
|
||||
def _eval_file_exists(self, path: str) -> bool:
|
||||
return (self.repo_root / path).exists()
|
||||
|
||||
def _eval_command(self, cmd: str) -> bool:
|
||||
try:
|
||||
result = subprocess.run(cmd, shell=True, capture_output=True, timeout=10)
|
||||
return result.returncode == 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _eval_uptime(self, target: float) -> bool:
|
||||
if not UPTIME_FILE.exists():
|
||||
return False
|
||||
try:
|
||||
data = json.loads(UPTIME_FILE.read_text())
|
||||
uptime = data.get("uptime_30d_percent", 0.0)
|
||||
return uptime >= target
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _eval_local_model_multi(self) -> bool:
|
||||
count = 0
|
||||
for host in HOSTS:
|
||||
if self._eval_command(f"ssh -o ConnectTimeout=5 {host} 'pgrep -f ollama >/dev/null 2>&1'"):
|
||||
count += 1
|
||||
return count >= 2
|
||||
|
||||
def _eval_zero_manual_restarts(self, days: int = 7) -> bool:
|
||||
log = LOG_DIR / "auto_restart.log"
|
||||
if not log.exists():
|
||||
return False
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
||||
try:
|
||||
with open(log) as f:
|
||||
for line in f:
|
||||
if "manual restart" in line.lower():
|
||||
# crude timestamp parse
|
||||
try:
|
||||
ts = datetime.fromisoformat(line[:19])
|
||||
if ts > cutoff:
|
||||
return False
|
||||
except Exception:
|
||||
continue
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def evaluate(self):
|
||||
"""Auto-check milestones where we have heuristics."""
|
||||
print("[EVAL] Running automatic milestone checks...\n")
|
||||
checks = [
|
||||
("M1", self._eval_command, "python3 fleet/health_check.py --dry-run 2>/dev/null || python3 fleet/health_check.py 2>&1 | head -1 >/dev/null"),
|
||||
("M2", self._eval_command, "test -f ~/.local/timmy/fleet-health/auto_restart.log && grep -q 'restarted' ~/.local/timmy/fleet-health/auto_restart.log"),
|
||||
("M3", self._eval_command, "test -d ~/.local/timmy/backups && ls ~/.local/timmy/backups | grep -q ."),
|
||||
("M4", self._eval_uptime, 95.0),
|
||||
("M5", self._eval_uptime, 97.0),
|
||||
("M6", self._eval_zero_manual_restarts, 7),
|
||||
("M9", self._eval_uptime, 98.0),
|
||||
("M11", self._eval_local_model_multi, None),
|
||||
]
|
||||
newly_found = []
|
||||
for m_id, check_fn, arg in checks:
|
||||
if m_id in self.completed:
|
||||
continue
|
||||
result = check_fn(arg) if arg is not None else check_fn()
|
||||
if result:
|
||||
print(f" ✅ {m_id} appears satisfied — marking complete.")
|
||||
self.completed.add(m_id)
|
||||
newly_found.append(m_id)
|
||||
else:
|
||||
print(f" ⭕ {m_id} not yet satisfied.")
|
||||
|
||||
if newly_found:
|
||||
self.save_completed()
|
||||
print(f"\n[SUCCESS] Auto-completed {len(newly_found)} milestone(s): {', '.join(newly_found)}")
|
||||
else:
|
||||
print(f"[INFO] {m_id} is already complete.")
|
||||
print("\n[INFO] No new milestones auto-detected.")
|
||||
|
||||
def daily(self):
|
||||
self.evaluate()
|
||||
text = self.summary_text()
|
||||
print(text)
|
||||
ok = telegram_send(text)
|
||||
if ok:
|
||||
print("\n[TELEGRAM] Daily update sent.")
|
||||
else:
|
||||
print("\n[TELEGRAM] Failed to send update.")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Gemini Phase Tracker")
|
||||
parser = argparse.ArgumentParser(description="Fleet Phase Progression Tracker")
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
|
||||
subparsers.add_parser("status", help="Show current progress")
|
||||
|
||||
subparsers.add_parser("evaluate", help="Auto-evaluate checkable milestones")
|
||||
subparsers.add_parser("telegram", help="Post summary to Telegram")
|
||||
subparsers.add_parser("daily", help="Evaluate then post to Telegram")
|
||||
|
||||
complete_parser = subparsers.add_parser("complete", help="Mark a milestone as complete")
|
||||
complete_parser.add_argument("id", help="Milestone ID (e.g. M1)")
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
tracker = PhaseTracker()
|
||||
|
||||
|
||||
if args.command == "status":
|
||||
tracker.show_progress()
|
||||
elif args.command == "evaluate":
|
||||
tracker.evaluate()
|
||||
elif args.command == "telegram":
|
||||
ok = telegram_send(tracker.summary_text())
|
||||
sys.exit(0 if ok else 1)
|
||||
elif args.command == "daily":
|
||||
tracker.daily()
|
||||
elif args.command == "complete":
|
||||
tracker.mark_complete(args.id)
|
||||
ok = tracker.mark_complete(args.id)
|
||||
sys.exit(0 if ok else 1)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -71,6 +71,43 @@ class TestCategorizeJob:
|
||||
r = categorize_job({"name": "t", "state": "paused", "enabled": False}, datetime.now(timezone.utc))
|
||||
assert r["category"] == "healthy"
|
||||
|
||||
def test_import_error_is_systemic_even_when_recent(self):
|
||||
from cron_audit_662 import categorize_job
|
||||
now = datetime.now(timezone.utc)
|
||||
r = categorize_job({
|
||||
"name": "t",
|
||||
"last_status": "error",
|
||||
"last_error": "cannot import name 'AIAgent' from 'run_agent'",
|
||||
"last_run_at": (now - timedelta(hours=1)).isoformat(),
|
||||
}, now)
|
||||
assert r["category"] == "systemic"
|
||||
assert r["action"] == "disable"
|
||||
|
||||
def test_empty_response_stays_transient(self):
|
||||
from cron_audit_662 import categorize_job
|
||||
now = datetime.now(timezone.utc)
|
||||
r = categorize_job({
|
||||
"name": "t",
|
||||
"last_status": "error",
|
||||
"last_error": "Agent completed but produced empty response (model error, timeout, or misconfiguration)",
|
||||
"last_run_at": (now - timedelta(hours=1)).isoformat(),
|
||||
}, now)
|
||||
assert r["category"] == "transient"
|
||||
|
||||
def test_delivery_failure_after_success_is_transient(self):
|
||||
from cron_audit_662 import categorize_job
|
||||
now = datetime.now(timezone.utc)
|
||||
r = categorize_job({
|
||||
"name": "t",
|
||||
"last_status": "ok",
|
||||
"last_delivery_error": "delivery error: Telegram send failed: Timed out",
|
||||
"last_run_at": now.isoformat(),
|
||||
"enabled": True,
|
||||
"state": "scheduled",
|
||||
}, now)
|
||||
assert r["category"] == "transient"
|
||||
assert "delivery failed" in r["reason"]
|
||||
|
||||
|
||||
class TestAuditFleet:
|
||||
def test_empty(self):
|
||||
@@ -116,3 +153,23 @@ class TestTimestampParsing:
|
||||
from cron_audit_662 import parse_timestamp
|
||||
assert parse_timestamp("") is None
|
||||
assert parse_timestamp(None) is None
|
||||
|
||||
|
||||
class TestIssueBody:
|
||||
def test_includes_schedule_state_and_delivery_error(self):
|
||||
from cron_audit_662 import generate_issue_body
|
||||
|
||||
body = generate_issue_body({
|
||||
"id": "job-1",
|
||||
"name": "Health Monitor",
|
||||
"schedule": "every 5m",
|
||||
"state": "scheduled",
|
||||
"last_error": "cannot import name 'tool' from 'tools.registry'",
|
||||
"last_delivery_error": "delivery error: Telegram send failed: Timed out",
|
||||
"reason": "Systemic error signature: cannot import (1.0h ago)",
|
||||
})
|
||||
|
||||
assert "Health Monitor" in body
|
||||
assert "every 5m" in body
|
||||
assert "scheduled" in body
|
||||
assert "Last Delivery Error" in body
|
||||
|
||||
Reference in New Issue
Block a user