Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
6b38c2b5aa fix: JSON schema + validation for scene description data #647
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 34s
Smoke Test / smoke (pull_request) Failing after 12s
Validate Config / YAML Lint (pull_request) Failing after 9s
Validate Config / JSON Validate (pull_request) Successful in 14s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m30s
Validate Config / Shell Script Lint (pull_request) Failing after 1m33s
Validate Config / Cron Syntax Check (pull_request) Successful in 18s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 21s
Validate Config / Playbook Schema Validation (pull_request) Successful in 23s
PR Checklist / pr-checklist (pull_request) Failing after 10m43s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
2026-04-14 21:16:31 -04:00
4 changed files with 269 additions and 523 deletions

View File

@@ -1,362 +0,0 @@
#!/usr/bin/env python3
"""PR Triage Automation -- Categorize, deduplicate, report, auto-merge (#659).
Enhancements over base implementation:
- Auto-merge for safe PRs (training data with passing tests)
- --all-repos flag for org-wide triage
- JSON output with structured data
- Age-based risk scoring
- Better duplicate detection (title similarity)
- Tests in tests/test_pr_triage.py
Usage:
python scripts/pr_triage.py --repo hermes-agent
python scripts/pr_triage.py --repo hermes-agent --json
python scripts/pr_triage.py --repo hermes-agent --auto-merge --dry-run
python scripts/pr_triage.py --all-repos --owner Timmy_Foundation
"""
import argparse
import json
import os
import re
import sys
import subprocess
from collections import Counter, defaultdict
from datetime import datetime, timezone
from difflib import SequenceMatcher
from urllib.request import Request, urlopen
from urllib.error import HTTPError
def _token():
t = os.environ.get("GITEA_TOKEN", "")
if not t:
p = os.path.expanduser("~/.config/gitea/token")
if os.path.exists(p):
t = open(p).read().strip()
return t
def _api(url, token, method="GET", data=None):
h = {"Authorization": "token " + token, "Accept": "application/json"}
body = json.dumps(data).encode() if data else None
if data:
h["Content-Type"] = "application/json"
req = Request(url, data=body, headers=h, method=method)
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError:
return None
def fetch_prs(base, token, owner, repo):
prs, page = [], 1
while True:
b = _api(f"{base}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50&page={page}", token)
if not b:
break
prs.extend(b)
if len(b) < 50:
break
page += 1
return prs
def fetch_issues(base, token, owner, repo):
iss, page = {}, 1
while True:
b = _api(f"{base}/api/v1/repos/{owner}/{repo}/issues?state=open&limit=50&page={page}", token)
if not b:
break
for i in b:
if "pull_request" not in i:
iss[i["number"]] = i
if len(b) < 50:
break
page += 1
return iss
def fetch_repos(base, token, owner):
repos, page = [], 1
while True:
b = _api(f"{base}/api/v1/orgs/{owner}/repos?limit=50&page={page}", token)
if not b:
break
repos.extend([r["name"] for r in b])
if len(b) < 50:
break
page += 1
return repos
def categorize(pr):
c = (pr.get("title", "") + " " + pr.get("body", "") + " " +
" ".join(l.get("name", "") for l in pr.get("labels", []))).lower()
for kw, cat in [
("training data", "training-data"), ("dpo", "training-data"), ("grpo", "training-data"),
("fix:", "bug-fix"), ("bug", "bug-fix"), ("hotfix", "bug-fix"),
("feat:", "feature"), ("feature", "feature"), ("enhancement", "feature"),
("refactor", "maintenance"), ("cleanup", "maintenance"), ("chore:", "maintenance"),
("doc", "documentation"), ("test", "testing"), ("ci", "infrastructure"),
("infra", "infrastructure"), ("deploy", "infrastructure"),
]:
if kw in c:
return cat
return "other"
def refs(pr):
return [int(m) for m in re.findall(r"#(\d+)", pr.get("title", "") + " " + pr.get("body", ""))]
def find_duplicates(prs):
by_ref = defaultdict(list)
for p in prs:
for r in refs(p):
by_ref[r].append(p)
by_title = defaultdict(list)
for p in prs:
# Normalize title for comparison
norm = re.sub(r"^(fix|feat|chore|docs|test|refactor)[\(:].*?[\):]\s*", "", p.get("title", "").lower())
norm = re.sub(r"#\d+", "", norm).strip()
by_title[norm].append(p)
dup_groups = []
seen = set()
# Ref-based duplicates
for r, group in by_ref.items():
if len(group) > 1:
key = tuple(sorted(p["number"] for p in group))
if key not in seen:
seen.add(key)
dup_groups.append({"type": "ref", "ref": r, "prs": group})
# Title-similarity duplicates (threshold 0.85)
for i, p1 in enumerate(prs):
for p2 in prs[i + 1:]:
key = tuple(sorted([p1["number"], p2["number"]]))
if key in seen:
continue
sim = SequenceMatcher(None, p1.get("title", "").lower(), p2.get("title", "").lower()).ratio()
if sim > 0.85:
seen.add(key)
dup_groups.append({"type": "similarity", "similarity": round(sim, 2), "prs": [p1, p2]})
return dup_groups
def health(pr, issues):
r = refs(pr)
created = datetime.fromisoformat(pr["created_at"].replace("Z", "+00:00"))
updated = datetime.fromisoformat(pr["updated_at"].replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
age_days = (now - created).days
stale_days = (now - updated).days
# Risk score: age + staleness + no refs + not mergeable
risk = 0
risk += min(age_days, 30) # max 30 for age
risk += min(stale_days * 2, 40) # max 40 for staleness
risk += 10 if not r else 0 # no issue refs
risk += 15 if pr.get("mergeable") is False else 0 # conflicts
risk = min(risk, 100)
return {
"pr": pr["number"],
"title": pr["title"],
"head": pr["head"]["ref"],
"category": categorize(pr),
"refs": r,
"open_issues": [x for x in r if x in issues],
"closed_issues": [x for x in r if x not in issues],
"age_days": age_days,
"stale_days": stale_days,
"risk_score": risk,
"mergeable": pr.get("mergeable"),
"author": pr.get("user", {}).get("login", ""),
"labels": [l.get("name", "") for l in pr.get("labels", [])],
}
def is_safe_to_merge(h):
"""Determine if a PR is safe to auto-merge."""
if h["category"] != "training-data":
return False, "not training-data"
if h["mergeable"] is False:
return False, "has conflicts"
if h["mergeable"] is None:
return False, "mergeable status unknown"
if h["stale_days"] > 30:
return False, f"too stale ({h['stale_days']}d)"
if h["risk_score"] > 50:
return False, f"risk too high ({h['risk_score']})"
return True, "safe"
def auto_merge(base, token, owner, repo, pr_num, dry_run=True):
"""Attempt to merge a PR."""
if dry_run:
return {"merged": False, "dry_run": True, "pr": pr_num}
url = f"{base}/api/v1/repos/{owner}/{repo}/pulls/{pr_num}/merge"
result = _api(url, token, method="POST", data={
"MergeTitleField": "auto",
"MergeMessageField": "auto",
"Do": "merge",
})
return {"merged": result is not None, "pr": pr_num, "result": result}
def report(repo, checks, dups):
lines = [
f"# PR Triage -- {repo}",
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"Open PRs: {len(checks)}",
"",
"## Summary",
"",
]
cats = Counter(h["category"] for h in checks)
lines.append("| Category | Count |")
lines.append("|----------|-------|")
for c, n in cats.most_common():
lines.append(f"| {c} | {n} |")
stale = [h for h in checks if h["stale_days"] > 7]
high_risk = [h for h in checks if h["risk_score"] > 50]
safe_merge = [h for h in checks if is_safe_to_merge(h)[0]]
lines.extend([
"",
f"Stale (>7d): {len(stale)}",
f"High risk (>50): {len(high_risk)}",
f"Safe to merge: {len(safe_merge)}",
f"Duplicate groups: {len(dups)}",
"",
])
if safe_merge:
lines.append("## Safe to Auto-Merge")
for h in safe_merge:
ok, reason = is_safe_to_merge(h)
lines.append(f"- #{h['pr']}: {h['title'][:60]} ({reason})")
lines.append("")
if dups:
lines.append("## Duplicates")
for g in dups:
pr_nums = [str(p["number"]) for p in g["prs"]]
lines.append(f"[{g['type']}] PRs {', '.join('#' + n for n in pr_nums)}:")
for p in g["prs"]:
lines.append(f" - #{p['number']}: {p['title']}")
lines.append("")
if stale:
lines.append("## Stale (>7d)")
for h in sorted(stale, key=lambda x: x["stale_days"], reverse=True):
lines.append(f"- #{h['pr']}: {h['title'][:50]} -- {h['stale_days']}d (risk: {h['risk_score']})")
lines.append("")
lines.append("## All PRs")
lines.append("| # | Title | Category | Age | Stale | Risk | Merge |")
lines.append("|---|-------|----------|-----|-------|------|-------|")
for h in sorted(checks, key=lambda x: x["pr"]):
m = "Y" if h["mergeable"] else ("N" if h["mergeable"] is False else "?")
s = f"{h['stale_days']}d" if h["stale_days"] > 7 else "-"
lines.append(f"| {h['pr']} | {h['title'][:45]} | {h['category']} | {h['age_days']}d | {s} | {h['risk_score']} | {m} |")
return "\n".join(lines)
def main():
p = argparse.ArgumentParser(description="PR Triage Automation")
p.add_argument("--base-url", default="https://forge.alexanderwhitestone.com")
p.add_argument("--owner", default="Timmy_Foundation")
p.add_argument("--repo", default="")
p.add_argument("--all-repos", action="store_true", help="Triage all org repos")
p.add_argument("--json", action="store_true", dest="js")
p.add_argument("--output", default="")
p.add_argument("--auto-merge", action="store_true", help="Auto-merge safe PRs")
p.add_argument("--dry-run", action="store_true", help="Show what would be merged without merging")
a = p.parse_args()
token = _token()
if not token:
print("No token"); sys.exit(1)
if a.all_repos:
repos = fetch_repos(a.base_url, token, a.owner)
all_checks = []
all_dups = []
for repo in repos:
prs = fetch_prs(a.base_url, token, a.owner, repo)
issues = fetch_issues(a.base_url, token, a.owner, repo)
checks = [health(pr, issues) for pr in prs]
dups = find_duplicates(prs)
for c in checks:
c["repo"] = repo
all_checks.extend(checks)
all_dups.extend(dups)
if a.js:
print(json.dumps({"repos": repos, "prs": all_checks, "duplicates_count": len(all_dups)}, indent=2))
else:
print(f"Org-wide triage: {len(repos)} repos, {len(all_checks)} PRs, {len(all_dups)} duplicate groups")
cats = Counter(h["category"] for h in all_checks)
for c, n in cats.most_common():
print(f" {c}: {n}")
return
repo = a.repo
if not repo:
try:
remote = subprocess.check_output(["git", "remote", "get-url", "origin"], text=True).strip()
m = re.search(r"[/:](\w[\w-]*)/(\w[\w-]*?)(?:\.git)?$", remote)
if m:
a.owner, repo = m.group(1), m.group(2)
except Exception:
pass
if not repo:
print("No repo specified"); sys.exit(1)
print(f"Triaging {a.owner}/{repo}...", file=sys.stderr)
prs = fetch_prs(a.base_url, token, a.owner, repo)
issues = fetch_issues(a.base_url, token, a.owner, repo)
checks = [health(pr, issues) for pr in prs]
dups = find_duplicates(prs)
# Auto-merge
merge_results = []
if a.auto_merge or a.dry_run:
safe = [h for h in checks if is_safe_to_merge(h)[0]]
if safe:
print(f"Auto-merge: {len(safe)} safe PRs ({'dry-run' if a.dry_run else 'live'})", file=sys.stderr)
for h in safe:
result = auto_merge(a.base_url, token, a.owner, repo, h["pr"], dry_run=a.dry_run)
merge_results.append(result)
status = "WOULD MERGE" if a.dry_run else ("MERGED" if result["merged"] else "FAILED")
print(f" #{h['pr']}: {status}", file=sys.stderr)
if a.js:
out = {
"repo": repo, "prs": checks,
"duplicates": [{"type": g["type"], "prs": [p["number"] for p in g["prs"]]} for g in dups],
"merge_results": merge_results,
}
print(json.dumps(out, indent=2))
else:
r = report(repo, checks, dups)
print(r)
if a.output:
with open(a.output, "w") as f:
f.write(r)
print(f"\n{len(checks)} PRs, {len(dups)} duplicate groups, {len(merge_results)} merges",
file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,185 @@
#!/usr/bin/env python3
"""
validate-scene-data.py — Validate scene description JSONL files against schema.
Usage:
python3 scripts/validate-scene-data.py training-data/*.jsonl
python3 scripts/validate-scene-data.py training-data/scene-descriptions-rock.jsonl
Exit codes:
0 = all entries valid
1 = validation errors found
Refs: timmy-config#647
"""
import json
import sys
import os
from pathlib import Path
# Try jsonschema, fall back to manual validation
try:
import jsonschema
HAS_JSONSCHEMA = True
except ImportError:
HAS_JSONSCHEMA = False
def load_schema():
"""Load the JSON schema from training-data/schema.json."""
schema_path = Path(__file__).parent.parent / "training-data" / "schema.json"
if not schema_path.exists():
# Try relative to CWD
schema_path = Path("training-data/schema.json")
if not schema_path.exists():
print(f"ERROR: Schema not found at {schema_path}", file=sys.stderr)
sys.exit(2)
with open(schema_path) as f:
return json.load(f)
def validate_entry_manual(entry, index):
"""Manual validation without jsonschema dependency."""
errors = []
# Required top-level fields
for field in ["song", "artist", "beat", "timestamp", "lyric_line", "scene"]:
if field not in entry:
errors.append(f"Missing required field: {field}")
# Type checks
if "song" in entry and not isinstance(entry["song"], str):
errors.append("'song' must be a string")
if "song" in entry and isinstance(entry["song"], str) and len(entry["song"].strip()) == 0:
errors.append("'song' must not be empty")
if "artist" in entry and not isinstance(entry["artist"], str):
errors.append("'artist' must be a string")
if "artist" in entry and isinstance(entry["artist"], str) and len(entry["artist"].strip()) == 0:
errors.append("'artist' must not be empty")
if "beat" in entry and not isinstance(entry["beat"], int):
errors.append("'beat' must be an integer")
if "beat" in entry and isinstance(entry["beat"], int) and entry["beat"] < 1:
errors.append("'beat' must be >= 1")
if "timestamp" in entry:
import re
if not re.match(r'^[0-9]+:[0-5][0-9]$', str(entry["timestamp"])):
errors.append(f"'timestamp' must be M:SS or MM:SS format, got: {entry['timestamp']}")
if "lyric_line" in entry and not isinstance(entry["lyric_line"], str):
errors.append("'lyric_line' must be a string")
if "lyric_line" in entry and isinstance(entry["lyric_line"], str) and len(entry["lyric_line"].strip()) == 0:
errors.append("'lyric_line' must not be empty")
# Scene validation
if "scene" in entry:
scene = entry["scene"]
if not isinstance(scene, dict):
errors.append("'scene' must be an object")
else:
for field in ["mood", "colors", "composition", "description"]:
if field not in scene:
errors.append(f"Missing required scene field: {field}")
if "mood" in scene and not isinstance(scene["mood"], str):
errors.append("'scene.mood' must be a string")
if "mood" in scene and isinstance(scene["mood"], str) and len(scene["mood"].strip()) == 0:
errors.append("'scene.mood' must not be empty")
if "colors" in scene:
if not isinstance(scene["colors"], list):
errors.append("'scene.colors' must be an array")
elif len(scene["colors"]) == 0:
errors.append("'scene.colors' must have at least 1 element")
else:
for i, c in enumerate(scene["colors"]):
if not isinstance(c, str) or len(c.strip()) == 0:
errors.append(f"'scene.colors[{i}]' must be a non-empty string")
if "composition" in scene and not isinstance(scene["composition"], str):
errors.append("'scene.composition' must be a string")
if "composition" in scene and isinstance(scene["composition"], str) and len(scene["composition"].strip()) == 0:
errors.append("'scene.composition' must not be empty")
if "description" in scene and not isinstance(scene["description"], str):
errors.append("'scene.description' must be a string")
if "description" in scene and isinstance(scene["description"], str) and len(scene["description"]) < 10:
errors.append(f"'scene.description' too short ({len(scene['description'])} chars, min 10)")
return errors
def validate_file(filepath, schema):
"""Validate all entries in a JSONL file."""
errors = []
total = 0
with open(filepath, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
total += 1
try:
entry = json.loads(line)
except json.JSONDecodeError as e:
errors.append(f" Line {line_num}: Invalid JSON — {e}")
continue
if HAS_JSONSCHEMA:
entry_errors = list(jsonschema.validate(entry, schema) or [])
# jsonschema raises on error, so this path won't see errors
else:
entry_errors = validate_entry_manual(entry, line_num)
for err in entry_errors:
errors.append(f" Line {line_num}: {err}")
return total, errors
def main():
if len(sys.argv) < 2:
print("Usage: python3 scripts/validate-scene-data.py <file.jsonl> [file2.jsonl ...]")
sys.exit(2)
schema = load_schema()
total_entries = 0
total_errors = 0
files_checked = 0
for filepath in sys.argv[1:]:
if not os.path.exists(filepath):
print(f"SKIP: {filepath} (not found)")
continue
files_checked += 1
count, errors = validate_file(filepath, schema)
total_entries += count
if errors:
total_errors += len(errors)
print(f"FAIL: {filepath}{len(errors)} error(s) in {count} entries:")
for err in errors[:20]: # Limit output
print(err)
if len(errors) > 20:
print(f" ... and {len(errors) - 20} more errors")
else:
print(f"PASS: {filepath}{count} entries valid")
print(f"\nSummary: {files_checked} files, {total_entries} entries, {total_errors} errors")
if total_errors > 0:
print("VALIDATION FAILED")
sys.exit(1)
else:
print("ALL VALID")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,161 +0,0 @@
"""Tests for PR triage automation (#659)."""
from __future__ import annotations
import pytest
from datetime import datetime, timezone, timedelta
from scripts.pr_triage import categorize, refs, find_duplicates, health, is_safe_to_merge
class TestCategorize:
"""PR categorization from title/body/labels."""
def test_training_data(self):
pr = {"title": "Add DPO training data", "body": "", "labels": []}
assert categorize(pr) == "training-data"
def test_bug_fix(self):
pr = {"title": "fix: resolve crash on startup", "body": "", "labels": []}
assert categorize(pr) == "bug-fix"
def test_feature(self):
pr = {"title": "feat: add dark mode", "body": "", "labels": []}
assert categorize(pr) == "feature"
def test_maintenance(self):
pr = {"title": "refactor: simplify auth flow", "body": "", "labels": []}
assert categorize(pr) == "maintenance"
def test_other(self):
pr = {"title": "Update readme", "body": "", "labels": []}
assert categorize(pr) == "other"
class TestRefs:
"""Issue reference extraction."""
def test_extracts_from_title(self):
pr = {"title": "fix: resolve #123", "body": ""}
assert refs(pr) == [123]
def test_extracts_from_body(self):
pr = {"title": "Fix", "body": "Closes #456, refs #789"}
assert refs(pr) == [456, 789]
def test_no_refs(self):
pr = {"title": "Fix", "body": "No issue refs"}
assert refs(pr) == []
def test_multiple_refs(self):
pr = {"title": "#1 and #2", "body": "Also #3"}
assert refs(pr) == [1, 2, 3]
class TestFindDuplicates:
"""Duplicate PR detection."""
def test_ref_based_duplicates(self):
prs = [
{"number": 1, "title": "Fix #100", "body": "Closes #100"},
{"number": 2, "title": "Fix #100 too", "body": "Closes #100"},
]
dups = find_duplicates(prs)
assert len(dups) == 1
assert dups[0]["type"] == "ref"
def test_title_similarity_duplicates(self):
prs = [
{"number": 1, "title": "feat: add dark mode support", "body": ""},
{"number": 2, "title": "feat: add dark mode support", "body": "different body"},
]
dups = find_duplicates(prs)
assert len(dups) >= 1
assert any(d["type"] == "similarity" for d in dups)
def test_no_duplicates(self):
prs = [
{"number": 1, "title": "Fix auth bug", "body": "Closes #100"},
{"number": 2, "title": "Add dark mode", "body": "Closes #200"},
]
dups = find_duplicates(prs)
assert len(dups) == 0
class TestHealth:
"""PR health assessment."""
def _make_pr(self, **overrides):
now = datetime.now(timezone.utc).isoformat()
pr = {
"number": 1,
"title": "test",
"body": "Closes #100",
"created_at": now,
"updated_at": now,
"head": {"ref": "fix/test"},
"mergeable": True,
"user": {"login": "agent"},
"labels": [],
}
pr.update(overrides)
return pr
def test_basic_health(self):
pr = self._make_pr()
h = health(pr, {100: {"number": 100}})
assert h["pr"] == 1
assert h["refs"] == [100]
assert h["open_issues"] == [100]
assert h["age_days"] == 0
def test_stale_detection(self):
old = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
pr = self._make_pr(created_at=old, updated_at=old)
h = health(pr, {})
assert h["stale_days"] >= 29
assert h["risk_score"] > 30
class TestIsSafeToMerge:
"""Auto-merge safety checks."""
def _make_health(self, **overrides):
h = {
"pr": 1, "title": "test", "head": "fix/test",
"category": "training-data", "refs": [100],
"open_issues": [100], "closed_issues": [],
"age_days": 1, "stale_days": 1,
"risk_score": 10, "mergeable": True,
"author": "agent", "labels": [],
}
h.update(overrides)
return h
def test_safe_training_data(self):
h = self._make_health()
ok, reason = is_safe_to_merge(h)
assert ok
def test_unsafe_not_training(self):
h = self._make_health(category="bug-fix")
ok, reason = is_safe_to_merge(h)
assert not ok
assert "not training-data" in reason
def test_unsafe_conflicts(self):
h = self._make_health(mergeable=False)
ok, reason = is_safe_to_merge(h)
assert not ok
assert "conflicts" in reason
def test_unsafe_too_stale(self):
h = self._make_health(stale_days=31)
ok, reason = is_safe_to_merge(h)
assert not ok
assert "stale" in reason
def test_unsafe_high_risk(self):
h = self._make_health(risk_score=60)
ok, reason = is_safe_to_merge(h)
assert not ok
assert "risk" in reason

84
training-data/schema.json Normal file
View File

@@ -0,0 +1,84 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Scene Description Training Entry",
"description": "Schema for lyrics-to-visual-scene description training data entries.",
"type": "object",
"required": ["song", "artist", "beat", "timestamp", "lyric_line", "scene"],
"properties": {
"song": {
"type": "string",
"minLength": 1,
"description": "Song title"
},
"artist": {
"type": "string",
"minLength": 1,
"description": "Artist name"
},
"mood_arc": {
"type": "string",
"description": "Overall mood progression of the song (optional)"
},
"beat": {
"type": "integer",
"minimum": 1,
"description": "Beat number within the song (1-indexed)"
},
"timestamp": {
"type": "string",
"pattern": "^[0-9]+:[0-5][0-9]$",
"description": "Timestamp in M:SS or MM:SS format"
},
"duration": {
"type": "string",
"description": "Duration of the beat (e.g. '30s', '15s')"
},
"duration_seconds": {
"type": "integer",
"minimum": 1,
"description": "Duration in seconds (integer alternative)"
},
"lyric_line": {
"type": "string",
"minLength": 1,
"description": "The lyric line for this beat"
},
"scene": {
"type": "object",
"required": ["mood", "colors", "composition", "description"],
"properties": {
"mood": {
"type": "string",
"minLength": 1,
"description": "Emotional mood of the scene"
},
"colors": {
"type": "array",
"items": { "type": "string", "minLength": 1 },
"minItems": 1,
"description": "Color palette for the scene"
},
"composition": {
"type": "string",
"minLength": 1,
"description": "Shot composition (e.g. 'wide shot', 'close-up', 'low angle')"
},
"camera": {
"type": "string",
"description": "Camera movement (e.g. 'static', 'slow zoom', 'tracking')"
},
"camera_movement": {
"type": "string",
"description": "Alternative field name for camera movement"
},
"description": {
"type": "string",
"minLength": 10,
"description": "Full scene description text"
}
},
"additionalProperties": true
}
},
"additionalProperties": true
}