Compare commits
1 Commits
fix/689
...
fix/config
| Author | SHA1 | Date | |
|---|---|---|---|
| c7002b5218 |
@@ -1,160 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
auto-scene-descriptions.py — Generate scene descriptions from image/video assets.
|
||||
|
||||
Scans an assets directory, uses vision model to describe each asset,
|
||||
outputs training pairs in timmy-config format.
|
||||
|
||||
Usage:
|
||||
python3 scripts/auto-scene-descriptions.py --scan ~/assets/
|
||||
python3 scripts/auto-scene-descriptions.py --scan ~/assets/ --output training-data/scene-from-media.jsonl
|
||||
python3 scripts/auto-scene-descriptions.py --scan ~/assets/ --dry-run
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
|
||||
VIDEO_EXTS = {".mp4", ".webm", ".mov", ".avi", ".mkv"}
|
||||
SUPPORTED_EXTS = IMAGE_EXTS | VIDEO_EXTS
|
||||
|
||||
|
||||
def scan_assets(directory: str) -> list[Path]:
|
||||
"""Find all image/video assets in a directory."""
|
||||
assets = []
|
||||
for root, dirs, files in os.walk(directory):
|
||||
dirs[:] = [d for d in dirs if d not in {".git", "node_modules", "__pycache__"}]
|
||||
for f in sorted(files):
|
||||
ext = Path(f).suffix.lower()
|
||||
if ext in SUPPORTED_EXTS:
|
||||
assets.append(Path(root) / f)
|
||||
return assets
|
||||
|
||||
|
||||
def extract_video_frame(video_path: Path) -> Path | None:
|
||||
"""Extract a representative frame from a video using ffmpeg."""
|
||||
frame_path = video_path.with_suffix(".frame.jpg")
|
||||
try:
|
||||
subprocess.run(
|
||||
["ffmpeg", "-i", str(video_path), "-vframes", "1", "-ss", "5", "-y", str(frame_path)],
|
||||
capture_output=True, timeout=30
|
||||
)
|
||||
if frame_path.exists():
|
||||
return frame_path
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def describe_with_ollama(image_path: Path) -> str:
|
||||
"""Generate a scene description using local Ollama vision model."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ollama", "run", "llava", f"Describe this image as a visual scene for a film. Include mood, colors, composition, and camera angle. Be specific and vivid in 2-3 sentences. Image: {image_path}"],
|
||||
capture_output=True, text=True, timeout=60
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return result.stdout.strip()
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def describe_with_fallback(image_path: Path) -> str:
|
||||
"""Generate a basic scene description from filename/path."""
|
||||
name = image_path.stem.replace("_", " ").replace("-", " ")
|
||||
parent = image_path.parent.name.replace("_", " ").replace("-", " ")
|
||||
return f"A scene depicting {name} in a {parent} setting. Visual composition inferred from asset location and naming conventions."
|
||||
|
||||
|
||||
def build_training_pair(asset_path: Path, description: str, asset_index: int) -> dict:
|
||||
"""Build a training pair in timmy-config scene description format."""
|
||||
name = asset_path.stem
|
||||
parent = asset_path.parent.name
|
||||
|
||||
# Infer mood from path/name keywords
|
||||
mood_keywords = {
|
||||
"dark": "melancholic", "light": "hopeful", "warm": "nostalgic",
|
||||
"cold": "isolated", "bright": "energetic", "sunset": "bittersweet",
|
||||
"night": "mysterious", "morning": "refreshing", "rain": "contemplative",
|
||||
}
|
||||
mood = "neutral"
|
||||
name_lower = name.lower() + parent.lower()
|
||||
for keyword, m in mood_keywords.items():
|
||||
if keyword in name_lower:
|
||||
mood = m
|
||||
break
|
||||
|
||||
return {
|
||||
"song": f"asset-{asset_index:04d}",
|
||||
"beat": 1,
|
||||
"lyric_line": f"[Visual asset: {asset_path.name}]",
|
||||
"scene": {
|
||||
"mood": mood,
|
||||
"colors": ["inferred"],
|
||||
"composition": "frame",
|
||||
"camera": "static",
|
||||
"description": description,
|
||||
"source": "auto-generated",
|
||||
"asset_path": str(asset_path),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Auto-generate scene descriptions from assets")
|
||||
parser.add_argument("--scan", required=True, help="Assets directory to scan")
|
||||
parser.add_argument("--output", default=None, help="Output JSONL path")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Show what would be generated")
|
||||
parser.add_argument("--vision", action="store_true", help="Use Ollama vision model (slow)")
|
||||
args = parser.parse_args()
|
||||
|
||||
assets = scan_assets(args.scan)
|
||||
print(f"Found {len(assets)} assets in {args.scan}", file=sys.stderr)
|
||||
|
||||
if not assets:
|
||||
print("No assets found.", file=sys.stderr)
|
||||
sys.exit(0)
|
||||
|
||||
pairs = []
|
||||
for i, asset in enumerate(assets):
|
||||
if args.vision:
|
||||
# For videos, extract frame first
|
||||
image_for_vision = asset
|
||||
if asset.suffix.lower() in VIDEO_EXTS:
|
||||
frame = extract_video_frame(asset)
|
||||
if frame:
|
||||
image_for_vision = frame
|
||||
|
||||
description = describe_with_ollama(image_for_vision)
|
||||
if not description:
|
||||
description = describe_with_fallback(asset)
|
||||
else:
|
||||
description = describe_with_fallback(asset)
|
||||
|
||||
pair = build_training_pair(asset, description, i)
|
||||
pairs.append(pair)
|
||||
|
||||
if args.dry_run and i < 5:
|
||||
print(f" {asset.name}: {description[:80]}")
|
||||
|
||||
if args.dry_run:
|
||||
print(f"\nWould generate {len(pairs)} training pairs")
|
||||
return
|
||||
|
||||
# Write output
|
||||
output_path = args.output or os.path.expanduser("~/.hermes/training-data/scene-from-media.jsonl")
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, "w") as f:
|
||||
for p in pairs:
|
||||
f.write(json.dumps(p, ensure_ascii=False) + "\n")
|
||||
|
||||
print(f"Generated {len(pairs)} scene descriptions → {output_path}", file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
480
scripts/config_drift_detector.py
Normal file
480
scripts/config_drift_detector.py
Normal file
@@ -0,0 +1,480 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
config_drift_detector.py — Detect config drift across fleet nodes.
|
||||
|
||||
Collects config from all wizard nodes via SSH, compares against
|
||||
canonical timmy-config golden state, and reports differences.
|
||||
|
||||
Usage:
|
||||
python3 scripts/config_drift_detector.py # Report only
|
||||
python3 scripts/config_drift_detector.py --auto-sync # Auto-fix drift with golden state
|
||||
python3 scripts/config_drift_detector.py --node allegro # Check single node
|
||||
python3 scripts/config_drift_detector.py --json # JSON output for automation
|
||||
|
||||
Exit codes:
|
||||
0 — no drift detected
|
||||
1 — drift detected
|
||||
2 — error (SSH failure, missing deps, etc.)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────────
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
REPO_ROOT = SCRIPT_DIR.parent
|
||||
ANSIBLE_INVENTORY = REPO_ROOT / "ansible" / "inventory" / "hosts.yml"
|
||||
GOLDEN_STATE_PLAYBOOK = REPO_ROOT / "ansible" / "playbooks" / "golden_state.yml"
|
||||
|
||||
# Config files to check on each node
|
||||
CONFIG_PATHS = [
|
||||
".hermes/config.yaml",
|
||||
"wizards/{name}/config.yaml",
|
||||
]
|
||||
|
||||
# Keys that define golden state (from ansible inventory vars)
|
||||
GOLDEN_KEYS = [
|
||||
"providers",
|
||||
"provider",
|
||||
"model",
|
||||
"base_url",
|
||||
"api_key_env",
|
||||
"banned_providers",
|
||||
"banned_models_patterns",
|
||||
]
|
||||
|
||||
|
||||
# ── Data Models ───────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class NodeConfig:
|
||||
name: str
|
||||
host: str
|
||||
configs: dict[str, Any] = field(default_factory=dict)
|
||||
errors: list[str] = field(default_factory=list)
|
||||
reachable: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class DriftResult:
|
||||
node: str
|
||||
file_path: str
|
||||
diff_type: str # "missing", "value_mismatch", "key_missing", "extra_key"
|
||||
key: str
|
||||
canonical_value: Any = None
|
||||
node_value: Any = None
|
||||
severity: str = "warning" # "info", "warning", "critical"
|
||||
|
||||
|
||||
# ── Inventory Parsing ─────────────────────────────────────────────────────────
|
||||
|
||||
def load_inventory() -> dict:
|
||||
"""Load Ansible inventory and extract wizard node definitions."""
|
||||
if not ANSIBLE_INVENTORY.exists():
|
||||
print(f"ERROR: Inventory not found at {ANSIBLE_INVENTORY}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
with open(ANSIBLE_INVENTORY) as f:
|
||||
inventory = yaml.safe_load(f)
|
||||
|
||||
wizards = inventory.get("all", {}).get("children", {}).get("wizards", {}).get("hosts", {})
|
||||
global_vars = inventory.get("all", {}).get("vars", {})
|
||||
|
||||
nodes = {}
|
||||
for name, config in wizards.items():
|
||||
nodes[name] = {
|
||||
"host": config.get("ansible_host", "localhost"),
|
||||
"user": config.get("ansible_user", ""),
|
||||
"wizard_name": config.get("wizard_name", name),
|
||||
"hermes_home": config.get("hermes_home", "~/.hermes"),
|
||||
"wizard_home": config.get("wizard_home", f"~/wizards/{name}"),
|
||||
"machine_type": config.get("machine_type", "unknown"),
|
||||
}
|
||||
|
||||
return nodes, global_vars
|
||||
|
||||
|
||||
def load_golden_state(inventory_vars: dict) -> dict:
|
||||
"""Extract golden state from inventory vars."""
|
||||
golden = {
|
||||
"providers": inventory_vars.get("golden_state_providers", []),
|
||||
"banned_providers": inventory_vars.get("banned_providers", []),
|
||||
"banned_models_patterns": inventory_vars.get("banned_models_patterns", []),
|
||||
}
|
||||
return golden
|
||||
|
||||
|
||||
# ── SSH Collection ────────────────────────────────────────────────────────────
|
||||
|
||||
def ssh_collect(node_name: str, node_info: dict, timeout: int = 15) -> NodeConfig:
|
||||
"""SSH into a node and collect config files."""
|
||||
host = node_info["host"]
|
||||
user = node_info.get("user", "")
|
||||
hermes_home = node_info.get("hermes_home", "~/.hermes")
|
||||
wizard_home = node_info.get("wizard_home", f"~/wizards/{node_name}")
|
||||
|
||||
result = NodeConfig(name=node_name, host=host)
|
||||
|
||||
# Build SSH target
|
||||
if host in ("localhost", "127.0.0.1"):
|
||||
ssh_target = None # local
|
||||
else:
|
||||
ssh_target = f"{user}@{host}" if user else host
|
||||
|
||||
# Collect each config path
|
||||
for path_template in CONFIG_PATHS:
|
||||
# Resolve path template
|
||||
remote_path = path_template.replace("{name}", node_name)
|
||||
if not remote_path.startswith("/"):
|
||||
# Resolve relative to home
|
||||
if "wizards/" in remote_path:
|
||||
full_path = f"{wizard_home}/config.yaml"
|
||||
else:
|
||||
full_path = f"{hermes_home}/config.yaml" if ".hermes" in remote_path else f"~/{remote_path}"
|
||||
else:
|
||||
full_path = remote_path
|
||||
|
||||
config_content = _remote_cat(ssh_target, full_path, timeout)
|
||||
if config_content is not None:
|
||||
try:
|
||||
parsed = yaml.safe_load(config_content)
|
||||
if parsed:
|
||||
result.configs[full_path] = parsed
|
||||
except yaml.YAMLError as e:
|
||||
result.errors.append(f"YAML parse error in {full_path}: {e}")
|
||||
# Don't flag missing files as errors — some paths may not exist on all nodes
|
||||
|
||||
# Also collect banned provider scan
|
||||
banned_check = _remote_grep(
|
||||
ssh_target,
|
||||
hermes_home,
|
||||
r"anthropic|claude-sonnet|claude-opus|claude-haiku",
|
||||
timeout
|
||||
)
|
||||
if banned_check:
|
||||
result.configs["__banned_scan__"] = banned_check
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _remote_cat(ssh_target: str | None, path: str, timeout: int) -> str | None:
|
||||
"""Cat a file remotely (or locally)."""
|
||||
if ssh_target is None:
|
||||
cmd = ["cat", path]
|
||||
else:
|
||||
cmd = ["ssh", "-o", "ConnectTimeout=5", "-o", "StrictHostKeyChecking=no",
|
||||
ssh_target, f"cat {path}"]
|
||||
|
||||
try:
|
||||
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
||||
if proc.returncode == 0:
|
||||
return proc.stdout
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def _remote_grep(ssh_target: str | None, base_path: str, pattern: str, timeout: int) -> dict:
|
||||
"""Grep for banned patterns in config files."""
|
||||
if ssh_target is None:
|
||||
cmd = ["grep", "-rn", "-i", pattern, base_path, "--include=*.yaml", "--include=*.yml"]
|
||||
else:
|
||||
cmd = ["ssh", "-o", "ConnectTimeout=5", "-o", "StrictHostKeyChecking=no",
|
||||
ssh_target, f"grep -rn -i '{pattern}' {base_path} --include='*.yaml' --include='*.yml' 2>/dev/null || true"]
|
||||
|
||||
try:
|
||||
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
||||
if proc.stdout.strip():
|
||||
lines = proc.stdout.strip().split("\n")
|
||||
return {"matches": lines, "count": len(lines)}
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
# ── Drift Detection ───────────────────────────────────────────────────────────
|
||||
|
||||
def detect_drift(nodes: list[NodeConfig], golden: dict) -> list[DriftResult]:
|
||||
"""Compare each node's config against golden state."""
|
||||
results = []
|
||||
|
||||
for node in nodes:
|
||||
if not node.reachable:
|
||||
continue
|
||||
|
||||
# Check for banned providers
|
||||
banned_scan = node.configs.get("__banned_scan__", {})
|
||||
if banned_scan.get("count", 0) > 0:
|
||||
for match in banned_scan.get("matches", []):
|
||||
results.append(DriftResult(
|
||||
node=node.name,
|
||||
file_path="(config files)",
|
||||
diff_type="banned_provider_found",
|
||||
key="banned_provider_reference",
|
||||
node_value=match,
|
||||
severity="critical"
|
||||
))
|
||||
|
||||
# Check each config file
|
||||
for path, config in node.configs.items():
|
||||
if path == "__banned_scan__":
|
||||
continue
|
||||
|
||||
# Check provider chain
|
||||
if isinstance(config, dict):
|
||||
node_providers = _extract_provider_chain(config)
|
||||
golden_providers = golden.get("providers", [])
|
||||
|
||||
if node_providers and golden_providers:
|
||||
# Compare provider names in order
|
||||
node_names = [p.get("name", "") for p in node_providers]
|
||||
golden_names = [p.get("name", "") for p in golden_providers]
|
||||
|
||||
if node_names != golden_names:
|
||||
results.append(DriftResult(
|
||||
node=node.name,
|
||||
file_path=path,
|
||||
diff_type="value_mismatch",
|
||||
key="provider_chain",
|
||||
canonical_value=golden_names,
|
||||
node_value=node_names,
|
||||
severity="critical"
|
||||
))
|
||||
|
||||
# Check for banned providers in node config
|
||||
for banned in golden.get("banned_providers", []):
|
||||
for provider in node_providers:
|
||||
prov_name = provider.get("name", "").lower()
|
||||
prov_model = provider.get("model", "").lower()
|
||||
if banned in prov_name or banned in prov_model:
|
||||
results.append(DriftResult(
|
||||
node=node.name,
|
||||
file_path=path,
|
||||
diff_type="banned_provider_found",
|
||||
key=f"provider.{provider.get('name', 'unknown')}",
|
||||
node_value=provider,
|
||||
severity="critical"
|
||||
))
|
||||
|
||||
# Check for missing critical keys
|
||||
critical_keys = ["display", "providers", "tools", "delegation"]
|
||||
for key in critical_keys:
|
||||
if key not in config and key in str(config):
|
||||
results.append(DriftResult(
|
||||
node=node.name,
|
||||
file_path=path,
|
||||
diff_type="key_missing",
|
||||
key=key,
|
||||
canonical_value="(present in golden state)",
|
||||
severity="warning"
|
||||
))
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def _extract_provider_chain(config: dict) -> list[dict]:
|
||||
"""Extract provider list from a config dict (handles multiple formats)."""
|
||||
# Direct providers key
|
||||
if "providers" in config:
|
||||
providers = config["providers"]
|
||||
if isinstance(providers, list):
|
||||
return providers
|
||||
|
||||
# Nested in display or model config
|
||||
for key in ["model", "inference", "llm"]:
|
||||
if key in config and isinstance(config[key], dict):
|
||||
if "providers" in config[key]:
|
||||
return config[key]["providers"]
|
||||
|
||||
# Single provider format
|
||||
if "provider" in config and "model" in config:
|
||||
return [{"name": config["provider"], "model": config["model"]}]
|
||||
|
||||
return []
|
||||
|
||||
|
||||
# ── Auto-Sync ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def auto_sync(drifts: list[DriftResult], nodes: list[NodeConfig]) -> list[str]:
|
||||
"""Auto-sync drifted nodes using golden state playbook."""
|
||||
actions = []
|
||||
|
||||
drifted_nodes = set(d.node for d in drifts if d.severity == "critical")
|
||||
if not drifted_nodes:
|
||||
actions.append("No critical drift to sync.")
|
||||
return actions
|
||||
|
||||
for node_name in drifted_nodes:
|
||||
node_info = next((n for n in nodes if n.name == node_name), None)
|
||||
if not node_info:
|
||||
continue
|
||||
|
||||
actions.append(f"[{node_name}] Running golden state sync...")
|
||||
|
||||
# Run ansible-playbook for this node
|
||||
cmd = [
|
||||
"ansible-playbook",
|
||||
str(GOLDEN_STATE_PLAYBOOK),
|
||||
"-i", str(ANSIBLE_INVENTORY),
|
||||
"-l", node_name,
|
||||
"--tags", "golden",
|
||||
]
|
||||
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=120,
|
||||
cwd=str(REPO_ROOT)
|
||||
)
|
||||
if proc.returncode == 0:
|
||||
actions.append(f"[{node_name}] Sync completed successfully.")
|
||||
else:
|
||||
actions.append(f"[{node_name}] Sync FAILED: {proc.stderr[:200]}")
|
||||
except subprocess.TimeoutExpired:
|
||||
actions.append(f"[{node_name}] Sync timed out after 120s.")
|
||||
except FileNotFoundError:
|
||||
actions.append(f"[{node_name}] ansible-playbook not found. Install Ansible or run manually.")
|
||||
|
||||
return actions
|
||||
|
||||
|
||||
# ── Reporting ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def print_report(drifts: list[DriftResult], nodes: list[NodeConfig], golden: dict):
|
||||
"""Print human-readable drift report."""
|
||||
print("=" * 70)
|
||||
print("CONFIG DRIFT DETECTION REPORT")
|
||||
print("=" * 70)
|
||||
print()
|
||||
|
||||
# Summary
|
||||
reachable = sum(1 for n in nodes if n.reachable)
|
||||
print(f"Nodes checked: {len(nodes)} (reachable: {reachable})")
|
||||
print(f"Golden state providers: {' → '.join(p['name'] for p in golden.get('providers', []))}")
|
||||
print(f"Banned providers: {', '.join(golden.get('banned_providers', []))}")
|
||||
print()
|
||||
|
||||
if not drifts:
|
||||
print("[OK] No config drift detected. All nodes match golden state.")
|
||||
return
|
||||
|
||||
# Group by node
|
||||
by_node: dict[str, list[DriftResult]] = {}
|
||||
for d in drifts:
|
||||
by_node.setdefault(d.node, []).append(d)
|
||||
|
||||
for node_name, node_drifts in sorted(by_node.items()):
|
||||
print(f"--- {node_name} ---")
|
||||
for d in node_drifts:
|
||||
severity_icon = {"critical": "[!!]", "warning": "[!]", "info": "[i]"}.get(d.severity, "[?]")
|
||||
print(f" {severity_icon} {d.diff_type}: {d.key}")
|
||||
if d.canonical_value is not None:
|
||||
print(f" canonical: {d.canonical_value}")
|
||||
if d.node_value is not None:
|
||||
print(f" actual: {d.node_value}")
|
||||
print()
|
||||
|
||||
# Severity summary
|
||||
critical = sum(1 for d in drifts if d.severity == "critical")
|
||||
warning = sum(1 for d in drifts if d.severity == "warning")
|
||||
print(f"Total: {len(drifts)} drift(s) — {critical} critical, {warning} warning")
|
||||
|
||||
|
||||
def print_json_report(drifts: list[DriftResult], nodes: list[NodeConfig], golden: dict):
|
||||
"""Print JSON report for automation."""
|
||||
report = {
|
||||
"nodes_checked": len(nodes),
|
||||
"reachable": sum(1 for n in nodes if n.reachable),
|
||||
"golden_providers": [p["name"] for p in golden.get("providers", [])],
|
||||
"drift_count": len(drifts),
|
||||
"critical_count": sum(1 for d in drifts if d.severity == "critical"),
|
||||
"drifts": [
|
||||
{
|
||||
"node": d.node,
|
||||
"file": d.file_path,
|
||||
"type": d.diff_type,
|
||||
"key": d.key,
|
||||
"canonical": d.canonical_value,
|
||||
"actual": d.node_value,
|
||||
"severity": d.severity,
|
||||
}
|
||||
for d in drifts
|
||||
],
|
||||
}
|
||||
print(json.dumps(report, indent=2, default=str))
|
||||
|
||||
|
||||
# ── CLI ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Detect config drift across fleet nodes")
|
||||
parser.add_argument("--node", help="Check only this node")
|
||||
parser.add_argument("--auto-sync", action="store_true", help="Auto-fix critical drift with golden state")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
parser.add_argument("--timeout", type=int, default=15, help="SSH timeout per node (seconds)")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load inventory
|
||||
print("Loading inventory...", file=sys.stderr)
|
||||
node_defs, global_vars = load_inventory()
|
||||
golden = load_golden_state(global_vars)
|
||||
|
||||
# Filter to single node if requested
|
||||
if args.node:
|
||||
if args.node not in node_defs:
|
||||
print(f"ERROR: Node '{args.node}' not in inventory. Available: {', '.join(node_defs.keys())}")
|
||||
sys.exit(2)
|
||||
node_defs = {args.node: node_defs[args.node]}
|
||||
|
||||
# Collect configs from each node
|
||||
print(f"Collecting configs from {len(node_defs)} node(s)...", file=sys.stderr)
|
||||
nodes = []
|
||||
for name, info in node_defs.items():
|
||||
print(f" {name} ({info['host']})...", file=sys.stderr, end=" ", flush=True)
|
||||
node_config = ssh_collect(name, info, timeout=args.timeout)
|
||||
if node_config.reachable:
|
||||
print(f"OK ({len(node_config.configs)} files)", file=sys.stderr)
|
||||
else:
|
||||
print("UNREACHABLE", file=sys.stderr)
|
||||
nodes.append(node_config)
|
||||
|
||||
# Detect drift
|
||||
print("\nAnalyzing drift...", file=sys.stderr)
|
||||
drifts = detect_drift(nodes, golden)
|
||||
|
||||
# Output
|
||||
if args.json:
|
||||
print_json_report(drifts, nodes, golden)
|
||||
else:
|
||||
print()
|
||||
print_report(drifts, nodes, golden)
|
||||
|
||||
# Auto-sync if requested
|
||||
if args.auto_sync and drifts:
|
||||
print("\n--- AUTO-SYNC ---")
|
||||
actions = auto_sync(drifts, nodes)
|
||||
for a in actions:
|
||||
print(a)
|
||||
|
||||
# Exit code
|
||||
if any(d.severity == "critical" for d in drifts):
|
||||
sys.exit(1)
|
||||
elif drifts:
|
||||
sys.exit(1)
|
||||
else:
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,191 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Auto-generate scene descriptions from image/video assets.
|
||||
|
||||
Scans a directory for media files, generates scene descriptions using
|
||||
a local vision model (Ollama), and outputs training pairs in JSONL format.
|
||||
|
||||
Usage:
|
||||
python scripts/generate_scene_descriptions.py --input ./assets --output training-data/scene-descriptions-auto.jsonl
|
||||
python scripts/generate_scene_descriptions.py --input ./assets --model gemma3:12b --limit 50
|
||||
python scripts/generate_scene_descriptions.py --dry-run # List files without generating
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# Supported media extensions
|
||||
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
|
||||
VIDEO_EXTS = {".mp4", ".webm", ".mov", ".avi", ".mkv"}
|
||||
ALL_EXTS = IMAGE_EXTS | VIDEO_EXTS
|
||||
|
||||
# Vision model prompt template
|
||||
SCENE_PROMPT = """Describe this image for a visual scene database. Output JSON:
|
||||
{
|
||||
"mood": "one of: calm, energetic, dark, warm, cool, chaotic, serene, tense, joyful, melancholic",
|
||||
"colors": ["dominant color 1", "dominant color 2", "dominant color 3"],
|
||||
"composition": "one of: close-up, wide-shot, medium-shot, low-angle, high-angle, bird-eye, profile, over-shoulder",
|
||||
"camera": "one of: static, slow-pan, tracking, handheld, crane, dolly, steady, locked-off",
|
||||
"lighting": "one of: natural, artificial, mixed, dramatic, soft, harsh, backlit",
|
||||
"description": "2-3 sentence visual description of the scene"
|
||||
}
|
||||
|
||||
Be specific. Describe what you see, not what you imagine."""
|
||||
|
||||
|
||||
def scan_media(input_dir: str) -> list[Path]:
|
||||
"""Scan directory for media files."""
|
||||
media_files = []
|
||||
input_path = Path(input_dir)
|
||||
if not input_path.exists():
|
||||
print(f"Error: {input_dir} does not exist", file=sys.stderr)
|
||||
return media_files
|
||||
|
||||
for ext in sorted(ALL_EXTS):
|
||||
media_files.extend(input_path.rglob(f"*{ext}"))
|
||||
media_files.extend(input_path.rglob(f"*{ext.upper()}"))
|
||||
|
||||
return sorted(set(media_files))
|
||||
|
||||
|
||||
def extract_video_frame(video_path: Path, output_path: Path) -> bool:
|
||||
"""Extract a representative frame from a video."""
|
||||
try:
|
||||
subprocess.run(
|
||||
["ffmpeg", "-i", str(video_path), "-vframes", "1",
|
||||
"-q:v", "2", str(output_path), "-y"],
|
||||
capture_output=True, timeout=30,
|
||||
)
|
||||
return output_path.exists()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def describe_image(image_path: Path, model: str = "gemma3:12b",
|
||||
ollama_url: str = "http://localhost:11434") -> Optional[dict]:
|
||||
"""Generate scene description using Ollama vision model."""
|
||||
try:
|
||||
import base64 as b64
|
||||
with open(image_path, "rb") as f:
|
||||
image_b64 = b64.b64encode(f.read()).decode()
|
||||
|
||||
import urllib.request
|
||||
req = urllib.request.Request(
|
||||
f"{ollama_url}/api/generate",
|
||||
data=json.dumps({
|
||||
"model": model,
|
||||
"prompt": SCENE_PROMPT,
|
||||
"images": [image_b64],
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.3, "num_predict": 512}
|
||||
}).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
resp = urllib.request.urlopen(req, timeout=120)
|
||||
data = json.loads(resp.read())
|
||||
response_text = data.get("response", "")
|
||||
|
||||
# Parse JSON from response
|
||||
import re
|
||||
json_match = re.search(r"\{[\s\S]*\}", response_text)
|
||||
if json_match:
|
||||
return json.loads(json_match.group())
|
||||
|
||||
return {"description": response_text[:500], "mood": "unknown",
|
||||
"colors": [], "composition": "unknown", "camera": "unknown", "lighting": "unknown"}
|
||||
except Exception as e:
|
||||
print(f" Error describing {image_path.name}: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def generate_training_pairs(media_files: list[Path], model: str, ollama_url: str,
|
||||
limit: int = 0, dry_run: bool = False) -> list[dict]:
|
||||
"""Generate training pairs from media files."""
|
||||
pairs = []
|
||||
files = media_files[:limit] if limit > 0 else media_files
|
||||
|
||||
print(f"Processing {len(files)} files...", file=sys.stderr)
|
||||
|
||||
for i, media_path in enumerate(files):
|
||||
print(f" [{i+1}/{len(files)}] {media_path.name}...", file=sys.stderr, end=" ")
|
||||
|
||||
if dry_run:
|
||||
print("(dry run)", file=sys.stderr)
|
||||
pairs.append({"source": str(media_path), "status": "dry-run"})
|
||||
continue
|
||||
|
||||
is_video = media_path.suffix.lower() in VIDEO_EXTS
|
||||
work_path = media_path
|
||||
|
||||
if is_video:
|
||||
# Extract frame for video
|
||||
frame_path = media_path.with_suffix(".frame.jpg")
|
||||
if extract_video_frame(media_path, frame_path):
|
||||
work_path = frame_path
|
||||
else:
|
||||
print("SKIP (frame extraction failed)", file=sys.stderr)
|
||||
continue
|
||||
|
||||
description = describe_image(work_path, model, ollama_url)
|
||||
if description:
|
||||
pair = {
|
||||
"source": str(media_path),
|
||||
"media_type": "video" if is_video else "image",
|
||||
"description": description,
|
||||
"model": model,
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
pairs.append(pair)
|
||||
print("OK", file=sys.stderr)
|
||||
else:
|
||||
print("FAIL", file=sys.stderr)
|
||||
|
||||
# Cleanup temp frame
|
||||
if is_video and work_path != media_path:
|
||||
try:
|
||||
work_path.unlink()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return pairs
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Auto-generate scene descriptions from media")
|
||||
parser.add_argument("--input", "-i", required=True, help="Input directory with media files")
|
||||
parser.add_argument("--output", "-o", default="training-data/scene-descriptions-auto.jsonl")
|
||||
parser.add_argument("--model", "-m", default="gemma3:12b", help="Ollama model name")
|
||||
parser.add_argument("--ollama-url", default="http://localhost:11434")
|
||||
parser.add_argument("--limit", "-l", type=int, default=0, help="Max files to process (0=all)")
|
||||
parser.add_argument("--dry-run", action="store_true", help="List files without generating")
|
||||
args = parser.parse_args()
|
||||
|
||||
media_files = scan_media(args.input)
|
||||
print(f"Found {len(media_files)} media files", file=sys.stderr)
|
||||
|
||||
if not media_files:
|
||||
print("No media files found.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
pairs = generate_training_pairs(media_files, args.model, args.ollama_url,
|
||||
args.limit, args.dry_run)
|
||||
|
||||
# Write output
|
||||
output_path = Path(args.output)
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(output_path, "w") as f:
|
||||
for pair in pairs:
|
||||
f.write(json.dumps(pair, ensure_ascii=False) + "\n")
|
||||
|
||||
print(f"\nWrote {len(pairs)} pairs to {output_path}", file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user