diff --git a/scripts/config_drift_detector.py b/scripts/config_drift_detector.py new file mode 100644 index 00000000..a8ff76ad --- /dev/null +++ b/scripts/config_drift_detector.py @@ -0,0 +1,494 @@ +#!/usr/bin/env python3 +""" +Config Drift Detection — Timmy Foundation Fleet + +Collects hermes config from all fleet nodes via SSH, diffs against +the canonical timmy-config, and reports drift. + +Usage: + # Full drift report + python3 scripts/config_drift_detector.py report + + # JSON output + python3 scripts/config_drift_detector.py report --json + + # Check a single node + python3 scripts/config_drift_detector.py check allegro + + # Auto-sync a node (dry-run by default) + python3 scripts/config_drift_detector.py sync allegro + python3 scripts/config_drift_detector.py sync allegro --apply + + # Sync all drifted nodes + python3 scripts/config_drift_detector.py sync-all +""" + +import argparse +import copy +import json +import os +import subprocess +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import yaml + +# ── Fleet Definition ───────────────────────────────────────────────────────── + +SCRIPT_DIR = Path(__file__).resolve().parent +ROOT_DIR = SCRIPT_DIR.parent +CANONICAL_CONFIG = ROOT_DIR / "config.yaml" +SYNC_LOG = ROOT_DIR / "fleet" / "drift-sync-log.json" + +FLEET_NODES = { + "timmy": { + "host": "localhost", + "ssh_user": None, + "hermes_home": os.path.expanduser("~/.hermes"), + "machine_type": "mac", + }, + "allegro": { + "host": "167.99.126.228", + "ssh_user": "root", + "hermes_home": "/root/.hermes", + "machine_type": "vps", + }, + "bezalel": { + "host": "159.203.146.185", + "ssh_user": "root", + "hermes_home": "/root/.hermes", + "machine_type": "vps", + }, + "ezra": { + "host": "143.198.27.163", + "ssh_user": "root", + "hermes_home": "/root/.hermes", + "machine_type": "vps", + }, +} + + +# ── Config Loading ─────────────────────────────────────────────────────────── + +def load_yaml(text: str) -> dict: + """Parse YAML text, return empty dict on failure.""" + try: + data = yaml.safe_load(text) + return data if isinstance(data, dict) else {} + except Exception: + return {} + + +def load_canonical(path: Path = CANONICAL_CONFIG) -> dict: + """Load the canonical timmy-config.""" + if path.exists(): + with open(path, encoding="utf-8") as f: + return load_yaml(f.read()) + return {} + + +# ── Remote Config Collection ───────────────────────────────────────────────── + +def ssh_fetch_config(node_name: str, node_spec: dict) -> Optional[str]: + """Fetch config.yaml from a remote node via SSH.""" + host = node_spec["host"] + user = node_spec.get("ssh_user") + hermes_home = node_spec["hermes_home"] + config_path = f"{hermes_home}/config.yaml" + + if host == "localhost": + # Local read + try: + with open(config_path, encoding="utf-8") as f: + return f.read() + except FileNotFoundError: + return None + except Exception: + return None + + # Remote SSH read + target = f"{user}@{host}" if user else host + try: + result = subprocess.run( + ["ssh", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=no", + target, "cat", config_path], + capture_output=True, text=True, timeout=20 + ) + if result.returncode == 0: + return result.stdout + return None + except (subprocess.TimeoutExpired, Exception): + return None + + +def collect_all_configs() -> Dict[str, Optional[dict]]: + """Collect configs from all fleet nodes.""" + configs = {} + for name, spec in FLEET_NODES.items(): + raw = ssh_fetch_config(name, spec) + configs[name] = load_yaml(raw) if raw else None + return configs + + +# ── Diff Engine ────────────────────────────────────────────────────────────── + +def _flatten(d: dict, prefix: str = "") -> Dict[str, Any]: + """Flatten nested dict into dot-separated keys.""" + items = {} + for k, v in d.items(): + key = f"{prefix}.{k}" if prefix else str(k) + if isinstance(v, dict): + items.update(_flatten(v, key)) + elif isinstance(v, list): + items[key] = json.dumps(v, sort_keys=True) + else: + items[key] = v + return items + + +def diff_configs(canonical: dict, remote: dict, + ignored_keys: Optional[List[str]] = None) -> List[Dict[str, Any]]: + """ + Diff two config dicts. Returns list of drift entries. + + Each entry: {key, canonical_value, remote_value, drift_type} + drift_type: "changed" | "missing_remote" | "extra_remote" + """ + ignored = set(ignored_keys or []) + # Ignore keys that are inherently node-specific + ignored.update({ + "terminal.cwd", # varies per session + "display.skin", # cosmetic + }) + + flat_canon = _flatten(canonical) + flat_remote = _flatten(remote) + drifts = [] + + all_keys = sorted(set(flat_canon.keys()) | set(flat_remote.keys())) + + for key in all_keys: + if key in ignored: + continue + # Ignore keys ending with .api_key or .token (secrets, not drift) + if any(key.endswith(s) for s in (".api_key", ".token", ".api_token")): + continue + + canon_val = flat_canon.get(key) + remote_val = flat_remote.get(key) + + if canon_val is None and remote_val is not None: + drifts.append({ + "key": key, + "canonical_value": None, + "remote_value": remote_val, + "drift_type": "extra_remote", + }) + elif canon_val is not None and remote_val is None: + drifts.append({ + "key": key, + "canonical_value": canon_val, + "remote_value": None, + "drift_type": "missing_remote", + }) + elif canon_val != remote_val: + drifts.append({ + "key": key, + "canonical_value": canon_val, + "remote_value": remote_val, + "drift_type": "changed", + }) + + return drifts + + +# ── Report ─────────────────────────────────────────────────────────────────── + +def check_node(node_name: str, canonical: dict) -> Dict[str, Any]: + """Check a single node for config drift.""" + spec = FLEET_NODES.get(node_name) + if not spec: + return {"node": node_name, "error": f"Unknown node: {node_name}"} + + raw = ssh_fetch_config(node_name, spec) + if raw is None: + return { + "node": node_name, + "host": spec["host"], + "reachable": False, + "drifts": [], + "drift_count": -1, + "error": "Could not fetch config", + } + + remote_config = load_yaml(raw) + drifts = diff_configs(canonical, remote_config) + + return { + "node": node_name, + "host": spec["host"], + "machine_type": spec["machine_type"], + "reachable": True, + "config_keys": len(_flatten(remote_config)), + "drifts": drifts, + "drift_count": len(drifts), + "has_drift": len(drifts) > 0, + } + + +def report_all(canonical: dict) -> Dict[str, Any]: + """Full fleet drift report.""" + now = datetime.now(timezone.utc).isoformat() + results = {} + + for node_name in FLEET_NODES: + results[node_name] = check_node(node_name, canonical) + + total_nodes = len(results) + reachable = sum(1 for r in results.values() if r.get("reachable", False)) + drifted = sum(1 for r in results.values() if r.get("has_drift", False)) + total_drifts = sum(r.get("drift_count", 0) for r in results.values()) + + return { + "timestamp": now, + "canonical_config": str(CANONICAL_CONFIG), + "total_nodes": total_nodes, + "reachable": reachable, + "unreachable": total_nodes - reachable, + "drifted_nodes": drifted, + "clean_nodes": reachable - drifted, + "total_drifts": total_drifts, + "nodes": results, + } + + +# ── Formatters ─────────────────────────────────────────────────────────────── + +def format_drift_markdown(drifts: List[Dict[str, Any]]) -> str: + """Format a list of drift entries as markdown table.""" + if not drifts: + return " (no drift — config matches canonical)\n" + + lines = [] + lines.append(f" {'Key':<40} {'Canonical':<20} {'Remote':<20} {'Type'}") + lines.append(f" {'─'*40} {'─'*20} {'─'*20} {'─'*12}") + + for d in drifts[:30]: # cap at 30 rows + key = d["key"][:39] + canon = str(d["canonical_value"] or "(absent)")[:19] + remote = str(d["remote_value"] or "(absent)")[:19] + dtype = d["drift_type"] + lines.append(f" {key:<40} {canon:<20} {remote:<20} {dtype}") + + if len(drifts) > 30: + lines.append(f" ... and {len(drifts) - 30} more") + + return "\n".join(lines) + "\n" + + +def format_report(report: dict) -> str: + """Format full fleet report as terminal text.""" + lines = [ + "╔══════════════════════════════════════════════════════╗", + "║ 🔍 CONFIG DRIFT DETECTION — TIMMY FOUNDATION ║", + "╚══════════════════════════════════════════════════════╝", + "", + f"Timestamp: {report['timestamp'][:19]}Z", + f"Canonical: {report['canonical_config']}", + "", + f"Nodes: {report['total_nodes']} total, {report['reachable']} reachable, " + f"{report['unreachable']} unreachable", + f"Drift: {report['drifted_nodes']} drifted, {report['clean_nodes']} clean, " + f"{report['total_drifts']} total key differences", + "", + ] + + for node_name, node in report["nodes"].items(): + status = "🟢" if node.get("reachable") else "⚫" + drift_icon = "🔴" if node.get("has_drift") else "✅" + lines.append(f"── {node_name} ({node.get('host', '?')}) {status} {drift_icon} ──") + + if not node.get("reachable"): + lines.append(f" UNREACHABLE: {node.get('error', 'unknown')}") + lines.append("") + continue + + lines.append(f" Type: {node.get('machine_type', '?')}, " + f"Keys: {node.get('config_keys', 0)}, " + f"Drifts: {node.get('drift_count', 0)}") + lines.append(format_drift_markdown(node.get("drifts", []))) + lines.append("") + + return "\n".join(lines) + + +# ── Sync ───────────────────────────────────────────────────────────────────── + +def generate_sync_patch(canonical: dict, remote: dict) -> Dict[str, Any]: + """Generate the key-value pairs that need syncing.""" + drifts = diff_configs(canonical, remote) + flat_canon = _flatten(canonical) + + patch = {} + for d in drifts: + if d["drift_type"] == "changed": + patch[d["key"]] = d["canonical_value"] + elif d["drift_type"] == "missing_remote": + patch[d["key"]] = d["canonical_value"] + # Don't delete extra_remote keys — that's destructive + + return patch + + +def sync_node(node_name: str, canonical: dict, apply: bool = False) -> Dict[str, Any]: + """Sync a node's config to match canonical (dry-run or apply).""" + spec = FLEET_NODES.get(node_name) + if not spec: + return {"error": f"Unknown node: {node_name}"} + + raw = ssh_fetch_config(node_name, spec) + if raw is None: + return {"error": "Could not fetch remote config"} + + remote_config = load_yaml(raw) + patch = generate_sync_patch(canonical, remote_config) + + result = { + "node": node_name, + "host": spec["host"], + "changes_needed": len(patch), + "dry_run": not apply, + "patch": patch, + } + + if not apply or not patch: + return result + + # Apply: write canonical config to remote + config_content = CANONICAL_CONFIG.read_text(encoding="utf-8") if CANONICAL_CONFIG.exists() else "" + config_path = f"{spec['hermes_home']}/config.yaml" + + host = spec["host"] + if host == "localhost": + try: + os.makedirs(os.path.dirname(config_path), exist_ok=True) + with open(config_path, "w", encoding="utf-8") as f: + f.write(config_content) + result["applied"] = True + except Exception as e: + result["error"] = str(e) + else: + target = f"{spec['ssh_user']}@{host}" if spec.get("ssh_user") else host + try: + proc = subprocess.run( + ["ssh", "-o", "ConnectTimeout=10", target, + f"cat > {config_path}"], + input=config_content, capture_output=True, text=True, timeout=20 + ) + result["applied"] = proc.returncode == 0 + if proc.returncode != 0: + result["error"] = proc.stderr[:200] + except Exception as e: + result["error"] = str(e) + + # Log the sync + if apply: + log_entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "node": node_name, + "changes": len(patch), + "patch_keys": list(patch.keys()), + } + _append_sync_log(log_entry) + + return result + + +def _append_sync_log(entry: dict): + """Append to the sync log.""" + log = [] + if SYNC_LOG.exists(): + try: + with open(SYNC_LOG) as f: + log = json.load(f) + except Exception: + pass + log.append(entry) + log = log[-100:] # keep last 100 + SYNC_LOG.parent.mkdir(parents=True, exist_ok=True) + with open(SYNC_LOG, "w") as f: + json.dump(log, f, indent=2) + + +# ── CLI ────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Config drift detection for the Timmy fleet") + sub = parser.add_subparsers(dest="command", required=True) + + # report + rpt = sub.add_parser("report", help="Full fleet drift report") + rpt.add_argument("--json", action="store_true", help="JSON output") + rpt.add_argument("--canonical", type=str, default=None, help="Canonical config path") + + # check + chk = sub.add_parser("check", help="Check a single node") + chk.add_argument("node", help="Node name (timmy, allegro, bezalel, ezra)") + chk.add_argument("--json", action="store_true", help="JSON output") + + # sync + syn = sub.add_parser("sync", help="Sync a node to canonical") + syn.add_argument("node", help="Node name") + syn.add_argument("--apply", action="store_true", help="Actually apply changes") + + # sync-all + sub.add_parser("sync-all", help="Sync all drifted nodes (dry-run)") + + args = parser.parse_args() + + canon_path = Path(args.canonical) if hasattr(args, "canonical") and args.canonical else CANONICAL_CONFIG + canonical = load_canonical(canon_path) + + if args.command == "report": + report = report_all(canonical) + if args.json: + print(json.dumps(report, indent=2)) + else: + print(format_report(report)) + + elif args.command == "check": + result = check_node(args.node, canonical) + if args.json: + print(json.dumps(result, indent=2)) + else: + print(f"Node: {result['node']}") + print(f"Host: {result.get('host', '?')}") + print(f"Reachable: {result.get('reachable', False)}") + print(f"Drifts: {result.get('drift_count', 0)}") + if result.get("drifts"): + print(format_drift_markdown(result["drifts"])) + + elif args.command == "sync": + result = sync_node(args.node, canonical, apply=args.apply) + print(json.dumps(result, indent=2)) + + elif args.command == "sync-all": + report = report_all(canonical) + synced = 0 + for node_name, node in report["nodes"].items(): + if node.get("has_drift") and node.get("reachable"): + result = sync_node(node_name, canonical, apply=False) + print(f"{node_name}: {result.get('changes_needed', 0)} changes needed (dry-run)") + synced += 1 + if synced == 0: + print("No drifted nodes found. Fleet is clean.") + else: + print(f"\n{synced} node(s) need syncing. Run 'sync --apply' to fix.") + + return 0 + + +if __name__ == "__main__": + sys.exit(main())