diff --git a/scripts/deploy_config_validator.py b/scripts/deploy_config_validator.py new file mode 100644 index 00000000..5ad57110 --- /dev/null +++ b/scripts/deploy_config_validator.py @@ -0,0 +1,336 @@ +#!/usr/bin/env python3 +""" +deploy_config_validator.py — Pre-deploy config validation for timmy-config sidecar. + +Validates YAML config before writing during deploy. Checks: + 1. YAML syntax (pyyaml safe_load) + 2. Required keys exist for the config type + 3. Value types match expected schema + 4. No banned providers referenced + 5. Provider chain is well-formed + +Usage: + # Validate a config file before deploy + python3 scripts/deploy_config_validator.py config.yaml + + # Validate stdin (piped from deploy script) + cat config.yaml | python3 scripts/deploy_config_validator.py - + + # Validate with expected type + python3 scripts/deploy_config_validator.py --type hermes config.yaml + + # JSON output for CI/CD + python3 scripts/deploy_config_validator.py --json config.yaml + +Exit codes: + 0 — config is valid + 1 — validation failed (errors printed to stderr) + 2 — usage error +""" + +import argparse +import json +import sys +from pathlib import Path +from typing import Any + +try: + import yaml +except ImportError: + print("ERROR: PyYAML not installed. Run: pip install pyyaml", file=sys.stderr) + sys.exit(2) + + +# ── Schema Definitions ──────────────────────────────────────────────────────── + +# Required keys per config type +REQUIRED_KEYS = { + "hermes": { + "providers": {"type": list, "description": "List of provider configurations"}, + }, + "wizard": { + "providers": {"type": list, "description": "List of provider configurations"}, + }, + "ansible_inventory": { + "all": {"type": dict, "description": "Top-level inventory structure"}, + }, + "cron": { + "jobs": {"type": list, "description": "List of cron job definitions"}, + }, + "playbook": { + "name": {"type": str, "description": "Playbook name"}, + }, + "any": {}, # No required keys for generic validation +} + +# Provider schema — each provider must have these keys +PROVIDER_REQUIRED = {"name", "model", "base_url"} +PROVIDER_ALLOWED_TYPES = { + "name": str, + "model": str, + "base_url": str, + "api_key_env": str, + "timeout": (int, float), + "reason": str, +} + +# Banned provider patterns (from ansible inventory) +BANNED_PROVIDERS = {"anthropic", "claude"} +BANNED_MODEL_PATTERNS = ["claude-*", "anthropic/*", "*sonnet*", "*opus*", "*haiku*"] + + +# ── Validators ──────────────────────────────────────────────────────────────── + +class ValidationError: + def __init__(self, path: str, message: str, severity: str = "error"): + self.path = path + self.message = message + self.severity = severity + + def __str__(self): + prefix = {"error": "ERROR", "warning": "WARN", "info": "INFO"}.get(self.severity, "???") + return f"[{prefix}] {self.path}: {self.message}" + + +def validate_yaml_syntax(text: str) -> tuple[Any | None, list[ValidationError]]: + """Validate YAML syntax. Returns (parsed_data, errors).""" + errors = [] + + # Check for tabs + for i, line in enumerate(text.splitlines(), 1): + if "\t" in line: + errors.append(ValidationError(f"line {i}", "contains tab character (use spaces for YAML)", "warning")) + + # Parse + try: + data = yaml.safe_load(text) + except yaml.YAMLError as e: + mark = getattr(e, "problem_mark", None) + if mark: + errors.append(ValidationError( + f"line {mark.line + 1}, col {mark.column + 1}", + f"YAML syntax error: {e.problem}" + )) + else: + errors.append(ValidationError("(file)", f"YAML syntax error: {e}")) + return None, errors + + if data is None: + errors.append(ValidationError("(file)", "empty or null config", "warning")) + return None, errors + + return data, errors + + +def validate_required_keys(data: dict, config_type: str) -> list[ValidationError]: + """Check that required keys exist.""" + errors = [] + schema = REQUIRED_KEYS.get(config_type, REQUIRED_KEYS["any"]) + + for key, spec in schema.items(): + if key not in data: + errors.append(ValidationError( + f".{key}", + f"required key missing: {key} ({spec['description']})" + )) + elif not isinstance(data[key], spec["type"]): + errors.append(ValidationError( + f".{key}", + f"expected {spec['type'].__name__}, got {type(data[key]).__name__}" + )) + + return errors + + +def validate_provider_chain(data: dict) -> list[ValidationError]: + """Validate provider configurations.""" + errors = [] + + providers = data.get("providers", []) + if not isinstance(providers, list): + return errors # Caught by required_keys check + + for i, provider in enumerate(providers): + path = f".providers[{i}]" + + if not isinstance(provider, dict): + errors.append(ValidationError(path, "provider must be a dict")) + continue + + # Check required provider keys + for key in PROVIDER_REQUIRED: + if key not in provider: + errors.append(ValidationError(f"{path}.{key}", f"provider missing required key: {key}")) + elif not isinstance(provider[key], str): + errors.append(ValidationError( + f"{path}.{key}", + f"expected string, got {type(provider[key]).__name__}" + )) + + # Check for banned providers + name = provider.get("name", "").lower() + model = provider.get("model", "").lower() + + for banned in BANNED_PROVIDERS: + if banned in name: + errors.append(ValidationError( + f"{path}.name", + f"banned provider: '{provider.get('name')}' (contains '{banned}')" + )) + + import fnmatch + for pattern in BANNED_MODEL_PATTERNS: + if fnmatch.fnmatch(model, pattern.lower()): + errors.append(ValidationError( + f"{path}.model", + f"banned model pattern: '{provider.get('model')}' matches '{pattern}'" + )) + + # Check value types + for key, val in provider.items(): + expected = PROVIDER_ALLOWED_TYPES.get(key) + if expected and not isinstance(val, expected): + errors.append(ValidationError( + f"{path}.{key}", + f"expected {expected if isinstance(expected, type) else expected.__name__}, got {type(val).__name__}", + "warning" + )) + + # Check provider chain has at least one entry + if not providers: + errors.append(ValidationError(".providers", "provider chain is empty — no inference available")) + + return errors + + +def validate_value_types(data: dict, path: str = "") -> list[ValidationError]: + """Recursively check for obviously wrong value types.""" + errors = [] + + if isinstance(data, dict): + for key, val in data.items(): + full_path = f"{path}.{key}" if path else f".{key}" + + # Ports should be integers + if key in ("port", "api_port", "hermes_port", "timeout") and val is not None: + if not isinstance(val, (int, float)): + errors.append(ValidationError(full_path, f"expected number, got {type(val).__name__}", "warning")) + + # URLs should be strings starting with http + if key in ("base_url", "gitea_url", "url") and val is not None: + if isinstance(val, str) and not val.startswith(("http://", "https://")): + errors.append(ValidationError(full_path, f"URL should start with http:// or https://", "warning")) + + # Recurse + errors.extend(validate_value_types(val, full_path)) + + elif isinstance(data, list): + for i, item in enumerate(data): + errors.extend(validate_value_types(item, f"{path}[{i}]")) + + return errors + + +def validate_config(text: str, config_type: str = "any") -> list[ValidationError]: + """Run all validations on a config text.""" + # Step 1: YAML syntax + data, errors = validate_yaml_syntax(text) + if data is None: + return errors # Can't continue without parsed data + + if not isinstance(data, dict): + if config_type != "any": + errors.append(ValidationError("(file)", f"expected dict for {config_type} config, got {type(data).__name__}")) + return errors + + # Step 2: Required keys + errors.extend(validate_required_keys(data, config_type)) + + # Step 3: Provider chain validation (if providers exist) + if "providers" in data: + errors.extend(validate_provider_chain(data)) + + # Step 4: Value type checking + errors.extend(validate_value_types(data)) + + return errors + + +# ── Auto-detect config type ─────────────────────────────────────────────────── + +def detect_config_type(data: dict) -> str: + """Guess config type from contents.""" + if "providers" in data and "display" in data: + return "hermes" + if "providers" in data and "wizard_name" in data: + return "wizard" + if "all" in data and "children" in data.get("all", {}): + return "ansible_inventory" + if "jobs" in data: + return "cron" + if "name" in data and "hosts" in data: + return "playbook" + return "any" + + +# ── CLI ─────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Pre-deploy config validation") + parser.add_argument("file", help="Config file to validate (use - for stdin)") + parser.add_argument("--type", choices=list(REQUIRED_KEYS.keys()), + help="Expected config type (auto-detected if omitted)") + parser.add_argument("--json", action="store_true", help="JSON output") + args = parser.parse_args() + + # Read input + if args.file == "-": + text = sys.stdin.read() + filename = "" + else: + path = Path(args.file) + if not path.exists(): + print(f"ERROR: File not found: {path}", file=sys.stderr) + sys.exit(2) + text = path.read_text(encoding="utf-8", errors="replace") + filename = str(path) + + # Detect type + config_type = args.type + if not config_type: + data, _ = validate_yaml_syntax(text) + if data and isinstance(data, dict): + config_type = detect_config_type(data) + else: + config_type = "any" + + # Validate + errors = validate_config(text, config_type) + + # Output + if args.json: + result = { + "file": filename, + "type": config_type, + "valid": not any(e.severity == "error" for e in errors), + "error_count": sum(1 for e in errors if e.severity == "error"), + "warning_count": sum(1 for e in errors if e.severity == "warning"), + "errors": [{"path": e.path, "message": e.message, "severity": e.severity} for e in errors], + } + print(json.dumps(result, indent=2)) + else: + if errors: + print(f"Config validation FAILED: {filename} (type: {config_type})", file=sys.stderr) + for e in errors: + print(f" {e}", file=sys.stderr) + else: + print(f"Config validation PASSED: {filename} (type: {config_type})") + + # Exit code + if any(e.severity == "error" for e in errors): + sys.exit(1) + sys.exit(0) + + +if __name__ == "__main__": + main()