Files
timmy-config/scripts/config_validate.py

357 lines
13 KiB
Python

#!/usr/bin/env python3
"""
config_validate.py — Pre-deploy validation for timmy-config YAML files (Issue #690).
Validates YAML syntax, required keys, value types, and forbidden keys before
writing config to disk. Prevents broken deploys from bad config.
Usage:
python3 scripts/config_validate.py config.yaml # Validate only
python3 scripts/config_validate.py config.yaml --deploy # Validate then write
python3 scripts/config_validate.py --schema # Print expected schema
python3 scripts/config_validate.py --check-dir config/ # Validate all YAML in dir
python3 scripts/config_validate.py --json config.yaml # Output as JSON
"""
import argparse
import json
import sys
import shutil
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
# ─── Schema definition ───────────────────────────────────────────────
SCHEMA = {
"model": {
"type": str,
"required": True,
"description": "Default model identifier (e.g. 'nousresearch/hermes-4-14b')",
},
"provider": {
"type": str,
"required": False,
"description": "Default provider name",
},
"providers": {
"type": dict,
"required": False,
"description": "Provider configurations keyed by name",
"value_schema": {
"base_url": {"type": str, "required": False},
"api_key_env": {"type": str, "required": False},
},
},
"fallback_providers": {
"type": list,
"required": False,
"description": "Ordered fallback provider chain",
"item_type": str,
},
"toolsets": {
"type": list,
"required": False,
"description": "Enabled toolset names",
"item_type": str,
},
"agent": {
"type": dict,
"required": False,
"description": "Agent behavior configuration",
"value_schema": {
"max_iterations": {"type": int, "required": False},
"temperature": {"type": (int, float), "required": False},
"save_trajectories": {"type": bool, "required": False},
"quiet_mode": {"type": bool, "required": False},
},
},
"display": {
"type": dict,
"required": False,
"description": "CLI display settings",
"value_schema": {
"spinner": {"type": bool, "required": False},
"colors": {"type": bool, "required": False},
"skin": {"type": str, "required": False},
"tool_progress": {"type": bool, "required": False},
},
},
"gateway": {
"type": dict,
"required": False,
"description": "Gateway/messaging settings",
"value_schema": {
"enabled": {"type": bool, "required": False},
"port": {"type": int, "required": False},
"cors_origins": {"type": list, "required": False},
},
},
"cron": {
"type": dict,
"required": False,
"description": "Cron scheduler settings",
"value_schema": {
"enabled": {"type": bool, "required": False},
"interval_seconds": {"type": int, "required": False},
"max_concurrent": {"type": int, "required": False},
},
},
"logging": {
"type": dict,
"required": False,
"description": "Logging configuration",
"value_schema": {
"level": {"type": str, "required": False},
"file": {"type": (str, type(None)), "required": False},
},
},
"session": {
"type": dict,
"required": False,
"description": "Session behavior",
"value_schema": {
"save_trajectories": {"type": bool, "required": False},
"max_iterations": {"type": int, "required": False},
"context_compression": {"type": bool, "required": False},
},
},
}
FORBIDDEN_KEYS = {
"anthropic_api_key": "Use ANTHROPIC_API_KEY env var — never store keys in config",
"openai_api_key": "Use OPENAI_API_KEY env var — never store keys in config",
"openrouter_api_key": "Use OPENROUTER_API_KEY env var — never store keys in config",
"password": "Never store passwords in config",
"secret": "Never store secrets in config",
"token": "Never store tokens in config — use env vars",
}
# ─── Validation errors ───────────────────────────────────────────────
class ValidationError:
def __init__(self, path: str, message: str, severity: str = "error"):
self.path = path
self.message = message
self.severity = severity
def to_dict(self) -> dict:
return {"path": self.path, "message": self.message, "severity": self.severity}
def __str__(self):
tag = "ERROR" if self.severity == "error" else "WARN"
return f"[{tag}] {self.path}: {self.message}"
def __repr__(self):
return f"ValidationError({self.path!r}, {self.message!r}, {self.severity!r})"
# ─── Core validators ─────────────────────────────────────────────────
def validate_yaml_syntax(content: str) -> Tuple[Optional[dict], List[ValidationError]]:
"""Parse YAML, return (data, errors)."""
errors: List[ValidationError] = []
if not HAS_YAML:
return None, [ValidationError("root", "PyYAML not installed — pip install pyyaml")]
try:
data = yaml.safe_load(content)
except yaml.YAMLError as e:
return None, [ValidationError("syntax", str(e))]
if data is None:
return {}, [ValidationError("root", "Config file is empty", "warning")]
if not isinstance(data, dict):
return None, [ValidationError("root", f"Expected mapping, got {type(data).__name__}")]
return data, errors
def validate_required_keys(data: dict) -> List[ValidationError]:
"""Check required keys exist and have correct types."""
errors: List[ValidationError] = []
for key, spec in SCHEMA.items():
if spec.get("required"):
if key not in data:
errors.append(ValidationError(key, f"Required key '{key}' is missing"))
elif not isinstance(data[key], spec["type"]):
errors.append(ValidationError(
key,
f"Expected {spec['type'].__name__}, got {type(data[key]).__name__}"
))
return errors
def validate_value_types(data: dict, schema: dict = None, prefix: str = "") -> List[ValidationError]:
"""Check all known keys have correct types, recursively."""
if schema is None:
schema = SCHEMA
errors: List[ValidationError] = []
for key, spec in schema.items():
full_key = f"{prefix}.{key}" if prefix else key
if key not in data:
continue
value = data[key]
expected = spec["type"]
if not isinstance(value, expected):
errors.append(ValidationError(
full_key,
f"Expected {expected.__name__}, got {type(value).__name__}"
))
continue
# Check list item types
if isinstance(value, list) and "item_type" in spec:
for i, item in enumerate(value):
if not isinstance(item, spec["item_type"]):
errors.append(ValidationError(
f"{full_key}[{i}]",
f"Expected {spec['item_type'].__name__}, got {type(item).__name__}"
))
# Recurse into nested dicts
if isinstance(value, dict) and "value_schema" in spec:
errors.extend(validate_value_types(value, spec["value_schema"], full_key))
return errors
def validate_no_forbidden_keys(data: dict, prefix: str = "") -> List[ValidationError]:
"""Check for keys that should never be in config."""
errors: List[ValidationError] = []
for key, value in data.items():
full_key = f"{prefix}.{key}" if prefix else key
if key.lower() in FORBIDDEN_KEYS:
errors.append(ValidationError(full_key, FORBIDDEN_KEYS[key.lower()]))
if isinstance(value, dict):
errors.extend(validate_no_forbidden_keys(value, full_key))
return errors
def validate_unknown_keys(data: dict, schema: dict = None, prefix: str = "") -> List[ValidationError]:
"""Warn about keys not in schema (not an error, just a warning)."""
if schema is None:
schema = SCHEMA
warnings: List[ValidationError] = []
known = set(schema.keys())
for key in data:
full_key = f"{prefix}.{key}" if prefix else key
if key not in known:
warnings.append(ValidationError(full_key, "Unknown key — not in schema", "warning"))
elif isinstance(data[key], dict) and key in schema and "value_schema" in schema[key]:
warnings.extend(validate_unknown_keys(data[key], schema[key]["value_schema"], full_key))
return warnings
# ─── Public API ───────────────────────────────────────────────────────
def validate_config(content: str) -> Tuple[bool, List[ValidationError]]:
"""Full validation pipeline. Returns (is_valid, errors)."""
data, errors = validate_yaml_syntax(content)
if data is None:
return False, errors
errors.extend(validate_required_keys(data))
errors.extend(validate_value_types(data))
errors.extend(validate_no_forbidden_keys(data))
errors.extend(validate_unknown_keys(data))
has_errors = any(e.severity == "error" for e in errors)
return not has_errors, errors
def validate_file(path: str) -> Tuple[bool, List[ValidationError]]:
"""Validate a YAML file on disk."""
p = Path(path)
if not p.exists():
return False, [ValidationError(str(p), "File not found")]
content = p.read_text(encoding="utf-8")
return validate_config(content)
# ─── Schema dump ──────────────────────────────────────────────────────
def dump_schema(schema: dict = None, prefix: str = "", indent: int = 0) -> List[str]:
"""Pretty-print schema as text."""
if schema is None:
schema = SCHEMA
lines: List[str] = []
for key, spec in schema.items():
full_key = f"{prefix}.{key}" if prefix else key
req = " (required)" if spec.get("required") else ""
desc = spec.get("description", "")
type_name = spec["type"].__name__ if hasattr(spec["type"], "__name__") else str(spec["type"])
lines.append(f"{' ' * indent}{full_key}: {type_name}{req}{desc}")
if "value_schema" in spec:
lines.extend(dump_schema(spec["value_schema"], full_key, indent + 1))
return lines
# ─── CLI ──────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Validate timmy-config YAML before deploy")
parser.add_argument("file", nargs="?", help="YAML file to validate")
parser.add_argument("--deploy", metavar="DEST", help="Validate then copy to DEST")
parser.add_argument("--schema", action="store_true", help="Print expected schema")
parser.add_argument("--check-dir", metavar="DIR", help="Validate all .yaml/.yml in directory")
parser.add_argument("--json", action="store_true", dest="as_json", help="Output as JSON")
args = parser.parse_args()
if args.schema:
lines = dump_schema()
print("timmy-config schema:\n")
for line in lines:
print(f" {line}")
return
if args.check_dir:
d = Path(args.check_dir)
if not d.is_dir():
print(f"ERROR: {d} is not a directory", file=sys.stderr)
sys.exit(1)
all_valid = True
results = []
for yf in sorted(d.glob("*.y*ml")):
valid, errors = validate_file(str(yf))
results.append({"file": str(yf), "valid": valid, "errors": [e.to_dict() for e in errors]})
if not valid:
all_valid = False
for e in errors:
if e.severity == "error":
print(f" {yf.name}: {e}")
if args.as_json:
print(json.dumps(results, indent=2))
elif all_valid:
print(f"OK: All YAML in {d} valid")
sys.exit(0 if all_valid else 1)
if not args.file:
parser.error("FILE required (or use --schema / --check-dir)")
valid, errors = validate_file(args.file)
if args.as_json:
out = {"file": args.file, "valid": valid, "errors": [e.to_dict() for e in errors]}
print(json.dumps(out, indent=2))
else:
for e in errors:
print(f" {e}")
if valid:
print(f"OK: {args.file} is valid")
else:
print(f"FAIL: {args.file} has {sum(1 for e in errors if e.severity == 'error')} errors")
if not valid:
sys.exit(1)
# Deploy mode: copy validated file to destination
if args.deploy:
dest = Path(args.deploy)
shutil.copy2(args.file, dest)
print(f"DEPLOYED: {args.file} -> {dest}")
if __name__ == "__main__":
main()