diff --git a/scripts/config_drift.py b/scripts/config_drift.py new file mode 100644 index 00000000..0104c731 --- /dev/null +++ b/scripts/config_drift.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 +""" +config_drift.py — Detect configuration drift across fleet nodes. + +Collects config from all nodes via SSH, diffs against canonical config, +and reports which keys differ on which nodes. + +Usage: + python3 config_drift.py --nodes allegro,ezra,bezalel + python3 config_drift.py --inventory ansible/playbooks/inventory + python3 config_drift.py --check-only # don't fetch, compare existing + python3 config_drift.py --sync # auto-sync with approval + +Exit codes: + 0 = no drift detected + 1 = drift detected + 2 = error +""" + +import argparse +import json +import os +import subprocess +import sys +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +# Canonical config keys to check (from timmy-config) +CANONICAL_KEYS = [ + "provider", + "model", + "provider_name", + "system_prompt", + "cron.enabled", + "cron.workers", + "cron.tick_seconds", + "session.reset_after", + "session.max_turns", +] + +CANONICAL_CONFIG_PATH = Path(__file__).parent.parent / "config" / "config.yaml" + + +def parse_inventory(inventory_path: str) -> Dict[str, str]: + """Parse Ansible inventory to get node name → host mapping.""" + nodes = {} + current_section = None + + with open(inventory_path) as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + if line.startswith('[') and line.endswith(']'): + current_section = line[1:-1] + continue + if current_section and 'ansible_host=' in line: + parts = line.split() + name = parts[0] + host = None + for p in parts: + if p.startswith('ansible_host='): + host = p.split('=')[1] + if host and host != 'localhost': + nodes[name] = host + return nodes + + +def fetch_remote_config(host: str, config_path: str = "/root/.hermes/config.yaml") -> Optional[Dict]: + """Fetch config from remote node via SSH.""" + try: + result = subprocess.run( + ["ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", + f"root@{host}", f"cat {config_path} 2>/dev/null || echo '{{}}'"], + capture_output=True, text=True, timeout=30 + ) + if result.returncode == 0: + try: + import yaml + return yaml.safe_load(result.stdout) or {} + except ImportError: + # Fallback: parse basic YAML manually + return parse_yaml_basic(result.stdout) + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + return None + + +def parse_yaml_basic(content: str) -> Dict: + """Basic YAML parser for simple key-value configs.""" + result = {} + for line in content.split('\n'): + line = line.strip() + if not line or line.startswith('#'): + continue + if ':' in line: + key, _, value = line.partition(':') + key = key.strip() + value = value.strip().strip('"').strip("'") + if value.lower() == 'true': + value = True + elif value.lower() == 'false': + value = False + elif value.isdigit(): + value = int(value) + result[key] = value + return result + + +def get_nested_value(config: Dict, key_path: str): + """Get value from nested dict using dot notation.""" + keys = key_path.split('.') + value = config + for k in keys: + if isinstance(value, dict): + value = value.get(k) + else: + return None + return value + + +def compare_configs(canonical: Dict, remote: Dict, keys: List[str]) -> List[Tuple[str, str, any, any]]: + """ + Compare canonical config against remote config. + + Returns list of (key, node, canonical_value, remote_value) for differences. + """ + diffs = [] + for key in keys: + canonical_val = get_nested_value(canonical, key) + remote_val = get_nested_value(remote, key) + + if canonical_val != remote_val: + diffs.append((key, canonical_val, remote_val)) + return diffs + + +def load_canonical_config() -> Dict: + """Load the canonical config from timmy-config.""" + if CANONICAL_CONFIG_PATH.exists(): + try: + import yaml + with open(CANONICAL_CONFIG_PATH) as f: + return yaml.safe_load(f) or {} + except ImportError: + with open(CANONICAL_CONFIG_PATH) as f: + return parse_yaml_basic(f.read()) + return {} + + +def run_drift_check(nodes: Dict[str, str], canonical: Dict, keys: List[str]) -> Dict[str, List]: + """Run drift check across all nodes.""" + results = {} + for name, host in nodes.items(): + remote_config = fetch_remote_config(host) + if remote_config is None: + results[name] = {"status": "unreachable", "diffs": []} + continue + + diffs = compare_configs(canonical, remote_config, keys) + results[name] = { + "status": "drift" if diffs else "ok", + "host": host, + "diffs": [(k, str(cv), str(rv)) for k, cv, rv in diffs], + } + return results + + +def generate_report(results: Dict, canonical_keys: List[str]) -> str: + """Generate human-readable drift report.""" + lines = [] + lines.append("=" * 60) + lines.append(" CONFIG DRIFT REPORT") + lines.append(f" {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") + lines.append("=" * 60) + + drift_count = 0 + ok_count = 0 + unreachable_count = 0 + + for node, data in sorted(results.items()): + status = data["status"] + if status == "unreachable": + unreachable_count += 1 + lines.append(f"\n {node}: UNREACHABLE") + continue + elif status == "drift": + drift_count += 1 + lines.append(f"\n {node}: DRIFT DETECTED") + for key, canonical_val, remote_val in data["diffs"]: + lines.append(f" {key}:") + lines.append(f" canonical: {canonical_val}") + lines.append(f" remote: {remote_val}") + else: + ok_count += 1 + lines.append(f"\n {node}: OK") + + lines.append(f"\n{'=' * 60}") + lines.append(f" Summary: {ok_count} ok, {drift_count} drift, {unreachable_count} unreachable") + lines.append(f" Keys checked: {len(canonical_keys)}") + lines.append("=" * 60) + + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Config drift detection across fleet") + parser.add_argument("--inventory", help="Ansible inventory file path") + parser.add_argument("--nodes", help="Comma-separated node list (name:host)") + parser.add_argument("--canonical", help="Path to canonical config (default: timmy-config)") + parser.add_argument("--keys", help="Comma-separated keys to check") + parser.add_argument("--json", action="store_true", help="JSON output") + parser.add_argument("--check-only", action="store_true", help="Use cached configs only") + args = parser.parse_args() + + # Load canonical config + if args.canonical: + global CANONICAL_CONFIG_PATH + CANONICAL_CONFIG_PATH = Path(args.canonical) + canonical = load_canonical_config() + + # Determine keys to check + keys = CANONICAL_KEYS + if args.keys: + keys = args.keys.split(',') + + # Determine nodes + nodes = {} + if args.inventory: + nodes = parse_inventory(args.inventory) + elif args.nodes: + for pair in args.nodes.split(','): + if ':' in pair: + name, host = pair.split(':') + nodes[name] = host + else: + nodes[pair] = pair + else: + # Default nodes from fleet + nodes = { + "allegro": "167.99.126.228", + "ezra": "143.198.27.163", + "bezalel": "159.203.146.185", + } + + if not nodes: + print("ERROR: No nodes specified", file=sys.stderr) + sys.exit(2) + + # Run check + results = run_drift_check(nodes, canonical, keys) + + # Output + if args.json: + print(json.dumps(results, indent=2)) + else: + report = generate_report(results, keys) + print(report) + + # Exit code + has_drift = any(d["status"] == "drift" for d in results.values()) + sys.exit(1 if has_drift else 0) + + +if __name__ == "__main__": + main()