scripts/config_validator.py — standalone validator for all YAML/JSON config files in the repo. Checks: - YAML syntax (pyyaml safe_load) - JSON syntax (json.loads) - Duplicate keys in YAML/JSON - Trailing whitespace - Tabs in YAML (should use spaces) - Cron expression validity (if present) Reports PASS/FAIL per file with line numbers. Exit 0 if all valid, 1 if any invalid.
307 lines
10 KiB
Python
307 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
config_validator.py — Validate all YAML/JSON config files in timmy-config.
|
|
|
|
Checks:
|
|
1. YAML syntax (pyyaml safe_load)
|
|
2. JSON syntax (json.loads)
|
|
3. Duplicate keys in YAML/JSON
|
|
4. Trailing whitespace in YAML
|
|
5. Tabs in YAML (should use spaces)
|
|
6. Cron expression validity (if present)
|
|
|
|
Exit 0 if all valid, 1 if any invalid.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
print("ERROR: PyYAML not installed. Run: pip install pyyaml")
|
|
sys.exit(1)
|
|
|
|
|
|
# ── Cron validation ──────────────────────────────────────────────────────────
|
|
|
|
DOW_NAMES = {"sun", "mon", "tue", "wed", "thu", "fri", "sat"}
|
|
MONTH_NAMES = {"jan", "feb", "mar", "apr", "may", "jun",
|
|
"jul", "aug", "sep", "oct", "nov", "dec"}
|
|
|
|
|
|
def _expand_cron_field(field: str, lo: int, hi: int, names: dict | None = None) -> set[int]:
|
|
"""Expand a single cron field into a set of valid integers."""
|
|
result: set[int] = set()
|
|
for part in field.split(","):
|
|
# Handle step: */N or 1-5/N
|
|
step = 1
|
|
if "/" in part:
|
|
part, step_str = part.split("/", 1)
|
|
if not step_str.isdigit() or int(step_str) < 1:
|
|
raise ValueError(f"invalid step value: {step_str}")
|
|
step = int(step_str)
|
|
|
|
if part == "*":
|
|
rng = range(lo, hi + 1, step)
|
|
elif "-" in part:
|
|
a, b = part.split("-", 1)
|
|
a = _resolve_name(a, names, lo, hi)
|
|
b = _resolve_name(b, names, lo, hi)
|
|
if a > b:
|
|
raise ValueError(f"range {a}-{b} is reversed")
|
|
rng = range(a, b + 1, step)
|
|
else:
|
|
val = _resolve_name(part, names, lo, hi)
|
|
rng = range(val, val + 1)
|
|
|
|
for v in rng:
|
|
if v < lo or v > hi:
|
|
raise ValueError(f"value {v} out of range [{lo}-{hi}]")
|
|
result.add(v)
|
|
return result
|
|
|
|
|
|
def _resolve_name(token: str, names: dict | None, lo: int, hi: int) -> int:
|
|
if names and token.lower() in names:
|
|
return names[token.lower()]
|
|
if not token.isdigit():
|
|
raise ValueError(f"unrecognized token: {token}")
|
|
val = int(token)
|
|
if val < lo or val > hi:
|
|
raise ValueError(f"value {val} out of range [{lo}-{hi}]")
|
|
return val
|
|
|
|
|
|
def validate_cron(expr: str) -> list[str]:
|
|
"""Validate a 5-field cron expression. Returns list of errors (empty = ok)."""
|
|
errors: list[str] = []
|
|
fields = expr.strip().split()
|
|
if len(fields) != 5:
|
|
return [f"expected 5 fields, got {len(fields)}"]
|
|
|
|
specs = [
|
|
(fields[0], 0, 59, None, "minute"),
|
|
(fields[1], 0, 23, None, "hour"),
|
|
(fields[2], 1, 31, None, "day-of-month"),
|
|
(fields[3], 1, 12, MONTH_NAMES, "month"),
|
|
(fields[4], 0, 7, DOW_NAMES, "day-of-week"),
|
|
]
|
|
for field, lo, hi, names, label in specs:
|
|
try:
|
|
_expand_cron_field(field, lo, hi, names)
|
|
except ValueError as e:
|
|
errors.append(f"{label}: {e}")
|
|
return errors
|
|
|
|
|
|
# ── Duplicate key detection ──────────────────────────────────────────────────
|
|
|
|
class DuplicateKeyError(Exception):
|
|
pass
|
|
|
|
|
|
class _StrictYAMLLoader(yaml.SafeLoader):
|
|
"""YAML loader that rejects duplicate keys."""
|
|
pass
|
|
|
|
|
|
def _no_duplicates_constructor(loader, node, deep=False):
|
|
mapping = {}
|
|
for key_node, value_node in node.value:
|
|
key = loader.construct_object(key_node, deep=deep)
|
|
if key in mapping:
|
|
raise DuplicateKeyError(
|
|
f"duplicate key '{key}' (line {key_node.start_mark.line + 1})"
|
|
)
|
|
mapping[key] = loader.construct_object(value_node, deep=deep)
|
|
return mapping
|
|
|
|
|
|
_StrictYAMLLoader.add_constructor(
|
|
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
|
|
_no_duplicates_constructor,
|
|
)
|
|
|
|
|
|
def _json_has_duplicates(text: str) -> list[str]:
|
|
"""Check for duplicate keys in JSON by scanning for repeated quoted keys at same depth."""
|
|
errors: list[str] = []
|
|
# Use a custom approach: parse with object_pairs_hook
|
|
seen_stack: list[set[str]] = []
|
|
|
|
def _check_pairs(pairs):
|
|
level_keys: set[str] = set()
|
|
for k, _ in pairs:
|
|
if k in level_keys:
|
|
errors.append(f"duplicate JSON key: '{k}'")
|
|
level_keys.add(k)
|
|
return dict(pairs)
|
|
|
|
try:
|
|
json.loads(text, object_pairs_hook=_check_pairs)
|
|
except json.JSONDecodeError:
|
|
pass # syntax errors caught elsewhere
|
|
return errors
|
|
|
|
|
|
# ── Main validator ───────────────────────────────────────────────────────────
|
|
|
|
def find_config_files(root: Path) -> list[Path]:
|
|
"""Recursively find .yaml, .yml, .json files (skip .git, node_modules, venv)."""
|
|
skip_dirs = {".git", "node_modules", "venv", "__pycache__", ".venv"}
|
|
results: list[Path] = []
|
|
for dirpath, dirnames, filenames in os.walk(root):
|
|
dirnames[:] = [d for d in dirnames if d not in skip_dirs]
|
|
for fname in filenames:
|
|
if fname.endswith((".yaml", ".yml", ".json")):
|
|
results.append(Path(dirpath) / fname)
|
|
return sorted(results)
|
|
|
|
|
|
def validate_yaml_file(filepath: Path, text: str) -> list[str]:
|
|
"""Validate a YAML file. Returns list of errors."""
|
|
errors: list[str] = []
|
|
|
|
# Check for tabs
|
|
for i, line in enumerate(text.splitlines(), 1):
|
|
if "\t" in line:
|
|
errors.append(f" line {i}: contains tab character (use spaces for YAML)")
|
|
if line != line.rstrip():
|
|
errors.append(f" line {i}: trailing whitespace")
|
|
|
|
# Check syntax + duplicate keys
|
|
try:
|
|
yaml.load(text, Loader=_StrictYAMLLoader)
|
|
except DuplicateKeyError as e:
|
|
errors.append(f" {e}")
|
|
except yaml.YAMLError as e:
|
|
mark = getattr(e, "problem_mark", None)
|
|
if mark:
|
|
errors.append(f" YAML syntax error at line {mark.line + 1}, col {mark.column + 1}: {e.problem}")
|
|
else:
|
|
errors.append(f" YAML syntax error: {e}")
|
|
|
|
# Check cron expressions in schedule fields
|
|
for i, line in enumerate(text.splitlines(), 1):
|
|
cron_match = re.search(r'(?:cron|schedule)\s*:\s*["\']?([*0-9/,a-zA-Z-]+(?:\s+[*0-9/,a-zA-Z-]+){4})["\']?', line)
|
|
if cron_match:
|
|
cron_errs = validate_cron(cron_match.group(1))
|
|
for ce in cron_errs:
|
|
errors.append(f" line {i}: invalid cron '{cron_match.group(1)}': {ce}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_json_file(filepath: Path, text: str) -> list[str]:
|
|
"""Validate a JSON file. Returns list of errors."""
|
|
errors: list[str] = []
|
|
|
|
# Check syntax
|
|
try:
|
|
json.loads(text)
|
|
except json.JSONDecodeError as e:
|
|
errors.append(f" JSON syntax error at line {e.lineno}, col {e.colno}: {e.msg}")
|
|
|
|
# Check duplicate keys
|
|
dup_errors = _json_has_duplicates(text)
|
|
errors.extend(dup_errors)
|
|
|
|
# Check for trailing whitespace (informational)
|
|
for i, line in enumerate(text.splitlines(), 1):
|
|
if line != line.rstrip():
|
|
errors.append(f" line {i}: trailing whitespace")
|
|
|
|
# Check cron expressions
|
|
cron_pattern = re.compile(r'"(?:cron|schedule)"?\s*:\s*"([^"]{5,})"')
|
|
for match in cron_pattern.finditer(text):
|
|
candidate = match.group(1).strip()
|
|
fields = candidate.split()
|
|
if len(fields) == 5 and all(re.match(r'^[*0-9/,a-zA-Z-]+$', f) for f in fields):
|
|
cron_errs = validate_cron(candidate)
|
|
for ce in cron_errs:
|
|
errors.append(f" invalid cron '{candidate}': {ce}")
|
|
|
|
# Also check nested schedule objects with cron fields
|
|
try:
|
|
obj = json.loads(text)
|
|
_scan_obj_for_cron(obj, errors)
|
|
except Exception:
|
|
pass
|
|
|
|
return errors
|
|
|
|
|
|
def _scan_obj_for_cron(obj, errors: list[str], path: str = ""):
|
|
"""Recursively scan dict/list for cron expressions."""
|
|
if isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
if k in ("cron", "schedule", "cron_expression") and isinstance(v, str):
|
|
fields = v.strip().split()
|
|
if len(fields) == 5:
|
|
cron_errs = validate_cron(v)
|
|
for ce in cron_errs:
|
|
errors.append(f" {path}.{k}: invalid cron '{v}': {ce}")
|
|
_scan_obj_for_cron(v, errors, f"{path}.{k}")
|
|
elif isinstance(obj, list):
|
|
for i, item in enumerate(obj):
|
|
_scan_obj_for_cron(item, errors, f"{path}[{i}]")
|
|
|
|
|
|
def main():
|
|
# Determine repo root (script lives in scripts/)
|
|
script_path = Path(__file__).resolve()
|
|
repo_root = script_path.parent.parent
|
|
|
|
print(f"Config Validator — scanning {repo_root}")
|
|
print("=" * 60)
|
|
|
|
files = find_config_files(repo_root)
|
|
print(f"Found {len(files)} config files to validate.\n")
|
|
|
|
total_errors = 0
|
|
failed_files: list[tuple[Path, list[str]]] = []
|
|
|
|
for filepath in files:
|
|
rel = filepath.relative_to(repo_root)
|
|
try:
|
|
text = filepath.read_text(encoding="utf-8", errors="replace")
|
|
except Exception as e:
|
|
failed_files.append((rel, [f" cannot read file: {e}"]))
|
|
total_errors += 1
|
|
continue
|
|
|
|
if filepath.suffix == ".json":
|
|
errors = validate_json_file(filepath, text)
|
|
else:
|
|
errors = validate_yaml_file(filepath, text)
|
|
|
|
if errors:
|
|
failed_files.append((rel, errors))
|
|
total_errors += len(errors)
|
|
print(f"FAIL {rel}")
|
|
else:
|
|
print(f"PASS {rel}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"Results: {len(files) - len(failed_files)}/{len(files)} files passed")
|
|
|
|
if failed_files:
|
|
print(f"\n{total_errors} error(s) in {len(failed_files)} file(s):\n")
|
|
for relpath, errs in failed_files:
|
|
print(f" {relpath}:")
|
|
for e in errs:
|
|
print(f" {e}")
|
|
print()
|
|
sys.exit(1)
|
|
else:
|
|
print("\nAll config files valid!")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|