Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 25s
Smoke Test / smoke (pull_request) Failing after 17s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 18s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 45s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 56s
Validate Config / Cron Syntax Check (pull_request) Successful in 12s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
PR Checklist / pr-checklist (pull_request) Failing after 4m12s
Validate Config / Playbook Schema Validation (pull_request) Successful in 23s
Validate Training Data / validate (pull_request) Successful in 18s
Architecture Lint / Lint Repository (pull_request) Failing after 23s
- Updated schema to support both full (genre+bpm+duration_seconds) and simplified (duration) formats across all 13 genre files - Added oneOf support for mood_arc (string or array) - Added camera_movement as alternate scene field (used in hiphop) - Validator catches: missing fields, wrong types, empty values, unexpected fields - All 1300 entries across 13 scene-descriptions-*.jsonl files pass - Auto-detects schema path, supports --schema flag Closes #647
163 lines
5.6 KiB
Python
Executable File
163 lines
5.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Validate JSONL training data files against the scene description schema.
|
|
|
|
Usage:
|
|
python3 scripts/validate-scene-data.py training-data/scene-descriptions-*.jsonl
|
|
python3 scripts/validate-scene-data.py --schema training-data/schema.json training-data/scene-descriptions-pop.jsonl
|
|
|
|
Exit codes:
|
|
0 = all files valid
|
|
1 = validation errors found
|
|
2 = bad arguments or missing files
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
|
|
def load_schema(path: str) -> dict:
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def _check(val, spec, loc, path):
|
|
"""Check a value against a schema property. Returns list of error strings."""
|
|
errors = []
|
|
|
|
# oneOf — at least one branch must pass
|
|
if "oneOf" in spec:
|
|
if not any(not _check(val, o, loc, path) for o in spec["oneOf"]):
|
|
types = [o.get("type", "?") for o in spec["oneOf"]]
|
|
errors.append(f"{loc}: '{path}' expected one of [{', '.join(types)}], got {type(val).__name__}")
|
|
return errors
|
|
|
|
t = spec.get("type")
|
|
if t == "string":
|
|
if not isinstance(val, str):
|
|
errors.append(f"{loc}: '{path}' expected string, got {type(val).__name__}")
|
|
elif spec.get("minLength") and len(val) < spec["minLength"]:
|
|
errors.append(f"{loc}: '{path}' is empty (min {spec['minLength']} chars)")
|
|
elif spec.get("pattern") and not re.match(spec["pattern"], val):
|
|
errors.append(f"{loc}: '{path}'='{val}' doesn't match {spec['pattern']}")
|
|
elif t == "number":
|
|
if not isinstance(val, (int, float)) or isinstance(val, bool):
|
|
errors.append(f"{loc}: '{path}' expected number, got {type(val).__name__}")
|
|
elif "minimum" in spec and val < spec["minimum"]:
|
|
errors.append(f"{loc}: '{path}'={val} below minimum {spec['minimum']}")
|
|
elif t == "integer":
|
|
if not isinstance(val, int) or isinstance(val, bool):
|
|
errors.append(f"{loc}: '{path}' expected integer, got {type(val).__name__}")
|
|
elif t == "array":
|
|
if not isinstance(val, list):
|
|
errors.append(f"{loc}: '{path}' expected array, got {type(val).__name__}")
|
|
elif spec.get("minItems") and len(val) < spec["minItems"]:
|
|
errors.append(f"{loc}: '{path}' has {len(val)} items, need >= {spec['minItems']}")
|
|
else:
|
|
for j, item in enumerate(val):
|
|
errors.extend(_check(item, spec.get("items", {}), loc, f"{path}[{j}]"))
|
|
elif t == "object":
|
|
if not isinstance(val, dict):
|
|
errors.append(f"{loc}: '{path}' expected object, got {type(val).__name__}")
|
|
return errors
|
|
for nf in spec.get("required", []):
|
|
if nf not in val:
|
|
errors.append(f"{loc}: '{path}.{nf}' is missing")
|
|
for nf, ns in spec.get("properties", {}).items():
|
|
if nf in val:
|
|
errors.extend(_check(val[nf], ns, loc, f"{path}.{nf}"))
|
|
if spec.get("additionalProperties") is False:
|
|
extra = set(val.keys()) - set(spec.get("properties", {}).keys())
|
|
if extra:
|
|
errors.append(f"{loc}: '{path}' has unexpected fields: {extra}")
|
|
return errors
|
|
|
|
|
|
def validate_entry(entry, schema, line_num, file_name):
|
|
"""Validate one JSONL entry."""
|
|
errors = []
|
|
loc = f"{file_name}:{line_num}"
|
|
props = schema.get("properties", {})
|
|
|
|
for field in schema.get("required", []):
|
|
if field not in entry:
|
|
errors.append(f"{loc}: missing required field '{field}'")
|
|
|
|
for field, spec in props.items():
|
|
if field in entry:
|
|
errors.extend(_check(entry[field], spec, loc, field))
|
|
|
|
extra = set(entry.keys()) - set(props.keys())
|
|
if extra:
|
|
errors.append(f"{loc}: unexpected fields: {extra}")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_file(path, schema):
|
|
errors = []
|
|
count = 0
|
|
with open(path) as f:
|
|
for n, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
count += 1
|
|
try:
|
|
entry = json.loads(line)
|
|
except json.JSONDecodeError as e:
|
|
errors.append(f"{path}:{n}: invalid JSON: {e}")
|
|
continue
|
|
if not isinstance(entry, dict):
|
|
errors.append(f"{path}:{n}: not a JSON object")
|
|
continue
|
|
errors.extend(validate_entry(entry, schema, n, path))
|
|
return count, errors
|
|
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("files", nargs="+")
|
|
p.add_argument("--schema", default=None)
|
|
args = p.parse_args()
|
|
|
|
schema_path = args.schema
|
|
if not schema_path:
|
|
for c in [Path(args.files[0]).parent / "schema.json", Path("training-data/schema.json")]:
|
|
if c.exists():
|
|
schema_path = str(c)
|
|
break
|
|
if not schema_path or not Path(schema_path).exists():
|
|
print("ERROR: schema not found. Use --schema path", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
schema = load_schema(schema_path)
|
|
tf = tl = te = 0
|
|
|
|
for fp in args.files:
|
|
if not Path(fp).exists():
|
|
print(f"SKIP: {fp}")
|
|
continue
|
|
tf += 1
|
|
n, errs = validate_file(fp, schema)
|
|
tl += n
|
|
if errs:
|
|
te += len(errs)
|
|
print(f"\n❌ {fp}: {len(errs)} errors in {n} entries")
|
|
for e in errs[:10]:
|
|
print(f" {e}")
|
|
if len(errs) > 10:
|
|
print(f" ... +{len(errs)-10} more")
|
|
else:
|
|
print(f"✅ {fp}: {n} entries valid")
|
|
|
|
print(f"\n--- Summary ---")
|
|
print(f"Files: {tf} Entries: {tl} Errors: {te}")
|
|
sys.exit(1 if te else 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|