#!/usr/bin/env python3 """ Config Drift Detection — Timmy Foundation Fleet Collects hermes config from all fleet nodes via SSH, diffs against the canonical timmy-config, and reports drift. Usage: # Full drift report python3 scripts/config_drift_detector.py report # JSON output python3 scripts/config_drift_detector.py report --json # Check a single node python3 scripts/config_drift_detector.py check allegro # Auto-sync a node (dry-run by default) python3 scripts/config_drift_detector.py sync allegro python3 scripts/config_drift_detector.py sync allegro --apply # Sync all drifted nodes python3 scripts/config_drift_detector.py sync-all """ import argparse import copy import json import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import yaml # ── Fleet Definition ───────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).resolve().parent ROOT_DIR = SCRIPT_DIR.parent CANONICAL_CONFIG = ROOT_DIR / "config.yaml" SYNC_LOG = ROOT_DIR / "fleet" / "drift-sync-log.json" FLEET_NODES = { "timmy": { "host": "localhost", "ssh_user": None, "hermes_home": os.path.expanduser("~/.hermes"), "machine_type": "mac", }, "allegro": { "host": "167.99.126.228", "ssh_user": "root", "hermes_home": "/root/.hermes", "machine_type": "vps", }, "bezalel": { "host": "159.203.146.185", "ssh_user": "root", "hermes_home": "/root/.hermes", "machine_type": "vps", }, "ezra": { "host": "143.198.27.163", "ssh_user": "root", "hermes_home": "/root/.hermes", "machine_type": "vps", }, } # ── Config Loading ─────────────────────────────────────────────────────────── def load_yaml(text: str) -> dict: """Parse YAML text, return empty dict on failure.""" try: data = yaml.safe_load(text) return data if isinstance(data, dict) else {} except Exception: return {} def load_canonical(path: Path = CANONICAL_CONFIG) -> dict: """Load the canonical timmy-config.""" if path.exists(): with open(path, encoding="utf-8") as f: return load_yaml(f.read()) return {} # ── Remote Config Collection ───────────────────────────────────────────────── def ssh_fetch_config(node_name: str, node_spec: dict) -> Optional[str]: """Fetch config.yaml from a remote node via SSH.""" host = node_spec["host"] user = node_spec.get("ssh_user") hermes_home = node_spec["hermes_home"] config_path = f"{hermes_home}/config.yaml" if host == "localhost": # Local read try: with open(config_path, encoding="utf-8") as f: return f.read() except FileNotFoundError: return None except Exception: return None # Remote SSH read target = f"{user}@{host}" if user else host try: result = subprocess.run( ["ssh", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=no", target, "cat", config_path], capture_output=True, text=True, timeout=20 ) if result.returncode == 0: return result.stdout return None except (subprocess.TimeoutExpired, Exception): return None def collect_all_configs() -> Dict[str, Optional[dict]]: """Collect configs from all fleet nodes.""" configs = {} for name, spec in FLEET_NODES.items(): raw = ssh_fetch_config(name, spec) configs[name] = load_yaml(raw) if raw else None return configs # ── Diff Engine ────────────────────────────────────────────────────────────── def _flatten(d: dict, prefix: str = "") -> Dict[str, Any]: """Flatten nested dict into dot-separated keys.""" items = {} for k, v in d.items(): key = f"{prefix}.{k}" if prefix else str(k) if isinstance(v, dict): items.update(_flatten(v, key)) elif isinstance(v, list): items[key] = json.dumps(v, sort_keys=True) else: items[key] = v return items def diff_configs(canonical: dict, remote: dict, ignored_keys: Optional[List[str]] = None) -> List[Dict[str, Any]]: """ Diff two config dicts. Returns list of drift entries. Each entry: {key, canonical_value, remote_value, drift_type} drift_type: "changed" | "missing_remote" | "extra_remote" """ ignored = set(ignored_keys or []) # Ignore keys that are inherently node-specific ignored.update({ "terminal.cwd", # varies per session "display.skin", # cosmetic }) flat_canon = _flatten(canonical) flat_remote = _flatten(remote) drifts = [] all_keys = sorted(set(flat_canon.keys()) | set(flat_remote.keys())) for key in all_keys: if key in ignored: continue # Ignore keys ending with .api_key or .token (secrets, not drift) if any(key.endswith(s) for s in (".api_key", ".token", ".api_token")): continue canon_val = flat_canon.get(key) remote_val = flat_remote.get(key) if canon_val is None and remote_val is not None: drifts.append({ "key": key, "canonical_value": None, "remote_value": remote_val, "drift_type": "extra_remote", }) elif canon_val is not None and remote_val is None: drifts.append({ "key": key, "canonical_value": canon_val, "remote_value": None, "drift_type": "missing_remote", }) elif canon_val != remote_val: drifts.append({ "key": key, "canonical_value": canon_val, "remote_value": remote_val, "drift_type": "changed", }) return drifts # ── Report ─────────────────────────────────────────────────────────────────── def check_node(node_name: str, canonical: dict) -> Dict[str, Any]: """Check a single node for config drift.""" spec = FLEET_NODES.get(node_name) if not spec: return {"node": node_name, "error": f"Unknown node: {node_name}"} raw = ssh_fetch_config(node_name, spec) if raw is None: return { "node": node_name, "host": spec["host"], "reachable": False, "drifts": [], "drift_count": -1, "error": "Could not fetch config", } remote_config = load_yaml(raw) drifts = diff_configs(canonical, remote_config) return { "node": node_name, "host": spec["host"], "machine_type": spec["machine_type"], "reachable": True, "config_keys": len(_flatten(remote_config)), "drifts": drifts, "drift_count": len(drifts), "has_drift": len(drifts) > 0, } def report_all(canonical: dict) -> Dict[str, Any]: """Full fleet drift report.""" now = datetime.now(timezone.utc).isoformat() results = {} for node_name in FLEET_NODES: results[node_name] = check_node(node_name, canonical) total_nodes = len(results) reachable = sum(1 for r in results.values() if r.get("reachable", False)) drifted = sum(1 for r in results.values() if r.get("has_drift", False)) total_drifts = sum(r.get("drift_count", 0) for r in results.values()) return { "timestamp": now, "canonical_config": str(CANONICAL_CONFIG), "total_nodes": total_nodes, "reachable": reachable, "unreachable": total_nodes - reachable, "drifted_nodes": drifted, "clean_nodes": reachable - drifted, "total_drifts": total_drifts, "nodes": results, } # ── Formatters ─────────────────────────────────────────────────────────────── def format_drift_markdown(drifts: List[Dict[str, Any]]) -> str: """Format a list of drift entries as markdown table.""" if not drifts: return " (no drift — config matches canonical)\n" lines = [] lines.append(f" {'Key':<40} {'Canonical':<20} {'Remote':<20} {'Type'}") lines.append(f" {'─'*40} {'─'*20} {'─'*20} {'─'*12}") for d in drifts[:30]: # cap at 30 rows key = d["key"][:39] canon = str(d["canonical_value"] or "(absent)")[:19] remote = str(d["remote_value"] or "(absent)")[:19] dtype = d["drift_type"] lines.append(f" {key:<40} {canon:<20} {remote:<20} {dtype}") if len(drifts) > 30: lines.append(f" ... and {len(drifts) - 30} more") return "\n".join(lines) + "\n" def format_report(report: dict) -> str: """Format full fleet report as terminal text.""" lines = [ "╔══════════════════════════════════════════════════════╗", "║ 🔍 CONFIG DRIFT DETECTION — TIMMY FOUNDATION ║", "╚══════════════════════════════════════════════════════╝", "", f"Timestamp: {report['timestamp'][:19]}Z", f"Canonical: {report['canonical_config']}", "", f"Nodes: {report['total_nodes']} total, {report['reachable']} reachable, " f"{report['unreachable']} unreachable", f"Drift: {report['drifted_nodes']} drifted, {report['clean_nodes']} clean, " f"{report['total_drifts']} total key differences", "", ] for node_name, node in report["nodes"].items(): status = "🟢" if node.get("reachable") else "⚫" drift_icon = "🔴" if node.get("has_drift") else "✅" lines.append(f"── {node_name} ({node.get('host', '?')}) {status} {drift_icon} ──") if not node.get("reachable"): lines.append(f" UNREACHABLE: {node.get('error', 'unknown')}") lines.append("") continue lines.append(f" Type: {node.get('machine_type', '?')}, " f"Keys: {node.get('config_keys', 0)}, " f"Drifts: {node.get('drift_count', 0)}") lines.append(format_drift_markdown(node.get("drifts", []))) lines.append("") return "\n".join(lines) # ── Sync ───────────────────────────────────────────────────────────────────── def generate_sync_patch(canonical: dict, remote: dict) -> Dict[str, Any]: """Generate the key-value pairs that need syncing.""" drifts = diff_configs(canonical, remote) flat_canon = _flatten(canonical) patch = {} for d in drifts: if d["drift_type"] == "changed": patch[d["key"]] = d["canonical_value"] elif d["drift_type"] == "missing_remote": patch[d["key"]] = d["canonical_value"] # Don't delete extra_remote keys — that's destructive return patch def sync_node(node_name: str, canonical: dict, apply: bool = False) -> Dict[str, Any]: """Sync a node's config to match canonical (dry-run or apply).""" spec = FLEET_NODES.get(node_name) if not spec: return {"error": f"Unknown node: {node_name}"} raw = ssh_fetch_config(node_name, spec) if raw is None: return {"error": "Could not fetch remote config"} remote_config = load_yaml(raw) patch = generate_sync_patch(canonical, remote_config) result = { "node": node_name, "host": spec["host"], "changes_needed": len(patch), "dry_run": not apply, "patch": patch, } if not apply or not patch: return result # Apply: write canonical config to remote config_content = CANONICAL_CONFIG.read_text(encoding="utf-8") if CANONICAL_CONFIG.exists() else "" config_path = f"{spec['hermes_home']}/config.yaml" host = spec["host"] if host == "localhost": try: os.makedirs(os.path.dirname(config_path), exist_ok=True) with open(config_path, "w", encoding="utf-8") as f: f.write(config_content) result["applied"] = True except Exception as e: result["error"] = str(e) else: target = f"{spec['ssh_user']}@{host}" if spec.get("ssh_user") else host try: proc = subprocess.run( ["ssh", "-o", "ConnectTimeout=10", target, f"cat > {config_path}"], input=config_content, capture_output=True, text=True, timeout=20 ) result["applied"] = proc.returncode == 0 if proc.returncode != 0: result["error"] = proc.stderr[:200] except Exception as e: result["error"] = str(e) # Log the sync if apply: log_entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "node": node_name, "changes": len(patch), "patch_keys": list(patch.keys()), } _append_sync_log(log_entry) return result def _append_sync_log(entry: dict): """Append to the sync log.""" log = [] if SYNC_LOG.exists(): try: with open(SYNC_LOG) as f: log = json.load(f) except Exception: pass log.append(entry) log = log[-100:] # keep last 100 SYNC_LOG.parent.mkdir(parents=True, exist_ok=True) with open(SYNC_LOG, "w") as f: json.dump(log, f, indent=2) # ── CLI ────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Config drift detection for the Timmy fleet") sub = parser.add_subparsers(dest="command", required=True) # report rpt = sub.add_parser("report", help="Full fleet drift report") rpt.add_argument("--json", action="store_true", help="JSON output") rpt.add_argument("--canonical", type=str, default=None, help="Canonical config path") # check chk = sub.add_parser("check", help="Check a single node") chk.add_argument("node", help="Node name (timmy, allegro, bezalel, ezra)") chk.add_argument("--json", action="store_true", help="JSON output") # sync syn = sub.add_parser("sync", help="Sync a node to canonical") syn.add_argument("node", help="Node name") syn.add_argument("--apply", action="store_true", help="Actually apply changes") # sync-all sub.add_parser("sync-all", help="Sync all drifted nodes (dry-run)") args = parser.parse_args() canon_path = Path(args.canonical) if hasattr(args, "canonical") and args.canonical else CANONICAL_CONFIG canonical = load_canonical(canon_path) if args.command == "report": report = report_all(canonical) if args.json: print(json.dumps(report, indent=2)) else: print(format_report(report)) elif args.command == "check": result = check_node(args.node, canonical) if args.json: print(json.dumps(result, indent=2)) else: print(f"Node: {result['node']}") print(f"Host: {result.get('host', '?')}") print(f"Reachable: {result.get('reachable', False)}") print(f"Drifts: {result.get('drift_count', 0)}") if result.get("drifts"): print(format_drift_markdown(result["drifts"])) elif args.command == "sync": result = sync_node(args.node, canonical, apply=args.apply) print(json.dumps(result, indent=2)) elif args.command == "sync-all": report = report_all(canonical) synced = 0 for node_name, node in report["nodes"].items(): if node.get("has_drift") and node.get("reachable"): result = sync_node(node_name, canonical, apply=False) print(f"{node_name}: {result.get('changes_needed', 0)} changes needed (dry-run)") synced += 1 if synced == 0: print("No drifted nodes found. Fleet is clean.") else: print(f"\n{synced} node(s) need syncing. Run 'sync --apply' to fix.") return 0 if __name__ == "__main__": sys.exit(main())