feat: config drift detection across all fleet nodes (#686)

Collects hermes config from all nodes via SSH, diffs against
canonical timmy-config, reports drift, optionally syncs.

Closes #686
This commit is contained in:
2026-04-17 05:18:41 +00:00
parent 6fbf5bb649
commit ef76901243

View File

@@ -0,0 +1,494 @@
#!/usr/bin/env python3
"""
Config Drift Detection — Timmy Foundation Fleet
Collects hermes config from all fleet nodes via SSH, diffs against
the canonical timmy-config, and reports drift.
Usage:
# Full drift report
python3 scripts/config_drift_detector.py report
# JSON output
python3 scripts/config_drift_detector.py report --json
# Check a single node
python3 scripts/config_drift_detector.py check allegro
# Auto-sync a node (dry-run by default)
python3 scripts/config_drift_detector.py sync allegro
python3 scripts/config_drift_detector.py sync allegro --apply
# Sync all drifted nodes
python3 scripts/config_drift_detector.py sync-all
"""
import argparse
import copy
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
# ── Fleet Definition ─────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
ROOT_DIR = SCRIPT_DIR.parent
CANONICAL_CONFIG = ROOT_DIR / "config.yaml"
SYNC_LOG = ROOT_DIR / "fleet" / "drift-sync-log.json"
FLEET_NODES = {
"timmy": {
"host": "localhost",
"ssh_user": None,
"hermes_home": os.path.expanduser("~/.hermes"),
"machine_type": "mac",
},
"allegro": {
"host": "167.99.126.228",
"ssh_user": "root",
"hermes_home": "/root/.hermes",
"machine_type": "vps",
},
"bezalel": {
"host": "159.203.146.185",
"ssh_user": "root",
"hermes_home": "/root/.hermes",
"machine_type": "vps",
},
"ezra": {
"host": "143.198.27.163",
"ssh_user": "root",
"hermes_home": "/root/.hermes",
"machine_type": "vps",
},
}
# ── Config Loading ───────────────────────────────────────────────────────────
def load_yaml(text: str) -> dict:
"""Parse YAML text, return empty dict on failure."""
try:
data = yaml.safe_load(text)
return data if isinstance(data, dict) else {}
except Exception:
return {}
def load_canonical(path: Path = CANONICAL_CONFIG) -> dict:
"""Load the canonical timmy-config."""
if path.exists():
with open(path, encoding="utf-8") as f:
return load_yaml(f.read())
return {}
# ── Remote Config Collection ─────────────────────────────────────────────────
def ssh_fetch_config(node_name: str, node_spec: dict) -> Optional[str]:
"""Fetch config.yaml from a remote node via SSH."""
host = node_spec["host"]
user = node_spec.get("ssh_user")
hermes_home = node_spec["hermes_home"]
config_path = f"{hermes_home}/config.yaml"
if host == "localhost":
# Local read
try:
with open(config_path, encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
return None
except Exception:
return None
# Remote SSH read
target = f"{user}@{host}" if user else host
try:
result = subprocess.run(
["ssh", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=no",
target, "cat", config_path],
capture_output=True, text=True, timeout=20
)
if result.returncode == 0:
return result.stdout
return None
except (subprocess.TimeoutExpired, Exception):
return None
def collect_all_configs() -> Dict[str, Optional[dict]]:
"""Collect configs from all fleet nodes."""
configs = {}
for name, spec in FLEET_NODES.items():
raw = ssh_fetch_config(name, spec)
configs[name] = load_yaml(raw) if raw else None
return configs
# ── Diff Engine ──────────────────────────────────────────────────────────────
def _flatten(d: dict, prefix: str = "") -> Dict[str, Any]:
"""Flatten nested dict into dot-separated keys."""
items = {}
for k, v in d.items():
key = f"{prefix}.{k}" if prefix else str(k)
if isinstance(v, dict):
items.update(_flatten(v, key))
elif isinstance(v, list):
items[key] = json.dumps(v, sort_keys=True)
else:
items[key] = v
return items
def diff_configs(canonical: dict, remote: dict,
ignored_keys: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""
Diff two config dicts. Returns list of drift entries.
Each entry: {key, canonical_value, remote_value, drift_type}
drift_type: "changed" | "missing_remote" | "extra_remote"
"""
ignored = set(ignored_keys or [])
# Ignore keys that are inherently node-specific
ignored.update({
"terminal.cwd", # varies per session
"display.skin", # cosmetic
})
flat_canon = _flatten(canonical)
flat_remote = _flatten(remote)
drifts = []
all_keys = sorted(set(flat_canon.keys()) | set(flat_remote.keys()))
for key in all_keys:
if key in ignored:
continue
# Ignore keys ending with .api_key or .token (secrets, not drift)
if any(key.endswith(s) for s in (".api_key", ".token", ".api_token")):
continue
canon_val = flat_canon.get(key)
remote_val = flat_remote.get(key)
if canon_val is None and remote_val is not None:
drifts.append({
"key": key,
"canonical_value": None,
"remote_value": remote_val,
"drift_type": "extra_remote",
})
elif canon_val is not None and remote_val is None:
drifts.append({
"key": key,
"canonical_value": canon_val,
"remote_value": None,
"drift_type": "missing_remote",
})
elif canon_val != remote_val:
drifts.append({
"key": key,
"canonical_value": canon_val,
"remote_value": remote_val,
"drift_type": "changed",
})
return drifts
# ── Report ───────────────────────────────────────────────────────────────────
def check_node(node_name: str, canonical: dict) -> Dict[str, Any]:
"""Check a single node for config drift."""
spec = FLEET_NODES.get(node_name)
if not spec:
return {"node": node_name, "error": f"Unknown node: {node_name}"}
raw = ssh_fetch_config(node_name, spec)
if raw is None:
return {
"node": node_name,
"host": spec["host"],
"reachable": False,
"drifts": [],
"drift_count": -1,
"error": "Could not fetch config",
}
remote_config = load_yaml(raw)
drifts = diff_configs(canonical, remote_config)
return {
"node": node_name,
"host": spec["host"],
"machine_type": spec["machine_type"],
"reachable": True,
"config_keys": len(_flatten(remote_config)),
"drifts": drifts,
"drift_count": len(drifts),
"has_drift": len(drifts) > 0,
}
def report_all(canonical: dict) -> Dict[str, Any]:
"""Full fleet drift report."""
now = datetime.now(timezone.utc).isoformat()
results = {}
for node_name in FLEET_NODES:
results[node_name] = check_node(node_name, canonical)
total_nodes = len(results)
reachable = sum(1 for r in results.values() if r.get("reachable", False))
drifted = sum(1 for r in results.values() if r.get("has_drift", False))
total_drifts = sum(r.get("drift_count", 0) for r in results.values())
return {
"timestamp": now,
"canonical_config": str(CANONICAL_CONFIG),
"total_nodes": total_nodes,
"reachable": reachable,
"unreachable": total_nodes - reachable,
"drifted_nodes": drifted,
"clean_nodes": reachable - drifted,
"total_drifts": total_drifts,
"nodes": results,
}
# ── Formatters ───────────────────────────────────────────────────────────────
def format_drift_markdown(drifts: List[Dict[str, Any]]) -> str:
"""Format a list of drift entries as markdown table."""
if not drifts:
return " (no drift — config matches canonical)\n"
lines = []
lines.append(f" {'Key':<40} {'Canonical':<20} {'Remote':<20} {'Type'}")
lines.append(f" {''*40} {''*20} {''*20} {''*12}")
for d in drifts[:30]: # cap at 30 rows
key = d["key"][:39]
canon = str(d["canonical_value"] or "(absent)")[:19]
remote = str(d["remote_value"] or "(absent)")[:19]
dtype = d["drift_type"]
lines.append(f" {key:<40} {canon:<20} {remote:<20} {dtype}")
if len(drifts) > 30:
lines.append(f" ... and {len(drifts) - 30} more")
return "\n".join(lines) + "\n"
def format_report(report: dict) -> str:
"""Format full fleet report as terminal text."""
lines = [
"╔══════════════════════════════════════════════════════╗",
"║ 🔍 CONFIG DRIFT DETECTION — TIMMY FOUNDATION ║",
"╚══════════════════════════════════════════════════════╝",
"",
f"Timestamp: {report['timestamp'][:19]}Z",
f"Canonical: {report['canonical_config']}",
"",
f"Nodes: {report['total_nodes']} total, {report['reachable']} reachable, "
f"{report['unreachable']} unreachable",
f"Drift: {report['drifted_nodes']} drifted, {report['clean_nodes']} clean, "
f"{report['total_drifts']} total key differences",
"",
]
for node_name, node in report["nodes"].items():
status = "🟢" if node.get("reachable") else ""
drift_icon = "🔴" if node.get("has_drift") else ""
lines.append(f"── {node_name} ({node.get('host', '?')}) {status} {drift_icon} ──")
if not node.get("reachable"):
lines.append(f" UNREACHABLE: {node.get('error', 'unknown')}")
lines.append("")
continue
lines.append(f" Type: {node.get('machine_type', '?')}, "
f"Keys: {node.get('config_keys', 0)}, "
f"Drifts: {node.get('drift_count', 0)}")
lines.append(format_drift_markdown(node.get("drifts", [])))
lines.append("")
return "\n".join(lines)
# ── Sync ─────────────────────────────────────────────────────────────────────
def generate_sync_patch(canonical: dict, remote: dict) -> Dict[str, Any]:
"""Generate the key-value pairs that need syncing."""
drifts = diff_configs(canonical, remote)
flat_canon = _flatten(canonical)
patch = {}
for d in drifts:
if d["drift_type"] == "changed":
patch[d["key"]] = d["canonical_value"]
elif d["drift_type"] == "missing_remote":
patch[d["key"]] = d["canonical_value"]
# Don't delete extra_remote keys — that's destructive
return patch
def sync_node(node_name: str, canonical: dict, apply: bool = False) -> Dict[str, Any]:
"""Sync a node's config to match canonical (dry-run or apply)."""
spec = FLEET_NODES.get(node_name)
if not spec:
return {"error": f"Unknown node: {node_name}"}
raw = ssh_fetch_config(node_name, spec)
if raw is None:
return {"error": "Could not fetch remote config"}
remote_config = load_yaml(raw)
patch = generate_sync_patch(canonical, remote_config)
result = {
"node": node_name,
"host": spec["host"],
"changes_needed": len(patch),
"dry_run": not apply,
"patch": patch,
}
if not apply or not patch:
return result
# Apply: write canonical config to remote
config_content = CANONICAL_CONFIG.read_text(encoding="utf-8") if CANONICAL_CONFIG.exists() else ""
config_path = f"{spec['hermes_home']}/config.yaml"
host = spec["host"]
if host == "localhost":
try:
os.makedirs(os.path.dirname(config_path), exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
f.write(config_content)
result["applied"] = True
except Exception as e:
result["error"] = str(e)
else:
target = f"{spec['ssh_user']}@{host}" if spec.get("ssh_user") else host
try:
proc = subprocess.run(
["ssh", "-o", "ConnectTimeout=10", target,
f"cat > {config_path}"],
input=config_content, capture_output=True, text=True, timeout=20
)
result["applied"] = proc.returncode == 0
if proc.returncode != 0:
result["error"] = proc.stderr[:200]
except Exception as e:
result["error"] = str(e)
# Log the sync
if apply:
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"node": node_name,
"changes": len(patch),
"patch_keys": list(patch.keys()),
}
_append_sync_log(log_entry)
return result
def _append_sync_log(entry: dict):
"""Append to the sync log."""
log = []
if SYNC_LOG.exists():
try:
with open(SYNC_LOG) as f:
log = json.load(f)
except Exception:
pass
log.append(entry)
log = log[-100:] # keep last 100
SYNC_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(SYNC_LOG, "w") as f:
json.dump(log, f, indent=2)
# ── CLI ──────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Config drift detection for the Timmy fleet")
sub = parser.add_subparsers(dest="command", required=True)
# report
rpt = sub.add_parser("report", help="Full fleet drift report")
rpt.add_argument("--json", action="store_true", help="JSON output")
rpt.add_argument("--canonical", type=str, default=None, help="Canonical config path")
# check
chk = sub.add_parser("check", help="Check a single node")
chk.add_argument("node", help="Node name (timmy, allegro, bezalel, ezra)")
chk.add_argument("--json", action="store_true", help="JSON output")
# sync
syn = sub.add_parser("sync", help="Sync a node to canonical")
syn.add_argument("node", help="Node name")
syn.add_argument("--apply", action="store_true", help="Actually apply changes")
# sync-all
sub.add_parser("sync-all", help="Sync all drifted nodes (dry-run)")
args = parser.parse_args()
canon_path = Path(args.canonical) if hasattr(args, "canonical") and args.canonical else CANONICAL_CONFIG
canonical = load_canonical(canon_path)
if args.command == "report":
report = report_all(canonical)
if args.json:
print(json.dumps(report, indent=2))
else:
print(format_report(report))
elif args.command == "check":
result = check_node(args.node, canonical)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"Node: {result['node']}")
print(f"Host: {result.get('host', '?')}")
print(f"Reachable: {result.get('reachable', False)}")
print(f"Drifts: {result.get('drift_count', 0)}")
if result.get("drifts"):
print(format_drift_markdown(result["drifts"]))
elif args.command == "sync":
result = sync_node(args.node, canonical, apply=args.apply)
print(json.dumps(result, indent=2))
elif args.command == "sync-all":
report = report_all(canonical)
synced = 0
for node_name, node in report["nodes"].items():
if node.get("has_drift") and node.get("reachable"):
result = sync_node(node_name, canonical, apply=False)
print(f"{node_name}: {result.get('changes_needed', 0)} changes needed (dry-run)")
synced += 1
if synced == 0:
print("No drifted nodes found. Fleet is clean.")
else:
print(f"\n{synced} node(s) need syncing. Run 'sync <node> --apply' to fix.")
return 0
if __name__ == "__main__":
sys.exit(main())