#!/usr/bin/env python3 """ config_drift_detector.py — Detect config drift across fleet nodes. Collects hermes config from all fleet nodes via SSH, diffs against canonical config from timmy-config repo. Usage: python3 scripts/config_drift_detector.py --report python3 scripts/config_drift_detector.py --sync # auto-sync with approval """ import json import os import subprocess import sys from datetime import datetime from pathlib import Path from typing import Dict, List, Any # Fleet nodes FLEET_NODES = { "local": {"host": "localhost", "ssh": None, "config": os.path.expanduser("~/.hermes/config.yaml")}, "ezra": {"host": "143.198.27.163", "ssh": "root@143.198.27.163", "config": "/root/.hermes/config.yaml"}, "bezalel": {"host": "167.99.126.228", "ssh": "root@167.99.126.228", "config": "/root/.hermes/config.yaml"}, } CANONICAL_CONFIG = os.path.expanduser("~/.hermes/config.yaml") REPORT_DIR = os.path.expanduser("~/.hermes/reports") def fetch_remote_config(ssh_target: str, remote_path: str) -> str: """Fetch config file from remote node via SSH.""" try: result = subprocess.run( ["ssh", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=no", ssh_target, f"cat {remote_path}"], capture_output=True, text=True, timeout=15 ) return result.stdout if result.returncode == 0 else "" except Exception: return "" def load_yaml(text: str) -> dict: """Parse YAML text (simple key-value extraction).""" import yaml try: return yaml.safe_load(text) or {} except Exception: return {} def diff_configs(canonical: dict, other: dict, path: str = "") -> List[dict]: """Find differences between two config dicts.""" diffs = [] all_keys = set(list(canonical.keys()) + list(other.keys())) for key in sorted(all_keys): current_path = f"{path}.{key}" if path else key c_val = canonical.get(key) o_val = other.get(key) if key not in canonical: diffs.append({"path": current_path, "type": "extra", "value": str(o_val)[:100]}) elif key not in other: diffs.append({"path": current_path, "type": "missing", "value": str(c_val)[:100]}) elif isinstance(c_val, dict) and isinstance(o_val, dict): diffs.extend(diff_configs(c_val, o_val, current_path)) elif c_val != o_val: diffs.append({ "path": current_path, "type": "changed", "canonical": str(c_val)[:100], "actual": str(o_val)[:100], }) return diffs def collect_fleet_configs() -> Dict[str, dict]: """Collect configs from all fleet nodes.""" configs = {} # Load canonical try: import yaml with open(CANONICAL_CONFIG) as f: configs["canonical"] = yaml.safe_load(f) or {} except Exception as e: print(f"ERROR loading canonical config: {e}", file=sys.stderr) configs["canonical"] = {} # Load from each node for name, node in FLEET_NODES.items(): if node["ssh"] is None: # Local try: import yaml with open(node["config"]) as f: configs[name] = yaml.safe_load(f) or {} except Exception: configs[name] = {} else: text = fetch_remote_config(node["ssh"], node["config"]) configs[name] = load_yaml(text) if text else {} return configs def generate_report(configs: Dict[str, dict]) -> str: """Generate drift report.""" canonical = configs.get("canonical", {}) lines = [ "# Config Drift Report", f"", f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}", f"**Canonical:** {CANONICAL_CONFIG}", f"", f"## Summary", f"", f"| Node | Status | Drift Keys |", f"|------|--------|------------|", ] total_drift = 0 node_diffs = {} for name, config in configs.items(): if name == "canonical": continue diffs = diff_configs(canonical, config) node_diffs[name] = diffs status = "IN SYNC" if not diffs else f"DRIFT ({len(diffs)} keys)" lines.append(f"| {name} | {status} | {len(diffs)} |") total_drift += len(diffs) lines.extend([ f"", f"**Total drift:** {total_drift} keys across {len(node_diffs)} nodes", f"", ]) # Detail per node for name, diffs in node_diffs.items(): if not diffs: continue lines.extend([ f"## {name}", f"", f"| Path | Type | Canonical | Actual |", f"|------|------|-----------|--------|", ]) for d in diffs[:30]: d_type = d["type"] if d_type == "changed": lines.append(f"| `{d['path']}` | changed | {d['canonical']} | {d['actual']} |") elif d_type == "extra": lines.append(f"| `{d['path']}` | extra | — | {d['value']} |") elif d_type == "missing": lines.append(f"| `{d['path']}` | missing | {d['value']} | — |") if len(diffs) > 30: lines.append(f"\n... and {len(diffs) - 30} more differences") lines.append("") return "\n".join(lines) def main(): import argparse parser = argparse.ArgumentParser(description="Config drift detection") parser.add_argument("--report", action="store_true", help="Generate drift report") parser.add_argument("--json", action="store_true", help="JSON output") args = parser.parse_args() print("Collecting fleet configs...", file=sys.stderr) configs = collect_fleet_configs() if args.json: diffs = {} canonical = configs.get("canonical", {}) for name, config in configs.items(): if name == "canonical": continue diffs[name] = diff_configs(canonical, config) print(json.dumps({"diffs": diffs, "timestamp": datetime.now().isoformat()}, indent=2)) else: report = generate_report(configs) print(report) # Save report os.makedirs(REPORT_DIR, exist_ok=True) report_path = os.path.join(REPORT_DIR, f"config-drift-{datetime.now().strftime('%Y%m%d')}.md") with open(report_path, "w") as f: f.write(report) print(f"\nReport saved: {report_path}", file=sys.stderr) if __name__ == "__main__": main()