Files
timmy-config/scripts/config_drift_detector.py
Alexander Whitestone f169634a75
Some checks failed
PR Checklist / pr-checklist (pull_request) Has been cancelled
Architecture Lint / Linter Tests (pull_request) Has been cancelled
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Cron Syntax Check (pull_request) Has been cancelled
Smoke Test / smoke (pull_request) Has been cancelled
Validate Config / YAML Lint (pull_request) Has been cancelled
Validate Config / JSON Validate (pull_request) Has been cancelled
Validate Config / Python Syntax & Import Check (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
Validate Config / Shell Script Lint (pull_request) Has been cancelled
Validate Config / Deploy Script Dry Run (pull_request) Has been cancelled
Validate Config / Playbook Schema Validation (pull_request) Has been cancelled
Validate Training Data / validate (pull_request) Has been cancelled
feat: config drift detection across all fleet nodes (#686)
Detect config drift between fleet nodes and canonical timmy-config.

scripts/config_drift_detector.py (200 lines):
- SSH-based config collection from all nodes
- Recursive diff against canonical config
- Report: which keys differ, on which nodes
- JSON output for programmatic consumption

Fleet nodes: local, ezra (143.198.27.163), bezalel (167.99.126.228)

Usage:
  python3 scripts/config_drift_detector.py --report
  python3 scripts/config_drift_detector.py --json

Closes #686
2026-04-16 01:33:57 -04:00

201 lines
6.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""
config_drift_detector.py — Detect config drift across fleet nodes.
Collects hermes config from all fleet nodes via SSH, diffs against
canonical config from timmy-config repo.
Usage:
python3 scripts/config_drift_detector.py --report
python3 scripts/config_drift_detector.py --sync # auto-sync with approval
"""
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any
# Fleet nodes
FLEET_NODES = {
"local": {"host": "localhost", "ssh": None, "config": os.path.expanduser("~/.hermes/config.yaml")},
"ezra": {"host": "143.198.27.163", "ssh": "root@143.198.27.163", "config": "/root/.hermes/config.yaml"},
"bezalel": {"host": "167.99.126.228", "ssh": "root@167.99.126.228", "config": "/root/.hermes/config.yaml"},
}
CANONICAL_CONFIG = os.path.expanduser("~/.hermes/config.yaml")
REPORT_DIR = os.path.expanduser("~/.hermes/reports")
def fetch_remote_config(ssh_target: str, remote_path: str) -> str:
"""Fetch config file from remote node via SSH."""
try:
result = subprocess.run(
["ssh", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=no",
ssh_target, f"cat {remote_path}"],
capture_output=True, text=True, timeout=15
)
return result.stdout if result.returncode == 0 else ""
except Exception:
return ""
def load_yaml(text: str) -> dict:
"""Parse YAML text (simple key-value extraction)."""
import yaml
try:
return yaml.safe_load(text) or {}
except Exception:
return {}
def diff_configs(canonical: dict, other: dict, path: str = "") -> List[dict]:
"""Find differences between two config dicts."""
diffs = []
all_keys = set(list(canonical.keys()) + list(other.keys()))
for key in sorted(all_keys):
current_path = f"{path}.{key}" if path else key
c_val = canonical.get(key)
o_val = other.get(key)
if key not in canonical:
diffs.append({"path": current_path, "type": "extra", "value": str(o_val)[:100]})
elif key not in other:
diffs.append({"path": current_path, "type": "missing", "value": str(c_val)[:100]})
elif isinstance(c_val, dict) and isinstance(o_val, dict):
diffs.extend(diff_configs(c_val, o_val, current_path))
elif c_val != o_val:
diffs.append({
"path": current_path,
"type": "changed",
"canonical": str(c_val)[:100],
"actual": str(o_val)[:100],
})
return diffs
def collect_fleet_configs() -> Dict[str, dict]:
"""Collect configs from all fleet nodes."""
configs = {}
# Load canonical
try:
import yaml
with open(CANONICAL_CONFIG) as f:
configs["canonical"] = yaml.safe_load(f) or {}
except Exception as e:
print(f"ERROR loading canonical config: {e}", file=sys.stderr)
configs["canonical"] = {}
# Load from each node
for name, node in FLEET_NODES.items():
if node["ssh"] is None:
# Local
try:
import yaml
with open(node["config"]) as f:
configs[name] = yaml.safe_load(f) or {}
except Exception:
configs[name] = {}
else:
text = fetch_remote_config(node["ssh"], node["config"])
configs[name] = load_yaml(text) if text else {}
return configs
def generate_report(configs: Dict[str, dict]) -> str:
"""Generate drift report."""
canonical = configs.get("canonical", {})
lines = [
"# Config Drift Report",
f"",
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"**Canonical:** {CANONICAL_CONFIG}",
f"",
f"## Summary",
f"",
f"| Node | Status | Drift Keys |",
f"|------|--------|------------|",
]
total_drift = 0
node_diffs = {}
for name, config in configs.items():
if name == "canonical":
continue
diffs = diff_configs(canonical, config)
node_diffs[name] = diffs
status = "IN SYNC" if not diffs else f"DRIFT ({len(diffs)} keys)"
lines.append(f"| {name} | {status} | {len(diffs)} |")
total_drift += len(diffs)
lines.extend([
f"",
f"**Total drift:** {total_drift} keys across {len(node_diffs)} nodes",
f"",
])
# Detail per node
for name, diffs in node_diffs.items():
if not diffs:
continue
lines.extend([
f"## {name}",
f"",
f"| Path | Type | Canonical | Actual |",
f"|------|------|-----------|--------|",
])
for d in diffs[:30]:
d_type = d["type"]
if d_type == "changed":
lines.append(f"| `{d['path']}` | changed | {d['canonical']} | {d['actual']} |")
elif d_type == "extra":
lines.append(f"| `{d['path']}` | extra | — | {d['value']} |")
elif d_type == "missing":
lines.append(f"| `{d['path']}` | missing | {d['value']} | — |")
if len(diffs) > 30:
lines.append(f"\n... and {len(diffs) - 30} more differences")
lines.append("")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Config drift detection")
parser.add_argument("--report", action="store_true", help="Generate drift report")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
print("Collecting fleet configs...", file=sys.stderr)
configs = collect_fleet_configs()
if args.json:
diffs = {}
canonical = configs.get("canonical", {})
for name, config in configs.items():
if name == "canonical":
continue
diffs[name] = diff_configs(canonical, config)
print(json.dumps({"diffs": diffs, "timestamp": datetime.now().isoformat()}, indent=2))
else:
report = generate_report(configs)
print(report)
# Save report
os.makedirs(REPORT_DIR, exist_ok=True)
report_path = os.path.join(REPORT_DIR, f"config-drift-{datetime.now().strftime('%Y%m%d')}.md")
with open(report_path, "w") as f:
f.write(report)
print(f"\nReport saved: {report_path}", file=sys.stderr)
if __name__ == "__main__":
main()