#!/usr/bin/env python3 """ config_validator.py — Validate all YAML/JSON config files in timmy-config. Checks: 1. YAML syntax (pyyaml safe_load) 2. JSON syntax (json.loads) 3. Duplicate keys in YAML/JSON 4. Trailing whitespace in YAML 5. Tabs in YAML (should use spaces) 6. Cron expression validity (if present) Exit 0 if all valid, 1 if any invalid. """ import json import os import re import sys from pathlib import Path try: import yaml except ImportError: print("ERROR: PyYAML not installed. Run: pip install pyyaml") sys.exit(1) # ── Cron validation ────────────────────────────────────────────────────────── DOW_NAMES = {"sun", "mon", "tue", "wed", "thu", "fri", "sat"} MONTH_NAMES = {"jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"} def _expand_cron_field(field: str, lo: int, hi: int, names: dict | None = None) -> set[int]: """Expand a single cron field into a set of valid integers.""" result: set[int] = set() for part in field.split(","): # Handle step: */N or 1-5/N step = 1 if "/" in part: part, step_str = part.split("/", 1) if not step_str.isdigit() or int(step_str) < 1: raise ValueError(f"invalid step value: {step_str}") step = int(step_str) if part == "*": rng = range(lo, hi + 1, step) elif "-" in part: a, b = part.split("-", 1) a = _resolve_name(a, names, lo, hi) b = _resolve_name(b, names, lo, hi) if a > b: raise ValueError(f"range {a}-{b} is reversed") rng = range(a, b + 1, step) else: val = _resolve_name(part, names, lo, hi) rng = range(val, val + 1) for v in rng: if v < lo or v > hi: raise ValueError(f"value {v} out of range [{lo}-{hi}]") result.add(v) return result def _resolve_name(token: str, names: dict | None, lo: int, hi: int) -> int: if names and token.lower() in names: return names[token.lower()] if not token.isdigit(): raise ValueError(f"unrecognized token: {token}") val = int(token) if val < lo or val > hi: raise ValueError(f"value {val} out of range [{lo}-{hi}]") return val def validate_cron(expr: str) -> list[str]: """Validate a 5-field cron expression. Returns list of errors (empty = ok).""" errors: list[str] = [] fields = expr.strip().split() if len(fields) != 5: return [f"expected 5 fields, got {len(fields)}"] specs = [ (fields[0], 0, 59, None, "minute"), (fields[1], 0, 23, None, "hour"), (fields[2], 1, 31, None, "day-of-month"), (fields[3], 1, 12, MONTH_NAMES, "month"), (fields[4], 0, 7, DOW_NAMES, "day-of-week"), ] for field, lo, hi, names, label in specs: try: _expand_cron_field(field, lo, hi, names) except ValueError as e: errors.append(f"{label}: {e}") return errors # ── Duplicate key detection ────────────────────────────────────────────────── class DuplicateKeyError(Exception): pass class _StrictYAMLLoader(yaml.SafeLoader): """YAML loader that rejects duplicate keys.""" pass def _no_duplicates_constructor(loader, node, deep=False): mapping = {} for key_node, value_node in node.value: key = loader.construct_object(key_node, deep=deep) if key in mapping: raise DuplicateKeyError( f"duplicate key '{key}' (line {key_node.start_mark.line + 1})" ) mapping[key] = loader.construct_object(value_node, deep=deep) return mapping _StrictYAMLLoader.add_constructor( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, _no_duplicates_constructor, ) def _json_has_duplicates(text: str) -> list[str]: """Check for duplicate keys in JSON by scanning for repeated quoted keys at same depth.""" errors: list[str] = [] # Use a custom approach: parse with object_pairs_hook seen_stack: list[set[str]] = [] def _check_pairs(pairs): level_keys: set[str] = set() for k, _ in pairs: if k in level_keys: errors.append(f"duplicate JSON key: '{k}'") level_keys.add(k) return dict(pairs) try: json.loads(text, object_pairs_hook=_check_pairs) except json.JSONDecodeError: pass # syntax errors caught elsewhere return errors # ── Main validator ─────────────────────────────────────────────────────────── def find_config_files(root: Path) -> list[Path]: """Recursively find .yaml, .yml, .json files (skip .git, node_modules, venv).""" skip_dirs = {".git", "node_modules", "venv", "__pycache__", ".venv"} results: list[Path] = [] for dirpath, dirnames, filenames in os.walk(root): dirnames[:] = [d for d in dirnames if d not in skip_dirs] for fname in filenames: if fname.endswith((".yaml", ".yml", ".json")): results.append(Path(dirpath) / fname) return sorted(results) def validate_yaml_file(filepath: Path, text: str) -> list[str]: """Validate a YAML file. Returns list of errors.""" errors: list[str] = [] # Check for tabs for i, line in enumerate(text.splitlines(), 1): if "\t" in line: errors.append(f" line {i}: contains tab character (use spaces for YAML)") if line != line.rstrip(): errors.append(f" line {i}: trailing whitespace") # Check syntax + duplicate keys try: yaml.load(text, Loader=_StrictYAMLLoader) except DuplicateKeyError as e: errors.append(f" {e}") except yaml.YAMLError as e: mark = getattr(e, "problem_mark", None) if mark: errors.append(f" YAML syntax error at line {mark.line + 1}, col {mark.column + 1}: {e.problem}") else: errors.append(f" YAML syntax error: {e}") # Check cron expressions in schedule fields for i, line in enumerate(text.splitlines(), 1): cron_match = re.search(r'(?:cron|schedule)\s*:\s*["\']?([*0-9/,a-zA-Z-]+(?:\s+[*0-9/,a-zA-Z-]+){4})["\']?', line) if cron_match: cron_errs = validate_cron(cron_match.group(1)) for ce in cron_errs: errors.append(f" line {i}: invalid cron '{cron_match.group(1)}': {ce}") return errors def validate_json_file(filepath: Path, text: str) -> list[str]: """Validate a JSON file. Returns list of errors.""" errors: list[str] = [] # Check syntax try: json.loads(text) except json.JSONDecodeError as e: errors.append(f" JSON syntax error at line {e.lineno}, col {e.colno}: {e.msg}") # Check duplicate keys dup_errors = _json_has_duplicates(text) errors.extend(dup_errors) # Check for trailing whitespace (informational) for i, line in enumerate(text.splitlines(), 1): if line != line.rstrip(): errors.append(f" line {i}: trailing whitespace") # Check cron expressions cron_pattern = re.compile(r'"(?:cron|schedule)"?\s*:\s*"([^"]{5,})"') for match in cron_pattern.finditer(text): candidate = match.group(1).strip() fields = candidate.split() if len(fields) == 5 and all(re.match(r'^[*0-9/,a-zA-Z-]+$', f) for f in fields): cron_errs = validate_cron(candidate) for ce in cron_errs: errors.append(f" invalid cron '{candidate}': {ce}") # Also check nested schedule objects with cron fields try: obj = json.loads(text) _scan_obj_for_cron(obj, errors) except Exception: pass return errors def _scan_obj_for_cron(obj, errors: list[str], path: str = ""): """Recursively scan dict/list for cron expressions.""" if isinstance(obj, dict): for k, v in obj.items(): if k in ("cron", "schedule", "cron_expression") and isinstance(v, str): fields = v.strip().split() if len(fields) == 5: cron_errs = validate_cron(v) for ce in cron_errs: errors.append(f" {path}.{k}: invalid cron '{v}': {ce}") _scan_obj_for_cron(v, errors, f"{path}.{k}") elif isinstance(obj, list): for i, item in enumerate(obj): _scan_obj_for_cron(item, errors, f"{path}[{i}]") def main(): # Determine repo root (script lives in scripts/) script_path = Path(__file__).resolve() repo_root = script_path.parent.parent print(f"Config Validator — scanning {repo_root}") print("=" * 60) files = find_config_files(repo_root) print(f"Found {len(files)} config files to validate.\n") total_errors = 0 failed_files: list[tuple[Path, list[str]]] = [] for filepath in files: rel = filepath.relative_to(repo_root) try: text = filepath.read_text(encoding="utf-8", errors="replace") except Exception as e: failed_files.append((rel, [f" cannot read file: {e}"])) total_errors += 1 continue if filepath.suffix == ".json": errors = validate_json_file(filepath, text) else: errors = validate_yaml_file(filepath, text) if errors: failed_files.append((rel, errors)) total_errors += len(errors) print(f"FAIL {rel}") else: print(f"PASS {rel}") print("\n" + "=" * 60) print(f"Results: {len(files) - len(failed_files)}/{len(files)} files passed") if failed_files: print(f"\n{total_errors} error(s) in {len(failed_files)} file(s):\n") for relpath, errs in failed_files: print(f" {relpath}:") for e in errs: print(f" {e}") print() sys.exit(1) else: print("\nAll config files valid!") sys.exit(0) if __name__ == "__main__": main()