Merge PR #776: scripts/config_drift.py (added)

This commit is contained in:
Merge Bot
2026-04-16 04:58:44 +00:00
parent 5c4b453687
commit 7f121d5591

267
scripts/config_drift.py Normal file
View File

@@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""
config_drift.py — Detect configuration drift across fleet nodes.
Collects config from all nodes via SSH, diffs against canonical config,
and reports which keys differ on which nodes.
Usage:
python3 config_drift.py --nodes allegro,ezra,bezalel
python3 config_drift.py --inventory ansible/playbooks/inventory
python3 config_drift.py --check-only # don't fetch, compare existing
python3 config_drift.py --sync # auto-sync with approval
Exit codes:
0 = no drift detected
1 = drift detected
2 = error
"""
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Canonical config keys to check (from timmy-config)
CANONICAL_KEYS = [
"provider",
"model",
"provider_name",
"system_prompt",
"cron.enabled",
"cron.workers",
"cron.tick_seconds",
"session.reset_after",
"session.max_turns",
]
CANONICAL_CONFIG_PATH = Path(__file__).parent.parent / "config" / "config.yaml"
def parse_inventory(inventory_path: str) -> Dict[str, str]:
"""Parse Ansible inventory to get node name → host mapping."""
nodes = {}
current_section = None
with open(inventory_path) as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if line.startswith('[') and line.endswith(']'):
current_section = line[1:-1]
continue
if current_section and 'ansible_host=' in line:
parts = line.split()
name = parts[0]
host = None
for p in parts:
if p.startswith('ansible_host='):
host = p.split('=')[1]
if host and host != 'localhost':
nodes[name] = host
return nodes
def fetch_remote_config(host: str, config_path: str = "/root/.hermes/config.yaml") -> Optional[Dict]:
"""Fetch config from remote node via SSH."""
try:
result = subprocess.run(
["ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10",
f"root@{host}", f"cat {config_path} 2>/dev/null || echo '{{}}'"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
try:
import yaml
return yaml.safe_load(result.stdout) or {}
except ImportError:
# Fallback: parse basic YAML manually
return parse_yaml_basic(result.stdout)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return None
def parse_yaml_basic(content: str) -> Dict:
"""Basic YAML parser for simple key-value configs."""
result = {}
for line in content.split('\n'):
line = line.strip()
if not line or line.startswith('#'):
continue
if ':' in line:
key, _, value = line.partition(':')
key = key.strip()
value = value.strip().strip('"').strip("'")
if value.lower() == 'true':
value = True
elif value.lower() == 'false':
value = False
elif value.isdigit():
value = int(value)
result[key] = value
return result
def get_nested_value(config: Dict, key_path: str):
"""Get value from nested dict using dot notation."""
keys = key_path.split('.')
value = config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
else:
return None
return value
def compare_configs(canonical: Dict, remote: Dict, keys: List[str]) -> List[Tuple[str, str, any, any]]:
"""
Compare canonical config against remote config.
Returns list of (key, node, canonical_value, remote_value) for differences.
"""
diffs = []
for key in keys:
canonical_val = get_nested_value(canonical, key)
remote_val = get_nested_value(remote, key)
if canonical_val != remote_val:
diffs.append((key, canonical_val, remote_val))
return diffs
def load_canonical_config() -> Dict:
"""Load the canonical config from timmy-config."""
if CANONICAL_CONFIG_PATH.exists():
try:
import yaml
with open(CANONICAL_CONFIG_PATH) as f:
return yaml.safe_load(f) or {}
except ImportError:
with open(CANONICAL_CONFIG_PATH) as f:
return parse_yaml_basic(f.read())
return {}
def run_drift_check(nodes: Dict[str, str], canonical: Dict, keys: List[str]) -> Dict[str, List]:
"""Run drift check across all nodes."""
results = {}
for name, host in nodes.items():
remote_config = fetch_remote_config(host)
if remote_config is None:
results[name] = {"status": "unreachable", "diffs": []}
continue
diffs = compare_configs(canonical, remote_config, keys)
results[name] = {
"status": "drift" if diffs else "ok",
"host": host,
"diffs": [(k, str(cv), str(rv)) for k, cv, rv in diffs],
}
return results
def generate_report(results: Dict, canonical_keys: List[str]) -> str:
"""Generate human-readable drift report."""
lines = []
lines.append("=" * 60)
lines.append(" CONFIG DRIFT REPORT")
lines.append(f" {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
lines.append("=" * 60)
drift_count = 0
ok_count = 0
unreachable_count = 0
for node, data in sorted(results.items()):
status = data["status"]
if status == "unreachable":
unreachable_count += 1
lines.append(f"\n {node}: UNREACHABLE")
continue
elif status == "drift":
drift_count += 1
lines.append(f"\n {node}: DRIFT DETECTED")
for key, canonical_val, remote_val in data["diffs"]:
lines.append(f" {key}:")
lines.append(f" canonical: {canonical_val}")
lines.append(f" remote: {remote_val}")
else:
ok_count += 1
lines.append(f"\n {node}: OK")
lines.append(f"\n{'=' * 60}")
lines.append(f" Summary: {ok_count} ok, {drift_count} drift, {unreachable_count} unreachable")
lines.append(f" Keys checked: {len(canonical_keys)}")
lines.append("=" * 60)
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Config drift detection across fleet")
parser.add_argument("--inventory", help="Ansible inventory file path")
parser.add_argument("--nodes", help="Comma-separated node list (name:host)")
parser.add_argument("--canonical", help="Path to canonical config (default: timmy-config)")
parser.add_argument("--keys", help="Comma-separated keys to check")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--check-only", action="store_true", help="Use cached configs only")
args = parser.parse_args()
# Load canonical config
if args.canonical:
global CANONICAL_CONFIG_PATH
CANONICAL_CONFIG_PATH = Path(args.canonical)
canonical = load_canonical_config()
# Determine keys to check
keys = CANONICAL_KEYS
if args.keys:
keys = args.keys.split(',')
# Determine nodes
nodes = {}
if args.inventory:
nodes = parse_inventory(args.inventory)
elif args.nodes:
for pair in args.nodes.split(','):
if ':' in pair:
name, host = pair.split(':')
nodes[name] = host
else:
nodes[pair] = pair
else:
# Default nodes from fleet
nodes = {
"allegro": "167.99.126.228",
"ezra": "143.198.27.163",
"bezalel": "159.203.146.185",
}
if not nodes:
print("ERROR: No nodes specified", file=sys.stderr)
sys.exit(2)
# Run check
results = run_drift_check(nodes, canonical, keys)
# Output
if args.json:
print(json.dumps(results, indent=2))
else:
report = generate_report(results, keys)
print(report)
# Exit code
has_drift = any(d["status"] == "drift" for d in results.values())
sys.exit(1 if has_drift else 0)
if __name__ == "__main__":
main()