diff --git a/scripts/config_validator.py b/scripts/config_validator.py new file mode 100644 index 00000000..d5b6b9d4 --- /dev/null +++ b/scripts/config_validator.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 +""" +config_validator.py — Validate all YAML/JSON config files in timmy-config. + +Checks: + 1. YAML syntax (pyyaml safe_load) + 2. JSON syntax (json.loads) + 3. Duplicate keys in YAML/JSON + 4. Trailing whitespace in YAML + 5. Tabs in YAML (should use spaces) + 6. Cron expression validity (if present) + +Exit 0 if all valid, 1 if any invalid. +""" + +import json +import os +import re +import sys +from pathlib import Path + +try: + import yaml +except ImportError: + print("ERROR: PyYAML not installed. Run: pip install pyyaml") + sys.exit(1) + + +# ── Cron validation ────────────────────────────────────────────────────────── + +DOW_NAMES = {"sun", "mon", "tue", "wed", "thu", "fri", "sat"} +MONTH_NAMES = {"jan", "feb", "mar", "apr", "may", "jun", + "jul", "aug", "sep", "oct", "nov", "dec"} + + +def _expand_cron_field(field: str, lo: int, hi: int, names: dict | None = None) -> set[int]: + """Expand a single cron field into a set of valid integers.""" + result: set[int] = set() + for part in field.split(","): + # Handle step: */N or 1-5/N + step = 1 + if "/" in part: + part, step_str = part.split("/", 1) + if not step_str.isdigit() or int(step_str) < 1: + raise ValueError(f"invalid step value: {step_str}") + step = int(step_str) + + if part == "*": + rng = range(lo, hi + 1, step) + elif "-" in part: + a, b = part.split("-", 1) + a = _resolve_name(a, names, lo, hi) + b = _resolve_name(b, names, lo, hi) + if a > b: + raise ValueError(f"range {a}-{b} is reversed") + rng = range(a, b + 1, step) + else: + val = _resolve_name(part, names, lo, hi) + rng = range(val, val + 1) + + for v in rng: + if v < lo or v > hi: + raise ValueError(f"value {v} out of range [{lo}-{hi}]") + result.add(v) + return result + + +def _resolve_name(token: str, names: dict | None, lo: int, hi: int) -> int: + if names and token.lower() in names: + return names[token.lower()] + if not token.isdigit(): + raise ValueError(f"unrecognized token: {token}") + val = int(token) + if val < lo or val > hi: + raise ValueError(f"value {val} out of range [{lo}-{hi}]") + return val + + +def validate_cron(expr: str) -> list[str]: + """Validate a 5-field cron expression. Returns list of errors (empty = ok).""" + errors: list[str] = [] + fields = expr.strip().split() + if len(fields) != 5: + return [f"expected 5 fields, got {len(fields)}"] + + specs = [ + (fields[0], 0, 59, None, "minute"), + (fields[1], 0, 23, None, "hour"), + (fields[2], 1, 31, None, "day-of-month"), + (fields[3], 1, 12, MONTH_NAMES, "month"), + (fields[4], 0, 7, DOW_NAMES, "day-of-week"), + ] + for field, lo, hi, names, label in specs: + try: + _expand_cron_field(field, lo, hi, names) + except ValueError as e: + errors.append(f"{label}: {e}") + return errors + + +# ── Duplicate key detection ────────────────────────────────────────────────── + +class DuplicateKeyError(Exception): + pass + + +class _StrictYAMLLoader(yaml.SafeLoader): + """YAML loader that rejects duplicate keys.""" + pass + + +def _no_duplicates_constructor(loader, node, deep=False): + mapping = {} + for key_node, value_node in node.value: + key = loader.construct_object(key_node, deep=deep) + if key in mapping: + raise DuplicateKeyError( + f"duplicate key '{key}' (line {key_node.start_mark.line + 1})" + ) + mapping[key] = loader.construct_object(value_node, deep=deep) + return mapping + + +_StrictYAMLLoader.add_constructor( + yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, + _no_duplicates_constructor, +) + + +def _json_has_duplicates(text: str) -> list[str]: + """Check for duplicate keys in JSON by scanning for repeated quoted keys at same depth.""" + errors: list[str] = [] + # Use a custom approach: parse with object_pairs_hook + seen_stack: list[set[str]] = [] + + def _check_pairs(pairs): + level_keys: set[str] = set() + for k, _ in pairs: + if k in level_keys: + errors.append(f"duplicate JSON key: '{k}'") + level_keys.add(k) + return dict(pairs) + + try: + json.loads(text, object_pairs_hook=_check_pairs) + except json.JSONDecodeError: + pass # syntax errors caught elsewhere + return errors + + +# ── Main validator ─────────────────────────────────────────────────────────── + +def find_config_files(root: Path) -> list[Path]: + """Recursively find .yaml, .yml, .json files (skip .git, node_modules, venv).""" + skip_dirs = {".git", "node_modules", "venv", "__pycache__", ".venv"} + results: list[Path] = [] + for dirpath, dirnames, filenames in os.walk(root): + dirnames[:] = [d for d in dirnames if d not in skip_dirs] + for fname in filenames: + if fname.endswith((".yaml", ".yml", ".json")): + results.append(Path(dirpath) / fname) + return sorted(results) + + +def validate_yaml_file(filepath: Path, text: str) -> list[str]: + """Validate a YAML file. Returns list of errors.""" + errors: list[str] = [] + + # Check for tabs + for i, line in enumerate(text.splitlines(), 1): + if "\t" in line: + errors.append(f" line {i}: contains tab character (use spaces for YAML)") + if line != line.rstrip(): + errors.append(f" line {i}: trailing whitespace") + + # Check syntax + duplicate keys + try: + yaml.load(text, Loader=_StrictYAMLLoader) + except DuplicateKeyError as e: + errors.append(f" {e}") + except yaml.YAMLError as e: + mark = getattr(e, "problem_mark", None) + if mark: + errors.append(f" YAML syntax error at line {mark.line + 1}, col {mark.column + 1}: {e.problem}") + else: + errors.append(f" YAML syntax error: {e}") + + # Check cron expressions in schedule fields + for i, line in enumerate(text.splitlines(), 1): + cron_match = re.search(r'(?:cron|schedule)\s*:\s*["\']?([*0-9/,a-zA-Z-]+(?:\s+[*0-9/,a-zA-Z-]+){4})["\']?', line) + if cron_match: + cron_errs = validate_cron(cron_match.group(1)) + for ce in cron_errs: + errors.append(f" line {i}: invalid cron '{cron_match.group(1)}': {ce}") + + return errors + + +def validate_json_file(filepath: Path, text: str) -> list[str]: + """Validate a JSON file. Returns list of errors.""" + errors: list[str] = [] + + # Check syntax + try: + json.loads(text) + except json.JSONDecodeError as e: + errors.append(f" JSON syntax error at line {e.lineno}, col {e.colno}: {e.msg}") + + # Check duplicate keys + dup_errors = _json_has_duplicates(text) + errors.extend(dup_errors) + + # Check for trailing whitespace (informational) + for i, line in enumerate(text.splitlines(), 1): + if line != line.rstrip(): + errors.append(f" line {i}: trailing whitespace") + + # Check cron expressions + cron_pattern = re.compile(r'"(?:cron|schedule)"?\s*:\s*"([^"]{5,})"') + for match in cron_pattern.finditer(text): + candidate = match.group(1).strip() + fields = candidate.split() + if len(fields) == 5 and all(re.match(r'^[*0-9/,a-zA-Z-]+$', f) for f in fields): + cron_errs = validate_cron(candidate) + for ce in cron_errs: + errors.append(f" invalid cron '{candidate}': {ce}") + + # Also check nested schedule objects with cron fields + try: + obj = json.loads(text) + _scan_obj_for_cron(obj, errors) + except Exception: + pass + + return errors + + +def _scan_obj_for_cron(obj, errors: list[str], path: str = ""): + """Recursively scan dict/list for cron expressions.""" + if isinstance(obj, dict): + for k, v in obj.items(): + if k in ("cron", "schedule", "cron_expression") and isinstance(v, str): + fields = v.strip().split() + if len(fields) == 5: + cron_errs = validate_cron(v) + for ce in cron_errs: + errors.append(f" {path}.{k}: invalid cron '{v}': {ce}") + _scan_obj_for_cron(v, errors, f"{path}.{k}") + elif isinstance(obj, list): + for i, item in enumerate(obj): + _scan_obj_for_cron(item, errors, f"{path}[{i}]") + + +def main(): + # Determine repo root (script lives in scripts/) + script_path = Path(__file__).resolve() + repo_root = script_path.parent.parent + + print(f"Config Validator — scanning {repo_root}") + print("=" * 60) + + files = find_config_files(repo_root) + print(f"Found {len(files)} config files to validate.\n") + + total_errors = 0 + failed_files: list[tuple[Path, list[str]]] = [] + + for filepath in files: + rel = filepath.relative_to(repo_root) + try: + text = filepath.read_text(encoding="utf-8", errors="replace") + except Exception as e: + failed_files.append((rel, [f" cannot read file: {e}"])) + total_errors += 1 + continue + + if filepath.suffix == ".json": + errors = validate_json_file(filepath, text) + else: + errors = validate_yaml_file(filepath, text) + + if errors: + failed_files.append((rel, errors)) + total_errors += len(errors) + print(f"FAIL {rel}") + else: + print(f"PASS {rel}") + + print("\n" + "=" * 60) + print(f"Results: {len(files) - len(failed_files)}/{len(files)} files passed") + + if failed_files: + print(f"\n{total_errors} error(s) in {len(failed_files)} file(s):\n") + for relpath, errs in failed_files: + print(f" {relpath}:") + for e in errs: + print(f" {e}") + print() + sys.exit(1) + else: + print("\nAll config files valid!") + sys.exit(0) + + +if __name__ == "__main__": + main()