Some checks failed
Smoke Test / smoke (pull_request) Failing after 23s
Architecture Lint / Linter Tests (pull_request) Successful in 26s
Validate Config / YAML Lint (pull_request) Failing after 17s
Validate Config / JSON Validate (pull_request) Successful in 20s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 55s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 1m0s
Validate Config / Cron Syntax Check (pull_request) Successful in 11s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
Validate Config / Playbook Schema Validation (pull_request) Successful in 23s
Architecture Lint / Lint Repository (pull_request) Failing after 19s
PR Checklist / pr-checklist (pull_request) Successful in 3m0s
Split the audit into an importable cron_audit_662 module plus a CLI wrapper, classify recent systemic failures by error signature instead of age alone, and include enough metadata for issue filing and delivery-failure reporting. Add regression tests for import-path loading, systemic vs transient classification, and issue body generation.
176 lines
6.7 KiB
Python
176 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Tests for cron-audit-662.py — Cron Fleet Audit."""
|
|
|
|
import json
|
|
import tempfile
|
|
from datetime import datetime, timezone, timedelta
|
|
from pathlib import Path
|
|
import pytest
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
|
|
|
|
|
class TestCrontabParsing:
|
|
def test_standard_schedule(self):
|
|
from cron_audit_662 import parse_crontab
|
|
jobs = parse_crontab("*/15 * * * * /root/heartbeat.sh", source="test")
|
|
assert len(jobs) == 1
|
|
assert jobs[0]["schedule"] == "*/15 * * * *"
|
|
assert jobs[0]["enabled"] is True
|
|
|
|
def test_comment_name(self):
|
|
from cron_audit_662 import parse_crontab
|
|
jobs = parse_crontab("0 6 * * * /bin/backup.sh # Morning Backup", source="test")
|
|
assert "Morning Backup" in jobs[0]["name"]
|
|
|
|
def test_reboot_entry(self):
|
|
from cron_audit_662 import parse_crontab
|
|
jobs = parse_crontab("@reboot /root/start.sh", source="test")
|
|
assert len(jobs) == 1
|
|
assert jobs[0]["schedule"] == "@reboot"
|
|
|
|
def test_skips_comments(self):
|
|
from cron_audit_662 import parse_crontab
|
|
jobs = parse_crontab("# comment\n0 * * * * /bin/real.sh", source="test")
|
|
assert len(jobs) == 1
|
|
|
|
def test_multiple(self):
|
|
from cron_audit_662 import parse_crontab
|
|
jobs = parse_crontab("*/5 * * * * /bin/a.sh\n0 6 * * * /bin/b.sh # B\n@reboot /bin/c.sh", source="vps")
|
|
assert len(jobs) == 3
|
|
|
|
def test_source_tagged(self):
|
|
from cron_audit_662 import parse_crontab
|
|
jobs = parse_crontab("0 * * * * /bin/x.sh", source="allegro")
|
|
assert "allegro" in jobs[0]["_source"]
|
|
|
|
|
|
class TestCategorizeJob:
|
|
def test_ok_is_healthy(self):
|
|
from cron_audit_662 import categorize_job
|
|
now = datetime.now(timezone.utc)
|
|
r = categorize_job({"name": "t", "last_status": "ok", "enabled": True, "state": "scheduled"}, now)
|
|
assert r["category"] == "healthy"
|
|
|
|
def test_recent_error_transient(self):
|
|
from cron_audit_662 import categorize_job
|
|
now = datetime.now(timezone.utc)
|
|
r = categorize_job({"name": "t", "last_status": "error", "last_error": "fail",
|
|
"last_run_at": (now - timedelta(hours=2)).isoformat()}, now)
|
|
assert r["category"] == "transient"
|
|
|
|
def test_old_error_systemic(self):
|
|
from cron_audit_662 import categorize_job
|
|
now = datetime.now(timezone.utc)
|
|
r = categorize_job({"name": "t", "last_status": "error", "last_error": "fail",
|
|
"last_run_at": (now - timedelta(hours=72)).isoformat()}, now)
|
|
assert r["category"] == "systemic"
|
|
|
|
def test_paused_healthy(self):
|
|
from cron_audit_662 import categorize_job
|
|
r = categorize_job({"name": "t", "state": "paused", "enabled": False}, datetime.now(timezone.utc))
|
|
assert r["category"] == "healthy"
|
|
|
|
def test_import_error_is_systemic_even_when_recent(self):
|
|
from cron_audit_662 import categorize_job
|
|
now = datetime.now(timezone.utc)
|
|
r = categorize_job({
|
|
"name": "t",
|
|
"last_status": "error",
|
|
"last_error": "cannot import name 'AIAgent' from 'run_agent'",
|
|
"last_run_at": (now - timedelta(hours=1)).isoformat(),
|
|
}, now)
|
|
assert r["category"] == "systemic"
|
|
assert r["action"] == "disable"
|
|
|
|
def test_empty_response_stays_transient(self):
|
|
from cron_audit_662 import categorize_job
|
|
now = datetime.now(timezone.utc)
|
|
r = categorize_job({
|
|
"name": "t",
|
|
"last_status": "error",
|
|
"last_error": "Agent completed but produced empty response (model error, timeout, or misconfiguration)",
|
|
"last_run_at": (now - timedelta(hours=1)).isoformat(),
|
|
}, now)
|
|
assert r["category"] == "transient"
|
|
|
|
def test_delivery_failure_after_success_is_transient(self):
|
|
from cron_audit_662 import categorize_job
|
|
now = datetime.now(timezone.utc)
|
|
r = categorize_job({
|
|
"name": "t",
|
|
"last_status": "ok",
|
|
"last_delivery_error": "delivery error: Telegram send failed: Timed out",
|
|
"last_run_at": now.isoformat(),
|
|
"enabled": True,
|
|
"state": "scheduled",
|
|
}, now)
|
|
assert r["category"] == "transient"
|
|
assert "delivery failed" in r["reason"]
|
|
|
|
|
|
class TestAuditFleet:
|
|
def test_empty(self):
|
|
from cron_audit_662 import audit_fleet
|
|
r = audit_fleet([], [])
|
|
assert r["total_jobs"] == 0
|
|
|
|
def test_mixed(self):
|
|
from cron_audit_662 import audit_fleet, parse_crontab
|
|
now = datetime.now(timezone.utc)
|
|
hermes = [
|
|
{"name": "good", "last_status": "ok", "enabled": True, "state": "scheduled"},
|
|
{"name": "bad", "last_status": "error", "last_error": "fail",
|
|
"last_run_at": (now - timedelta(hours=72)).isoformat()},
|
|
]
|
|
crontab = parse_crontab("0 * * * * /bin/x.sh", source="vps")
|
|
r = audit_fleet(hermes, crontab)
|
|
assert r["total_jobs"] == 3
|
|
assert r["hermes_jobs"] == 2
|
|
assert r["crontab_jobs"] == 1
|
|
assert len(r["systemic_jobs"]) == 1
|
|
|
|
|
|
class TestCrontabBackupLoading:
|
|
def test_loads_directory(self, tmp_path):
|
|
from cron_audit_662 import load_crontab_backups
|
|
(tmp_path / "allegro-crontab-backup.txt").write_text("*/15 * * * * /root/hb.sh # HB\n")
|
|
(tmp_path / "ezra-crontab-backup.txt").write_text("0 6 * * * /root/rpt.sh\n")
|
|
jobs = load_crontab_backups(tmp_path)
|
|
assert len(jobs) == 2
|
|
|
|
def test_empty_dir(self, tmp_path):
|
|
from cron_audit_662 import load_crontab_backups
|
|
assert load_crontab_backups(tmp_path) == []
|
|
|
|
|
|
class TestTimestampParsing:
|
|
def test_iso_with_tz(self):
|
|
from cron_audit_662 import parse_timestamp
|
|
assert parse_timestamp("2026-04-14T15:30:00+00:00") is not None
|
|
|
|
def test_empty(self):
|
|
from cron_audit_662 import parse_timestamp
|
|
assert parse_timestamp("") is None
|
|
assert parse_timestamp(None) is None
|
|
|
|
|
|
class TestIssueBody:
|
|
def test_includes_schedule_state_and_delivery_error(self):
|
|
from cron_audit_662 import generate_issue_body
|
|
|
|
body = generate_issue_body({
|
|
"id": "job-1",
|
|
"name": "Health Monitor",
|
|
"schedule": "every 5m",
|
|
"state": "scheduled",
|
|
"last_error": "cannot import name 'tool' from 'tools.registry'",
|
|
"last_delivery_error": "delivery error: Telegram send failed: Timed out",
|
|
"reason": "Systemic error signature: cannot import (1.0h ago)",
|
|
})
|
|
|
|
assert "Health Monitor" in body
|
|
assert "every 5m" in body
|
|
assert "scheduled" in body
|
|
assert "Last Delivery Error" in body
|