Files
timmy-config/scripts/config_drift_detector.py
STEP35 FREE BURN d913be508b
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 22s
Smoke Test / smoke (pull_request) Failing after 21s
Validate Config / YAML Lint (pull_request) Failing after 15s
Validate Config / JSON Validate (pull_request) Successful in 18s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 57s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 13s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 14s
Validate Config / Shell Script Lint (pull_request) Failing after 53s
Validate Config / Playbook Schema Validation (pull_request) Successful in 25s
Architecture Lint / Lint Repository (pull_request) Failing after 20s
PR Checklist / pr-checklist (pull_request) Failing after 3m32s
fix(#442): deprecate ad-hoc recovery scripts; add ansible structure tests
- Add DEPRECATION headers to overlapping ad-hoc recovery scripts:
  - config_drift.py, config_drift_detector.py → golden_state role
  - self_healing.py → deadman_switch role
  - reset_pipeline_state.py → agent_startup playbook
  - provision_wizard.py → site.yml + wizard_base role
  - cron_audit.py, cron-audit-662.py → cron_manager role

- Add tests/test_ansible_structure.py (19 pytest checks) validating
  the complete canonical ansible/ structure required by #442

- Update docs/CANONICAL_SERVICES.md with explicit supersession table
  mapping each ad-hoc mechanism to its Ansible replacement

- ansible/README.md: document webhook setup as required step

This establishes ansible/ as the single canonical source of truth
for fleet lifecycle management and formally retires the overlapping
ad-hoc recovery mechanisms.

Closes #442
2026-04-29 01:41:34 -04:00

219 lines
7.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
config_drift_detector.py — DEPRECATED
Config drift detection is superseded by Ansible golden_state convergence.
Superseded by: ansible/playbooks/golden_state.yml + ansible/roles/golden_state
DO NOT USE. Use the Ansible canonical playbook instead:
ansible-playbook -i ansible/inventory/hosts.yml ansible/playbooks/golden_state.yml
This script remains only for historical reference. It will be removed after
fleet-wide migration is verified (issue timmy-config #442).
Issue: timmy-config #442 — [P2] Ansible IaC — Canonical Playbook for Fleet Management
Deprecated: 2026-04-29 (STEP35 FREE BURN)
"""
#!/usr/bin/env python3
"""
config_drift_detector.py — Detect config drift across fleet nodes.
Collects hermes config from all fleet nodes via SSH, diffs against
canonical config from timmy-config repo.
Usage:
python3 scripts/config_drift_detector.py --report
python3 scripts/config_drift_detector.py --sync # auto-sync with approval
"""
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any
# Fleet nodes
FLEET_NODES = {
"local": {"host": "localhost", "ssh": None, "config": os.path.expanduser("~/.hermes/config.yaml")},
"ezra": {"host": "143.198.27.163", "ssh": "root@143.198.27.163", "config": "/root/.hermes/config.yaml"},
"bezalel": {"host": "167.99.126.228", "ssh": "root@167.99.126.228", "config": "/root/.hermes/config.yaml"},
}
CANONICAL_CONFIG = os.path.expanduser("~/.hermes/config.yaml")
REPORT_DIR = os.path.expanduser("~/.hermes/reports")
def fetch_remote_config(ssh_target: str, remote_path: str) -> str:
"""Fetch config file from remote node via SSH."""
try:
result = subprocess.run(
["ssh", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=no",
ssh_target, f"cat {remote_path}"],
capture_output=True, text=True, timeout=15
)
return result.stdout if result.returncode == 0 else ""
except Exception:
return ""
def load_yaml(text: str) -> dict:
"""Parse YAML text (simple key-value extraction)."""
import yaml
try:
return yaml.safe_load(text) or {}
except Exception:
return {}
def diff_configs(canonical: dict, other: dict, path: str = "") -> List[dict]:
"""Find differences between two config dicts."""
diffs = []
all_keys = set(list(canonical.keys()) + list(other.keys()))
for key in sorted(all_keys):
current_path = f"{path}.{key}" if path else key
c_val = canonical.get(key)
o_val = other.get(key)
if key not in canonical:
diffs.append({"path": current_path, "type": "extra", "value": str(o_val)[:100]})
elif key not in other:
diffs.append({"path": current_path, "type": "missing", "value": str(c_val)[:100]})
elif isinstance(c_val, dict) and isinstance(o_val, dict):
diffs.extend(diff_configs(c_val, o_val, current_path))
elif c_val != o_val:
diffs.append({
"path": current_path,
"type": "changed",
"canonical": str(c_val)[:100],
"actual": str(o_val)[:100],
})
return diffs
def collect_fleet_configs() -> Dict[str, dict]:
"""Collect configs from all fleet nodes."""
configs = {}
# Load canonical
try:
import yaml
with open(CANONICAL_CONFIG) as f:
configs["canonical"] = yaml.safe_load(f) or {}
except Exception as e:
print(f"ERROR loading canonical config: {e}", file=sys.stderr)
configs["canonical"] = {}
# Load from each node
for name, node in FLEET_NODES.items():
if node["ssh"] is None:
# Local
try:
import yaml
with open(node["config"]) as f:
configs[name] = yaml.safe_load(f) or {}
except Exception:
configs[name] = {}
else:
text = fetch_remote_config(node["ssh"], node["config"])
configs[name] = load_yaml(text) if text else {}
return configs
def generate_report(configs: Dict[str, dict]) -> str:
"""Generate drift report."""
canonical = configs.get("canonical", {})
lines = [
"# Config Drift Report",
f"",
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"**Canonical:** {CANONICAL_CONFIG}",
f"",
f"## Summary",
f"",
f"| Node | Status | Drift Keys |",
f"|------|--------|------------|",
]
total_drift = 0
node_diffs = {}
for name, config in configs.items():
if name == "canonical":
continue
diffs = diff_configs(canonical, config)
node_diffs[name] = diffs
status = "IN SYNC" if not diffs else f"DRIFT ({len(diffs)} keys)"
lines.append(f"| {name} | {status} | {len(diffs)} |")
total_drift += len(diffs)
lines.extend([
f"",
f"**Total drift:** {total_drift} keys across {len(node_diffs)} nodes",
f"",
])
# Detail per node
for name, diffs in node_diffs.items():
if not diffs:
continue
lines.extend([
f"## {name}",
f"",
f"| Path | Type | Canonical | Actual |",
f"|------|------|-----------|--------|",
])
for d in diffs[:30]:
d_type = d["type"]
if d_type == "changed":
lines.append(f"| `{d['path']}` | changed | {d['canonical']} | {d['actual']} |")
elif d_type == "extra":
lines.append(f"| `{d['path']}` | extra | — | {d['value']} |")
elif d_type == "missing":
lines.append(f"| `{d['path']}` | missing | {d['value']} | — |")
if len(diffs) > 30:
lines.append(f"\n... and {len(diffs) - 30} more differences")
lines.append("")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Config drift detection")
parser.add_argument("--report", action="store_true", help="Generate drift report")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
print("Collecting fleet configs...", file=sys.stderr)
configs = collect_fleet_configs()
if args.json:
diffs = {}
canonical = configs.get("canonical", {})
for name, config in configs.items():
if name == "canonical":
continue
diffs[name] = diff_configs(canonical, config)
print(json.dumps({"diffs": diffs, "timestamp": datetime.now().isoformat()}, indent=2))
else:
report = generate_report(configs)
print(report)
# Save report
os.makedirs(REPORT_DIR, exist_ok=True)
report_path = os.path.join(REPORT_DIR, f"config-drift-{datetime.now().strftime('%Y%m%d')}.md")
with open(report_path, "w") as f:
f.write(report)
print(f"\nReport saved: {report_path}", file=sys.stderr)
if __name__ == "__main__":
main()