fix: JSON schema + validator for scene description training data (#647)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 25s
Smoke Test / smoke (pull_request) Failing after 17s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 18s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 45s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 56s
Validate Config / Cron Syntax Check (pull_request) Successful in 12s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
PR Checklist / pr-checklist (pull_request) Failing after 4m12s
Validate Config / Playbook Schema Validation (pull_request) Successful in 23s
Validate Training Data / validate (pull_request) Successful in 18s
Architecture Lint / Lint Repository (pull_request) Failing after 23s
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 25s
Smoke Test / smoke (pull_request) Failing after 17s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 18s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 45s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 56s
Validate Config / Cron Syntax Check (pull_request) Successful in 12s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
PR Checklist / pr-checklist (pull_request) Failing after 4m12s
Validate Config / Playbook Schema Validation (pull_request) Successful in 23s
Validate Training Data / validate (pull_request) Successful in 18s
Architecture Lint / Lint Repository (pull_request) Failing after 23s
- Updated schema to support both full (genre+bpm+duration_seconds) and simplified (duration) formats across all 13 genre files - Added oneOf support for mood_arc (string or array) - Added camera_movement as alternate scene field (used in hiphop) - Validator catches: missing fields, wrong types, empty values, unexpected fields - All 1300 entries across 13 scene-descriptions-*.jsonl files pass - Auto-detects schema path, supports --schema flag Closes #647
This commit is contained in:
249
scripts/validate-scene-data.py
Normal file → Executable file
249
scripts/validate-scene-data.py
Normal file → Executable file
@@ -1,184 +1,161 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
validate-scene-data.py — Validate scene description JSONL files against schema.
|
||||
"""Validate JSONL training data files against the scene description schema.
|
||||
|
||||
Usage:
|
||||
python3 scripts/validate-scene-data.py training-data/*.jsonl
|
||||
python3 scripts/validate-scene-data.py training-data/scene-descriptions-rock.jsonl
|
||||
python3 scripts/validate-scene-data.py training-data/scene-descriptions-*.jsonl
|
||||
python3 scripts/validate-scene-data.py --schema training-data/schema.json training-data/scene-descriptions-pop.jsonl
|
||||
|
||||
Exit codes:
|
||||
0 = all entries valid
|
||||
0 = all files valid
|
||||
1 = validation errors found
|
||||
|
||||
Refs: timmy-config#647
|
||||
2 = bad arguments or missing files
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Try jsonschema, fall back to manual validation
|
||||
try:
|
||||
import jsonschema
|
||||
HAS_JSONSCHEMA = True
|
||||
except ImportError:
|
||||
HAS_JSONSCHEMA = False
|
||||
|
||||
|
||||
def load_schema():
|
||||
"""Load the JSON schema from training-data/schema.json."""
|
||||
schema_path = Path(__file__).parent.parent / "training-data" / "schema.json"
|
||||
if not schema_path.exists():
|
||||
# Try relative to CWD
|
||||
schema_path = Path("training-data/schema.json")
|
||||
if not schema_path.exists():
|
||||
print(f"ERROR: Schema not found at {schema_path}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
with open(schema_path) as f:
|
||||
def load_schema(path: str) -> dict:
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def validate_entry_manual(entry, index):
|
||||
"""Manual validation without jsonschema dependency."""
|
||||
def _check(val, spec, loc, path):
|
||||
"""Check a value against a schema property. Returns list of error strings."""
|
||||
errors = []
|
||||
|
||||
# Required top-level fields
|
||||
for field in ["song", "artist", "beat", "timestamp", "lyric_line", "scene"]:
|
||||
if field not in entry:
|
||||
errors.append(f"Missing required field: {field}")
|
||||
# oneOf — at least one branch must pass
|
||||
if "oneOf" in spec:
|
||||
if not any(not _check(val, o, loc, path) for o in spec["oneOf"]):
|
||||
types = [o.get("type", "?") for o in spec["oneOf"]]
|
||||
errors.append(f"{loc}: '{path}' expected one of [{', '.join(types)}], got {type(val).__name__}")
|
||||
return errors
|
||||
|
||||
# Type checks
|
||||
if "song" in entry and not isinstance(entry["song"], str):
|
||||
errors.append("'song' must be a string")
|
||||
if "song" in entry and isinstance(entry["song"], str) and len(entry["song"].strip()) == 0:
|
||||
errors.append("'song' must not be empty")
|
||||
|
||||
if "artist" in entry and not isinstance(entry["artist"], str):
|
||||
errors.append("'artist' must be a string")
|
||||
if "artist" in entry and isinstance(entry["artist"], str) and len(entry["artist"].strip()) == 0:
|
||||
errors.append("'artist' must not be empty")
|
||||
|
||||
if "beat" in entry and not isinstance(entry["beat"], int):
|
||||
errors.append("'beat' must be an integer")
|
||||
if "beat" in entry and isinstance(entry["beat"], int) and entry["beat"] < 1:
|
||||
errors.append("'beat' must be >= 1")
|
||||
|
||||
if "timestamp" in entry:
|
||||
import re
|
||||
if not re.match(r'^[0-9]+:[0-5][0-9]$', str(entry["timestamp"])):
|
||||
errors.append(f"'timestamp' must be M:SS or MM:SS format, got: {entry['timestamp']}")
|
||||
|
||||
if "lyric_line" in entry and not isinstance(entry["lyric_line"], str):
|
||||
errors.append("'lyric_line' must be a string")
|
||||
if "lyric_line" in entry and isinstance(entry["lyric_line"], str) and len(entry["lyric_line"].strip()) == 0:
|
||||
errors.append("'lyric_line' must not be empty")
|
||||
|
||||
# Scene validation
|
||||
if "scene" in entry:
|
||||
scene = entry["scene"]
|
||||
if not isinstance(scene, dict):
|
||||
errors.append("'scene' must be an object")
|
||||
t = spec.get("type")
|
||||
if t == "string":
|
||||
if not isinstance(val, str):
|
||||
errors.append(f"{loc}: '{path}' expected string, got {type(val).__name__}")
|
||||
elif spec.get("minLength") and len(val) < spec["minLength"]:
|
||||
errors.append(f"{loc}: '{path}' is empty (min {spec['minLength']} chars)")
|
||||
elif spec.get("pattern") and not re.match(spec["pattern"], val):
|
||||
errors.append(f"{loc}: '{path}'='{val}' doesn't match {spec['pattern']}")
|
||||
elif t == "number":
|
||||
if not isinstance(val, (int, float)) or isinstance(val, bool):
|
||||
errors.append(f"{loc}: '{path}' expected number, got {type(val).__name__}")
|
||||
elif "minimum" in spec and val < spec["minimum"]:
|
||||
errors.append(f"{loc}: '{path}'={val} below minimum {spec['minimum']}")
|
||||
elif t == "integer":
|
||||
if not isinstance(val, int) or isinstance(val, bool):
|
||||
errors.append(f"{loc}: '{path}' expected integer, got {type(val).__name__}")
|
||||
elif t == "array":
|
||||
if not isinstance(val, list):
|
||||
errors.append(f"{loc}: '{path}' expected array, got {type(val).__name__}")
|
||||
elif spec.get("minItems") and len(val) < spec["minItems"]:
|
||||
errors.append(f"{loc}: '{path}' has {len(val)} items, need >= {spec['minItems']}")
|
||||
else:
|
||||
for field in ["mood", "colors", "composition", "description"]:
|
||||
if field not in scene:
|
||||
errors.append(f"Missing required scene field: {field}")
|
||||
for j, item in enumerate(val):
|
||||
errors.extend(_check(item, spec.get("items", {}), loc, f"{path}[{j}]"))
|
||||
elif t == "object":
|
||||
if not isinstance(val, dict):
|
||||
errors.append(f"{loc}: '{path}' expected object, got {type(val).__name__}")
|
||||
return errors
|
||||
for nf in spec.get("required", []):
|
||||
if nf not in val:
|
||||
errors.append(f"{loc}: '{path}.{nf}' is missing")
|
||||
for nf, ns in spec.get("properties", {}).items():
|
||||
if nf in val:
|
||||
errors.extend(_check(val[nf], ns, loc, f"{path}.{nf}"))
|
||||
if spec.get("additionalProperties") is False:
|
||||
extra = set(val.keys()) - set(spec.get("properties", {}).keys())
|
||||
if extra:
|
||||
errors.append(f"{loc}: '{path}' has unexpected fields: {extra}")
|
||||
return errors
|
||||
|
||||
if "mood" in scene and not isinstance(scene["mood"], str):
|
||||
errors.append("'scene.mood' must be a string")
|
||||
if "mood" in scene and isinstance(scene["mood"], str) and len(scene["mood"].strip()) == 0:
|
||||
errors.append("'scene.mood' must not be empty")
|
||||
|
||||
if "colors" in scene:
|
||||
if not isinstance(scene["colors"], list):
|
||||
errors.append("'scene.colors' must be an array")
|
||||
elif len(scene["colors"]) == 0:
|
||||
errors.append("'scene.colors' must have at least 1 element")
|
||||
else:
|
||||
for i, c in enumerate(scene["colors"]):
|
||||
if not isinstance(c, str) or len(c.strip()) == 0:
|
||||
errors.append(f"'scene.colors[{i}]' must be a non-empty string")
|
||||
def validate_entry(entry, schema, line_num, file_name):
|
||||
"""Validate one JSONL entry."""
|
||||
errors = []
|
||||
loc = f"{file_name}:{line_num}"
|
||||
props = schema.get("properties", {})
|
||||
|
||||
if "composition" in scene and not isinstance(scene["composition"], str):
|
||||
errors.append("'scene.composition' must be a string")
|
||||
if "composition" in scene and isinstance(scene["composition"], str) and len(scene["composition"].strip()) == 0:
|
||||
errors.append("'scene.composition' must not be empty")
|
||||
for field in schema.get("required", []):
|
||||
if field not in entry:
|
||||
errors.append(f"{loc}: missing required field '{field}'")
|
||||
|
||||
if "description" in scene and not isinstance(scene["description"], str):
|
||||
errors.append("'scene.description' must be a string")
|
||||
if "description" in scene and isinstance(scene["description"], str) and len(scene["description"]) < 10:
|
||||
errors.append(f"'scene.description' too short ({len(scene['description'])} chars, min 10)")
|
||||
for field, spec in props.items():
|
||||
if field in entry:
|
||||
errors.extend(_check(entry[field], spec, loc, field))
|
||||
|
||||
extra = set(entry.keys()) - set(props.keys())
|
||||
if extra:
|
||||
errors.append(f"{loc}: unexpected fields: {extra}")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def validate_file(filepath, schema):
|
||||
"""Validate all entries in a JSONL file."""
|
||||
def validate_file(path, schema):
|
||||
errors = []
|
||||
total = 0
|
||||
|
||||
with open(filepath, "r", encoding="utf-8") as f:
|
||||
for line_num, line in enumerate(f, 1):
|
||||
count = 0
|
||||
with open(path) as f:
|
||||
for n, line in enumerate(f, 1):
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
total += 1
|
||||
|
||||
count += 1
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
except json.JSONDecodeError as e:
|
||||
errors.append(f" Line {line_num}: Invalid JSON — {e}")
|
||||
errors.append(f"{path}:{n}: invalid JSON: {e}")
|
||||
continue
|
||||
|
||||
if HAS_JSONSCHEMA:
|
||||
entry_errors = list(jsonschema.validate(entry, schema) or [])
|
||||
# jsonschema raises on error, so this path won't see errors
|
||||
else:
|
||||
entry_errors = validate_entry_manual(entry, line_num)
|
||||
|
||||
for err in entry_errors:
|
||||
errors.append(f" Line {line_num}: {err}")
|
||||
|
||||
return total, errors
|
||||
if not isinstance(entry, dict):
|
||||
errors.append(f"{path}:{n}: not a JSON object")
|
||||
continue
|
||||
errors.extend(validate_entry(entry, schema, n, path))
|
||||
return count, errors
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python3 scripts/validate-scene-data.py <file.jsonl> [file2.jsonl ...]")
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument("files", nargs="+")
|
||||
p.add_argument("--schema", default=None)
|
||||
args = p.parse_args()
|
||||
|
||||
schema_path = args.schema
|
||||
if not schema_path:
|
||||
for c in [Path(args.files[0]).parent / "schema.json", Path("training-data/schema.json")]:
|
||||
if c.exists():
|
||||
schema_path = str(c)
|
||||
break
|
||||
if not schema_path or not Path(schema_path).exists():
|
||||
print("ERROR: schema not found. Use --schema path", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
schema = load_schema()
|
||||
total_entries = 0
|
||||
total_errors = 0
|
||||
files_checked = 0
|
||||
schema = load_schema(schema_path)
|
||||
tf = tl = te = 0
|
||||
|
||||
for filepath in sys.argv[1:]:
|
||||
if not os.path.exists(filepath):
|
||||
print(f"SKIP: {filepath} (not found)")
|
||||
for fp in args.files:
|
||||
if not Path(fp).exists():
|
||||
print(f"SKIP: {fp}")
|
||||
continue
|
||||
|
||||
files_checked += 1
|
||||
count, errors = validate_file(filepath, schema)
|
||||
total_entries += count
|
||||
|
||||
if errors:
|
||||
total_errors += len(errors)
|
||||
print(f"FAIL: {filepath} — {len(errors)} error(s) in {count} entries:")
|
||||
for err in errors[:20]: # Limit output
|
||||
print(err)
|
||||
if len(errors) > 20:
|
||||
print(f" ... and {len(errors) - 20} more errors")
|
||||
tf += 1
|
||||
n, errs = validate_file(fp, schema)
|
||||
tl += n
|
||||
if errs:
|
||||
te += len(errs)
|
||||
print(f"\n❌ {fp}: {len(errs)} errors in {n} entries")
|
||||
for e in errs[:10]:
|
||||
print(f" {e}")
|
||||
if len(errs) > 10:
|
||||
print(f" ... +{len(errs)-10} more")
|
||||
else:
|
||||
print(f"PASS: {filepath} — {count} entries valid")
|
||||
print(f"✅ {fp}: {n} entries valid")
|
||||
|
||||
print(f"\nSummary: {files_checked} files, {total_entries} entries, {total_errors} errors")
|
||||
|
||||
if total_errors > 0:
|
||||
print("VALIDATION FAILED")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("ALL VALID")
|
||||
sys.exit(0)
|
||||
print(f"\n--- Summary ---")
|
||||
print(f"Files: {tf} Entries: {tl} Errors: {te}")
|
||||
sys.exit(1 if te else 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user