Compare commits

..

1 Commits

Author SHA1 Message Date
c7002b5218 feat: config drift detection across fleet nodes (#686)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 37s
Smoke Test / smoke (pull_request) Failing after 26s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 16s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m26s
Validate Config / Shell Script Lint (pull_request) Failing after 37s
Validate Config / Cron Syntax Check (pull_request) Successful in 6s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 8s
Validate Config / Playbook Schema Validation (pull_request) Successful in 25s
PR Checklist / pr-checklist (pull_request) Failing after 11m24s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
2026-04-15 03:13:54 +00:00
3 changed files with 480 additions and 351 deletions

View File

@@ -1,160 +0,0 @@
#!/usr/bin/env python3
"""
auto-scene-descriptions.py — Generate scene descriptions from image/video assets.
Scans an assets directory, uses vision model to describe each asset,
outputs training pairs in timmy-config format.
Usage:
python3 scripts/auto-scene-descriptions.py --scan ~/assets/
python3 scripts/auto-scene-descriptions.py --scan ~/assets/ --output training-data/scene-from-media.jsonl
python3 scripts/auto-scene-descriptions.py --scan ~/assets/ --dry-run
"""
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
VIDEO_EXTS = {".mp4", ".webm", ".mov", ".avi", ".mkv"}
SUPPORTED_EXTS = IMAGE_EXTS | VIDEO_EXTS
def scan_assets(directory: str) -> list[Path]:
"""Find all image/video assets in a directory."""
assets = []
for root, dirs, files in os.walk(directory):
dirs[:] = [d for d in dirs if d not in {".git", "node_modules", "__pycache__"}]
for f in sorted(files):
ext = Path(f).suffix.lower()
if ext in SUPPORTED_EXTS:
assets.append(Path(root) / f)
return assets
def extract_video_frame(video_path: Path) -> Path | None:
"""Extract a representative frame from a video using ffmpeg."""
frame_path = video_path.with_suffix(".frame.jpg")
try:
subprocess.run(
["ffmpeg", "-i", str(video_path), "-vframes", "1", "-ss", "5", "-y", str(frame_path)],
capture_output=True, timeout=30
)
if frame_path.exists():
return frame_path
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return None
def describe_with_ollama(image_path: Path) -> str:
"""Generate a scene description using local Ollama vision model."""
try:
result = subprocess.run(
["ollama", "run", "llava", f"Describe this image as a visual scene for a film. Include mood, colors, composition, and camera angle. Be specific and vivid in 2-3 sentences. Image: {image_path}"],
capture_output=True, text=True, timeout=60
)
if result.returncode == 0:
return result.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return ""
def describe_with_fallback(image_path: Path) -> str:
"""Generate a basic scene description from filename/path."""
name = image_path.stem.replace("_", " ").replace("-", " ")
parent = image_path.parent.name.replace("_", " ").replace("-", " ")
return f"A scene depicting {name} in a {parent} setting. Visual composition inferred from asset location and naming conventions."
def build_training_pair(asset_path: Path, description: str, asset_index: int) -> dict:
"""Build a training pair in timmy-config scene description format."""
name = asset_path.stem
parent = asset_path.parent.name
# Infer mood from path/name keywords
mood_keywords = {
"dark": "melancholic", "light": "hopeful", "warm": "nostalgic",
"cold": "isolated", "bright": "energetic", "sunset": "bittersweet",
"night": "mysterious", "morning": "refreshing", "rain": "contemplative",
}
mood = "neutral"
name_lower = name.lower() + parent.lower()
for keyword, m in mood_keywords.items():
if keyword in name_lower:
mood = m
break
return {
"song": f"asset-{asset_index:04d}",
"beat": 1,
"lyric_line": f"[Visual asset: {asset_path.name}]",
"scene": {
"mood": mood,
"colors": ["inferred"],
"composition": "frame",
"camera": "static",
"description": description,
"source": "auto-generated",
"asset_path": str(asset_path),
}
}
def main():
parser = argparse.ArgumentParser(description="Auto-generate scene descriptions from assets")
parser.add_argument("--scan", required=True, help="Assets directory to scan")
parser.add_argument("--output", default=None, help="Output JSONL path")
parser.add_argument("--dry-run", action="store_true", help="Show what would be generated")
parser.add_argument("--vision", action="store_true", help="Use Ollama vision model (slow)")
args = parser.parse_args()
assets = scan_assets(args.scan)
print(f"Found {len(assets)} assets in {args.scan}", file=sys.stderr)
if not assets:
print("No assets found.", file=sys.stderr)
sys.exit(0)
pairs = []
for i, asset in enumerate(assets):
if args.vision:
# For videos, extract frame first
image_for_vision = asset
if asset.suffix.lower() in VIDEO_EXTS:
frame = extract_video_frame(asset)
if frame:
image_for_vision = frame
description = describe_with_ollama(image_for_vision)
if not description:
description = describe_with_fallback(asset)
else:
description = describe_with_fallback(asset)
pair = build_training_pair(asset, description, i)
pairs.append(pair)
if args.dry_run and i < 5:
print(f" {asset.name}: {description[:80]}")
if args.dry_run:
print(f"\nWould generate {len(pairs)} training pairs")
return
# Write output
output_path = args.output or os.path.expanduser("~/.hermes/training-data/scene-from-media.jsonl")
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w") as f:
for p in pairs:
f.write(json.dumps(p, ensure_ascii=False) + "\n")
print(f"Generated {len(pairs)} scene descriptions → {output_path}", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,480 @@
#!/usr/bin/env python3
"""
config_drift_detector.py — Detect config drift across fleet nodes.
Collects config from all wizard nodes via SSH, compares against
canonical timmy-config golden state, and reports differences.
Usage:
python3 scripts/config_drift_detector.py # Report only
python3 scripts/config_drift_detector.py --auto-sync # Auto-fix drift with golden state
python3 scripts/config_drift_detector.py --node allegro # Check single node
python3 scripts/config_drift_detector.py --json # JSON output for automation
Exit codes:
0 — no drift detected
1 — drift detected
2 — error (SSH failure, missing deps, etc.)
"""
import argparse
import json
import os
import subprocess
import sys
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import yaml
# ── Constants ─────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
ANSIBLE_INVENTORY = REPO_ROOT / "ansible" / "inventory" / "hosts.yml"
GOLDEN_STATE_PLAYBOOK = REPO_ROOT / "ansible" / "playbooks" / "golden_state.yml"
# Config files to check on each node
CONFIG_PATHS = [
".hermes/config.yaml",
"wizards/{name}/config.yaml",
]
# Keys that define golden state (from ansible inventory vars)
GOLDEN_KEYS = [
"providers",
"provider",
"model",
"base_url",
"api_key_env",
"banned_providers",
"banned_models_patterns",
]
# ── Data Models ───────────────────────────────────────────────────────────────
@dataclass
class NodeConfig:
name: str
host: str
configs: dict[str, Any] = field(default_factory=dict)
errors: list[str] = field(default_factory=list)
reachable: bool = True
@dataclass
class DriftResult:
node: str
file_path: str
diff_type: str # "missing", "value_mismatch", "key_missing", "extra_key"
key: str
canonical_value: Any = None
node_value: Any = None
severity: str = "warning" # "info", "warning", "critical"
# ── Inventory Parsing ─────────────────────────────────────────────────────────
def load_inventory() -> dict:
"""Load Ansible inventory and extract wizard node definitions."""
if not ANSIBLE_INVENTORY.exists():
print(f"ERROR: Inventory not found at {ANSIBLE_INVENTORY}", file=sys.stderr)
sys.exit(2)
with open(ANSIBLE_INVENTORY) as f:
inventory = yaml.safe_load(f)
wizards = inventory.get("all", {}).get("children", {}).get("wizards", {}).get("hosts", {})
global_vars = inventory.get("all", {}).get("vars", {})
nodes = {}
for name, config in wizards.items():
nodes[name] = {
"host": config.get("ansible_host", "localhost"),
"user": config.get("ansible_user", ""),
"wizard_name": config.get("wizard_name", name),
"hermes_home": config.get("hermes_home", "~/.hermes"),
"wizard_home": config.get("wizard_home", f"~/wizards/{name}"),
"machine_type": config.get("machine_type", "unknown"),
}
return nodes, global_vars
def load_golden_state(inventory_vars: dict) -> dict:
"""Extract golden state from inventory vars."""
golden = {
"providers": inventory_vars.get("golden_state_providers", []),
"banned_providers": inventory_vars.get("banned_providers", []),
"banned_models_patterns": inventory_vars.get("banned_models_patterns", []),
}
return golden
# ── SSH Collection ────────────────────────────────────────────────────────────
def ssh_collect(node_name: str, node_info: dict, timeout: int = 15) -> NodeConfig:
"""SSH into a node and collect config files."""
host = node_info["host"]
user = node_info.get("user", "")
hermes_home = node_info.get("hermes_home", "~/.hermes")
wizard_home = node_info.get("wizard_home", f"~/wizards/{node_name}")
result = NodeConfig(name=node_name, host=host)
# Build SSH target
if host in ("localhost", "127.0.0.1"):
ssh_target = None # local
else:
ssh_target = f"{user}@{host}" if user else host
# Collect each config path
for path_template in CONFIG_PATHS:
# Resolve path template
remote_path = path_template.replace("{name}", node_name)
if not remote_path.startswith("/"):
# Resolve relative to home
if "wizards/" in remote_path:
full_path = f"{wizard_home}/config.yaml"
else:
full_path = f"{hermes_home}/config.yaml" if ".hermes" in remote_path else f"~/{remote_path}"
else:
full_path = remote_path
config_content = _remote_cat(ssh_target, full_path, timeout)
if config_content is not None:
try:
parsed = yaml.safe_load(config_content)
if parsed:
result.configs[full_path] = parsed
except yaml.YAMLError as e:
result.errors.append(f"YAML parse error in {full_path}: {e}")
# Don't flag missing files as errors — some paths may not exist on all nodes
# Also collect banned provider scan
banned_check = _remote_grep(
ssh_target,
hermes_home,
r"anthropic|claude-sonnet|claude-opus|claude-haiku",
timeout
)
if banned_check:
result.configs["__banned_scan__"] = banned_check
return result
def _remote_cat(ssh_target: str | None, path: str, timeout: int) -> str | None:
"""Cat a file remotely (or locally)."""
if ssh_target is None:
cmd = ["cat", path]
else:
cmd = ["ssh", "-o", "ConnectTimeout=5", "-o", "StrictHostKeyChecking=no",
ssh_target, f"cat {path}"]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if proc.returncode == 0:
return proc.stdout
except subprocess.TimeoutExpired:
pass
except FileNotFoundError:
pass
return None
def _remote_grep(ssh_target: str | None, base_path: str, pattern: str, timeout: int) -> dict:
"""Grep for banned patterns in config files."""
if ssh_target is None:
cmd = ["grep", "-rn", "-i", pattern, base_path, "--include=*.yaml", "--include=*.yml"]
else:
cmd = ["ssh", "-o", "ConnectTimeout=5", "-o", "StrictHostKeyChecking=no",
ssh_target, f"grep -rn -i '{pattern}' {base_path} --include='*.yaml' --include='*.yml' 2>/dev/null || true"]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if proc.stdout.strip():
lines = proc.stdout.strip().split("\n")
return {"matches": lines, "count": len(lines)}
except subprocess.TimeoutExpired:
pass
return {}
# ── Drift Detection ───────────────────────────────────────────────────────────
def detect_drift(nodes: list[NodeConfig], golden: dict) -> list[DriftResult]:
"""Compare each node's config against golden state."""
results = []
for node in nodes:
if not node.reachable:
continue
# Check for banned providers
banned_scan = node.configs.get("__banned_scan__", {})
if banned_scan.get("count", 0) > 0:
for match in banned_scan.get("matches", []):
results.append(DriftResult(
node=node.name,
file_path="(config files)",
diff_type="banned_provider_found",
key="banned_provider_reference",
node_value=match,
severity="critical"
))
# Check each config file
for path, config in node.configs.items():
if path == "__banned_scan__":
continue
# Check provider chain
if isinstance(config, dict):
node_providers = _extract_provider_chain(config)
golden_providers = golden.get("providers", [])
if node_providers and golden_providers:
# Compare provider names in order
node_names = [p.get("name", "") for p in node_providers]
golden_names = [p.get("name", "") for p in golden_providers]
if node_names != golden_names:
results.append(DriftResult(
node=node.name,
file_path=path,
diff_type="value_mismatch",
key="provider_chain",
canonical_value=golden_names,
node_value=node_names,
severity="critical"
))
# Check for banned providers in node config
for banned in golden.get("banned_providers", []):
for provider in node_providers:
prov_name = provider.get("name", "").lower()
prov_model = provider.get("model", "").lower()
if banned in prov_name or banned in prov_model:
results.append(DriftResult(
node=node.name,
file_path=path,
diff_type="banned_provider_found",
key=f"provider.{provider.get('name', 'unknown')}",
node_value=provider,
severity="critical"
))
# Check for missing critical keys
critical_keys = ["display", "providers", "tools", "delegation"]
for key in critical_keys:
if key not in config and key in str(config):
results.append(DriftResult(
node=node.name,
file_path=path,
diff_type="key_missing",
key=key,
canonical_value="(present in golden state)",
severity="warning"
))
return results
def _extract_provider_chain(config: dict) -> list[dict]:
"""Extract provider list from a config dict (handles multiple formats)."""
# Direct providers key
if "providers" in config:
providers = config["providers"]
if isinstance(providers, list):
return providers
# Nested in display or model config
for key in ["model", "inference", "llm"]:
if key in config and isinstance(config[key], dict):
if "providers" in config[key]:
return config[key]["providers"]
# Single provider format
if "provider" in config and "model" in config:
return [{"name": config["provider"], "model": config["model"]}]
return []
# ── Auto-Sync ─────────────────────────────────────────────────────────────────
def auto_sync(drifts: list[DriftResult], nodes: list[NodeConfig]) -> list[str]:
"""Auto-sync drifted nodes using golden state playbook."""
actions = []
drifted_nodes = set(d.node for d in drifts if d.severity == "critical")
if not drifted_nodes:
actions.append("No critical drift to sync.")
return actions
for node_name in drifted_nodes:
node_info = next((n for n in nodes if n.name == node_name), None)
if not node_info:
continue
actions.append(f"[{node_name}] Running golden state sync...")
# Run ansible-playbook for this node
cmd = [
"ansible-playbook",
str(GOLDEN_STATE_PLAYBOOK),
"-i", str(ANSIBLE_INVENTORY),
"-l", node_name,
"--tags", "golden",
]
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=120,
cwd=str(REPO_ROOT)
)
if proc.returncode == 0:
actions.append(f"[{node_name}] Sync completed successfully.")
else:
actions.append(f"[{node_name}] Sync FAILED: {proc.stderr[:200]}")
except subprocess.TimeoutExpired:
actions.append(f"[{node_name}] Sync timed out after 120s.")
except FileNotFoundError:
actions.append(f"[{node_name}] ansible-playbook not found. Install Ansible or run manually.")
return actions
# ── Reporting ─────────────────────────────────────────────────────────────────
def print_report(drifts: list[DriftResult], nodes: list[NodeConfig], golden: dict):
"""Print human-readable drift report."""
print("=" * 70)
print("CONFIG DRIFT DETECTION REPORT")
print("=" * 70)
print()
# Summary
reachable = sum(1 for n in nodes if n.reachable)
print(f"Nodes checked: {len(nodes)} (reachable: {reachable})")
print(f"Golden state providers: {''.join(p['name'] for p in golden.get('providers', []))}")
print(f"Banned providers: {', '.join(golden.get('banned_providers', []))}")
print()
if not drifts:
print("[OK] No config drift detected. All nodes match golden state.")
return
# Group by node
by_node: dict[str, list[DriftResult]] = {}
for d in drifts:
by_node.setdefault(d.node, []).append(d)
for node_name, node_drifts in sorted(by_node.items()):
print(f"--- {node_name} ---")
for d in node_drifts:
severity_icon = {"critical": "[!!]", "warning": "[!]", "info": "[i]"}.get(d.severity, "[?]")
print(f" {severity_icon} {d.diff_type}: {d.key}")
if d.canonical_value is not None:
print(f" canonical: {d.canonical_value}")
if d.node_value is not None:
print(f" actual: {d.node_value}")
print()
# Severity summary
critical = sum(1 for d in drifts if d.severity == "critical")
warning = sum(1 for d in drifts if d.severity == "warning")
print(f"Total: {len(drifts)} drift(s) — {critical} critical, {warning} warning")
def print_json_report(drifts: list[DriftResult], nodes: list[NodeConfig], golden: dict):
"""Print JSON report for automation."""
report = {
"nodes_checked": len(nodes),
"reachable": sum(1 for n in nodes if n.reachable),
"golden_providers": [p["name"] for p in golden.get("providers", [])],
"drift_count": len(drifts),
"critical_count": sum(1 for d in drifts if d.severity == "critical"),
"drifts": [
{
"node": d.node,
"file": d.file_path,
"type": d.diff_type,
"key": d.key,
"canonical": d.canonical_value,
"actual": d.node_value,
"severity": d.severity,
}
for d in drifts
],
}
print(json.dumps(report, indent=2, default=str))
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Detect config drift across fleet nodes")
parser.add_argument("--node", help="Check only this node")
parser.add_argument("--auto-sync", action="store_true", help="Auto-fix critical drift with golden state")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--timeout", type=int, default=15, help="SSH timeout per node (seconds)")
args = parser.parse_args()
# Load inventory
print("Loading inventory...", file=sys.stderr)
node_defs, global_vars = load_inventory()
golden = load_golden_state(global_vars)
# Filter to single node if requested
if args.node:
if args.node not in node_defs:
print(f"ERROR: Node '{args.node}' not in inventory. Available: {', '.join(node_defs.keys())}")
sys.exit(2)
node_defs = {args.node: node_defs[args.node]}
# Collect configs from each node
print(f"Collecting configs from {len(node_defs)} node(s)...", file=sys.stderr)
nodes = []
for name, info in node_defs.items():
print(f" {name} ({info['host']})...", file=sys.stderr, end=" ", flush=True)
node_config = ssh_collect(name, info, timeout=args.timeout)
if node_config.reachable:
print(f"OK ({len(node_config.configs)} files)", file=sys.stderr)
else:
print("UNREACHABLE", file=sys.stderr)
nodes.append(node_config)
# Detect drift
print("\nAnalyzing drift...", file=sys.stderr)
drifts = detect_drift(nodes, golden)
# Output
if args.json:
print_json_report(drifts, nodes, golden)
else:
print()
print_report(drifts, nodes, golden)
# Auto-sync if requested
if args.auto_sync and drifts:
print("\n--- AUTO-SYNC ---")
actions = auto_sync(drifts, nodes)
for a in actions:
print(a)
# Exit code
if any(d.severity == "critical" for d in drifts):
sys.exit(1)
elif drifts:
sys.exit(1)
else:
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,191 +0,0 @@
#!/usr/bin/env python3
"""
Auto-generate scene descriptions from image/video assets.
Scans a directory for media files, generates scene descriptions using
a local vision model (Ollama), and outputs training pairs in JSONL format.
Usage:
python scripts/generate_scene_descriptions.py --input ./assets --output training-data/scene-descriptions-auto.jsonl
python scripts/generate_scene_descriptions.py --input ./assets --model gemma3:12b --limit 50
python scripts/generate_scene_descriptions.py --dry-run # List files without generating
"""
import argparse
import json
import os
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Supported media extensions
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
VIDEO_EXTS = {".mp4", ".webm", ".mov", ".avi", ".mkv"}
ALL_EXTS = IMAGE_EXTS | VIDEO_EXTS
# Vision model prompt template
SCENE_PROMPT = """Describe this image for a visual scene database. Output JSON:
{
"mood": "one of: calm, energetic, dark, warm, cool, chaotic, serene, tense, joyful, melancholic",
"colors": ["dominant color 1", "dominant color 2", "dominant color 3"],
"composition": "one of: close-up, wide-shot, medium-shot, low-angle, high-angle, bird-eye, profile, over-shoulder",
"camera": "one of: static, slow-pan, tracking, handheld, crane, dolly, steady, locked-off",
"lighting": "one of: natural, artificial, mixed, dramatic, soft, harsh, backlit",
"description": "2-3 sentence visual description of the scene"
}
Be specific. Describe what you see, not what you imagine."""
def scan_media(input_dir: str) -> list[Path]:
"""Scan directory for media files."""
media_files = []
input_path = Path(input_dir)
if not input_path.exists():
print(f"Error: {input_dir} does not exist", file=sys.stderr)
return media_files
for ext in sorted(ALL_EXTS):
media_files.extend(input_path.rglob(f"*{ext}"))
media_files.extend(input_path.rglob(f"*{ext.upper()}"))
return sorted(set(media_files))
def extract_video_frame(video_path: Path, output_path: Path) -> bool:
"""Extract a representative frame from a video."""
try:
subprocess.run(
["ffmpeg", "-i", str(video_path), "-vframes", "1",
"-q:v", "2", str(output_path), "-y"],
capture_output=True, timeout=30,
)
return output_path.exists()
except Exception:
return False
def describe_image(image_path: Path, model: str = "gemma3:12b",
ollama_url: str = "http://localhost:11434") -> Optional[dict]:
"""Generate scene description using Ollama vision model."""
try:
import base64 as b64
with open(image_path, "rb") as f:
image_b64 = b64.b64encode(f.read()).decode()
import urllib.request
req = urllib.request.Request(
f"{ollama_url}/api/generate",
data=json.dumps({
"model": model,
"prompt": SCENE_PROMPT,
"images": [image_b64],
"stream": False,
"options": {"temperature": 0.3, "num_predict": 512}
}).encode(),
headers={"Content-Type": "application/json"},
)
resp = urllib.request.urlopen(req, timeout=120)
data = json.loads(resp.read())
response_text = data.get("response", "")
# Parse JSON from response
import re
json_match = re.search(r"\{[\s\S]*\}", response_text)
if json_match:
return json.loads(json_match.group())
return {"description": response_text[:500], "mood": "unknown",
"colors": [], "composition": "unknown", "camera": "unknown", "lighting": "unknown"}
except Exception as e:
print(f" Error describing {image_path.name}: {e}", file=sys.stderr)
return None
def generate_training_pairs(media_files: list[Path], model: str, ollama_url: str,
limit: int = 0, dry_run: bool = False) -> list[dict]:
"""Generate training pairs from media files."""
pairs = []
files = media_files[:limit] if limit > 0 else media_files
print(f"Processing {len(files)} files...", file=sys.stderr)
for i, media_path in enumerate(files):
print(f" [{i+1}/{len(files)}] {media_path.name}...", file=sys.stderr, end=" ")
if dry_run:
print("(dry run)", file=sys.stderr)
pairs.append({"source": str(media_path), "status": "dry-run"})
continue
is_video = media_path.suffix.lower() in VIDEO_EXTS
work_path = media_path
if is_video:
# Extract frame for video
frame_path = media_path.with_suffix(".frame.jpg")
if extract_video_frame(media_path, frame_path):
work_path = frame_path
else:
print("SKIP (frame extraction failed)", file=sys.stderr)
continue
description = describe_image(work_path, model, ollama_url)
if description:
pair = {
"source": str(media_path),
"media_type": "video" if is_video else "image",
"description": description,
"model": model,
"generated_at": datetime.now(timezone.utc).isoformat(),
}
pairs.append(pair)
print("OK", file=sys.stderr)
else:
print("FAIL", file=sys.stderr)
# Cleanup temp frame
if is_video and work_path != media_path:
try:
work_path.unlink()
except Exception:
pass
return pairs
def main():
parser = argparse.ArgumentParser(description="Auto-generate scene descriptions from media")
parser.add_argument("--input", "-i", required=True, help="Input directory with media files")
parser.add_argument("--output", "-o", default="training-data/scene-descriptions-auto.jsonl")
parser.add_argument("--model", "-m", default="gemma3:12b", help="Ollama model name")
parser.add_argument("--ollama-url", default="http://localhost:11434")
parser.add_argument("--limit", "-l", type=int, default=0, help="Max files to process (0=all)")
parser.add_argument("--dry-run", action="store_true", help="List files without generating")
args = parser.parse_args()
media_files = scan_media(args.input)
print(f"Found {len(media_files)} media files", file=sys.stderr)
if not media_files:
print("No media files found.", file=sys.stderr)
sys.exit(1)
pairs = generate_training_pairs(media_files, args.model, args.ollama_url,
args.limit, args.dry_run)
# Write output
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
for pair in pairs:
f.write(json.dumps(pair, ensure_ascii=False) + "\n")
print(f"\nWrote {len(pairs)} pairs to {output_path}", file=sys.stderr)
if __name__ == "__main__":
main()