feat: add config validator script
scripts/config_validator.py — standalone validator for all YAML/JSON config files in the repo. Checks: - YAML syntax (pyyaml safe_load) - JSON syntax (json.loads) - Duplicate keys in YAML/JSON - Trailing whitespace - Tabs in YAML (should use spaces) - Cron expression validity (if present) Reports PASS/FAIL per file with line numbers. Exit 0 if all valid, 1 if any invalid.
This commit is contained in:
306
scripts/config_validator.py
Normal file
306
scripts/config_validator.py
Normal file
@@ -0,0 +1,306 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
config_validator.py — Validate all YAML/JSON config files in timmy-config.
|
||||
|
||||
Checks:
|
||||
1. YAML syntax (pyyaml safe_load)
|
||||
2. JSON syntax (json.loads)
|
||||
3. Duplicate keys in YAML/JSON
|
||||
4. Trailing whitespace in YAML
|
||||
5. Tabs in YAML (should use spaces)
|
||||
6. Cron expression validity (if present)
|
||||
|
||||
Exit 0 if all valid, 1 if any invalid.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
print("ERROR: PyYAML not installed. Run: pip install pyyaml")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# ── Cron validation ──────────────────────────────────────────────────────────
|
||||
|
||||
DOW_NAMES = {"sun", "mon", "tue", "wed", "thu", "fri", "sat"}
|
||||
MONTH_NAMES = {"jan", "feb", "mar", "apr", "may", "jun",
|
||||
"jul", "aug", "sep", "oct", "nov", "dec"}
|
||||
|
||||
|
||||
def _expand_cron_field(field: str, lo: int, hi: int, names: dict | None = None) -> set[int]:
|
||||
"""Expand a single cron field into a set of valid integers."""
|
||||
result: set[int] = set()
|
||||
for part in field.split(","):
|
||||
# Handle step: */N or 1-5/N
|
||||
step = 1
|
||||
if "/" in part:
|
||||
part, step_str = part.split("/", 1)
|
||||
if not step_str.isdigit() or int(step_str) < 1:
|
||||
raise ValueError(f"invalid step value: {step_str}")
|
||||
step = int(step_str)
|
||||
|
||||
if part == "*":
|
||||
rng = range(lo, hi + 1, step)
|
||||
elif "-" in part:
|
||||
a, b = part.split("-", 1)
|
||||
a = _resolve_name(a, names, lo, hi)
|
||||
b = _resolve_name(b, names, lo, hi)
|
||||
if a > b:
|
||||
raise ValueError(f"range {a}-{b} is reversed")
|
||||
rng = range(a, b + 1, step)
|
||||
else:
|
||||
val = _resolve_name(part, names, lo, hi)
|
||||
rng = range(val, val + 1)
|
||||
|
||||
for v in rng:
|
||||
if v < lo or v > hi:
|
||||
raise ValueError(f"value {v} out of range [{lo}-{hi}]")
|
||||
result.add(v)
|
||||
return result
|
||||
|
||||
|
||||
def _resolve_name(token: str, names: dict | None, lo: int, hi: int) -> int:
|
||||
if names and token.lower() in names:
|
||||
return names[token.lower()]
|
||||
if not token.isdigit():
|
||||
raise ValueError(f"unrecognized token: {token}")
|
||||
val = int(token)
|
||||
if val < lo or val > hi:
|
||||
raise ValueError(f"value {val} out of range [{lo}-{hi}]")
|
||||
return val
|
||||
|
||||
|
||||
def validate_cron(expr: str) -> list[str]:
|
||||
"""Validate a 5-field cron expression. Returns list of errors (empty = ok)."""
|
||||
errors: list[str] = []
|
||||
fields = expr.strip().split()
|
||||
if len(fields) != 5:
|
||||
return [f"expected 5 fields, got {len(fields)}"]
|
||||
|
||||
specs = [
|
||||
(fields[0], 0, 59, None, "minute"),
|
||||
(fields[1], 0, 23, None, "hour"),
|
||||
(fields[2], 1, 31, None, "day-of-month"),
|
||||
(fields[3], 1, 12, MONTH_NAMES, "month"),
|
||||
(fields[4], 0, 7, DOW_NAMES, "day-of-week"),
|
||||
]
|
||||
for field, lo, hi, names, label in specs:
|
||||
try:
|
||||
_expand_cron_field(field, lo, hi, names)
|
||||
except ValueError as e:
|
||||
errors.append(f"{label}: {e}")
|
||||
return errors
|
||||
|
||||
|
||||
# ── Duplicate key detection ──────────────────────────────────────────────────
|
||||
|
||||
class DuplicateKeyError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class _StrictYAMLLoader(yaml.SafeLoader):
|
||||
"""YAML loader that rejects duplicate keys."""
|
||||
pass
|
||||
|
||||
|
||||
def _no_duplicates_constructor(loader, node, deep=False):
|
||||
mapping = {}
|
||||
for key_node, value_node in node.value:
|
||||
key = loader.construct_object(key_node, deep=deep)
|
||||
if key in mapping:
|
||||
raise DuplicateKeyError(
|
||||
f"duplicate key '{key}' (line {key_node.start_mark.line + 1})"
|
||||
)
|
||||
mapping[key] = loader.construct_object(value_node, deep=deep)
|
||||
return mapping
|
||||
|
||||
|
||||
_StrictYAMLLoader.add_constructor(
|
||||
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
|
||||
_no_duplicates_constructor,
|
||||
)
|
||||
|
||||
|
||||
def _json_has_duplicates(text: str) -> list[str]:
|
||||
"""Check for duplicate keys in JSON by scanning for repeated quoted keys at same depth."""
|
||||
errors: list[str] = []
|
||||
# Use a custom approach: parse with object_pairs_hook
|
||||
seen_stack: list[set[str]] = []
|
||||
|
||||
def _check_pairs(pairs):
|
||||
level_keys: set[str] = set()
|
||||
for k, _ in pairs:
|
||||
if k in level_keys:
|
||||
errors.append(f"duplicate JSON key: '{k}'")
|
||||
level_keys.add(k)
|
||||
return dict(pairs)
|
||||
|
||||
try:
|
||||
json.loads(text, object_pairs_hook=_check_pairs)
|
||||
except json.JSONDecodeError:
|
||||
pass # syntax errors caught elsewhere
|
||||
return errors
|
||||
|
||||
|
||||
# ── Main validator ───────────────────────────────────────────────────────────
|
||||
|
||||
def find_config_files(root: Path) -> list[Path]:
|
||||
"""Recursively find .yaml, .yml, .json files (skip .git, node_modules, venv)."""
|
||||
skip_dirs = {".git", "node_modules", "venv", "__pycache__", ".venv"}
|
||||
results: list[Path] = []
|
||||
for dirpath, dirnames, filenames in os.walk(root):
|
||||
dirnames[:] = [d for d in dirnames if d not in skip_dirs]
|
||||
for fname in filenames:
|
||||
if fname.endswith((".yaml", ".yml", ".json")):
|
||||
results.append(Path(dirpath) / fname)
|
||||
return sorted(results)
|
||||
|
||||
|
||||
def validate_yaml_file(filepath: Path, text: str) -> list[str]:
|
||||
"""Validate a YAML file. Returns list of errors."""
|
||||
errors: list[str] = []
|
||||
|
||||
# Check for tabs
|
||||
for i, line in enumerate(text.splitlines(), 1):
|
||||
if "\t" in line:
|
||||
errors.append(f" line {i}: contains tab character (use spaces for YAML)")
|
||||
if line != line.rstrip():
|
||||
errors.append(f" line {i}: trailing whitespace")
|
||||
|
||||
# Check syntax + duplicate keys
|
||||
try:
|
||||
yaml.load(text, Loader=_StrictYAMLLoader)
|
||||
except DuplicateKeyError as e:
|
||||
errors.append(f" {e}")
|
||||
except yaml.YAMLError as e:
|
||||
mark = getattr(e, "problem_mark", None)
|
||||
if mark:
|
||||
errors.append(f" YAML syntax error at line {mark.line + 1}, col {mark.column + 1}: {e.problem}")
|
||||
else:
|
||||
errors.append(f" YAML syntax error: {e}")
|
||||
|
||||
# Check cron expressions in schedule fields
|
||||
for i, line in enumerate(text.splitlines(), 1):
|
||||
cron_match = re.search(r'(?:cron|schedule)\s*:\s*["\']?([*0-9/,a-zA-Z-]+(?:\s+[*0-9/,a-zA-Z-]+){4})["\']?', line)
|
||||
if cron_match:
|
||||
cron_errs = validate_cron(cron_match.group(1))
|
||||
for ce in cron_errs:
|
||||
errors.append(f" line {i}: invalid cron '{cron_match.group(1)}': {ce}")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def validate_json_file(filepath: Path, text: str) -> list[str]:
|
||||
"""Validate a JSON file. Returns list of errors."""
|
||||
errors: list[str] = []
|
||||
|
||||
# Check syntax
|
||||
try:
|
||||
json.loads(text)
|
||||
except json.JSONDecodeError as e:
|
||||
errors.append(f" JSON syntax error at line {e.lineno}, col {e.colno}: {e.msg}")
|
||||
|
||||
# Check duplicate keys
|
||||
dup_errors = _json_has_duplicates(text)
|
||||
errors.extend(dup_errors)
|
||||
|
||||
# Check for trailing whitespace (informational)
|
||||
for i, line in enumerate(text.splitlines(), 1):
|
||||
if line != line.rstrip():
|
||||
errors.append(f" line {i}: trailing whitespace")
|
||||
|
||||
# Check cron expressions
|
||||
cron_pattern = re.compile(r'"(?:cron|schedule)"?\s*:\s*"([^"]{5,})"')
|
||||
for match in cron_pattern.finditer(text):
|
||||
candidate = match.group(1).strip()
|
||||
fields = candidate.split()
|
||||
if len(fields) == 5 and all(re.match(r'^[*0-9/,a-zA-Z-]+$', f) for f in fields):
|
||||
cron_errs = validate_cron(candidate)
|
||||
for ce in cron_errs:
|
||||
errors.append(f" invalid cron '{candidate}': {ce}")
|
||||
|
||||
# Also check nested schedule objects with cron fields
|
||||
try:
|
||||
obj = json.loads(text)
|
||||
_scan_obj_for_cron(obj, errors)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def _scan_obj_for_cron(obj, errors: list[str], path: str = ""):
|
||||
"""Recursively scan dict/list for cron expressions."""
|
||||
if isinstance(obj, dict):
|
||||
for k, v in obj.items():
|
||||
if k in ("cron", "schedule", "cron_expression") and isinstance(v, str):
|
||||
fields = v.strip().split()
|
||||
if len(fields) == 5:
|
||||
cron_errs = validate_cron(v)
|
||||
for ce in cron_errs:
|
||||
errors.append(f" {path}.{k}: invalid cron '{v}': {ce}")
|
||||
_scan_obj_for_cron(v, errors, f"{path}.{k}")
|
||||
elif isinstance(obj, list):
|
||||
for i, item in enumerate(obj):
|
||||
_scan_obj_for_cron(item, errors, f"{path}[{i}]")
|
||||
|
||||
|
||||
def main():
|
||||
# Determine repo root (script lives in scripts/)
|
||||
script_path = Path(__file__).resolve()
|
||||
repo_root = script_path.parent.parent
|
||||
|
||||
print(f"Config Validator — scanning {repo_root}")
|
||||
print("=" * 60)
|
||||
|
||||
files = find_config_files(repo_root)
|
||||
print(f"Found {len(files)} config files to validate.\n")
|
||||
|
||||
total_errors = 0
|
||||
failed_files: list[tuple[Path, list[str]]] = []
|
||||
|
||||
for filepath in files:
|
||||
rel = filepath.relative_to(repo_root)
|
||||
try:
|
||||
text = filepath.read_text(encoding="utf-8", errors="replace")
|
||||
except Exception as e:
|
||||
failed_files.append((rel, [f" cannot read file: {e}"]))
|
||||
total_errors += 1
|
||||
continue
|
||||
|
||||
if filepath.suffix == ".json":
|
||||
errors = validate_json_file(filepath, text)
|
||||
else:
|
||||
errors = validate_yaml_file(filepath, text)
|
||||
|
||||
if errors:
|
||||
failed_files.append((rel, errors))
|
||||
total_errors += len(errors)
|
||||
print(f"FAIL {rel}")
|
||||
else:
|
||||
print(f"PASS {rel}")
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print(f"Results: {len(files) - len(failed_files)}/{len(files)} files passed")
|
||||
|
||||
if failed_files:
|
||||
print(f"\n{total_errors} error(s) in {len(failed_files)} file(s):\n")
|
||||
for relpath, errs in failed_files:
|
||||
print(f" {relpath}:")
|
||||
for e in errs:
|
||||
print(f" {e}")
|
||||
print()
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("\nAll config files valid!")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user