Compare commits

...

2 Commits

Author SHA1 Message Date
ade82f2e67 test: config drift detector (#686)
28 tests covering:
- YAML loading (valid, empty, invalid, non-dict)
- Nested flattening (dot paths, lists, deep nesting)
- Config diffing (changed, missing, extra, ignored keys, secrets)
- Sync patch generation (changed, missing, non-destructive)
- Markdown and report formatting

Closes #686
2026-04-17 05:20:08 +00:00
ef76901243 feat: config drift detection across all fleet nodes (#686)
Collects hermes config from all nodes via SSH, diffs against
canonical timmy-config, reports drift, optionally syncs.

Closes #686
2026-04-17 05:18:41 +00:00
2 changed files with 722 additions and 0 deletions

View File

@@ -0,0 +1,494 @@
#!/usr/bin/env python3
"""
Config Drift Detection — Timmy Foundation Fleet
Collects hermes config from all fleet nodes via SSH, diffs against
the canonical timmy-config, and reports drift.
Usage:
# Full drift report
python3 scripts/config_drift_detector.py report
# JSON output
python3 scripts/config_drift_detector.py report --json
# Check a single node
python3 scripts/config_drift_detector.py check allegro
# Auto-sync a node (dry-run by default)
python3 scripts/config_drift_detector.py sync allegro
python3 scripts/config_drift_detector.py sync allegro --apply
# Sync all drifted nodes
python3 scripts/config_drift_detector.py sync-all
"""
import argparse
import copy
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
# ── Fleet Definition ─────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
ROOT_DIR = SCRIPT_DIR.parent
CANONICAL_CONFIG = ROOT_DIR / "config.yaml"
SYNC_LOG = ROOT_DIR / "fleet" / "drift-sync-log.json"
FLEET_NODES = {
"timmy": {
"host": "localhost",
"ssh_user": None,
"hermes_home": os.path.expanduser("~/.hermes"),
"machine_type": "mac",
},
"allegro": {
"host": "167.99.126.228",
"ssh_user": "root",
"hermes_home": "/root/.hermes",
"machine_type": "vps",
},
"bezalel": {
"host": "159.203.146.185",
"ssh_user": "root",
"hermes_home": "/root/.hermes",
"machine_type": "vps",
},
"ezra": {
"host": "143.198.27.163",
"ssh_user": "root",
"hermes_home": "/root/.hermes",
"machine_type": "vps",
},
}
# ── Config Loading ───────────────────────────────────────────────────────────
def load_yaml(text: str) -> dict:
"""Parse YAML text, return empty dict on failure."""
try:
data = yaml.safe_load(text)
return data if isinstance(data, dict) else {}
except Exception:
return {}
def load_canonical(path: Path = CANONICAL_CONFIG) -> dict:
"""Load the canonical timmy-config."""
if path.exists():
with open(path, encoding="utf-8") as f:
return load_yaml(f.read())
return {}
# ── Remote Config Collection ─────────────────────────────────────────────────
def ssh_fetch_config(node_name: str, node_spec: dict) -> Optional[str]:
"""Fetch config.yaml from a remote node via SSH."""
host = node_spec["host"]
user = node_spec.get("ssh_user")
hermes_home = node_spec["hermes_home"]
config_path = f"{hermes_home}/config.yaml"
if host == "localhost":
# Local read
try:
with open(config_path, encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
return None
except Exception:
return None
# Remote SSH read
target = f"{user}@{host}" if user else host
try:
result = subprocess.run(
["ssh", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=no",
target, "cat", config_path],
capture_output=True, text=True, timeout=20
)
if result.returncode == 0:
return result.stdout
return None
except (subprocess.TimeoutExpired, Exception):
return None
def collect_all_configs() -> Dict[str, Optional[dict]]:
"""Collect configs from all fleet nodes."""
configs = {}
for name, spec in FLEET_NODES.items():
raw = ssh_fetch_config(name, spec)
configs[name] = load_yaml(raw) if raw else None
return configs
# ── Diff Engine ──────────────────────────────────────────────────────────────
def _flatten(d: dict, prefix: str = "") -> Dict[str, Any]:
"""Flatten nested dict into dot-separated keys."""
items = {}
for k, v in d.items():
key = f"{prefix}.{k}" if prefix else str(k)
if isinstance(v, dict):
items.update(_flatten(v, key))
elif isinstance(v, list):
items[key] = json.dumps(v, sort_keys=True)
else:
items[key] = v
return items
def diff_configs(canonical: dict, remote: dict,
ignored_keys: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""
Diff two config dicts. Returns list of drift entries.
Each entry: {key, canonical_value, remote_value, drift_type}
drift_type: "changed" | "missing_remote" | "extra_remote"
"""
ignored = set(ignored_keys or [])
# Ignore keys that are inherently node-specific
ignored.update({
"terminal.cwd", # varies per session
"display.skin", # cosmetic
})
flat_canon = _flatten(canonical)
flat_remote = _flatten(remote)
drifts = []
all_keys = sorted(set(flat_canon.keys()) | set(flat_remote.keys()))
for key in all_keys:
if key in ignored:
continue
# Ignore keys ending with .api_key or .token (secrets, not drift)
if any(key.endswith(s) for s in (".api_key", ".token", ".api_token")):
continue
canon_val = flat_canon.get(key)
remote_val = flat_remote.get(key)
if canon_val is None and remote_val is not None:
drifts.append({
"key": key,
"canonical_value": None,
"remote_value": remote_val,
"drift_type": "extra_remote",
})
elif canon_val is not None and remote_val is None:
drifts.append({
"key": key,
"canonical_value": canon_val,
"remote_value": None,
"drift_type": "missing_remote",
})
elif canon_val != remote_val:
drifts.append({
"key": key,
"canonical_value": canon_val,
"remote_value": remote_val,
"drift_type": "changed",
})
return drifts
# ── Report ───────────────────────────────────────────────────────────────────
def check_node(node_name: str, canonical: dict) -> Dict[str, Any]:
"""Check a single node for config drift."""
spec = FLEET_NODES.get(node_name)
if not spec:
return {"node": node_name, "error": f"Unknown node: {node_name}"}
raw = ssh_fetch_config(node_name, spec)
if raw is None:
return {
"node": node_name,
"host": spec["host"],
"reachable": False,
"drifts": [],
"drift_count": -1,
"error": "Could not fetch config",
}
remote_config = load_yaml(raw)
drifts = diff_configs(canonical, remote_config)
return {
"node": node_name,
"host": spec["host"],
"machine_type": spec["machine_type"],
"reachable": True,
"config_keys": len(_flatten(remote_config)),
"drifts": drifts,
"drift_count": len(drifts),
"has_drift": len(drifts) > 0,
}
def report_all(canonical: dict) -> Dict[str, Any]:
"""Full fleet drift report."""
now = datetime.now(timezone.utc).isoformat()
results = {}
for node_name in FLEET_NODES:
results[node_name] = check_node(node_name, canonical)
total_nodes = len(results)
reachable = sum(1 for r in results.values() if r.get("reachable", False))
drifted = sum(1 for r in results.values() if r.get("has_drift", False))
total_drifts = sum(r.get("drift_count", 0) for r in results.values())
return {
"timestamp": now,
"canonical_config": str(CANONICAL_CONFIG),
"total_nodes": total_nodes,
"reachable": reachable,
"unreachable": total_nodes - reachable,
"drifted_nodes": drifted,
"clean_nodes": reachable - drifted,
"total_drifts": total_drifts,
"nodes": results,
}
# ── Formatters ───────────────────────────────────────────────────────────────
def format_drift_markdown(drifts: List[Dict[str, Any]]) -> str:
"""Format a list of drift entries as markdown table."""
if not drifts:
return " (no drift — config matches canonical)\n"
lines = []
lines.append(f" {'Key':<40} {'Canonical':<20} {'Remote':<20} {'Type'}")
lines.append(f" {''*40} {''*20} {''*20} {''*12}")
for d in drifts[:30]: # cap at 30 rows
key = d["key"][:39]
canon = str(d["canonical_value"] or "(absent)")[:19]
remote = str(d["remote_value"] or "(absent)")[:19]
dtype = d["drift_type"]
lines.append(f" {key:<40} {canon:<20} {remote:<20} {dtype}")
if len(drifts) > 30:
lines.append(f" ... and {len(drifts) - 30} more")
return "\n".join(lines) + "\n"
def format_report(report: dict) -> str:
"""Format full fleet report as terminal text."""
lines = [
"╔══════════════════════════════════════════════════════╗",
"║ 🔍 CONFIG DRIFT DETECTION — TIMMY FOUNDATION ║",
"╚══════════════════════════════════════════════════════╝",
"",
f"Timestamp: {report['timestamp'][:19]}Z",
f"Canonical: {report['canonical_config']}",
"",
f"Nodes: {report['total_nodes']} total, {report['reachable']} reachable, "
f"{report['unreachable']} unreachable",
f"Drift: {report['drifted_nodes']} drifted, {report['clean_nodes']} clean, "
f"{report['total_drifts']} total key differences",
"",
]
for node_name, node in report["nodes"].items():
status = "🟢" if node.get("reachable") else ""
drift_icon = "🔴" if node.get("has_drift") else ""
lines.append(f"── {node_name} ({node.get('host', '?')}) {status} {drift_icon} ──")
if not node.get("reachable"):
lines.append(f" UNREACHABLE: {node.get('error', 'unknown')}")
lines.append("")
continue
lines.append(f" Type: {node.get('machine_type', '?')}, "
f"Keys: {node.get('config_keys', 0)}, "
f"Drifts: {node.get('drift_count', 0)}")
lines.append(format_drift_markdown(node.get("drifts", [])))
lines.append("")
return "\n".join(lines)
# ── Sync ─────────────────────────────────────────────────────────────────────
def generate_sync_patch(canonical: dict, remote: dict) -> Dict[str, Any]:
"""Generate the key-value pairs that need syncing."""
drifts = diff_configs(canonical, remote)
flat_canon = _flatten(canonical)
patch = {}
for d in drifts:
if d["drift_type"] == "changed":
patch[d["key"]] = d["canonical_value"]
elif d["drift_type"] == "missing_remote":
patch[d["key"]] = d["canonical_value"]
# Don't delete extra_remote keys — that's destructive
return patch
def sync_node(node_name: str, canonical: dict, apply: bool = False) -> Dict[str, Any]:
"""Sync a node's config to match canonical (dry-run or apply)."""
spec = FLEET_NODES.get(node_name)
if not spec:
return {"error": f"Unknown node: {node_name}"}
raw = ssh_fetch_config(node_name, spec)
if raw is None:
return {"error": "Could not fetch remote config"}
remote_config = load_yaml(raw)
patch = generate_sync_patch(canonical, remote_config)
result = {
"node": node_name,
"host": spec["host"],
"changes_needed": len(patch),
"dry_run": not apply,
"patch": patch,
}
if not apply or not patch:
return result
# Apply: write canonical config to remote
config_content = CANONICAL_CONFIG.read_text(encoding="utf-8") if CANONICAL_CONFIG.exists() else ""
config_path = f"{spec['hermes_home']}/config.yaml"
host = spec["host"]
if host == "localhost":
try:
os.makedirs(os.path.dirname(config_path), exist_ok=True)
with open(config_path, "w", encoding="utf-8") as f:
f.write(config_content)
result["applied"] = True
except Exception as e:
result["error"] = str(e)
else:
target = f"{spec['ssh_user']}@{host}" if spec.get("ssh_user") else host
try:
proc = subprocess.run(
["ssh", "-o", "ConnectTimeout=10", target,
f"cat > {config_path}"],
input=config_content, capture_output=True, text=True, timeout=20
)
result["applied"] = proc.returncode == 0
if proc.returncode != 0:
result["error"] = proc.stderr[:200]
except Exception as e:
result["error"] = str(e)
# Log the sync
if apply:
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"node": node_name,
"changes": len(patch),
"patch_keys": list(patch.keys()),
}
_append_sync_log(log_entry)
return result
def _append_sync_log(entry: dict):
"""Append to the sync log."""
log = []
if SYNC_LOG.exists():
try:
with open(SYNC_LOG) as f:
log = json.load(f)
except Exception:
pass
log.append(entry)
log = log[-100:] # keep last 100
SYNC_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(SYNC_LOG, "w") as f:
json.dump(log, f, indent=2)
# ── CLI ──────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Config drift detection for the Timmy fleet")
sub = parser.add_subparsers(dest="command", required=True)
# report
rpt = sub.add_parser("report", help="Full fleet drift report")
rpt.add_argument("--json", action="store_true", help="JSON output")
rpt.add_argument("--canonical", type=str, default=None, help="Canonical config path")
# check
chk = sub.add_parser("check", help="Check a single node")
chk.add_argument("node", help="Node name (timmy, allegro, bezalel, ezra)")
chk.add_argument("--json", action="store_true", help="JSON output")
# sync
syn = sub.add_parser("sync", help="Sync a node to canonical")
syn.add_argument("node", help="Node name")
syn.add_argument("--apply", action="store_true", help="Actually apply changes")
# sync-all
sub.add_parser("sync-all", help="Sync all drifted nodes (dry-run)")
args = parser.parse_args()
canon_path = Path(args.canonical) if hasattr(args, "canonical") and args.canonical else CANONICAL_CONFIG
canonical = load_canonical(canon_path)
if args.command == "report":
report = report_all(canonical)
if args.json:
print(json.dumps(report, indent=2))
else:
print(format_report(report))
elif args.command == "check":
result = check_node(args.node, canonical)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"Node: {result['node']}")
print(f"Host: {result.get('host', '?')}")
print(f"Reachable: {result.get('reachable', False)}")
print(f"Drifts: {result.get('drift_count', 0)}")
if result.get("drifts"):
print(format_drift_markdown(result["drifts"]))
elif args.command == "sync":
result = sync_node(args.node, canonical, apply=args.apply)
print(json.dumps(result, indent=2))
elif args.command == "sync-all":
report = report_all(canonical)
synced = 0
for node_name, node in report["nodes"].items():
if node.get("has_drift") and node.get("reachable"):
result = sync_node(node_name, canonical, apply=False)
print(f"{node_name}: {result.get('changes_needed', 0)} changes needed (dry-run)")
synced += 1
if synced == 0:
print("No drifted nodes found. Fleet is clean.")
else:
print(f"\n{synced} node(s) need syncing. Run 'sync <node> --apply' to fix.")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,228 @@
"""Tests for config drift detection."""
import json
import sys
from pathlib import Path
import pytest
SCRIPT_DIR = Path(__file__).resolve().parent.parent / "scripts"
sys.path.insert(0, str(SCRIPT_DIR))
from config_drift_detector import (
load_yaml,
_flatten,
diff_configs,
generate_sync_patch,
format_drift_markdown,
format_report,
)
# ── YAML Loading ─────────────────────────────────────────────────────────────
class TestLoadYaml:
def test_valid_yaml(self):
result = load_yaml("model:\n default: hermes\n provider: nous")
assert result["model"]["default"] == "hermes"
def test_empty_string(self):
assert load_yaml("") == {}
def test_invalid_yaml(self):
assert load_yaml("{invalid yaml ][") == {}
def test_non_dict_root(self):
assert load_yaml("- item1\n- item2") == {}
# ── Flatten ──────────────────────────────────────────────────────────────────
class TestFlatten:
def test_flat_dict(self):
assert _flatten({"a": 1, "b": 2}) == {"a": 1, "b": 2}
def test_nested_dict(self):
result = _flatten({"model": {"default": "hermes", "provider": "nous"}})
assert result["model.default"] == "hermes"
assert result["model.provider"] == "nous"
def test_deeply_nested(self):
result = _flatten({"a": {"b": {"c": 42}}})
assert result["a.b.c"] == 42
def test_list_serialized_as_json(self):
result = _flatten({"toolsets": ["all", "terminal"]})
assert result["toolsets"] == '["all", "terminal"]'
def test_empty_dict(self):
assert _flatten({}) == {}
# ── Diff Configs ─────────────────────────────────────────────────────────────
class TestDiffConfigs:
def test_identical_configs(self):
cfg = {"model": {"default": "hermes"}, "agent": {"max_turns": 30}}
assert diff_configs(cfg, cfg) == []
def test_changed_value(self):
canon = {"model": {"default": "hermes"}}
remote = {"model": {"default": "gpt-4"}}
drifts = diff_configs(canon, remote)
assert len(drifts) == 1
assert drifts[0]["key"] == "model.default"
assert drifts[0]["drift_type"] == "changed"
assert drifts[0]["canonical_value"] == "hermes"
assert drifts[0]["remote_value"] == "gpt-4"
def test_missing_remote_key(self):
canon = {"model": {"default": "hermes"}, "agent": {"max_turns": 30}}
remote = {"model": {"default": "hermes"}}
drifts = diff_configs(canon, remote)
assert len(drifts) == 1
assert drifts[0]["key"] == "agent.max_turns"
assert drifts[0]["drift_type"] == "missing_remote"
def test_extra_remote_key(self):
canon = {"model": {"default": "hermes"}}
remote = {"model": {"default": "hermes"}, "extra": {"key": 1}}
drifts = diff_configs(canon, remote)
assert len(drifts) == 1
assert drifts[0]["drift_type"] == "extra_remote"
def test_ignored_keys(self):
canon = {"terminal": {"cwd": "/a"}, "model": {"default": "hermes"}}
remote = {"terminal": {"cwd": "/b"}, "model": {"default": "hermes"}}
# terminal.cwd is in the default ignore list
drifts = diff_configs(canon, remote)
assert len(drifts) == 0
def test_api_keys_ignored(self):
canon = {"auxiliary": {"vision": {"api_key": "sk-old"}}}
remote = {"auxiliary": {"vision": {"api_key": "sk-new"}}}
drifts = diff_configs(canon, remote)
assert len(drifts) == 0
def test_custom_ignored_keys(self):
canon = {"a": 1, "b": 2}
remote = {"a": 99, "b": 99}
drifts = diff_configs(canon, remote, ignored_keys=["a"])
assert len(drifts) == 1
assert drifts[0]["key"] == "b"
def test_multiple_drifts(self):
canon = {"model": {"default": "hermes", "provider": "nous"}, "agent": {"max_turns": 30}}
remote = {"model": {"default": "gpt-4", "provider": "openai"}, "agent": {"max_turns": 30}}
drifts = diff_configs(canon, remote)
assert len(drifts) == 2
keys = {d["key"] for d in drifts}
assert "model.default" in keys
assert "model.provider" in keys
def test_list_values_compared_by_content(self):
canon = {"toolsets": ["all"]}
remote = {"toolsets": ["terminal", "file"]}
drifts = diff_configs(canon, remote)
assert len(drifts) == 1
assert drifts[0]["drift_type"] == "changed"
# ── Sync Patch ───────────────────────────────────────────────────────────────
class TestGenerateSyncPatch:
def test_empty_when_identical(self):
cfg = {"model": {"default": "hermes"}}
assert generate_sync_patch(cfg, cfg) == {}
def test_patches_changed_values(self):
canon = {"model": {"default": "hermes"}}
remote = {"model": {"default": "gpt-4"}}
patch = generate_sync_patch(canon, remote)
assert patch["model.default"] == "hermes"
def test_patches_missing_keys(self):
canon = {"model": {"default": "hermes"}, "agent": {"max_turns": 30}}
remote = {"model": {"default": "hermes"}}
patch = generate_sync_patch(canon, remote)
assert patch["agent.max_turns"] == 30
def test_does_not_delete_extra_keys(self):
canon = {"model": {"default": "hermes"}}
remote = {"model": {"default": "hermes"}, "extra": 1}
patch = generate_sync_patch(canon, remote)
assert "extra" not in patch
# ── Formatting ───────────────────────────────────────────────────────────────
class TestFormatting:
def test_drift_markdown_no_drift(self):
text = format_drift_markdown([])
assert "no drift" in text.lower()
def test_drift_markdown_with_drifts(self):
drifts = [
{"key": "model.default", "canonical_value": "hermes", "remote_value": "gpt-4", "drift_type": "changed"},
]
text = format_drift_markdown(drifts)
assert "model.default" in text
assert "hermes" in text
assert "gpt-4" in text
assert "changed" in text
def test_drift_markdown_caps_at_30(self):
drifts = [
{"key": f"key.{i}", "canonical_value": "a", "remote_value": "b", "drift_type": "changed"}
for i in range(50)
]
text = format_drift_markdown(drifts)
assert "and 20 more" in text
def test_report_format_has_sections(self):
report = {
"timestamp": "2026-04-15T12:00:00+00:00",
"canonical_config": "config.yaml",
"total_nodes": 4,
"reachable": 3,
"unreachable": 1,
"drifted_nodes": 1,
"clean_nodes": 2,
"total_drifts": 3,
"nodes": {
"allegro": {
"host": "167.99.126.228",
"reachable": True,
"machine_type": "vps",
"config_keys": 25,
"has_drift": True,
"drift_count": 3,
"drifts": [
{"key": "model.default", "canonical_value": "hermes", "remote_value": "gpt-4", "drift_type": "changed"},
{"key": "agent.max_turns", "canonical_value": 30, "remote_value": 50, "drift_type": "changed"},
{"key": "terminal.timeout", "canonical_value": 180, "remote_value": None, "drift_type": "missing_remote"},
],
},
"timmy": {
"host": "localhost",
"reachable": True,
"machine_type": "mac",
"config_keys": 30,
"has_drift": False,
"drift_count": 0,
"drifts": [],
},
"ezra": {
"host": "143.198.27.163",
"reachable": False,
"drift_count": -1,
"error": "Could not fetch config",
"drifts": [],
},
},
}
text = format_report(report)
assert "CONFIG DRIFT DETECTION" in text
assert "allegro" in text.lower()
assert "UNREACHABLE" in text
assert "model.default" in text
assert "no drift" in text.lower() # timmy