|
|
|
|
@@ -1,150 +0,0 @@
|
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""
|
|
|
|
|
sidecar_validator.py - Pre-deploy validation for timmy-config sidecar configs.
|
|
|
|
|
Validates YAML/JSON configs against expected schemas before deploy.
|
|
|
|
|
Usage:
|
|
|
|
|
python3 scripts/sidecar_validator.py [config_path ...]
|
|
|
|
|
python3 scripts/sidecar_validator.py --all
|
|
|
|
|
python3 scripts/sidecar_validator.py --pre-deploy
|
|
|
|
|
"""
|
|
|
|
|
import json, os, sys
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
try:
|
|
|
|
|
import yaml
|
|
|
|
|
except ImportError:
|
|
|
|
|
print("ERROR: PyYAML not installed.", file=sys.stderr); sys.exit(2)
|
|
|
|
|
|
|
|
|
|
SCHEMAS = {
|
|
|
|
|
"wizard_config": {
|
|
|
|
|
"description": "Wizard agent configuration",
|
|
|
|
|
"required": {"model": dict},
|
|
|
|
|
"optional": {"toolsets": list, "fallback_providers": list, "agent": dict, "providers": dict, "terminal": dict, "browser": dict, "compression": dict, "auxiliary": dict},
|
|
|
|
|
"nested_required": {"model": {"default": str, "provider": str}},
|
|
|
|
|
"nested_optional": {"model": {"fallback": str}, "agent": {"max_turns": (int, float), "reasoning_effort": str, "verbose": bool}},
|
|
|
|
|
},
|
|
|
|
|
"sidecar_config": {
|
|
|
|
|
"description": "Sidecar agent configuration",
|
|
|
|
|
"required": {"name": str, "role": str},
|
|
|
|
|
"optional": {"capabilities": list, "instructions": str, "model": str, "provider": str, "toolsets": list},
|
|
|
|
|
},
|
|
|
|
|
"main_config": {
|
|
|
|
|
"description": "Main hermes configuration",
|
|
|
|
|
"required": {"model": dict},
|
|
|
|
|
"optional": {"toolsets": list, "agent": dict, "terminal": dict, "browser": dict, "checkpoints": dict, "compression": dict, "auxiliary": dict, "fallback_providers": list, "providers": dict},
|
|
|
|
|
"nested_required": {"model": {"default": str, "provider": str}},
|
|
|
|
|
},
|
|
|
|
|
"cron_pipeline": {
|
|
|
|
|
"description": "Cron/pipeline schedule",
|
|
|
|
|
"required": {"name": str},
|
|
|
|
|
"optional": {"schedule": str, "cron": str, "tasks": list, "steps": list, "prompt": str, "model": dict},
|
|
|
|
|
},
|
|
|
|
|
"playbook": {
|
|
|
|
|
"description": "Agent playbook",
|
|
|
|
|
"required": {"name": str},
|
|
|
|
|
"optional": {"description": str, "model": str, "steps": list, "prompt": str},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def classify_config(filepath):
|
|
|
|
|
parts, name = filepath.parts, filepath.name
|
|
|
|
|
if "wizards" in parts and "-sidecar." in name: return "sidecar_config"
|
|
|
|
|
if "wizards" in parts and name in ("config.yaml", "config.yml"): return "wizard_config"
|
|
|
|
|
if name == "config.yaml" and len(parts) <= 2: return "main_config"
|
|
|
|
|
if "cron" in parts and name.endswith((".yml", ".yaml")): return "cron_pipeline"
|
|
|
|
|
if "playbooks" in parts and name.endswith((".yaml", ".yml")): return "playbook"
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def type_name(t):
|
|
|
|
|
if isinstance(t, tuple): return " or ".join(tt.__name__ for tt in t)
|
|
|
|
|
return t.__name__
|
|
|
|
|
|
|
|
|
|
def validate_config(data, schema_name, schema):
|
|
|
|
|
errors = []
|
|
|
|
|
for key, expected_type in schema["required"].items():
|
|
|
|
|
if key not in data:
|
|
|
|
|
errors.append(f"missing required key: '{key}' (expected {type_name(expected_type)})")
|
|
|
|
|
elif not isinstance(data[key], expected_type):
|
|
|
|
|
errors.append(f"'{key}' wrong type: got {type(data[key]).__name__}, expected {type_name(expected_type)}")
|
|
|
|
|
for pk, cs in schema.get("nested_required", {}).items():
|
|
|
|
|
if pk in data and isinstance(data[pk], dict):
|
|
|
|
|
for ck, et in cs.items():
|
|
|
|
|
if ck not in data[pk]:
|
|
|
|
|
errors.append(f"'{pk}' missing key: '{ck}'")
|
|
|
|
|
elif not isinstance(data[pk][ck], et):
|
|
|
|
|
errors.append(f"'{pk}.{ck}' wrong type: {type(data[pk][ck]).__name__}")
|
|
|
|
|
for pk, cs in schema.get("nested_optional", {}).items():
|
|
|
|
|
if pk in data and isinstance(data[pk], dict):
|
|
|
|
|
for ck, et in cs.items():
|
|
|
|
|
if ck in data[pk] and not isinstance(data[pk][ck], et):
|
|
|
|
|
errors.append(f"'{pk}.{ck}' wrong type: {type(data[pk][ck]).__name__}")
|
|
|
|
|
if schema_name == "wizard_config" and "fallback_providers" in data and isinstance(data["fallback_providers"], list):
|
|
|
|
|
for i, fb in enumerate(data["fallback_providers"]):
|
|
|
|
|
if not isinstance(fb, dict):
|
|
|
|
|
errors.append(f"fallback_providers[{i}]: expected dict")
|
|
|
|
|
elif "provider" not in fb: errors.append(f"fallback_providers[{i}]: missing 'provider'")
|
|
|
|
|
elif "model" not in fb: errors.append(f"fallback_providers[{i}]: missing 'model'")
|
|
|
|
|
if schema_name == "sidecar_config" and "capabilities" in data:
|
|
|
|
|
if not isinstance(data["capabilities"], list):
|
|
|
|
|
errors.append(f"'capabilities' must be a list")
|
|
|
|
|
if schema_name == "cron_pipeline":
|
|
|
|
|
s = data.get("schedule") or data.get("cron", "")
|
|
|
|
|
if isinstance(s, str) and s.strip() and len(s.strip().split()) != 5:
|
|
|
|
|
errors.append(f"schedule has {len(s.strip().split())} fields, expected 5")
|
|
|
|
|
return errors
|
|
|
|
|
|
|
|
|
|
def validate_file(filepath):
|
|
|
|
|
schema_name = classify_config(filepath)
|
|
|
|
|
if schema_name is None: return True, []
|
|
|
|
|
schema = SCHEMAS[schema_name]
|
|
|
|
|
try: text = filepath.read_text(encoding="utf-8", errors="replace")
|
|
|
|
|
except Exception as e: return False, [f"cannot read: {e}"]
|
|
|
|
|
try:
|
|
|
|
|
data = json.loads(text) if filepath.suffix == ".json" else yaml.safe_load(text)
|
|
|
|
|
except Exception as e: return False, [f"parse error: {e}"]
|
|
|
|
|
if not isinstance(data, dict): return False, [f"expected mapping, got {type(data).__name__}"]
|
|
|
|
|
errors = validate_config(data, schema_name, schema)
|
|
|
|
|
return len(errors) == 0, errors
|
|
|
|
|
|
|
|
|
|
def find_deploy_targets(root):
|
|
|
|
|
targets = []
|
|
|
|
|
for p in ["config.yaml", "wizards/*/config.yaml", "wizards/*-sidecar.json"]:
|
|
|
|
|
targets.extend(root.glob(p))
|
|
|
|
|
return sorted(targets)
|
|
|
|
|
|
|
|
|
|
def find_all_configs(root):
|
|
|
|
|
skip = {".git", "node_modules", "venv", "__pycache__"}
|
|
|
|
|
results = []
|
|
|
|
|
for dp, dns, fns in os.walk(root):
|
|
|
|
|
dns[:] = [d for d in dns if d not in skip]
|
|
|
|
|
for fn in fns:
|
|
|
|
|
if fn.endswith((".yaml", ".yml", ".json")):
|
|
|
|
|
fp = Path(dp) / fn
|
|
|
|
|
if classify_config(fp) is not None: results.append(fp)
|
|
|
|
|
return sorted(results)
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
root = Path(__file__).resolve().parent.parent
|
|
|
|
|
args = sys.argv[1:]
|
|
|
|
|
if not args or args == ["--help"]: print(__doc__); sys.exit(2)
|
|
|
|
|
if "--all" in args: files = find_all_configs(root); mode = "all configs"
|
|
|
|
|
elif "--pre-deploy" in args: files = find_deploy_targets(root); mode = "deploy targets"
|
|
|
|
|
else: files = [Path(a) for a in args if not a.startswith("-")]; mode = "specified files"
|
|
|
|
|
if not files: print(f"No files found: {mode}"); sys.exit(0)
|
|
|
|
|
print(f"Sidecar Validator - {mode}"); print("=" * 60)
|
|
|
|
|
total = 0; failed = []
|
|
|
|
|
for fp in files:
|
|
|
|
|
rel = fp.relative_to(root) if fp.is_absolute() else fp
|
|
|
|
|
sn = classify_config(fp) or "unknown"
|
|
|
|
|
ok, errs = validate_file(fp)
|
|
|
|
|
if ok: print(f"PASS {rel} [{sn}]")
|
|
|
|
|
else: failed.append((rel, sn, errs)); total += len(errs); print(f"FAIL {rel} [{sn}]")
|
|
|
|
|
print(); print("=" * 60)
|
|
|
|
|
print(f"Results: {len(files)-len(failed)}/{len(files)} valid")
|
|
|
|
|
if failed:
|
|
|
|
|
print(f"\n{total} error(s) in {len(failed)} file(s):\n")
|
|
|
|
|
for rel, sn, errs in failed:
|
|
|
|
|
print(f" {rel} ({sn}):")
|
|
|
|
|
for e in errs: print(f" - {e}")
|
|
|
|
|
print(); sys.exit(1)
|
|
|
|
|
else: print("\nAll configs valid!"); sys.exit(0)
|
|
|
|
|
if __name__ == "__main__": main()
|