Compare commits

..

6 Commits

Author SHA1 Message Date
63d03a2ae3 feat: add sidecar config validator (#690)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 44s
Smoke Test / smoke (pull_request) Failing after 29s
Validate Config / YAML Lint (pull_request) Failing after 21s
Validate Config / JSON Validate (pull_request) Successful in 21s
PR Checklist / pr-checklist (pull_request) Failing after 4m21s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 2m16s
Validate Config / Cron Syntax Check (pull_request) Successful in 17s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 20s
Validate Config / Shell Script Lint (pull_request) Failing after 58s
Validate Config / Playbook Schema Validation (pull_request) Successful in 30s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
Pre-deploy validation for timmy-config sidecar configs.
Validates YAML/JSON configs against expected schemas.
Checks required keys, value types, nested structures.

Supported config types:
- Wizard configs (wizards/*/config.yaml): model, provider, fallback_providers
- Sidecar configs (wizards/*-sidecar.json): name, role, capabilities
- Main config (config.yaml): model, provider, toolsets
- Cron pipelines (cron/*.yml): name, schedule format
- Playbooks (playbooks/*.yaml): name, steps

Usage:
  python3 scripts/sidecar_validator.py --pre-deploy
  python3 scripts/sidecar_validator.py --all
  python3 scripts/sidecar_validator.py config.yaml

Closes #690
2026-04-14 23:29:54 -04:00
d120526244 fix: add python3 shebang to scripts/visual_pr_reviewer.py (#681) 2026-04-15 02:57:53 +00:00
8596ff761b fix: add python3 shebang to scripts/diagram_meaning_extractor.py (#681) 2026-04-15 02:57:40 +00:00
7553fd4f3e fix: add python3 shebang to scripts/captcha_bypass_handler.py (#681) 2026-04-15 02:57:25 +00:00
71082fe06f fix: add python3 shebang to bin/soul_eval_gate.py (#681) 2026-04-15 02:57:14 +00:00
6d678e938e fix: add python3 shebang to bin/nostr-agent-demo.py (#681) 2026-04-15 02:57:00 +00:00
10 changed files with 155 additions and 547 deletions

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
"""
Full Nostr agent-to-agent communication demo - FINAL WORKING
"""

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
"""
Soul Eval Gate — The Conscience of the Training Pipeline

View File

@@ -1,104 +0,0 @@
{
"audit_time": "2026-04-15T01:13:31.126215+00:00",
"total_jobs": 7,
"summary": {
"healthy": 7,
"transient_errors": 0,
"systemic_failures": 0
},
"systemic_jobs": [],
"transient_jobs": [],
"all_jobs": [
{
"id": "9e0624269ba7",
"name": "Triage Heartbeat",
"schedule": "every 15m",
"state": "paused",
"enabled": false,
"last_status": "ok",
"last_error": null,
"last_run_at": "2026-03-24T15:33:57.749458-04:00",
"category": "healthy",
"reason": "Dashboard repo frozen - loops redirected to the-nexus",
"action": "none \u2014 paused intentionally"
},
{
"id": "e29eda4a8548",
"name": "PR Review Sweep",
"schedule": "every 30m",
"state": "paused",
"enabled": false,
"last_status": "ok",
"last_error": null,
"last_run_at": "2026-03-24T15:21:42.995715-04:00",
"category": "healthy",
"reason": "Dashboard repo frozen - loops redirected to the-nexus",
"action": "none \u2014 paused intentionally"
},
{
"id": "a77a87392582",
"name": "Health Monitor",
"schedule": "every 5m",
"state": "scheduled",
"enabled": true,
"last_status": "ok",
"last_error": null,
"last_run_at": "2026-03-24T15:34:39.045945-04:00",
"category": "healthy",
"reason": "Last run succeeded",
"action": ""
},
{
"id": "36fb2f630a17",
"name": "Hermes Philosophy Loop",
"schedule": "every 1440m",
"state": "unknown",
"enabled": false,
"last_status": null,
"last_error": null,
"last_run_at": null,
"category": "healthy",
"reason": "Never run, no errors",
"action": ""
},
{
"id": "muda-audit-weekly",
"name": "Muda Audit",
"schedule": "0 21 * * 0",
"state": "scheduled",
"enabled": true,
"last_status": null,
"last_error": null,
"last_run_at": null,
"category": "healthy",
"reason": "Never run, no errors",
"action": ""
},
{
"id": "kaizen-retro-349",
"name": "Kaizen Retro",
"schedule": "daily at 07:30",
"state": "scheduled",
"enabled": true,
"last_status": null,
"last_error": null,
"last_run_at": null,
"category": "healthy",
"reason": "Never run, no errors",
"action": ""
},
{
"id": "overnight-rd-nightly",
"name": "Overnight R&D Loop",
"schedule": "Nightly at 10 PM EDT",
"state": "scheduled",
"enabled": true,
"last_status": null,
"last_error": null,
"last_run_at": null,
"category": "healthy",
"reason": "Never run, no errors",
"action": ""
}
]
}

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -1,333 +0,0 @@
#!/usr/bin/env python3
"""
Cron Fleet Audit Script — #662
Reads hermes cron job state, categorizes all jobs into:
- healthy: last_status=ok or never-run-and-enabled
- transient: recent errors (likely network/timeout)
- systemic: repeated errors over 48+ hours
Outputs a JSON report and optionally:
--disable Disable systemic jobs erroring 48+ hours
--issues File Gitea issues for systemic failures
"""
import json
import sys
import os
import argparse
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import List, Dict, Any
# --- Config ---
ERROR_THRESHOLD_HOURS = 48
CRON_STATE_PATHS = [
Path.home() / ".hermes" / "cron" / "jobs.json",
Path.home() / ".hermes" / "cron" / "state.json",
Path("/root/.hermes/cron/jobs.json"),
Path("/root/.hermes/cron/state.json"),
]
def load_cron_state() -> List[Dict[str, Any]]:
"""Load cron job state from known locations."""
for path in CRON_STATE_PATHS:
if path.exists():
try:
with open(path) as f:
data = json.load(f)
if isinstance(data, dict) and "jobs" in data:
return data["jobs"]
if isinstance(data, list):
return data
except (json.JSONDecodeError, IOError):
continue
# Fallback: try hermes cron list CLI
try:
import subprocess
result = subprocess.run(
["hermes", "cron", "list", "--json"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
data = json.loads(result.stdout)
if isinstance(data, dict) and "jobs" in data:
return data["jobs"]
if isinstance(data, list):
return data
except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError):
pass
return []
def parse_timestamp(ts: str) -> datetime:
"""Parse ISO timestamp, handle various formats."""
if not ts:
return None
# Normalize timezone
ts = ts.replace("+00:00", "+00:00")
try:
dt = datetime.fromisoformat(ts)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return None
def categorize_job(job: Dict[str, Any], now: datetime) -> Dict[str, Any]:
"""Categorize a single job."""
status = job.get("last_status", "")
last_error = job.get("last_error", "")
last_run = parse_timestamp(job.get("last_run_at"))
enabled = job.get("enabled", False)
state = job.get("state", "unknown")
name = job.get("name", job.get("id", "unknown"))
entry = {
"id": job.get("id", ""),
"name": name,
"schedule": job.get("schedule_display", str(job.get("schedule", ""))),
"state": state,
"enabled": enabled,
"last_status": status,
"last_error": last_error,
"last_run_at": job.get("last_run_at"),
"category": "healthy",
"reason": "",
"action": "",
}
# Never run / no error
if status is None and not last_error:
entry["category"] = "healthy"
entry["reason"] = "Never run, no errors"
return entry
# Explicitly paused with reason
if state == "paused":
entry["category"] = "healthy"
entry["reason"] = job.get("paused_reason", "Manually paused")
entry["action"] = "none — paused intentionally"
return entry
# Completed jobs
if state == "completed":
entry["category"] = "healthy"
entry["reason"] = "Completed (one-shot)"
return entry
# Error status
if status == "error" and last_error:
age_hours = None
if last_run:
age_hours = (now - last_run).total_seconds() / 3600
if age_hours is not None and age_hours >= ERROR_THRESHOLD_HOURS:
entry["category"] = "systemic"
entry["reason"] = f"Erroring for {age_hours:.1f}h (>{ERROR_THRESHOLD_HOURS}h threshold)"
entry["action"] = "disable"
else:
entry["category"] = "transient"
age_str = f"{age_hours:.1f}h ago" if age_hours is not None else "unknown age"
entry["reason"] = f"Recent error ({age_str}), may be transient"
entry["action"] = "monitor"
return entry
# OK status
if status == "ok":
entry["category"] = "healthy"
entry["reason"] = "Last run succeeded"
return entry
# Scheduled but never errored
if state == "scheduled" and enabled:
entry["category"] = "healthy"
entry["reason"] = "Scheduled and running"
return entry
# Unknown state
entry["category"] = "transient"
entry["reason"] = f"Unknown state: {state}, status: {status}"
entry["action"] = "investigate"
return entry
def audit_jobs(jobs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Run full audit on job list."""
now = datetime.now(timezone.utc)
categorized = [categorize_job(j, now) for j in jobs]
healthy = [c for c in categorized if c["category"] == "healthy"]
transient = [c for c in categorized if c["category"] == "transient"]
systemic = [c for c in categorized if c["category"] == "systemic"]
report = {
"audit_time": now.isoformat(),
"total_jobs": len(jobs),
"summary": {
"healthy": len(healthy),
"transient_errors": len(transient),
"systemic_failures": len(systemic),
},
"systemic_jobs": [
{
"id": j["id"],
"name": j["name"],
"reason": j["reason"],
"last_error": j["last_error"],
}
for j in systemic
],
"transient_jobs": [
{
"id": j["id"],
"name": j["name"],
"reason": j["reason"],
}
for j in transient
],
"all_jobs": categorized,
}
return report
def generate_issue_body(job: Dict[str, Any]) -> str:
"""Generate a Gitea issue body for a systemic cron failure."""
return f"""## Systemic Cron Failure — Auto-Filed by Audit #662
**Job:** {job['name']} (`{job['id']}`)
**Schedule:** {job['schedule']}
**State:** {job['state']}
**Last Error:**
```
{job['last_error'] or 'No error details available'}
```
**Audit Finding:** {job['reason']}
### Action Required
- [ ] Diagnose root cause of repeated failure
- [ ] Fix configuration or remove broken job
- [ ] Verify job resumes healthy after fix
*Auto-generated by cron-audit-662.py*
"""
def main():
parser = argparse.ArgumentParser(description="Cron fleet audit (#662)")
parser.add_argument("--jobs-file", help="Path to jobs.json override")
parser.add_argument("--disable", action="store_true",
help="Disable systemic jobs (requires hermes CLI)")
parser.add_argument("--issues", action="store_true",
help="File Gitea issues for systemic failures")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--json", action="store_true", help="JSON output only")
args = parser.parse_args()
# Load jobs
jobs = []
if args.jobs_file:
with open(args.jobs_file) as f:
data = json.load(f)
jobs = data.get("jobs", data) if isinstance(data, dict) else data
else:
jobs = load_cron_state()
if not jobs:
print("ERROR: No cron jobs found. Check ~/.hermes/cron/ or run 'hermes cron list'.")
sys.exit(1)
# Run audit
report = audit_jobs(jobs)
# Output
if args.json:
print(json.dumps(report, indent=2))
else:
print(f"\n{'='*60}")
print(f" CRON FLEET AUDIT — {report['total_jobs']} jobs")
print(f"{'='*60}")
print(f" Healthy: {report['summary']['healthy']}")
print(f" Transient errors: {report['summary']['transient_errors']}")
print(f" Systemic failures: {report['summary']['systemic_failures']}")
print(f"{'='*60}")
if report["systemic_jobs"]:
print(f"\n SYSTEMIC FAILURES (>{ERROR_THRESHOLD_HOURS}h):")
for j in report["systemic_jobs"]:
print(f" - {j['name']} ({j['id']}): {j['reason']}")
if j["last_error"]:
print(f" Error: {j['last_error'][:100]}")
if report["transient_jobs"]:
print(f"\n TRANSIENT ERRORS:")
for j in report["transient_jobs"]:
print(f" - {j['name']} ({j['id']}): {j['reason']}")
print()
# Write report file
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
print(f"Report written to {args.output}")
# Disable systemic jobs
if args.disable and report["systemic_jobs"]:
import subprocess
for j in report["systemic_jobs"]:
print(f"Disabling: {j['name']} ({j['id']})")
try:
subprocess.run(
["hermes", "cron", "pause", j["id"]],
capture_output=True, text=True, timeout=10
)
print(f" → Disabled")
except Exception as e:
print(f" → Failed: {e}")
# File issues for systemic failures
if args.issues and report["systemic_jobs"]:
gitea_token = os.environ.get("GITEA_TOKEN") or ""
if not gitea_token:
token_path = Path.home() / ".config" / "gitea" / "token"
if token_path.exists():
gitea_token = token_path.read_text().strip()
if not gitea_token:
print("ERROR: No Gitea token found. Set GITEA_TOKEN or ~/.config/gitea/token")
sys.exit(1)
import urllib.request
base = "https://forge.alexanderwhitestone.com/api/v1"
headers = {
"Authorization": f"token {gitea_token}",
"Content-Type": "application/json",
}
for j in report["systemic_jobs"]:
title = f"CRON FAIL: {j['name']} — systemic error ({j['id']})"
body = generate_issue_body(j)
data = json.dumps({"title": title, "body": body}).encode()
req = urllib.request.Request(
f"{base}/repos/Timmy_Foundation/timmy-config/issues",
data=data, headers=headers, method="POST"
)
try:
resp = urllib.request.urlopen(req)
result = json.loads(resp.read())
print(f"Issued #{result['number']}: {title}")
except Exception as e:
print(f"Failed to file issue for {j['name']}: {e}")
# Exit code: non-zero if systemic failures found
sys.exit(1 if report["systemic_jobs"] else 0)
if __name__ == "__main__":
main()

View File

@@ -1 +0,0 @@
cron-audit-662.py

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

150
scripts/sidecar_validator.py Executable file
View File

@@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""
sidecar_validator.py - Pre-deploy validation for timmy-config sidecar configs.
Validates YAML/JSON configs against expected schemas before deploy.
Usage:
python3 scripts/sidecar_validator.py [config_path ...]
python3 scripts/sidecar_validator.py --all
python3 scripts/sidecar_validator.py --pre-deploy
"""
import json, os, sys
from pathlib import Path
try:
import yaml
except ImportError:
print("ERROR: PyYAML not installed.", file=sys.stderr); sys.exit(2)
SCHEMAS = {
"wizard_config": {
"description": "Wizard agent configuration",
"required": {"model": dict},
"optional": {"toolsets": list, "fallback_providers": list, "agent": dict, "providers": dict, "terminal": dict, "browser": dict, "compression": dict, "auxiliary": dict},
"nested_required": {"model": {"default": str, "provider": str}},
"nested_optional": {"model": {"fallback": str}, "agent": {"max_turns": (int, float), "reasoning_effort": str, "verbose": bool}},
},
"sidecar_config": {
"description": "Sidecar agent configuration",
"required": {"name": str, "role": str},
"optional": {"capabilities": list, "instructions": str, "model": str, "provider": str, "toolsets": list},
},
"main_config": {
"description": "Main hermes configuration",
"required": {"model": dict},
"optional": {"toolsets": list, "agent": dict, "terminal": dict, "browser": dict, "checkpoints": dict, "compression": dict, "auxiliary": dict, "fallback_providers": list, "providers": dict},
"nested_required": {"model": {"default": str, "provider": str}},
},
"cron_pipeline": {
"description": "Cron/pipeline schedule",
"required": {"name": str},
"optional": {"schedule": str, "cron": str, "tasks": list, "steps": list, "prompt": str, "model": dict},
},
"playbook": {
"description": "Agent playbook",
"required": {"name": str},
"optional": {"description": str, "model": str, "steps": list, "prompt": str},
},
}
def classify_config(filepath):
parts, name = filepath.parts, filepath.name
if "wizards" in parts and "-sidecar." in name: return "sidecar_config"
if "wizards" in parts and name in ("config.yaml", "config.yml"): return "wizard_config"
if name == "config.yaml" and len(parts) <= 2: return "main_config"
if "cron" in parts and name.endswith((".yml", ".yaml")): return "cron_pipeline"
if "playbooks" in parts and name.endswith((".yaml", ".yml")): return "playbook"
return None
def type_name(t):
if isinstance(t, tuple): return " or ".join(tt.__name__ for tt in t)
return t.__name__
def validate_config(data, schema_name, schema):
errors = []
for key, expected_type in schema["required"].items():
if key not in data:
errors.append(f"missing required key: '{key}' (expected {type_name(expected_type)})")
elif not isinstance(data[key], expected_type):
errors.append(f"'{key}' wrong type: got {type(data[key]).__name__}, expected {type_name(expected_type)}")
for pk, cs in schema.get("nested_required", {}).items():
if pk in data and isinstance(data[pk], dict):
for ck, et in cs.items():
if ck not in data[pk]:
errors.append(f"'{pk}' missing key: '{ck}'")
elif not isinstance(data[pk][ck], et):
errors.append(f"'{pk}.{ck}' wrong type: {type(data[pk][ck]).__name__}")
for pk, cs in schema.get("nested_optional", {}).items():
if pk in data and isinstance(data[pk], dict):
for ck, et in cs.items():
if ck in data[pk] and not isinstance(data[pk][ck], et):
errors.append(f"'{pk}.{ck}' wrong type: {type(data[pk][ck]).__name__}")
if schema_name == "wizard_config" and "fallback_providers" in data and isinstance(data["fallback_providers"], list):
for i, fb in enumerate(data["fallback_providers"]):
if not isinstance(fb, dict):
errors.append(f"fallback_providers[{i}]: expected dict")
elif "provider" not in fb: errors.append(f"fallback_providers[{i}]: missing 'provider'")
elif "model" not in fb: errors.append(f"fallback_providers[{i}]: missing 'model'")
if schema_name == "sidecar_config" and "capabilities" in data:
if not isinstance(data["capabilities"], list):
errors.append(f"'capabilities' must be a list")
if schema_name == "cron_pipeline":
s = data.get("schedule") or data.get("cron", "")
if isinstance(s, str) and s.strip() and len(s.strip().split()) != 5:
errors.append(f"schedule has {len(s.strip().split())} fields, expected 5")
return errors
def validate_file(filepath):
schema_name = classify_config(filepath)
if schema_name is None: return True, []
schema = SCHEMAS[schema_name]
try: text = filepath.read_text(encoding="utf-8", errors="replace")
except Exception as e: return False, [f"cannot read: {e}"]
try:
data = json.loads(text) if filepath.suffix == ".json" else yaml.safe_load(text)
except Exception as e: return False, [f"parse error: {e}"]
if not isinstance(data, dict): return False, [f"expected mapping, got {type(data).__name__}"]
errors = validate_config(data, schema_name, schema)
return len(errors) == 0, errors
def find_deploy_targets(root):
targets = []
for p in ["config.yaml", "wizards/*/config.yaml", "wizards/*-sidecar.json"]:
targets.extend(root.glob(p))
return sorted(targets)
def find_all_configs(root):
skip = {".git", "node_modules", "venv", "__pycache__"}
results = []
for dp, dns, fns in os.walk(root):
dns[:] = [d for d in dns if d not in skip]
for fn in fns:
if fn.endswith((".yaml", ".yml", ".json")):
fp = Path(dp) / fn
if classify_config(fp) is not None: results.append(fp)
return sorted(results)
def main():
root = Path(__file__).resolve().parent.parent
args = sys.argv[1:]
if not args or args == ["--help"]: print(__doc__); sys.exit(2)
if "--all" in args: files = find_all_configs(root); mode = "all configs"
elif "--pre-deploy" in args: files = find_deploy_targets(root); mode = "deploy targets"
else: files = [Path(a) for a in args if not a.startswith("-")]; mode = "specified files"
if not files: print(f"No files found: {mode}"); sys.exit(0)
print(f"Sidecar Validator - {mode}"); print("=" * 60)
total = 0; failed = []
for fp in files:
rel = fp.relative_to(root) if fp.is_absolute() else fp
sn = classify_config(fp) or "unknown"
ok, errs = validate_file(fp)
if ok: print(f"PASS {rel} [{sn}]")
else: failed.append((rel, sn, errs)); total += len(errs); print(f"FAIL {rel} [{sn}]")
print(); print("=" * 60)
print(f"Results: {len(files)-len(failed)}/{len(files)} valid")
if failed:
print(f"\n{total} error(s) in {len(failed)} file(s):\n")
for rel, sn, errs in failed:
print(f" {rel} ({sn}):")
for e in errs: print(f" - {e}")
print(); sys.exit(1)
else: print("\nAll configs valid!"); sys.exit(0)
if __name__ == "__main__": main()

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -1,109 +0,0 @@
"""
Tests for scripts/cron-audit-662.py — cron fleet audit.
"""
import json
import sys
import unittest
from datetime import datetime, timezone, timedelta
from pathlib import Path
# Add scripts to path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from cron_audit_662 import categorize_job, audit_jobs
class TestCategorizeJob(unittest.TestCase):
def setUp(self):
self.now = datetime(2026, 4, 14, 20, 0, 0, tzinfo=timezone.utc)
def test_healthy_ok(self):
job = {"id": "a1", "name": "Test", "last_status": "ok", "enabled": True, "state": "scheduled"}
result = categorize_job(job, self.now)
self.assertEqual(result["category"], "healthy")
def test_healthy_never_run(self):
job = {"id": "a2", "name": "Never", "last_status": None, "last_error": None}
result = categorize_job(job, self.now)
self.assertEqual(result["category"], "healthy")
def test_healthy_paused(self):
job = {"id": "a3", "name": "Paused", "state": "paused", "paused_reason": "intentional"}
result = categorize_job(job, self.now)
self.assertEqual(result["category"], "healthy")
def test_healthy_completed(self):
job = {"id": "a4", "name": "Done", "state": "completed"}
result = categorize_job(job, self.now)
self.assertEqual(result["category"], "healthy")
def test_transient_recent_error(self):
recent = (self.now - timedelta(hours=2)).isoformat()
job = {
"id": "t1", "name": "RecentErr",
"last_status": "error",
"last_error": "Connection timeout",
"last_run_at": recent,
"enabled": True,
"state": "scheduled",
}
result = categorize_job(job, self.now)
self.assertEqual(result["category"], "transient")
self.assertIn("transient", result["reason"].lower())
def test_systemic_old_error(self):
old = (self.now - timedelta(hours=72)).isoformat()
job = {
"id": "s1", "name": "OldErr",
"last_status": "error",
"last_error": "ConfigError: bad config",
"last_run_at": old,
"enabled": True,
"state": "scheduled",
}
result = categorize_job(job, self.now)
self.assertEqual(result["category"], "systemic")
self.assertEqual(result["action"], "disable")
def test_systemic_boundary(self):
"""48.1 hours should be systemic."""
boundary = (self.now - timedelta(hours=48, minutes=6)).isoformat()
job = {
"id": "s2", "name": "Boundary",
"last_status": "error",
"last_error": "fail",
"last_run_at": boundary,
"enabled": True,
"state": "scheduled",
}
result = categorize_job(job, self.now)
self.assertEqual(result["category"], "systemic")
class TestAuditJobs(unittest.TestCase):
def test_empty(self):
report = audit_jobs([])
self.assertEqual(report["total_jobs"], 0)
self.assertEqual(report["summary"]["healthy"], 0)
def test_mixed_report(self):
now = datetime(2026, 4, 14, 20, 0, 0, tzinfo=timezone.utc)
old = (now - timedelta(hours=72)).isoformat()
recent = (now - timedelta(hours=1)).isoformat()
jobs = [
{"id": "h1", "name": "Healthy", "last_status": "ok", "enabled": True, "state": "scheduled"},
{"id": "t1", "name": "Transient", "last_status": "error", "last_error": "timeout", "last_run_at": recent, "enabled": True, "state": "scheduled"},
{"id": "s1", "name": "Systemic", "last_status": "error", "last_error": "config bad", "last_run_at": old, "enabled": True, "state": "scheduled"},
{"id": "p1", "name": "Paused", "state": "paused", "paused_reason": "frozen"},
]
report = audit_jobs(jobs)
self.assertEqual(report["summary"]["healthy"], 2)
self.assertEqual(report["summary"]["transient_errors"], 1)
self.assertEqual(report["summary"]["systemic_failures"], 1)
self.assertEqual(len(report["systemic_jobs"]), 1)
self.assertEqual(report["systemic_jobs"][0]["name"], "Systemic")
if __name__ == "__main__":
unittest.main()