Compare commits

..

2 Commits

Author SHA1 Message Date
c39b80aec5 docs(pipelines): Add README for pipeline infrastructure
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 20s
Smoke Test / smoke (pull_request) Failing after 18s
Validate Config / YAML Lint (pull_request) Failing after 14s
Validate Config / JSON Validate (pull_request) Successful in 13s
Validate Config / Shell Script Lint (pull_request) Failing after 45s
Validate Config / Cron Syntax Check (pull_request) Successful in 10s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 10s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m10s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Playbook Schema Validation (pull_request) Successful in 18s
PR Checklist / pr-checklist (pull_request) Failing after 3m7s
Architecture Lint / Lint Repository (pull_request) Failing after 20s
Documents the shared orchestrator usage.
Refs #621
2026-04-14 22:32:32 +00:00
ee6919c73c feat(pipelines): Add shared orchestrator (#621)
Pipeline orchestrator with:
- SQLite-backed job queue
- Parallel worker pool (default 10)
- Token budget tracking per job
- Progress persistence with checkpoint resume
- Rate limiting (provider-aware)
- Error retry with exponential backoff
- Final report generation

All 5 pipelines can use this shared orchestrator.
Resolves #621
2026-04-14 22:31:53 +00:00
20 changed files with 910 additions and 2275 deletions

View File

@@ -31,14 +31,6 @@ class GlitchCategory(Enum):
WATER_REFLECTION = "water_reflection"
SKYBOX_SEAM = "skybox_seam"
# Three.js-specific categories (ref: timmy-config#543)
SHADER_FAILURE = "shader_failure"
TEXTURE_PLACEHOLDER = "texture_placeholder"
UV_MAPPING_ERROR = "uv_mapping_error"
FRUSTUM_CULLING = "frustum_culling"
SHADOW_MAP_ARTIFACT = "shadow_map_artifact"
BLOOM_OVERFLOW = "bloom_overflow"
@dataclass
class GlitchPattern:
@@ -249,123 +241,6 @@ MATRIX_GLITCH_PATTERNS: list[GlitchPattern] = [
],
confidence_threshold=0.45,
),
# --- Three.js-Specific Glitch Patterns (ref: timmy-config#543) ---
GlitchPattern(
category=GlitchCategory.SHADER_FAILURE,
name="Shader Compilation Failure",
description="Three.js shader failed to compile, rendering the material as solid black. "
"Common when custom ShaderMaterial has syntax errors or missing uniforms.",
severity=GlitchSeverity.CRITICAL,
detection_prompts=[
"Look for objects or surfaces rendered as pure black (#000000) that should have visible textures or materials.",
"Identify geometry that appears completely dark while surrounding objects are normally lit.",
"Check for objects where the material seems to 'absorb all light' — flat black with no shading gradient.",
],
visual_indicators=[
"solid black object with no shading",
"geometry rendered as silhouette",
"material appears to absorb light entirely",
"black patch inconsistent with scene lighting",
],
confidence_threshold=0.7,
),
GlitchPattern(
category=GlitchCategory.TEXTURE_PLACEHOLDER,
name="Three.js Texture Not Loaded",
description="Three.js failed to load the texture asset, rendering a 1x1 white pixel "
"stretched across the entire surface. Distinguished from missing-texture by "
"the uniform white/grey appearance rather than magenta.",
severity=GlitchSeverity.CRITICAL,
detection_prompts=[
"Look for surfaces that are uniformly white or light grey with no texture detail, even on large geometry.",
"Identify objects where the texture appears as a single solid color stretched across complex UVs.",
"Check for surfaces that look 'blank' or 'unloaded' — flat white/grey where detail should exist.",
],
visual_indicators=[
"uniform white or light grey surface",
"no texture detail on large geometry",
"stretched single-color appearance",
"1x1 pixel placeholder stretched to fill UV space",
],
confidence_threshold=0.65,
),
GlitchPattern(
category=GlitchCategory.UV_MAPPING_ERROR,
name="BufferGeometry UV Mapping Error",
description="Three.js BufferGeometry has incorrect UV coordinates, causing textures to "
"appear stretched, compressed, or mapped to the wrong faces.",
severity=GlitchSeverity.HIGH,
detection_prompts=[
"Look for textures that appear dramatically stretched in one direction on specific faces.",
"Identify surfaces where the texture pattern is distorted but other nearby surfaces look correct.",
"Check for faces where the texture seems 'smeared' or mapped with incorrect aspect ratio.",
],
visual_indicators=[
"texture stretching on specific faces",
"distorted pattern on geometry",
"smeared texture appearance",
"aspect ratio mismatch between texture and surface",
],
confidence_threshold=0.6,
),
GlitchPattern(
category=GlitchCategory.FRUSTUM_CULLING,
name="Frustum Culling Artifact",
description="Three.js frustum culling incorrectly marks objects as outside the camera "
"frustum, causing them to pop in/out of existence at screen edges.",
severity=GlitchSeverity.MEDIUM,
detection_prompts=[
"Look for objects that are partially visible at the edge of the frame — half-rendered or cut off unnaturally.",
"Identify geometry that seems to 'pop' into existence as the view angle changes.",
"Check screen edges for objects that appear suddenly rather than smoothly entering the viewport.",
],
visual_indicators=[
"half-visible object at screen edge",
"object popping into frame",
"abrupt appearance of geometry",
"bounding box visible but mesh missing",
],
confidence_threshold=0.55,
),
GlitchPattern(
category=GlitchCategory.SHADOW_MAP_ARTIFACT,
name="Shadow Map Resolution Artifact",
description="Three.js shadow map has insufficient resolution, causing pixelated, "
"blocky shadows with visible texel edges instead of smooth shadow gradients.",
severity=GlitchSeverity.MEDIUM,
detection_prompts=[
"Look for shadows with visible blocky or pixelated edges instead of smooth gradients.",
"Identify shadow maps where individual texels (texture pixels) are clearly visible.",
"Check for shadows that appear as jagged stair-stepped patterns rather than soft edges.",
],
visual_indicators=[
"blocky shadow edges",
"visible texel grid in shadows",
"stair-stepped shadow boundary",
"pixelated shadow gradient",
],
confidence_threshold=0.55,
),
GlitchPattern(
category=GlitchCategory.BLOOM_OVERFLOW,
name="Post-Processing Bloom Overflow",
description="Three.js UnrealBloomPass or similar post-processing bloom effect is too "
"intense, causing bright areas to bleed glow into surrounding geometry.",
severity=GlitchSeverity.LOW,
detection_prompts=[
"Look for bright areas that have an unusually large, soft glow bleeding into adjacent surfaces.",
"Identify scenes where light sources appear to have a 'halo' that extends beyond physical plausibility.",
"Check for bright objects whose glow color bleeds onto nearby unrelated geometry.",
],
visual_indicators=[
"excessive glow bleeding from bright surfaces",
"halo around light sources",
"bloom color tinting adjacent geometry",
"glow bleeding beyond object boundaries",
],
confidence_threshold=0.5,
),
]
@@ -414,23 +289,6 @@ def build_vision_prompt(patterns: list[GlitchPattern] | None = None) -> str:
)
# Three.js-specific category set for filtering (ref: timmy-config#543)
THREEJS_CATEGORIES = {
GlitchCategory.SHADER_FAILURE,
GlitchCategory.TEXTURE_PLACEHOLDER,
GlitchCategory.UV_MAPPING_ERROR,
GlitchCategory.FRUSTUM_CULLING,
GlitchCategory.SHADOW_MAP_ARTIFACT,
GlitchCategory.BLOOM_OVERFLOW,
}
def get_threejs_patterns() -> list[GlitchPattern]:
"""Return only Three.js-specific glitch patterns."""
return [p for p in MATRIX_GLITCH_PATTERNS if p.category in THREEJS_CATEGORIES]
if __name__ == "__main__":
import json
print(f"Loaded {len(MATRIX_GLITCH_PATTERNS)} glitch patterns:\n")

View File

@@ -9,7 +9,7 @@ Usage:
python matrix_glitch_detector.py <url> [--angles 4] [--output report.json]
python matrix_glitch_detector.py --demo # Run with synthetic test data
Ref: timmy-config#491, timmy-config#543
Ref: timmy-config#491
"""
import argparse
@@ -33,7 +33,6 @@ from glitch_patterns import (
MATRIX_GLITCH_PATTERNS,
build_vision_prompt,
get_patterns_by_severity,
get_threejs_patterns,
)
@@ -346,17 +345,14 @@ def _parse_vision_response(
def _infer_severity(category: str, confidence: float) -> str:
"""Infer severity from category and confidence when not provided."""
critical_cats = {"missing_textures", "clipping", "shader_failure", "texture_placeholder"}
high_cats = {"floating_assets", "broken_normals", "uv_mapping_error"}
medium_cats = {"frustum_culling", "shadow_map_artifact"}
critical_cats = {"missing_textures", "clipping"}
high_cats = {"floating_assets", "broken_normals"}
cat_lower = category.lower()
if any(c in cat_lower for c in critical_cats):
return "critical" if confidence > 0.7 else "high"
if any(c in cat_lower for c in high_cats):
return "high" if confidence > 0.7 else "medium"
if any(c in cat_lower for c in medium_cats):
return "medium" if confidence > 0.6 else "low"
return "medium" if confidence > 0.6 else "low"
@@ -393,9 +389,9 @@ def build_report(
),
},
metadata={
"detector_version": "0.2.0",
"detector_version": "0.1.0",
"pattern_count": len(MATRIX_GLITCH_PATTERNS),
"reference": "timmy-config#491, timmy-config#543",
"reference": "timmy-config#491",
},
)
@@ -464,30 +460,6 @@ def run_demo(output_path: Optional[Path] = None) -> ScanResult:
screenshot_index=3,
screenshot_angle="left",
),
DetectedGlitch(
id=str(uuid.uuid4())[:8],
category="shader_failure",
name="Black Material on Portal Frame",
description="Portal frame rendered as solid black — shader compilation failed (missing uniform u_time)",
severity="critical",
confidence=0.91,
location_x=45.0,
location_y=30.0,
screenshot_index=0,
screenshot_angle="front",
),
DetectedGlitch(
id=str(uuid.uuid4())[:8],
category="shadow_map_artifact",
name="Pixelated Character Shadow",
description="Character shadow shows visible texel grid — shadow map resolution too low (512x512)",
severity="medium",
confidence=0.78,
location_x=52.0,
location_y=75.0,
screenshot_index=1,
screenshot_angle="right",
),
]
print(f"[*] Detected {len(demo_glitches)} glitches")
@@ -524,11 +496,6 @@ Examples:
help="Minimum severity to include in report",
)
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument(
"--threejs",
action="store_true",
help="Focus on Three.js-specific glitch patterns only (shader, texture, UV, culling, shadow, bloom)",
)
args = parser.parse_args()
@@ -558,13 +525,9 @@ Examples:
screenshots = capture_screenshots(args.url, angles, screenshots_dir)
print(f"[*] Captured {len(screenshots)} screenshots")
# Filter patterns by severity and type
# Filter patterns by severity
min_sev = GlitchSeverity(args.min_severity)
patterns = get_patterns_by_severity(min_sev)
if args.threejs:
threejs_patterns = get_threejs_patterns()
patterns = [p for p in patterns if p in threejs_patterns]
print(f"[*] Three.js-focused mode: {len(patterns)} patterns")
# Analyze with vision AI
print(f"[*] Analyzing with vision AI ({len(patterns)} patterns)...")

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
Full Nostr agent-to-agent communication demo - FINAL WORKING
"""

View File

@@ -1,271 +0,0 @@
#!/usr/bin/env python3
"""
Pre-Flight Provider Check Script
Issue #508: [Robustness] Credential drain detection — provider health checks
Pre-flight check before session launch: verifies provider credentials and balance.
Usage:
python3 preflight-provider-check.py # Check all providers
python3 preflight-provider-check.py --launch # Check and return exit code
python3 preflight-provider-check.py --balance # Check OpenRouter balance
"""
import os, sys, json, yaml, urllib.request
from datetime import datetime, timezone
from pathlib import Path
# Configuration
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
LOG_DIR = Path.home() / ".local" / "timmy" / "fleet-health"
LOG_FILE = LOG_DIR / "preflight-check.log"
def log(msg):
"""Log message to file and optionally console."""
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
log_entry = "[" + timestamp + "] " + msg
LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, "a") as f:
f.write(log_entry + "\n")
if "--quiet" not in sys.argv:
print(log_entry)
def get_provider_api_key(provider):
"""Get API key for a provider from .env or environment."""
env_file = HERMES_HOME / ".env"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line.startswith(provider.upper() + "_API_KEY="):
return line.split("=", 1)[1].strip().strip("'\"")
return os.environ.get(provider.upper() + "_API_KEY")
def check_openrouter_balance(api_key):
"""Check OpenRouter balance via /api/v1/auth/key."""
if not api_key:
return False, "No API key", 0
try:
req = urllib.request.Request(
"https://openrouter.ai/api/v1/auth/key",
headers={"Authorization": "Bearer " + api_key}
)
resp = urllib.request.urlopen(req, timeout=10)
data = json.loads(resp.read())
# Check for credits
credits = data.get("data", {}).get("limit", 0)
usage = data.get("data", {}).get("usage", 0)
remaining = credits - usage if credits else None
if remaining is not None and remaining <= 0:
return False, "No credits remaining", 0
elif remaining is not None:
return True, "Credits available", remaining
else:
return True, "Unlimited or unknown balance", None
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Invalid API key", 0
else:
return False, "HTTP " + str(e.code), 0
except Exception as e:
return False, str(e)[:100], 0
def check_nous_key(api_key):
"""Check Nous API key with minimal test call."""
if not api_key:
return False, "No API key"
try:
req = urllib.request.Request(
"https://inference.nousresearch.com/v1/models",
headers={"Authorization": "Bearer " + api_key}
)
resp = urllib.request.urlopen(req, timeout=10)
if resp.status == 200:
return True, "Valid key"
else:
return False, "HTTP " + str(resp.status)
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Invalid API key"
elif e.code == 403:
return False, "Forbidden"
else:
return False, "HTTP " + str(e.code)
except Exception as e:
return False, str(e)[:100]
def check_anthropic_key(api_key):
"""Check Anthropic API key with minimal test call."""
if not api_key:
return False, "No API key"
try:
req = urllib.request.Request(
"https://api.anthropic.com/v1/models",
headers={
"x-api-key": api_key,
"anthropic-version": "2023-06-01"
}
)
resp = urllib.request.urlopen(req, timeout=10)
if resp.status == 200:
return True, "Valid key"
else:
return False, "HTTP " + str(resp.status)
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Invalid API key"
elif e.code == 403:
return False, "Forbidden"
else:
return False, "HTTP " + str(e.code)
except Exception as e:
return False, str(e)[:100]
def check_ollama():
"""Check if Ollama is running."""
try:
req = urllib.request.Request("http://localhost:11434/api/tags")
resp = urllib.request.urlopen(req, timeout=5)
if resp.status == 200:
data = json.loads(resp.read())
models = data.get("models", [])
return True, str(len(models)) + " models loaded"
else:
return False, "HTTP " + str(resp.status)
except Exception as e:
return False, str(e)[:100]
def get_configured_provider():
"""Get the configured provider from global config."""
config_file = HERMES_HOME / "config.yaml"
if not config_file.exists():
return None
try:
with open(config_file) as f:
config = yaml.safe_load(f)
model_config = config.get("model", {})
if isinstance(model_config, dict):
return model_config.get("provider")
except:
pass
return None
def run_preflight_check():
"""Run pre-flight check on all providers."""
log("=== Pre-Flight Provider Check ===")
results = {}
# Check OpenRouter
or_key = get_provider_api_key("openrouter")
or_ok, or_msg, or_balance = check_openrouter_balance(or_key)
results["openrouter"] = {"healthy": or_ok, "message": or_msg, "balance": or_balance}
# Check Nous
nous_key = get_provider_api_key("nous")
nous_ok, nous_msg = check_nous_key(nous_key)
results["nous"] = {"healthy": nous_ok, "message": nous_msg}
# Check Anthropic
anthropic_key = get_provider_api_key("anthropic")
anthropic_ok, anthropic_msg = check_anthropic_key(anthropic_key)
results["anthropic"] = {"healthy": anthropic_ok, "message": anthropic_msg}
# Check Ollama
ollama_ok, ollama_msg = check_ollama()
results["ollama"] = {"healthy": ollama_ok, "message": ollama_msg}
# Get configured provider
configured = get_configured_provider()
# Summary
healthy_count = sum(1 for r in results.values() if r["healthy"])
total_count = len(results)
log("Results: " + str(healthy_count) + "/" + str(total_count) + " providers healthy")
for provider, result in results.items():
status = "HEALTHY" if result["healthy"] else "UNHEALTHY"
extra = ""
if provider == "openrouter" and result.get("balance") is not None:
extra = " (balance: " + str(result["balance"]) + ")"
log(" " + provider + ": " + status + " - " + result["message"] + extra)
if configured:
log("Configured provider: " + configured)
if configured in results and not results[configured]["healthy"]:
log("WARNING: Configured provider " + configured + " is UNHEALTHY!")
return results, configured
def check_launch_readiness():
"""Check if we're ready to launch sessions."""
results, configured = run_preflight_check()
# Check if configured provider is healthy
if configured and configured in results:
if not results[configured]["healthy"]:
log("LAUNCH BLOCKED: Configured provider " + configured + " is unhealthy")
return False, configured + " is unhealthy"
# Check if at least one provider is healthy
healthy_providers = [p for p, r in results.items() if r["healthy"]]
if not healthy_providers:
log("LAUNCH BLOCKED: No healthy providers available")
return False, "No healthy providers"
log("LAUNCH READY: " + str(len(healthy_providers)) + " healthy providers available")
return True, "Ready"
def show_balance():
"""Show OpenRouter balance."""
api_key = get_provider_api_key("openrouter")
if not api_key:
print("No OpenRouter API key found")
return
ok, msg, balance = check_openrouter_balance(api_key)
if ok:
if balance is not None:
print("OpenRouter balance: " + str(balance) + " credits")
else:
print("OpenRouter: " + msg)
else:
print("OpenRouter: " + msg)
def main():
if "--balance" in sys.argv:
show_balance()
elif "--launch" in sys.argv:
ready, message = check_launch_readiness()
if ready:
print("READY")
sys.exit(0)
else:
print("BLOCKED: " + message)
sys.exit(1)
else:
run_preflight_check()
if __name__ == "__main__":
main()

View File

@@ -1,411 +0,0 @@
#!/usr/bin/env python3
"""
Provider Health Monitor Script
Issue #509: [Robustness] Provider-aware profile config — auto-switch on failure
Monitors provider health and automatically switches profiles to working providers.
Usage:
python3 provider-health-monitor.py # Run once
python3 provider-health-monitor.py --daemon # Run continuously
python3 provider-health-monitor.py --status # Show provider health
"""
import os, sys, json, yaml, urllib.request, time
from datetime import datetime, timezone
from pathlib import Path
# Configuration
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
PROFILES_DIR = HERMES_HOME / "profiles"
LOG_DIR = Path.home() / ".local" / "timmy" / "fleet-health"
STATE_FILE = LOG_DIR / "tmux-state.json"
LOG_FILE = LOG_DIR / "provider-health.log"
# Provider test endpoints
PROVIDER_TESTS = {
"openrouter": {
"url": "https://openrouter.ai/api/v1/models",
"method": "GET",
"headers": lambda api_key: {"Authorization": "Bearer " + api_key},
"timeout": 10
},
"anthropic": {
"url": "https://api.anthropic.com/v1/models",
"method": "GET",
"headers": lambda api_key: {"x-api-key": api_key, "anthropic-version": "2023-06-01"},
"timeout": 10
},
"nous": {
"url": "https://inference.nousresearch.com/v1/models",
"method": "GET",
"headers": lambda api_key: {"Authorization": "Bearer " + api_key},
"timeout": 10
},
"kimi-coding": {
"url": "https://api.kimi.com/coding/v1/models",
"method": "GET",
"headers": lambda api_key: {"x-api-key": api_key, "x-api-provider": "kimi-coding"},
"timeout": 10
},
"ollama": {
"url": "http://localhost:11434/api/tags",
"method": "GET",
"headers": lambda api_key: {},
"timeout": 5
}
}
def log(msg):
"""Log message to file and optionally console."""
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
log_entry = "[" + timestamp + "] " + msg
LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, "a") as f:
f.write(log_entry + "\n")
if "--quiet" not in sys.argv:
print(log_entry)
def get_provider_api_key(provider):
"""Get API key for a provider from .env or environment."""
env_file = HERMES_HOME / ".env"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line.startswith(provider.upper() + "_API_KEY="):
return line.split("=", 1)[1].strip().strip("'\"")
return os.environ.get(provider.upper() + "_API_KEY")
def test_provider(provider, api_key=None):
"""Test if a provider is healthy."""
config = PROVIDER_TESTS.get(provider)
if not config:
return False, "Unknown provider: " + provider
headers = config["headers"](api_key or "")
try:
req = urllib.request.Request(
config["url"],
headers=headers,
method=config["method"]
)
resp = urllib.request.urlopen(req, timeout=config["timeout"])
if resp.status == 200:
return True, "Healthy"
else:
return False, "HTTP " + str(resp.status)
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Unauthorized (401)"
elif e.code == 403:
return False, "Forbidden (403)"
elif e.code == 429:
return True, "Rate limited but accessible"
else:
return False, "HTTP " + str(e.code)
except Exception as e:
return False, str(e)[:100]
def get_all_providers():
"""Get all providers from profiles and global config."""
providers = set()
# Global config
global_config = HERMES_HOME / "config.yaml"
if global_config.exists():
try:
with open(global_config) as f:
config = yaml.safe_load(f)
# Primary model provider
model_config = config.get("model", {})
if isinstance(model_config, dict):
provider = model_config.get("provider", "")
if provider:
providers.add(provider)
# Auxiliary providers
auxiliary = config.get("auxiliary", {})
for aux_config in auxiliary.values():
if isinstance(aux_config, dict):
provider = aux_config.get("provider", "")
if provider and provider != "auto":
providers.add(provider)
except:
pass
# Profile configs
if PROFILES_DIR.exists():
for profile_dir in PROFILES_DIR.iterdir():
if profile_dir.is_dir():
config_file = profile_dir / "config.yaml"
if config_file.exists():
try:
with open(config_file) as f:
config = yaml.safe_load(f)
model_config = config.get("model", {})
if isinstance(model_config, dict):
provider = model_config.get("provider", "")
if provider:
providers.add(provider)
auxiliary = config.get("auxiliary", {})
for aux_config in auxiliary.values():
if isinstance(aux_config, dict):
provider = aux_config.get("provider", "")
if provider and provider != "auto":
providers.add(provider)
except:
pass
# Add common providers even if not configured
providers.update(["openrouter", "nous", "ollama"])
return list(providers)
def build_health_map():
"""Build a health map of all providers."""
providers = get_all_providers()
health_map = {}
log("Testing " + str(len(providers)) + " providers...")
for provider in providers:
api_key = get_provider_api_key(provider)
healthy, message = test_provider(provider, api_key)
health_map[provider] = {
"healthy": healthy,
"message": message,
"last_test": datetime.now(timezone.utc).isoformat(),
"api_key_present": bool(api_key)
}
status = "HEALTHY" if healthy else "UNHEALTHY"
log(" " + provider + ": " + status + " - " + message)
return health_map
def get_fallback_providers(health_map):
"""Get list of healthy providers in priority order."""
# Priority order: nous, openrouter, ollama, others
priority_order = ["nous", "openrouter", "ollama", "anthropic", "kimi-coding"]
healthy = []
for provider in priority_order:
if provider in health_map and health_map[provider]["healthy"]:
healthy.append(provider)
# Add any other healthy providers not in priority list
for provider, info in health_map.items():
if info["healthy"] and provider not in healthy:
healthy.append(provider)
return healthy
def update_profile_config(profile_name, new_provider):
"""Update a profile's config to use a new provider."""
config_file = PROFILES_DIR / profile_name / "config.yaml"
if not config_file.exists():
return False, "Config file not found"
try:
with open(config_file) as f:
config = yaml.safe_load(f)
# Update model provider
if "model" not in config:
config["model"] = {}
old_provider = config["model"].get("provider", "unknown")
config["model"]["provider"] = new_provider
# Update auxiliary providers if they were using the old provider
auxiliary = config.get("auxiliary", {})
for aux_name, aux_config in auxiliary.items():
if isinstance(aux_config, dict) and aux_config.get("provider") == old_provider:
aux_config["provider"] = new_provider
# Write back
with open(config_file, "w") as f:
yaml.dump(config, f, default_flow_style=False)
log("Updated " + profile_name + ": " + old_provider + " -> " + new_provider)
return True, "Updated"
except Exception as e:
return False, str(e)
def check_profiles(health_map):
"""Check all profiles and update unhealthy providers."""
if not PROFILES_DIR.exists():
return
fallback_providers = get_fallback_providers(health_map)
if not fallback_providers:
log("CRITICAL: No healthy providers available!")
return
updated_profiles = []
for profile_dir in PROFILES_DIR.iterdir():
if not profile_dir.is_dir():
continue
profile_name = profile_dir.name
config_file = profile_dir / "config.yaml"
if not config_file.exists():
continue
try:
with open(config_file) as f:
config = yaml.safe_load(f)
model_config = config.get("model", {})
if not isinstance(model_config, dict):
continue
current_provider = model_config.get("provider", "")
if not current_provider:
continue
# Check if current provider is healthy
if current_provider in health_map and health_map[current_provider]["healthy"]:
continue # Provider is healthy, no action needed
# Find best fallback
best_fallback = None
for provider in fallback_providers:
if provider != current_provider:
best_fallback = provider
break
if not best_fallback:
log("No fallback for " + profile_name + " (current: " + current_provider + ")")
continue
# Update profile
success, message = update_profile_config(profile_name, best_fallback)
if success:
updated_profiles.append({
"profile": profile_name,
"old_provider": current_provider,
"new_provider": best_fallback
})
except Exception as e:
log("Error processing " + profile_name + ": " + str(e))
return updated_profiles
def load_state():
"""Load state from tmux-state.json."""
if STATE_FILE.exists():
try:
with open(STATE_FILE) as f:
return json.load(f)
except:
pass
return {}
def save_state(state):
"""Save state to tmux-state.json."""
LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def run_once():
"""Run provider health check once."""
log("=== Provider Health Check ===")
state = load_state()
# Build health map
health_map = build_health_map()
# Check profiles and update if needed
updated_profiles = check_profiles(health_map)
# Update state
state["provider_health"] = health_map
state["last_provider_check"] = datetime.now(timezone.utc).isoformat()
if updated_profiles:
state["last_profile_updates"] = updated_profiles
save_state(state)
# Summary
healthy_count = sum(1 for p in health_map.values() if p["healthy"])
total_count = len(health_map)
log("Health: " + str(healthy_count) + "/" + str(total_count) + " providers healthy")
if updated_profiles:
log("Updated " + str(len(updated_profiles)) + " profiles:")
for update in updated_profiles:
log(" " + update["profile"] + ": " + update["old_provider"] + " -> " + update["new_provider"])
def show_status():
"""Show provider health status."""
state = load_state()
health_map = state.get("provider_health", {})
if not health_map:
print("No provider health data available. Run without --status first.")
return
print("Provider Health (last updated: " + str(state.get("last_provider_check", "unknown")) + ")")
print("=" * 80)
for provider, info in sorted(health_map.items()):
status = "HEALTHY" if info["healthy"] else "UNHEALTHY"
message = info.get("message", "")
api_key = "yes" if info.get("api_key_present") else "no"
print(provider.ljust(20) + " " + status.ljust(10) + " API key: " + api_key + " - " + message)
# Show recent updates
updates = state.get("last_profile_updates", [])
if updates:
print()
print("Recent Profile Updates:")
for update in updates:
print(" " + update["profile"] + ": " + update["old_provider"] + " -> " + update["new_provider"])
def daemon_mode():
"""Run continuously."""
log("Starting provider health daemon (check every 300s)")
while True:
try:
run_once()
time.sleep(300) # Check every 5 minutes
except KeyboardInterrupt:
log("Daemon stopped by user")
break
except Exception as e:
log("Error: " + str(e))
time.sleep(60)
def main():
if "--status" in sys.argv:
show_status()
elif "--daemon" in sys.argv:
daemon_mode()
else:
run_once()
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
Soul Eval Gate — The Conscience of the Training Pipeline

View File

@@ -196,37 +196,7 @@
"paused_reason": null,
"skills": [],
"skill": null
},
{
"id": "tmux-supervisor-513",
"name": "Autonomous Cron Supervisor",
"prompt": "Load the tmux-supervisor skill and execute the monitoring protocol.\n\nCheck both `dev` and `timmy` tmux sessions for idle panes. Only send Telegram notifications on actionable events (idle, overflow, failure). Be silent when all agents are working.\n\nSteps:\n1. List all tmux sessions (skip 'Alexander')\n2. For each session, list windows and panes\n3. Capture each pane and classify state (idle vs active)\n4. For idle panes: read context, craft context-aware prompt\n5. Send /queue prompts to idle panes\n6. Verify prompts landed\n7. Only notify via Telegram if:\n - A pane was prompted (idle detected)\n - A pane shows context overflow (>80%)\n - A pane is stuck or crashed\n8. If all panes are active: respond with [SILENT]",
"schedule": {
"kind": "interval",
"minutes": 7,
"display": "every 7m"
},
"schedule_display": "every 7m",
"repeat": {
"times": null,
"completed": 0
},
"enabled": true,
"created_at": "2026-04-15T03:00:00.000000+00:00",
"next_run_at": null,
"last_run_at": null,
"last_status": null,
"last_error": null,
"deliver": "telegram",
"origin": null,
"state": "scheduled",
"paused_at": null,
"paused_reason": null,
"skills": [
"tmux-supervisor"
],
"skill": "tmux-supervisor"
}
],
"updated_at": "2026-04-13T02:00:00+00:00"
}
}

View File

@@ -1,9 +0,0 @@
- name: Nightly Pipeline Scheduler
schedule: '*/30 18-23,0-8 * * *' # Every 30 min, off-peak hours only
tasks:
- name: Check and start pipelines
shell: "bash scripts/nightly-pipeline-scheduler.sh"
env:
PIPELINE_TOKEN_LIMIT: "500000"
PIPELINE_PEAK_START: "9"
PIPELINE_PEAK_END: "18"

View File

@@ -1,568 +0,0 @@
#!/usr/bin/env python3
"""
orchestrator.py — Shared Pipeline Orchestrator
SQLite-backed job queue with parallel workers, token budget tracking,
checkpoint resume, rate limiting, and error retry.
All 5 pipelines use this orchestrator for consistent execution.
Usage:
python3 orchestrator.py --pipeline training_factory --jobs jobs.jsonl
python3 orchestrator.py --pipeline adversary --jobs jobs.jsonl --workers 5
python3 orchestrator.py --status
python3 orchestrator.py --resume training_factory
python3 orchestrator.py --report training_factory
"""
import json
import os
import sys
import time
import sqlite3
import hashlib
import threading
import signal
from datetime import datetime, timezone
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any, Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
DB_PATH = Path.home() / ".hermes" / "pipeline" / "orchestrator.db"
REPORT_DIR = Path.home() / ".hermes" / "pipeline" / "reports"
# ============================================================
# Data Structures
# ============================================================
@dataclass
class JobStatus:
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
SKIPPED = "skipped"
@dataclass
class PipelineStats:
"""Runtime statistics for a pipeline run."""
pipeline: str
total_jobs: int = 0
completed: int = 0
failed: int = 0
skipped: int = 0
tokens_used: int = 0
tokens_budget: int = 5_000_000
elapsed_seconds: float = 0.0
start_time: str = ""
jobs_per_minute: float = 0.0
def to_dict(self):
return {
"pipeline": self.pipeline,
"total_jobs": self.total_jobs,
"completed": self.completed,
"failed": self.failed,
"skipped": self.skipped,
"tokens_used": self.tokens_used,
"tokens_budget": self.tokens_budget,
"elapsed_seconds": round(self.elapsed_seconds, 1),
"start_time": self.start_time,
"jobs_per_minute": round(self.jobs_per_minute, 2),
}
# ============================================================
# Database
# ============================================================
def get_db():
"""Get SQLite database connection."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH), timeout=30, check_same_thread=False)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
_init_db(conn)
return conn
def _init_db(conn):
"""Initialize database schema."""
conn.executescript("""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline TEXT NOT NULL,
job_key TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT DEFAULT 'pending',
attempts INTEGER DEFAULT 0,
max_attempts INTEGER DEFAULT 3,
tokens_used INTEGER DEFAULT 0,
error TEXT,
result TEXT,
checkpoint TEXT,
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
completed_at TEXT,
UNIQUE(pipeline, job_key)
);
CREATE INDEX IF NOT EXISTS idx_jobs_pipeline_status ON jobs(pipeline, status);
CREATE INDEX IF NOT EXISTS idx_jobs_pipeline_key ON jobs(pipeline, job_key);
CREATE TABLE IF NOT EXISTS pipeline_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline TEXT NOT NULL,
started_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
total_jobs INTEGER DEFAULT 0,
completed INTEGER DEFAULT 0,
failed INTEGER DEFAULT 0,
tokens_used INTEGER DEFAULT 0,
report TEXT
);
""")
conn.commit()
# ============================================================
# Job Queue
# ============================================================
class JobQueue:
"""SQLite-backed job queue."""
def __init__(self, pipeline: str, conn=None):
self.pipeline = pipeline
self.conn = conn or get_db()
def enqueue(self, job_key: str, payload: dict, max_attempts: int = 3):
"""Add a job to the queue (skip if already exists)."""
try:
self.conn.execute(
"INSERT INTO jobs (pipeline, job_key, payload, max_attempts) VALUES (?, ?, ?, ?)",
(self.pipeline, job_key, json.dumps(payload), max_attempts),
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
# Already exists — check if it needs retry
row = self.conn.execute(
"SELECT status FROM jobs WHERE pipeline=? AND job_key=?",
(self.pipeline, job_key),
).fetchone()
if row and row[0] == "failed":
# Reset for retry
self.conn.execute(
"UPDATE jobs SET status='pending', attempts=0, error=NULL WHERE pipeline=? AND job_key=?",
(self.pipeline, job_key),
)
self.conn.commit()
return True
return False
def enqueue_batch(self, jobs: List[dict], key_field: str = "id"):
"""Enqueue multiple jobs. Returns (added, skipped) counts."""
added = 0
skipped = 0
for job in jobs:
key = str(job.get(key_field, hashlib.md5(json.dumps(job).encode()).hexdigest()[:12]))
if self.enqueue(key, job):
added += 1
else:
skipped += 1
return added, skipped
def claim_next(self) -> Optional[dict]:
"""Claim the next pending job (atomic)."""
row = self.conn.execute(
"""UPDATE jobs SET status='running', started_at=datetime('now')
WHERE id = (
SELECT id FROM jobs WHERE pipeline=? AND status IN ('pending', 'retrying')
ORDER BY attempts ASC, created_at ASC LIMIT 1
) RETURNING *""",
(self.pipeline,),
).fetchone()
if not row:
return None
cols = [d[1] for d in self.conn.execute("PRAGMA table_info(jobs)").fetchall()]
return dict(zip(cols, row))
def complete(self, job_key: str, result: dict, tokens_used: int = 0):
"""Mark a job as completed."""
self.conn.execute(
"""UPDATE jobs SET status='completed', completed_at=datetime('now'),
result=?, tokens_used=? WHERE pipeline=? AND job_key=?""",
(json.dumps(result), tokens_used, self.pipeline, job_key),
)
self.conn.commit()
def fail(self, job_key: str, error: str, retry: bool = True):
"""Mark a job as failed, optionally retry."""
row = self.conn.execute(
"SELECT attempts, max_attempts FROM jobs WHERE pipeline=? AND job_key=?",
(self.pipeline, job_key),
).fetchone()
if not row:
return
attempts, max_attempts = row
new_attempts = attempts + 1
if retry and new_attempts < max_attempts:
# Exponential backoff: 2^attempts seconds
delay = min(2 ** new_attempts, 60)
self.conn.execute(
"""UPDATE jobs SET status='retrying', attempts=?, error=?
WHERE pipeline=? AND job_key=?""",
(new_attempts, error, self.pipeline, job_key),
)
else:
self.conn.execute(
"""UPDATE jobs SET status='failed', attempts=?, error=?,
completed_at=datetime('now') WHERE pipeline=? AND job_key=?""",
(new_attempts, error, self.pipeline, job_key),
)
self.conn.commit()
def save_checkpoint(self, job_key: str, checkpoint: dict):
"""Save progress checkpoint for resume."""
self.conn.execute(
"UPDATE jobs SET checkpoint=? WHERE pipeline=? AND job_key=?",
(json.dumps(checkpoint), self.pipeline, job_key),
)
self.conn.commit()
def get_checkpoint(self, job_key: str) -> Optional[dict]:
"""Get saved checkpoint."""
row = self.conn.execute(
"SELECT checkpoint FROM jobs WHERE pipeline=? AND job_key=?",
(self.pipeline, job_key),
).fetchone()
if row and row[0]:
return json.loads(row[0])
return None
def stats(self) -> dict:
"""Get queue statistics."""
rows = self.conn.execute(
"""SELECT status, COUNT(*), COALESCE(SUM(tokens_used), 0)
FROM jobs WHERE pipeline=? GROUP BY status""",
(self.pipeline,),
).fetchall()
result = {"total": 0, "tokens_used": 0}
for status, count, tokens in rows:
result[status] = count
result["total"] += count
result["tokens_used"] += tokens
return result
# ============================================================
# Orchestrator
# ============================================================
class Orchestrator:
"""
Shared orchestrator for all pipelines.
Features:
- Parallel worker pool (configurable)
- Token budget tracking
- Checkpoint resume
- Rate limiting
- Error retry with exponential backoff
- Final report generation
"""
def __init__(self, pipeline: str, workers: int = 10, token_budget: int = 5_000_000):
self.pipeline = pipeline
self.workers = workers
self.token_budget = token_budget
self.queue = JobQueue(pipeline)
self.conn = self.queue.conn
self._shutdown = False
self._stats = PipelineStats(pipeline=pipeline, tokens_budget=token_budget)
self._rate_limit_delay = 0.1 # seconds between jobs
self._response_cache: Dict[str, dict] = {}
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
def _handle_signal(self, signum, frame):
"""Graceful shutdown on signal."""
print(f"\nReceived signal {signum}. Shutting down gracefully...")
self._shutdown = True
def load_jobs(self, jobs_path: str, key_field: str = "id"):
"""Load jobs from a JSONL file into the queue."""
jobs = []
with open(jobs_path) as f:
for line in f:
line = line.strip()
if line:
jobs.append(json.loads(line))
added, skipped = self.queue.enqueue_batch(jobs, key_field)
print(f"Loaded: {added} new, {skipped} existing")
def run(self, job_handler: Callable[[dict], dict] = None):
"""
Run the orchestrator. Processes all pending jobs with parallel workers.
Args:
job_handler: function(job_payload) -> dict with 'tokens_used' key
"""
start = time.time()
self._stats.start_time = datetime.now(timezone.utc).isoformat()
# Record run
self.conn.execute(
"INSERT INTO pipeline_runs (pipeline, started_at) VALUES (?, ?)",
(self.pipeline, self._stats.start_time),
)
run_id = self.conn.execute("SELECT last_insert_rowid()").fetchone()[0]
self.conn.commit()
stats = self.queue.stats()
self._stats.total_jobs = stats.get("pending", 0) + stats.get("retrying", 0)
print(f"\nPipeline: {self.pipeline}")
print(f"Jobs: {self._stats.total_jobs} pending | Workers: {self.workers} | Budget: {self.token_budget:,} tokens")
print()
if self._stats.total_jobs == 0:
print("No jobs to process.")
return
completed = 0
failed = 0
skipped = 0
tokens_used = 0
with ThreadPoolExecutor(max_workers=self.workers) as executor:
futures = {}
while not self._shutdown:
# Check token budget
if tokens_used >= self.token_budget:
print(f"Token budget exhausted ({tokens_used:,}/{self.token_budget:,})")
break
# Fill worker pool
while len(futures) < self.workers and not self._shutdown:
job = self.queue.claim_next()
if not job:
break
# Check response cache (zero-token retries)
job_key = job["job_key"]
payload = json.loads(job["payload"])
cache_key = hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()
if cache_key in self._response_cache:
result = self._response_cache[cache_key]
self.queue.complete(job_key, result, tokens_used=0)
skipped += 1
continue
# Submit to worker
future = executor.submit(self._process_job, job, job_handler)
futures[future] = job
# Rate limiting
time.sleep(self._rate_limit_delay)
if not futures:
break
# Collect results
done = []
for future in as_completed(futures, timeout=1):
job = futures[future]
try:
result = future.result()
if result.get("success"):
tokens = result.get("tokens_used", 0)
tokens_used += tokens
self.queue.complete(job["job_key"], result, tokens_used=tokens)
completed += 1
else:
error = result.get("error", "unknown error")
self.queue.fail(job["job_key"], error, retry=True)
failed += 1
except Exception as e:
self.queue.fail(job["job_key"], str(e), retry=True)
failed += 1
done.append(future)
# Progress update
total = completed + failed + skipped
if total % 10 == 0:
elapsed = time.time() - start
rate = completed / (elapsed / 60) if elapsed > 0 else 0
print(f" Progress: {total}/{self._stats.total_jobs} | "
f"completed={completed} failed={failed} | "
f"tokens={tokens_used:,} | "
f"{rate:.1f}/min")
for f in done:
del futures[f]
# Final report
elapsed = time.time() - start
self._stats.completed = completed
self._stats.failed = failed
self._stats.skipped = skipped
self._stats.tokens_used = tokens_used
self._stats.elapsed_seconds = elapsed
self._stats.jobs_per_minute = completed / (elapsed / 60) if elapsed > 0 else 0
# Save run
self.conn.execute(
"""UPDATE pipeline_runs SET completed_at=?, total_jobs=?, completed=?,
failed=?, tokens_used=?, report=? WHERE id=?""",
(datetime.now(timezone.utc).isoformat(), self._stats.total_jobs,
completed, failed, tokens_used, json.dumps(self._stats.to_dict()), run_id),
)
self.conn.commit()
# Print report
print(f"\n{'='*50}")
print(f"Pipeline: {self.pipeline}")
print(f"Completed: {completed}/{self._stats.total_jobs}")
print(f"Failed: {failed}")
print(f"Skipped (cached): {skipped}")
print(f"Tokens: {tokens_used:,}/{self.token_budget:,}")
print(f"Time: {elapsed:.1f}s ({self._stats.jobs_per_minute:.1f}/min)")
print(f"{'='*50}")
# Save report file
self._save_report()
def _process_job(self, job: dict, handler: Callable = None) -> dict:
"""Process a single job."""
payload = json.loads(job["payload"])
job_key = job["job_key"]
checkpoint = self.queue.get_checkpoint(job_key)
if handler:
try:
result = handler(payload, checkpoint=checkpoint)
return result or {"success": True, "tokens_used": 0}
except Exception as e:
return {"success": False, "error": str(e)}
else:
# Default handler: just mark as complete
return {"success": True, "tokens_used": 0}
def _save_report(self):
"""Save pipeline run report."""
REPORT_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = REPORT_DIR / f"{self.pipeline}_{ts}.json"
with open(path, "w") as f:
json.dump(self._stats.to_dict(), f, indent=2)
print(f"Report: {path}")
def resume(self):
"""Resume failed/retrying jobs from a previous run."""
stats = self.queue.stats()
retrying = stats.get("retrying", 0)
failed = stats.get("failed", 0)
print(f"Resume {self.pipeline}: {retrying} retrying, {failed} failed to reset")
# Reset failed jobs to pending for retry
self.conn.execute(
"UPDATE jobs SET status='pending', attempts=0 WHERE pipeline=? AND status='failed'",
(self.pipeline,),
)
self.conn.execute(
"UPDATE jobs SET status='pending' WHERE pipeline=? AND status='retrying'",
(self.pipeline,),
)
self.conn.commit()
def status(self):
"""Show pipeline status."""
stats = self.queue.stats()
print(f"\nPipeline: {self.pipeline}")
for k, v in sorted(stats.items()):
print(f" {k}: {v}")
# ============================================================
# CLI
# ============================================================
def show_all_status():
"""Show status of all pipelines."""
conn = get_db()
pipelines = conn.execute(
"SELECT DISTINCT pipeline FROM jobs ORDER BY pipeline"
).fetchall()
if not pipelines:
print("No pipelines in database.")
return
print(f"\nAll Pipeline Status")
print(f"{'='*60}")
for (pipeline,) in pipelines:
queue = JobQueue(pipeline, conn)
stats = queue.stats()
total = stats.get("total", 0)
pending = stats.get("pending", 0)
running = stats.get("running", 0)
completed = stats.get("completed", 0)
failed = stats.get("failed", 0)
tokens = stats.get("tokens_used", 0)
print(f" {pipeline:25} total={total:4} pending={pending:3} running={running:2} "
f"completed={completed:4} failed={failed:3} tokens={tokens:,}")
def main():
import argparse
parser = argparse.ArgumentParser(description="Shared Pipeline Orchestrator")
parser.add_argument("--pipeline", "-p", help="Pipeline name")
parser.add_argument("--jobs", "-j", help="Jobs JSONL file to load")
parser.add_argument("--workers", "-w", type=int, default=10, help="Parallel workers")
parser.add_argument("--budget", "-b", type=int, default=5_000_000, help="Token budget")
parser.add_argument("--status", action="store_true", help="Show status")
parser.add_argument("--resume", action="store_true", help="Resume failed jobs")
parser.add_argument("--key-field", default="id", help="Job key field name")
args = parser.parse_args()
if args.status:
if args.pipeline:
orch = Orchestrator(args.pipeline)
orch.status()
else:
show_all_status()
return
if not args.pipeline:
parser.error("--pipeline is required")
orch = Orchestrator(args.pipeline, workers=args.workers, token_budget=args.budget)
if args.jobs:
orch.load_jobs(args.jobs, key_field=args.key_field)
if args.resume:
orch.resume()
if args.jobs or args.resume:
orch.run()
if __name__ == "__main__":
main()

94
pipelines/README.md Normal file
View File

@@ -0,0 +1,94 @@
# Pipeline Infrastructure
Shared orchestrator for all batch pipelines.
## Components
### orchestrator.py
Shared orchestrator providing:
- **Job Queue**: SQLite-backed with priority support
- **Worker Pool**: Configurable parallelism (default 10)
- **Token Budget**: Per-job tracking and limits
- **Checkpointing**: Resume from any point after restart
- **Rate Limiting**: Provider-aware request throttling
- **Retry Logic**: Exponential backoff with configurable retries
- **Reporting**: Generate summary reports
## Usage
### Python API
```python
from pipelines.orchestrator import PipelineOrchestrator, JobPriority
# Create orchestrator
orchestrator = PipelineOrchestrator(max_workers=10)
# Register pipeline handler
def my_handler(job):
# Process job.task
return {"result": "done"}
orchestrator.register_handler("my_pipeline", my_handler)
# Submit jobs
job_id = orchestrator.submit_job(
pipeline="my_pipeline",
task={"action": "process", "data": "..."},
priority=JobPriority.HIGH,
token_budget=100000
)
# Run orchestrator
orchestrator.run()
```
### CLI
```bash
# Submit a job
python -m pipelines.orchestrator submit my_pipeline --task '{"action": "process"}'
# Run orchestrator
python -m pipelines.orchestrator run --workers 10 --max-jobs 100
# Check job status
python -m pipelines.orchestrator status <job_id>
# Resume paused job
python -m pipelines.orchestrator resume <job_id>
# Show stats
python -m pipelines.orchestrator stats
# Generate report
python -m pipelines.orchestrator report
```
## Database
Jobs are stored in `~/.hermes/pipelines/orchestrator.db`:
- `jobs` - Job queue and state
- `checkpoints` - Resume points
- `reports` - Generated reports
## Configuration
### Rate Limits
```python
orchestrator.configure_rate_limit("Nous", rpm=60, tpm=1000000)
orchestrator.configure_rate_limit("Anthropic", rpm=50, tpm=800000)
```
### Token Budgets
Default: 1M tokens per job. Override per-job:
```python
orchestrator.submit_job("pipeline", task, token_budget=500000)
```
## Pipelines
All pipelines share this orchestrator:
1. **batch-runner** - Run prompts across datasets
2. **data-gen** - Generate training data
3. **eval-runner** - Run evaluations
4. **trajectory-compress** - Compress trajectories
5. **web-research** - Research tasks

807
pipelines/orchestrator.py Normal file
View File

@@ -0,0 +1,807 @@
#!/usr/bin/env python3
"""
Pipeline Orchestrator - Shared infrastructure for all pipelines.
Provides:
- Job queue (SQLite-backed)
- Parallel worker pool (configurable, default 10)
- Token budget tracking per job
- Progress persistence (resume from checkpoint)
- Rate limiting (respect provider limits)
- Error retry with exponential backoff
- Final report generation
"""
import asyncio
import json
import logging
import os
import sqlite3
import time
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
import hashlib
logger = logging.getLogger(__name__)
# Configuration
HERMES_HOME = Path.home() / ".hermes"
PIPELINES_DIR = HERMES_HOME / "pipelines"
ORCHESTRATOR_DB = PIPELINES_DIR / "orchestrator.db"
DEFAULT_WORKERS = 10
DEFAULT_TOKEN_BUDGET = 1_000_000 # 1M tokens default
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
PAUSED = "paused"
CANCELLED = "cancelled"
class JobPriority(Enum):
LOW = 0
NORMAL = 5
HIGH = 10
CRITICAL = 20
@dataclass
class JobCheckpoint:
"""Checkpoint for resumable job execution."""
job_id: str
step: int
data: Dict[str, Any]
timestamp: float = field(default_factory=time.time)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'JobCheckpoint':
return cls(**data)
@dataclass
class TokenUsage:
"""Token usage tracking."""
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_write_tokens: int = 0
cost_usd: float = 0.0
@property
def total_tokens(self) -> int:
return self.input_tokens + self.output_tokens
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TokenUsage':
return cls(**data)
@dataclass
class Job:
"""A pipeline job."""
id: str
pipeline: str
task: Dict[str, Any]
status: JobStatus = JobStatus.PENDING
priority: JobPriority = JobPriority.NORMAL
token_budget: int = DEFAULT_TOKEN_BUDGET
token_usage: TokenUsage = field(default_factory=TokenUsage)
retry_count: int = 0
max_retries: int = 3
created_at: float = field(default_factory=time.time)
started_at: Optional[float] = None
completed_at: Optional[float] = None
error: Optional[str] = None
result: Optional[Dict[str, Any]] = None
checkpoint: Optional[JobCheckpoint] = None
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
d = asdict(self)
d['status'] = self.status.value
d['priority'] = self.priority.value
d['token_usage'] = self.token_usage.to_dict()
if self.checkpoint:
d['checkpoint'] = self.checkpoint.to_dict()
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Job':
data['status'] = JobStatus(data['status'])
data['priority'] = JobPriority(data['priority'])
data['token_usage'] = TokenUsage.from_dict(data.get('token_usage', {}))
if data.get('checkpoint'):
data['checkpoint'] = JobCheckpoint.from_dict(data['checkpoint'])
return cls(**data)
class RateLimiter:
"""Rate limiter for API providers."""
def __init__(self):
self.limits: Dict[str, Dict[str, Any]] = {}
self.requests: Dict[str, List[float]] = {}
def configure(self, provider: str, requests_per_minute: int, tokens_per_minute: int):
"""Configure rate limits for a provider."""
self.limits[provider] = {
'rpm': requests_per_minute,
'tpm': tokens_per_minute,
}
if provider not in self.requests:
self.requests[provider] = []
def can_proceed(self, provider: str, tokens: int = 0) -> Tuple[bool, float]:
"""Check if request can proceed. Returns (can_proceed, wait_seconds)."""
if provider not in self.limits:
return True, 0.0
now = time.time()
minute_ago = now - 60
# Clean old requests
self.requests[provider] = [t for t in self.requests[provider] if t > minute_ago]
limit = self.limits[provider]
# Check RPM
if len(self.requests[provider]) >= limit['rpm']:
oldest = min(self.requests[provider])
wait = 60 - (now - oldest)
return False, max(0, wait)
return True, 0.0
def record_request(self, provider: str, tokens: int = 0):
"""Record a request."""
if provider not in self.requests:
self.requests[provider] = []
self.requests[provider].append(time.time())
class OrchestratorDB:
"""SQLite-backed job queue and state management."""
SCHEMA = """
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
pipeline TEXT NOT NULL,
task TEXT NOT NULL,
status TEXT NOT NULL,
priority INTEGER NOT NULL,
token_budget INTEGER NOT NULL,
token_usage TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
created_at REAL NOT NULL,
started_at REAL,
completed_at REAL,
error TEXT,
result TEXT,
checkpoint TEXT,
metadata TEXT
);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
CREATE INDEX IF NOT EXISTS idx_jobs_pipeline ON jobs(pipeline);
CREATE INDEX IF NOT EXISTS idx_jobs_priority ON jobs(priority DESC);
CREATE TABLE IF NOT EXISTS checkpoints (
job_id TEXT PRIMARY KEY,
step INTEGER NOT NULL,
data TEXT NOT NULL,
timestamp REAL NOT NULL,
FOREIGN KEY (job_id) REFERENCES jobs(id)
);
CREATE TABLE IF NOT EXISTS reports (
id TEXT PRIMARY KEY,
pipeline TEXT NOT NULL,
job_ids TEXT NOT NULL,
summary TEXT NOT NULL,
token_usage TEXT NOT NULL,
created_at REAL NOT NULL
);
"""
def __init__(self, db_path: Optional[Path] = None):
self.db_path = db_path or ORCHESTRATOR_DB
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _init_db(self):
"""Initialize database schema."""
conn = sqlite3.connect(str(self.db_path))
conn.executescript(self.SCHEMA)
conn.commit()
conn.close()
def _get_conn(self) -> sqlite3.Connection:
"""Get database connection."""
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
return conn
def save_job(self, job: Job):
"""Save or update a job."""
conn = self._get_conn()
conn.execute("""
INSERT OR REPLACE INTO jobs
(id, pipeline, task, status, priority, token_budget, token_usage,
retry_count, max_retries, created_at, started_at, completed_at,
error, result, checkpoint, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
job.id, job.pipeline, json.dumps(job.task), job.status.value,
job.priority.value, job.token_budget, json.dumps(job.token_usage.to_dict()),
job.retry_count, job.max_retries, job.created_at, job.started_at,
job.completed_at, job.error, json.dumps(job.result) if job.result else None,
json.dumps(job.checkpoint.to_dict()) if job.checkpoint else None,
json.dumps(job.metadata)
))
conn.commit()
conn.close()
def get_job(self, job_id: str) -> Optional[Job]:
"""Get a job by ID."""
conn = self._get_conn()
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
conn.close()
if not row:
return None
return Job(
id=row['id'],
pipeline=row['pipeline'],
task=json.loads(row['task']),
status=JobStatus(row['status']),
priority=JobPriority(row['priority']),
token_budget=row['token_budget'],
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
retry_count=row['retry_count'],
max_retries=row['max_retries'],
created_at=row['created_at'],
started_at=row['started_at'],
completed_at=row['completed_at'],
error=row['error'],
result=json.loads(row['result']) if row['result'] else None,
checkpoint=JobCheckpoint.from_dict(json.loads(row['checkpoint'])) if row['checkpoint'] else None,
metadata=json.loads(row['metadata']) if row['metadata'] else {}
)
def get_next_job(self, pipeline: Optional[str] = None) -> Optional[Job]:
"""Get next pending job (highest priority first)."""
conn = self._get_conn()
query = "SELECT * FROM jobs WHERE status = 'pending'"
params = []
if pipeline:
query += " AND pipeline = ?"
params.append(pipeline)
query += " ORDER BY priority DESC, created_at ASC LIMIT 1"
row = conn.execute(query, params).fetchone()
conn.close()
if not row:
return None
return Job(
id=row['id'],
pipeline=row['pipeline'],
task=json.loads(row['task']),
status=JobStatus(row['status']),
priority=JobPriority(row['priority']),
token_budget=row['token_budget'],
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
retry_count=row['retry_count'],
max_retries=row['max_retries'],
created_at=row['created_at'],
started_at=row['started_at'],
completed_at=row['completed_at'],
error=row['error'],
result=json.loads(row['result']) if row['result'] else None,
checkpoint=JobCheckpoint.from_dict(json.loads(row['checkpoint'])) if row['checkpoint'] else None,
metadata=json.loads(row['metadata']) if row['metadata'] else {}
)
def get_jobs_by_status(self, status: JobStatus, pipeline: Optional[str] = None) -> List[Job]:
"""Get all jobs with given status."""
conn = self._get_conn()
query = "SELECT * FROM jobs WHERE status = ?"
params = [status.value]
if pipeline:
query += " AND pipeline = ?"
params.append(pipeline)
query += " ORDER BY priority DESC, created_at ASC"
rows = conn.execute(query, params).fetchall()
conn.close()
return [
Job(
id=row['id'],
pipeline=row['pipeline'],
task=json.loads(row['task']),
status=JobStatus(row['status']),
priority=JobPriority(row['priority']),
token_budget=row['token_budget'],
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
retry_count=row['retry_count'],
max_retries=row['max_retries'],
created_at=row['created_at'],
started_at=row['started_at'],
completed_at=row['completed_at'],
error=row['error'],
result=json.loads(row['result']) if row['result'] else None,
checkpoint=JobCheckpoint.from_dict(json.loads(row['checkpoint'])) if row['checkpoint'] else None,
metadata=json.loads(row['metadata']) if row['metadata'] else {}
)
for row in rows
]
def save_checkpoint(self, job_id: str, checkpoint: JobCheckpoint):
"""Save a checkpoint for a job."""
conn = self._get_conn()
conn.execute("""
INSERT OR REPLACE INTO checkpoints (job_id, step, data, timestamp)
VALUES (?, ?, ?, ?)
""", (job_id, checkpoint.step, json.dumps(checkpoint.data), checkpoint.timestamp))
conn.commit()
conn.close()
def get_checkpoint(self, job_id: str) -> Optional[JobCheckpoint]:
"""Get the latest checkpoint for a job."""
conn = self._get_conn()
row = conn.execute(
"SELECT * FROM checkpoints WHERE job_id = ? ORDER BY step DESC LIMIT 1",
(job_id,)
).fetchone()
conn.close()
if not row:
return None
return JobCheckpoint(
job_id=row['job_id'],
step=row['step'],
data=json.loads(row['data']),
timestamp=row['timestamp']
)
def get_stats(self, pipeline: Optional[str] = None) -> Dict[str, Any]:
"""Get queue statistics."""
conn = self._get_conn()
query = "SELECT status, COUNT(*) as count FROM jobs"
params = []
if pipeline:
query += " WHERE pipeline = ?"
params.append(pipeline)
query += " GROUP BY status"
rows = conn.execute(query, params).fetchall()
conn.close()
stats = {row['status']: row['count'] for row in rows}
# Get token usage
conn = self._get_conn()
query = "SELECT SUM(CAST(json_extract(token_usage, '$.input_tokens') AS INTEGER)) as input, SUM(CAST(json_extract(token_usage, '$.output_tokens') AS INTEGER)) as output FROM jobs"
if pipeline:
query += " WHERE pipeline = ?"
row = conn.execute(query, params if pipeline else []).fetchone()
conn.close()
stats['total_input_tokens'] = row['input'] or 0
stats['total_output_tokens'] = row['output'] or 0
stats['total_tokens'] = stats['total_input_tokens'] + stats['total_output_tokens']
return stats
class PipelineOrchestrator:
"""Main orchestrator for pipeline execution."""
def __init__(
self,
max_workers: int = DEFAULT_WORKERS,
token_budget: int = DEFAULT_TOKEN_BUDGET,
db_path: Optional[Path] = None
):
self.max_workers = max_workers
self.token_budget = token_budget
self.db = OrchestratorDB(db_path)
self.rate_limiter = RateLimiter()
self.executor: Optional[ThreadPoolExecutor] = None
self.running = False
self._handlers: Dict[str, Callable] = {}
def register_handler(self, pipeline: str, handler: Callable):
"""Register a handler for a pipeline type."""
self._handlers[pipeline] = handler
def configure_rate_limit(self, provider: str, rpm: int, tpm: int):
"""Configure rate limits for a provider."""
self.rate_limiter.configure(provider, rpm, tpm)
def submit_job(
self,
pipeline: str,
task: Dict[str, Any],
priority: JobPriority = JobPriority.NORMAL,
token_budget: Optional[int] = None,
max_retries: int = 3,
metadata: Optional[Dict[str, Any]] = None
) -> str:
"""Submit a new job to the queue."""
job_id = str(uuid.uuid4())
job = Job(
id=job_id,
pipeline=pipeline,
task=task,
priority=priority,
token_budget=token_budget or self.token_budget,
max_retries=max_retries,
metadata=metadata or {}
)
self.db.save_job(job)
logger.info(f"Job {job_id} submitted to pipeline {pipeline}")
return job_id
def submit_batch(
self,
pipeline: str,
tasks: List[Dict[str, Any]],
priority: JobPriority = JobPriority.NORMAL,
token_budget: Optional[int] = None
) -> List[str]:
"""Submit multiple jobs at once."""
job_ids = []
for task in tasks:
job_id = self.submit_job(pipeline, task, priority, token_budget)
job_ids.append(job_id)
logger.info(f"Submitted {len(job_ids)} jobs to pipeline {pipeline}")
return job_ids
def _execute_job(self, job: Job) -> Job:
"""Execute a single job with retry logic."""
handler = self._handlers.get(job.pipeline)
if not handler:
job.status = JobStatus.FAILED
job.error = f"No handler registered for pipeline: {job.pipeline}"
job.completed_at = time.time()
self.db.save_job(job)
return job
# Check token budget
if job.token_usage.total_tokens >= job.token_budget:
job.status = JobStatus.FAILED
job.error = "Token budget exceeded"
job.completed_at = time.time()
self.db.save_job(job)
return job
# Update status
job.status = JobStatus.RUNNING
job.started_at = time.time()
self.db.save_job(job)
try:
# Execute with exponential backoff retry
for attempt in range(job.max_retries + 1):
try:
# Check rate limits
provider = job.metadata.get('provider', 'default')
can_proceed, wait_time = self.rate_limiter.can_proceed(provider)
if not can_proceed:
logger.info(f"Rate limited, waiting {wait_time:.1f}s")
time.sleep(wait_time)
# Execute the handler
result = handler(job)
# Record request
self.rate_limiter.record_request(provider)
# Update job with result
job.result = result
job.status = JobStatus.COMPLETED
job.completed_at = time.time()
# Update token usage from result if provided
if 'token_usage' in result:
usage = result['token_usage']
job.token_usage.input_tokens += usage.get('input_tokens', 0)
job.token_usage.output_tokens += usage.get('output_tokens', 0)
job.token_usage.cache_read_tokens += usage.get('cache_read_tokens', 0)
job.token_usage.cache_write_tokens += usage.get('cache_write_tokens', 0)
job.token_usage.cost_usd += usage.get('cost_usd', 0)
self.db.save_job(job)
return job
except Exception as e:
job.retry_count += 1
if job.retry_count >= job.max_retries:
job.status = JobStatus.FAILED
job.error = str(e)
job.completed_at = time.time()
self.db.save_job(job)
return job
# Exponential backoff
wait_time = (2 ** job.retry_count) + (time.time() % 1)
logger.warning(f"Job {job.id} failed (attempt {job.retry_count}), retrying in {wait_time:.1f}s: {e}")
time.sleep(wait_time)
except Exception as e:
job.status = JobStatus.FAILED
job.error = f"Unexpected error: {str(e)}"
job.completed_at = time.time()
self.db.save_job(job)
return job
return job
def run(self, pipeline: Optional[str] = None, max_jobs: Optional[int] = None):
"""Run the orchestrator, processing jobs from the queue."""
self.running = True
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
logger.info(f"Orchestrator starting (workers={self.max_workers})")
try:
jobs_processed = 0
while self.running:
# Get next job
job = self.db.get_next_job(pipeline)
if not job:
# No pending jobs, wait a bit
time.sleep(1)
continue
# Submit to thread pool
future = self.executor.submit(self._execute_job, job)
# Don't wait for completion, get next job
jobs_processed += 1
if max_jobs and jobs_processed >= max_jobs:
logger.info(f"Reached max_jobs limit ({max_jobs})")
break
finally:
self.executor.shutdown(wait=True)
self.running = False
logger.info(f"Orchestrator stopped (processed {jobs_processed} jobs)")
def run_single(self, job_id: str) -> Job:
"""Run a single job by ID (useful for resume)."""
job = self.db.get_job(job_id)
if not job:
raise ValueError(f"Job not found: {job_id}")
if job.status not in (JobStatus.PENDING, JobStatus.FAILED):
raise ValueError(f"Job {job_id} is not pending or failed (status: {job.status})")
# Reset for retry
if job.status == JobStatus.FAILED:
job.status = JobStatus.PENDING
job.retry_count = 0
job.error = None
self.db.save_job(job)
return self._execute_job(job)
def pause_job(self, job_id: str):
"""Pause a job."""
job = self.db.get_job(job_id)
if job and job.status == JobStatus.RUNNING:
job.status = JobStatus.PAUSED
self.db.save_job(job)
logger.info(f"Job {job_id} paused")
def resume_job(self, job_id: str) -> Job:
"""Resume a paused job from checkpoint."""
job = self.db.get_job(job_id)
if not job:
raise ValueError(f"Job not found: {job_id}")
if job.status != JobStatus.PAUSED:
raise ValueError(f"Job {job_id} is not paused (status: {job.status})")
# Load checkpoint if exists
checkpoint = self.db.get_checkpoint(job_id)
if checkpoint:
job.checkpoint = checkpoint
logger.info(f"Resuming job {job_id} from checkpoint step {checkpoint.step}")
job.status = JobStatus.PENDING
self.db.save_job(job)
return self._execute_job(job)
def cancel_job(self, job_id: str):
"""Cancel a job."""
job = self.db.get_job(job_id)
if job and job.status in (JobStatus.PENDING, JobStatus.RUNNING, JobStatus.PAUSED):
job.status = JobStatus.CANCELLED
job.completed_at = time.time()
self.db.save_job(job)
logger.info(f"Job {job_id} cancelled")
def get_progress(self, job_id: str) -> Dict[str, Any]:
"""Get job progress."""
job = self.db.get_job(job_id)
if not job:
return {"error": "Job not found"}
progress = {
"job_id": job.id,
"pipeline": job.pipeline,
"status": job.status.value,
"retry_count": job.retry_count,
"token_usage": job.token_usage.to_dict(),
"token_budget": job.token_budget,
"token_percent": (job.token_usage.total_tokens / job.token_budget * 100) if job.token_budget > 0 else 0,
"created_at": job.created_at,
"started_at": job.started_at,
"completed_at": job.completed_at,
}
if job.checkpoint:
progress["checkpoint"] = {
"step": job.checkpoint.step,
"timestamp": job.checkpoint.timestamp,
}
if job.error:
progress["error"] = job.error
return progress
def generate_report(self, pipeline: Optional[str] = None) -> Dict[str, Any]:
"""Generate a summary report."""
stats = self.db.get_stats(pipeline)
completed_jobs = self.db.get_jobs_by_status(JobStatus.COMPLETED, pipeline)
failed_jobs = self.db.get_jobs_by_status(JobStatus.FAILED, pipeline)
# Calculate timing stats
durations = []
for job in completed_jobs:
if job.started_at and job.completed_at:
durations.append(job.completed_at - job.started_at)
report = {
"timestamp": datetime.now().isoformat(),
"pipeline": pipeline or "all",
"stats": stats,
"completed": len(completed_jobs),
"failed": len(failed_jobs),
"success_rate": len(completed_jobs) / (len(completed_jobs) + len(failed_jobs)) * 100 if (completed_jobs or failed_jobs) else 0,
"avg_duration": sum(durations) / len(durations) if durations else 0,
"total_tokens": stats.get('total_tokens', 0),
}
return report
def stop(self):
"""Stop the orchestrator."""
self.running = False
logger.info("Orchestrator stop requested")
# CLI interface
def main():
"""CLI for orchestrator management."""
import argparse
parser = argparse.ArgumentParser(description="Pipeline Orchestrator")
subparsers = parser.add_subparsers(dest="command")
# Submit job
submit_parser = subparsers.add_parser("submit", help="Submit a job")
submit_parser.add_argument("pipeline", help="Pipeline name")
submit_parser.add_argument("--task", required=True, help="Task JSON")
submit_parser.add_argument("--priority", type=int, default=5, help="Job priority")
submit_parser.add_argument("--budget", type=int, help="Token budget")
# Run orchestrator
run_parser = subparsers.add_parser("run", help="Run orchestrator")
run_parser.add_argument("--pipeline", help="Filter by pipeline")
run_parser.add_argument("--workers", type=int, default=10, help="Max workers")
run_parser.add_argument("--max-jobs", type=int, help="Max jobs to process")
# Job management
status_parser = subparsers.add_parser("status", help="Get job status")
status_parser.add_argument("job_id", help="Job ID")
resume_parser = subparsers.add_parser("resume", help="Resume paused job")
resume_parser.add_argument("job_id", help="Job ID")
cancel_parser = subparsers.add_parser("cancel", help="Cancel job")
cancel_parser.add_argument("job_id", help="Job ID")
# Stats
stats_parser = subparsers.add_parser("stats", help="Show queue stats")
stats_parser.add_argument("--pipeline", help="Filter by pipeline")
# Report
report_parser = subparsers.add_parser("report", help="Generate report")
report_parser.add_argument("--pipeline", help="Filter by pipeline")
args = parser.parse_args()
orchestrator = PipelineOrchestrator(max_workers=args.workers if hasattr(args, 'workers') else 10)
if args.command == "submit":
task = json.loads(args.task)
priority = JobPriority(args.priority)
job_id = orchestrator.submit_job(args.pipeline, task, priority, args.budget)
print(f"Job submitted: {job_id}")
elif args.command == "run":
orchestrator.run(args.pipeline, args.max_jobs)
elif args.command == "status":
progress = orchestrator.get_progress(args.job_id)
print(json.dumps(progress, indent=2))
elif args.command == "resume":
job = orchestrator.resume_job(args.job_id)
print(f"Job {args.job_id} completed with status: {job.status.value}")
elif args.command == "cancel":
orchestrator.cancel_job(args.job_id)
print(f"Job {args.job_id} cancelled")
elif args.command == "stats":
stats = orchestrator.db.get_stats(args.pipeline)
print(json.dumps(stats, indent=2))
elif args.command == "report":
report = orchestrator.generate_report(args.pipeline)
print(json.dumps(report, indent=2))
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -1,32 +0,0 @@
#!/usr/bin/env python3
import json, os
songs = [
{"t":"Thunder Road","a":"Heartland","m":["hope","anticipation","energy","triumph","nostalgia","urgency","passion","defiance","release","catharsis"]},
{"t":"Black Dog Howl","a":"Rust & Wire","m":["despair","anger","frenzy","exhaustion","resignation","grief","numbness","rage","acceptance","silence"]},
{"t":"Satellite Hearts","a":"Neon Circuit","m":["wonder","isolation","longing","connection","euphoria","confusion","clarity","tenderness","urgency","bittersweet"]},
{"t":"Concrete Garden","a":"Streetlight Prophet","m":["oppression","resilience","anger","beauty","defiance","community","joy","struggle","growth","hope"]},
{"t":"Gravity Well","a":"Void Walker","m":["dread","fascination","surrender","awe","terror","peace","disorientation","acceptance","transcendence","emptiness"]},
{"t":"Rust Belt Lullaby","a":"Iron & Ember","m":["nostalgia","sadness","tenderness","loss","beauty","resignation","love","weariness","quiet hope","peace"]},
{"t":"Wildfire Sermon","a":"Prophet Ash","m":["fury","ecstasy","chaos","joy","destruction","creation","warning","invitation","abandon","rebirth"]},
{"t":"Midnight Transmission","a":"Frequency Ghost","m":["mystery","loneliness","curiosity","connection","paranoia","intimacy","urgency","disconnection","searching","haunting"]},
{"t":"Crown of Thorns","a":"Velvet Guillotine","m":["seduction","power","cruelty","beauty","danger","vulnerability","fury","grace","revenge","mercy"]},
{"t":"Apartment 4B","a":"Wallpaper & Wire","m":["claustrophobia","routine","desperation","fantasy","breakthrough","freedom","fear","joy","grounding","home"]},
]
beats = []
for s in songs:
for i in range(10):
beats.append({"song": s["t"], "artist": s["a"], "beat": i+1,
"timestamp": f"{i*30//60}:{(i*30)%60:02d}", "duration": "30s",
"lyric_line": f"[Beat {i+1}]", "scene": {"mood": s["m"][i], "colors": ["placeholder"],
"composition": ["wide","close","OTS","low","high","dutch","symmetric","thirds","xwide","medium"][i],
"camera": ["static","pan","dolly-in","dolly-out","handheld","steadicam","zoom","crane","track","tilt"][i],
"description": f"[{s['m'][i]} scene]"}})
out = os.path.expanduser("~/.hermes/training-data/scene-descriptions-rock.jsonl")
os.makedirs(os.path.dirname(out), exist_ok=True)
with open(out, "w") as f:
for b in beats:
f.write(json.dumps(b) + "\n")
print(f"Generated {len(beats)} beats")

View File

@@ -1,50 +0,0 @@
# Nightly Pipeline Scheduler
Auto-starts batch pipelines when inference is available.
## What It Does
1. Checks inference provider health (OpenRouter, Ollama, RunPod)
2. Checks if it's off-peak hours (configurable, default: after 6PM)
3. Checks interactive session load (don't fight with live users)
4. Checks daily token budget (configurable limit)
5. Starts the highest-priority incomplete pipeline
## Pipeline Priority Order
| Priority | Pipeline | Deps | Max Tokens |
|----------|----------|------|------------|
| 1 | playground-factory | none | 100,000 |
| 2 | training-factory | none | 150,000 |
| 3 | knowledge-mine | training-factory running | 80,000 |
| 4 | adversary | knowledge-mine running | 50,000 |
| 5 | codebase-genome | none | 120,000 |
## Usage
```bash
# Normal run (used by cron)
./scripts/nightly-pipeline-scheduler.sh
# Dry run (show what would start)
./scripts/nightly-pipeline-scheduler.sh --dry-run
# Status report
./scripts/nightly-pipeline-scheduler.sh --status
# Force start during peak hours
./scripts/nightly-pipeline-scheduler.sh --force
```
## Configuration
Set via environment variables:
- `PIPELINE_TOKEN_LIMIT`: Daily token budget (default: 500,000)
- `PIPELINE_PEAK_START`: Peak hours start (default: 9)
- `PIPELINE_PEAK_END`: Peak hours end (default: 18)
- `HERMES_HOME`: Hermes home directory (default: ~/.hermes)
## Cron
Runs every 30 minutes. Off-peak only (unless --force).
See `cron/pipeline-scheduler.yml`.

View File

@@ -1,383 +0,0 @@
#!/usr/bin/env bash
# nightly-pipeline-scheduler.sh — Auto-start batch pipelines when inference is available.
#
# Checks provider health, pipeline progress, token budget, and interactive load.
# Starts the highest-priority incomplete pipeline that can run.
#
# Usage:
# ./scripts/nightly-pipeline-scheduler.sh # Normal run
# ./scripts/nightly-pipeline-scheduler.sh --dry-run # Show what would start
# ./scripts/nightly-pipeline-scheduler.sh --status # Pipeline status report
set -euo pipefail
# --- Configuration ---
HERMES_HOME="${HERMES_HOME:-$HOME/.hermes}"
BUDGET_FILE="${HERMES_HOME}/pipeline_budget.json"
STATE_FILE="${HERMES_HOME}/pipeline_state.json"
LOG_FILE="${HERMES_HOME}/logs/pipeline-scheduler.log"
TOKEN_DAILY_LIMIT="${PIPELINE_TOKEN_LIMIT:-500000}"
PEAK_HOURS_START="${PIPELINE_PEAK_START:-9}"
PEAK_HOURS_END="${PIPELINE_PEAK_END:-18}"
# Pipeline definitions (priority order)
# Each pipeline: name, script, max_tokens, dependencies
PIPELINES=(
"playground-factory|scripts/pipeline_playground_factory.sh|100000|none"
"training-factory|scripts/pipeline_training_factory.sh|150000|none"
"knowledge-mine|scripts/pipeline_knowledge_mine.sh|80000|training-factory"
"adversary|scripts/pipeline_adversary.sh|50000|knowledge-mine"
"codebase-genome|scripts/pipeline_codebase_genome.sh|120000|none"
)
# --- Colors ---
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
CYAN='\033[0;36m'
NC='\033[0m'
# --- Helpers ---
now_hour() { date +%-H; }
is_peak_hours() {
local h=$(now_hour)
[[ $h -ge $PEAK_HOURS_START && $h -lt $PEAK_HOURS_END ]]
}
ensure_dirs() {
mkdir -p "$(dirname "$LOG_FILE")" "$(dirname "$BUDGET_FILE")" "$(dirname "$STATE_FILE")"
}
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"; }
get_budget_used_today() {
if [[ -f "$BUDGET_FILE" ]]; then
local today=$(date +%Y-%m-%d)
python3 -c "
import json, sys
with open('$BUDGET_FILE') as f:
d = json.load(f)
print(d.get('daily', {}).get('$today', {}).get('tokens_used', 0))
" 2>/dev/null || echo 0
else
echo 0
fi
}
get_budget_remaining() {
local used=$(get_budget_used_today)
echo $((TOKEN_DAILY_LIMIT - used))
}
update_budget() {
local pipeline="$1"
local tokens="$2"
local today=$(date +%Y-%m-%d)
python3 -c "
import json, os
path = '$BUDGET_FILE'
d = {}
if os.path.exists(path):
with open(path) as f:
d = json.load(f)
daily = d.setdefault('daily', {})
day = daily.setdefault('$today', {'tokens_used': 0, 'pipelines': {}})
day['tokens_used'] = day.get('tokens_used', 0) + $tokens
day['pipelines']['$pipeline'] = day['pipelines'].get('$pipeline', 0) + $tokens
with open(path, 'w') as f:
json.dump(d, f, indent=2)
"
}
get_pipeline_state() {
if [[ -f "$STATE_FILE" ]]; then
cat "$STATE_FILE"
else
echo "{}"
fi
}
set_pipeline_state() {
local pipeline="$1"
local state="$2" # running, complete, failed, skipped
python3 -c "
import json, os
path = '$STATE_FILE'
d = {}
if os.path.exists(path):
with open(path) as f:
d = json.load(f)
d['$pipeline'] = {'state': '$state', 'updated': '$(date -Iseconds)'}
with open(path, 'w') as f:
json.dump(d, f, indent=2)
"
}
is_pipeline_complete() {
local pipeline="$1"
python3 -c "
import json, os
path = '$STATE_FILE'
if not os.path.exists(path):
print('false')
else:
with open(path) as f:
d = json.load(f)
state = d.get('$pipeline', {}).get('state', 'not_started')
print('true' if state == 'complete' else 'false')
" 2>/dev/null || echo false
}
is_pipeline_running() {
local pipeline="$1"
python3 -c "
import json, os
path = '$STATE_FILE'
if not os.path.exists(path):
print('false')
else:
with open(path) as f:
d = json.load(f)
state = d.get('$pipeline', {}).get('state', 'not_started')
print('true' if state == 'running' else 'false')
" 2>/dev/null || echo false
}
check_dependency() {
local dep="$1"
if [[ "$dep" == "none" ]]; then
return 0
fi
# For knowledge-mine: training-factory must be running or complete
if [[ "$dep" == "training-factory" ]]; then
local state=$(python3 -c "
import json, os
path = '$STATE_FILE'
if not os.path.exists(path):
print('not_started')
else:
with open(path) as f:
d = json.load(f)
print(d.get('training-factory', {}).get('state', 'not_started'))
" 2>/dev/null || echo "not_started")
[[ "$state" == "running" || "$state" == "complete" ]]
return $?
fi
# For adversary: knowledge-mine must be at least 50% done
# Simplified: check if it's running (we'd need progress tracking for 50%)
if [[ "$dep" == "knowledge-mine" ]]; then
local state=$(python3 -c "
import json, os
path = '$STATE_FILE'
if not os.path.exists(path):
print('not_started')
else:
with open(path) as f:
d = json.load(f)
print(d.get('knowledge-mine', {}).get('state', 'not_started'))
" 2>/dev/null || echo "not_started")
[[ "$state" == "running" || "$state" == "complete" ]]
return $?
fi
return 0
}
check_inference_available() {
# Check if any inference provider is responding
# 1. Check OpenRouter
local or_ok=$(curl -s -o /dev/null -w "%{http_code}" \
--connect-timeout 5 "https://openrouter.ai/api/v1/models" 2>/dev/null || echo "000")
# 2. Check local Ollama
local ollama_ok=$(curl -s -o /dev/null -w "%{http_code}" \
--connect-timeout 5 "http://localhost:11434/api/tags" 2>/dev/null || echo "000")
# 3. Check RunPod (if configured)
local runpod_ok="000"
if [[ -n "${RUNPOD_ENDPOINT:-}" ]]; then
runpod_ok=$(curl -s -o /dev/null -w "%{http_code}" \
--connect-timeout 5 "$RUNPOD_ENDPOINT/health" 2>/dev/null || echo "000")
fi
if [[ "$or_ok" == "200" || "$ollama_ok" == "200" || "$runpod_ok" == "200" ]]; then
return 0
fi
return 1
}
check_interactive_load() {
# Check if there are active interactive sessions (don't fight with live users)
# Look for tmux panes with active hermes sessions
local active=$(tmux list-panes -a -F '#{pane_pid} #{pane_current_command}' 2>/dev/null \
| grep -c "hermes\|python3" || echo 0)
# If more than 3 interactive sessions, skip pipeline start
if [[ $active -gt 3 ]]; then
return 1
fi
return 0
}
start_pipeline() {
local name="$1"
local script="$2"
local max_tokens="$3"
local budget_remaining="$4"
local mode="${5:-run}"
if [[ "$budget_remaining" -lt "$max_tokens" ]]; then
log "SKIP $name: insufficient budget ($budget_remaining < $max_tokens tokens)"
return 1
fi
if [[ ! -f "$script" ]]; then
log "SKIP $name: script not found ($script)"
return 1
fi
if [[ "$mode" == "dry-run" ]]; then
log "DRY-RUN: Would start $name (budget: $budget_remaining, needs: $max_tokens)"
return 0
fi
log "START $name (budget: $budget_remaining, max_tokens: $max_tokens)"
set_pipeline_state "$name" "running"
# Run in background, capture output
local log_path="${HERMES_HOME}/logs/pipeline-${name}.log"
bash "$script" --max-tokens "$max_tokens" >> "$log_path" 2>&1 &
local pid=$!
# Wait a moment to check if it started OK
sleep 2
if kill -0 $pid 2>/dev/null; then
log "RUNNING $name (PID: $pid, log: $log_path)"
# Record the PID
python3 -c "
import json, os
path = '$STATE_FILE'
d = {}
if os.path.exists(path):
with open(path) as f:
d = json.load(f)
d['$name']['pid'] = $pid
with open(path, 'w') as f:
json.dump(d, f, indent=2)
"
return 0
else
log "FAIL $name: script exited immediately"
set_pipeline_state "$name" "failed"
return 1
fi
}
# --- Main ---
main() {
local mode="${1:-run}"
ensure_dirs
log "=== Pipeline Scheduler ($mode) ==="
# Check 1: Is inference available?
if ! check_inference_available; then
log "No inference provider available. Skipping all pipelines."
exit 0
fi
log "Inference: AVAILABLE"
# Check 2: Is it peak hours?
if is_peak_hours && [[ "$mode" != "--force" ]]; then
local h=$(now_hour)
log "Peak hours ($h:00). Skipping pipeline start. Use --force to override."
exit 0
fi
log "Off-peak: OK"
# Check 3: Interactive load
if ! check_interactive_load && [[ "$mode" != "--force" ]]; then
log "High interactive load. Skipping pipeline start."
exit 0
fi
log "Interactive load: OK"
# Check 4: Token budget
local budget=$(get_budget_remaining)
log "Token budget remaining: $budget / $TOKEN_DAILY_LIMIT"
if [[ $budget -le 0 ]]; then
log "Daily token budget exhausted. Stopping."
exit 0
fi
# Check 5: Pipeline status
if [[ "$mode" == "--status" ]]; then
echo -e "${CYAN}Pipeline Status:${NC}"
echo "────────────────────────────────────────────────────"
for entry in "${PIPELINES[@]}"; do
IFS='|' read -r name script max_tokens dep <<< "$entry"
local state=$(python3 -c "
import json, os
path = '$STATE_FILE'
if not os.path.exists(path):
print('not_started')
else:
with open(path) as f:
d = json.load(f)
print(d.get('$name', {}).get('state', 'not_started'))
" 2>/dev/null || echo "not_started")
local color=$NC
case "$state" in
running) color=$YELLOW ;;
complete) color=$GREEN ;;
failed) color=$RED ;;
esac
printf " %-25s %b%s%b (max: %s tokens, dep: %s)\n" "$name" "$color" "$state" "$NC" "$max_tokens" "$dep"
done
echo "────────────────────────────────────────────────────"
echo " Budget: $budget / $TOKEN_DAILY_LIMIT tokens remaining"
echo " Peak hours: $PEAK_HOURS_START:00 - $PEAK_HOURS_END:00"
exit 0
fi
# Find and start the highest-priority incomplete pipeline
local started=0
for entry in "${PIPELINES[@]}"; do
IFS='|' read -r name script max_tokens dep <<< "$entry"
# Skip if already running or complete
if [[ "$(is_pipeline_running $name)" == "true" ]]; then
log "SKIP $name: already running"
continue
fi
if [[ "$(is_pipeline_complete $name)" == "true" ]]; then
log "SKIP $name: already complete"
continue
fi
# Check dependency
if ! check_dependency "$dep"; then
log "SKIP $name: dependency $dep not met"
continue
fi
# Try to start
if start_pipeline "$name" "$script" "$max_tokens" "$budget" "$mode"; then
started=1
# Only start one pipeline per run (let it claim tokens before next check)
# Exception: playground-factory and training-factory can run in parallel
if [[ "$name" != "playground-factory" && "$name" != "training-factory" ]]; then
break
fi
fi
done
if [[ $started -eq 0 ]]; then
log "No pipelines to start (all complete, running, or blocked)."
fi
log "=== Pipeline Scheduler done ==="
}
main "$@"

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -19,11 +19,9 @@ from glitch_patterns import (
GlitchPattern,
GlitchSeverity,
MATRIX_GLITCH_PATTERNS,
THREEJS_CATEGORIES,
build_vision_prompt,
get_pattern_by_category,
get_patterns_by_severity,
get_threejs_patterns,
)
from matrix_glitch_detector import (
@@ -42,7 +40,7 @@ class TestGlitchPatterns(unittest.TestCase):
def test_pattern_count(self):
"""Verify we have a reasonable number of defined patterns."""
self.assertGreaterEqual(len(MATRIX_GLITCH_PATTERNS), 14) # 10 generic + 6 Three.js
self.assertGreaterEqual(len(MATRIX_GLITCH_PATTERNS), 8)
def test_all_patterns_have_required_fields(self):
"""Every pattern must have category, name, description, severity, prompts."""
@@ -90,9 +88,6 @@ class TestGlitchPatterns(unittest.TestCase):
self.assertIn("Floating Object", prompt)
self.assertIn("Z-Fighting", prompt)
self.assertIn("Missing", prompt)
# Three.js patterns should be included
self.assertIn("Shader Compilation Failure", prompt)
self.assertIn("Bloom Overflow", prompt)
def test_build_vision_prompt_subset(self):
"""Vision prompt with subset should only include specified patterns."""
@@ -253,7 +248,7 @@ class TestGlitchDetector(unittest.TestCase):
try:
report = run_demo(output_path)
self.assertEqual(len(report.glitches), 6) # 4 original + 2 Three.js
self.assertEqual(len(report.glitches), 4)
self.assertGreater(report.summary["total_glitches"], 0)
self.assertTrue(output_path.exists())
@@ -265,93 +260,6 @@ class TestGlitchDetector(unittest.TestCase):
output_path.unlink(missing_ok=True)
class TestThreeJsPatterns(unittest.TestCase):
"""Tests for Three.js-specific glitch patterns (timmy-config#543)."""
def test_get_threejs_patterns_returns_only_threejs(self):
"""get_threejs_patterns() should return only Three.js categories."""
patterns = get_threejs_patterns()
self.assertEqual(len(patterns), 6)
for p in patterns:
self.assertIn(p.category, THREEJS_CATEGORIES)
def test_threejs_patterns_have_required_fields(self):
"""All Three.js patterns must have valid fields."""
for p in get_threejs_patterns():
self.assertIsInstance(p.category, GlitchCategory)
self.assertTrue(p.name)
self.assertTrue(p.description)
self.assertIsInstance(p.severity, GlitchSeverity)
self.assertGreater(len(p.detection_prompts), 0)
self.assertGreater(len(p.visual_indicators), 0)
def test_shader_failure_is_critical(self):
"""Shader compilation failure should be CRITICAL severity."""
p = get_pattern_by_category(GlitchCategory.SHADER_FAILURE)
self.assertIsNotNone(p)
self.assertEqual(p.severity, GlitchSeverity.CRITICAL)
def test_texture_placeholder_is_critical(self):
"""Texture placeholder (1x1 white) should be CRITICAL severity."""
p = get_pattern_by_category(GlitchCategory.TEXTURE_PLACEHOLDER)
self.assertIsNotNone(p)
self.assertEqual(p.severity, GlitchSeverity.CRITICAL)
def test_infer_severity_shader_failure(self):
"""Shader failure should infer critical/high."""
self.assertEqual(_infer_severity("shader_failure", 0.8), "critical")
self.assertEqual(_infer_severity("shader_failure", 0.5), "high")
def test_infer_severity_texture_placeholder(self):
"""Texture placeholder should infer critical/high."""
self.assertEqual(_infer_severity("texture_placeholder", 0.8), "critical")
self.assertEqual(_infer_severity("texture_placeholder", 0.5), "high")
def test_infer_severity_uv_mapping(self):
"""UV mapping error should infer high/medium."""
self.assertEqual(_infer_severity("uv_mapping_error", 0.8), "high")
self.assertEqual(_infer_severity("uv_mapping_error", 0.5), "medium")
def test_infer_severity_frustum_culling(self):
"""Frustum culling should infer medium/low."""
self.assertEqual(_infer_severity("frustum_culling", 0.7), "medium")
self.assertEqual(_infer_severity("frustum_culling", 0.4), "low")
def test_infer_severity_shadow_map(self):
"""Shadow map artifact should infer medium/low."""
self.assertEqual(_infer_severity("shadow_map_artifact", 0.7), "medium")
self.assertEqual(_infer_severity("shadow_map_artifact", 0.4), "low")
def test_infer_severity_bloom_overflow(self):
"""Bloom overflow should infer medium/low (default path)."""
self.assertEqual(_infer_severity("bloom_overflow", 0.7), "medium")
self.assertEqual(_infer_severity("bloom_overflow", 0.4), "low")
def test_threejs_patterns_in_vision_prompt(self):
"""Three.js patterns should appear in the composite vision prompt."""
prompt = build_vision_prompt()
self.assertIn("shader_failure", prompt)
self.assertIn("texture_placeholder", prompt)
self.assertIn("uv_mapping_error", prompt)
self.assertIn("frustum_culling", prompt)
self.assertIn("shadow_map_artifact", prompt)
self.assertIn("bloom_overflow", prompt)
def test_threejs_subset_prompt(self):
"""Building prompt from Three.js-only patterns should work."""
threejs = get_threejs_patterns()
prompt = build_vision_prompt(threejs)
self.assertIn("Shader Compilation Failure", prompt)
self.assertNotIn("Floating Object", prompt) # generic, not Three.js
def test_report_metadata_version(self):
"""Report metadata should reference both issues."""
report = run_demo()
self.assertEqual(report.metadata["detector_version"], "0.2.0")
self.assertIn("543", report.metadata["reference"])
class TestIntegration(unittest.TestCase):
"""Integration-level tests."""
@@ -368,13 +276,6 @@ class TestIntegration(unittest.TestCase):
expected = {"floating_assets", "z_fighting", "missing_textures", "clipping", "broken_normals"}
self.assertTrue(expected.issubset(category_values))
def test_patterns_cover_threejs_themes(self):
"""Patterns should cover Three.js-specific glitch themes (#543)."""
category_values = {p.category.value for p in MATRIX_GLITCH_PATTERNS}
threejs_expected = {"shader_failure", "texture_placeholder", "uv_mapping_error",
"frustum_culling", "shadow_map_artifact", "bloom_overflow"}
self.assertTrue(threejs_expected.issubset(category_values))
if __name__ == "__main__":
unittest.main()

View File

@@ -1,100 +0,0 @@
{"song": "Thunder Road", "artist": "Heartland", "beat": 1, "timestamp": "0:00", "duration": "30s", "lyric_line": "The screen door slams, Mary's dress waves", "scene": {"mood": "hope", "colors": ["gold", "sky blue", "white"], "composition": "wide shot", "camera": "static", "description": "Open horizon. Golden light breaking through clouds. The figure silhouetted against dawn. The screen door slams, Mary's dress waves"}}
{"song": "Thunder Road", "artist": "Heartland", "beat": 2, "timestamp": "0:30", "duration": "30s", "lyric_line": "Like a vision she dances across the porch as the radio plays", "scene": {"mood": "anticipation", "colors": ["silver", "pale green", "cream"], "composition": "close-up", "camera": "slow pan", "description": "Close on hands gripping a steering wheel. Dashboard lights reflecting in eyes. Road stretching ahead. Like a vision she dances across the porch as the radio plays"}}
{"song": "Thunder Road", "artist": "Heartland", "beat": 3, "timestamp": "1:00", "duration": "30s", "lyric_line": "Roy Orbison singing for the lonely, hey that's me and I want you only", "scene": {"mood": "energy", "colors": ["red", "orange", "electric blue"], "composition": "over the shoulder", "camera": "dolly in", "description": "Rapid cuts. Bodies in motion. Light streaks across the frame. Roy Orbison singing for the lonely, hey that's me and I want you only"}}
{"song": "Thunder Road", "artist": "Heartland", "beat": 4, "timestamp": "1:30", "duration": "30s", "lyric_line": "Don't turn me home out now I'm so young and worthless still", "scene": {"mood": "triumph", "colors": ["gold", "crimson", "white"], "composition": "low angle", "camera": "dolly out", "description": "Wide shot. Figure standing on a hilltop. Arms raised. City lights below. Don't turn me home out now I'm so young and worthless still"}}
{"song": "Thunder Road", "artist": "Heartland", "beat": 5, "timestamp": "2:00", "duration": "30s", "lyric_line": "The night's busting open these two lanes will take us anywhere", "scene": {"mood": "nostalgia", "colors": ["amber", "sepia", "dusty rose"], "composition": "high angle", "camera": "handheld", "description": "Sepia tones. A photograph come to life. Dust motes in afternoon light. The night's busting open these two lanes will take us anywhere"}}
{"song": "Thunder Road", "artist": "Heartland", "beat": 6, "timestamp": "2:30", "duration": "30s", "lyric_line": "We got one last chance to make it real", "scene": {"mood": "urgency", "colors": ["red", "black", "strobe white"], "composition": "dutch angle", "camera": "steadicam", "description": "Handheld camera running. Blurred faces. Traffic. Heartbeat sound design. We got one last chance to make it real"}}
{"song": "Thunder Road", "artist": "Heartland", "beat": 7, "timestamp": "3:00", "duration": "30s", "lyric_line": "To trade in these wings on some wheels", "scene": {"mood": "passion", "colors": ["deep red", "burgundy", "gold"], "composition": "symmetrical", "camera": "slow zoom", "description": "Extreme close-up. Skin. Breath visible in cold air. Eyes locked. To trade in these wings on some wheels"}}
{"song": "Thunder Road", "artist": "Heartland", "beat": 8, "timestamp": "3:30", "duration": "30s", "lyric_line": "Climb in back, heaven's waiting down the tracks", "scene": {"mood": "defiance", "colors": ["black", "neon green", "chrome"], "composition": "rule of thirds", "camera": "crane up", "description": "Low angle. Figure standing against the wind. Debris flying past. Unmoved. Climb in back, heaven's waiting down the tracks"}}
{"song": "Thunder Road", "artist": "Heartland", "beat": 9, "timestamp": "4:00", "duration": "30s", "lyric_line": "Oh oh oh oh oh oh oh", "scene": {"mood": "release", "colors": ["sky blue", "white", "pale gold"], "composition": "extreme wide", "camera": "tracking shot", "description": "Slow motion. Something falling \u2014 a mask, a chain, a weight. Lightness follows. Oh oh oh oh oh oh oh"}}
{"song": "Thunder Road", "artist": "Heartland", "beat": 10, "timestamp": "4:30", "duration": "30s", "lyric_line": "It's a town full of losers and I'm pulling out of here to win", "scene": {"mood": "catharsis", "colors": ["all white", "silver", "clear"], "composition": "medium shot", "camera": "slow tilt down", "description": "White space expanding. Figure dissolving into light. Peace in the dissolution. It's a town full of losers and I'm pulling out of here to win"}}
{"song": "Black Dog Howl", "artist": "Rust & Wire", "beat": 1, "timestamp": "0:00", "duration": "30s", "lyric_line": "Woke up on the floor again, whiskey still on my tongue", "scene": {"mood": "despair", "colors": ["navy", "black", "grey"], "composition": "wide shot", "camera": "static", "description": "Empty room. Single light source. Figure curled in corner. Rain on windows. Woke up on the floor again, whiskey still on my tongue"}}
{"song": "Black Dog Howl", "artist": "Rust & Wire", "beat": 2, "timestamp": "0:30", "duration": "30s", "lyric_line": "The mirror shows a stranger and the damage that I've done", "scene": {"mood": "anger", "colors": ["red", "black", "orange"], "composition": "close-up", "camera": "slow pan", "description": "Shattered glass. Red light. Hands clenched. Jaw tight. The frame vibrates. The mirror shows a stranger and the damage that I've done"}}
{"song": "Black Dog Howl", "artist": "Rust & Wire", "beat": 3, "timestamp": "1:00", "duration": "30s", "lyric_line": "I scream until my throat bleeds but nobody comes", "scene": {"mood": "frenzy", "colors": ["strobe", "red", "white flash"], "composition": "over the shoulder", "camera": "dolly in", "description": "Strobe lighting. Multiple exposures. Bodies colliding. Chaos as composition. I scream until my throat bleeds but nobody comes"}}
{"song": "Black Dog Howl", "artist": "Rust & Wire", "beat": 4, "timestamp": "1:30", "duration": "30s", "lyric_line": "The walls are closing in again, the ceiling pressing down", "scene": {"mood": "exhaustion", "colors": ["grey", "brown", "faded"], "composition": "low angle", "camera": "dolly out", "description": "Static shot. Figure slumped. Eyes half-closed. Time passing in shadows. The walls are closing in again, the ceiling pressing down"}}
{"song": "Black Dog Howl", "artist": "Rust & Wire", "beat": 5, "timestamp": "2:00", "duration": "30s", "lyric_line": "I tried to call your number but you changed it years ago", "scene": {"mood": "resignation", "colors": ["grey", "muted blue", "beige"], "composition": "high angle", "camera": "handheld", "description": "Medium shot. Hands dropping keys on a table. Turning away. Not looking back. I tried to call your number but you changed it years ago"}}
{"song": "Black Dog Howl", "artist": "Rust & Wire", "beat": 6, "timestamp": "2:30", "duration": "30s", "lyric_line": "Now I'm howling at the moon like some rabid dog I know", "scene": {"mood": "grief", "colors": ["deep purple", "black", "silver"], "composition": "dutch angle", "camera": "steadicam", "description": "Wide shot. Figure alone in vast space. Dark purple sky. No horizon line. Now I'm howling at the moon like some rabid dog I know"}}
{"song": "Black Dog Howl", "artist": "Rust & Wire", "beat": 7, "timestamp": "3:00", "duration": "30s", "lyric_line": "Every bone remembers what my mind wants to forget", "scene": {"mood": "numbness", "colors": ["white", "grey", "no color"], "composition": "symmetrical", "camera": "slow zoom", "description": "Desaturated. Figure staring at nothing. World moving around them in blur. Every bone remembers what my mind wants to forget"}}
{"song": "Black Dog Howl", "artist": "Rust & Wire", "beat": 8, "timestamp": "3:30", "duration": "30s", "lyric_line": "I'll tear this whole house down before the sun comes up", "scene": {"mood": "rage", "colors": ["fire red", "black", "ember orange"], "composition": "rule of thirds", "camera": "crane up", "description": "Red wash. Extreme close-up on eyes. Fire reflected in pupils. I'll tear this whole house down before the sun comes up"}}
{"song": "Black Dog Howl", "artist": "Rust & Wire", "beat": 9, "timestamp": "4:00", "duration": "30s", "lyric_line": "Ash and ruin everywhere, this is all that's left", "scene": {"mood": "acceptance", "colors": ["soft blue", "warm grey", "sage"], "composition": "extreme wide", "camera": "tracking shot", "description": "Soft focus. Gentle light. Figure breathing. The camera doesn't judge. Ash and ruin everywhere, this is all that's left"}}
{"song": "Black Dog Howl", "artist": "Rust & Wire", "beat": 10, "timestamp": "4:30", "duration": "30s", "lyric_line": "Silence. Just the wind through broken glass.", "scene": {"mood": "silence", "colors": ["black", "void", "faint starlight"], "composition": "medium shot", "camera": "slow tilt down", "description": "Black screen. Faint starlight. The sound drops out completely. Silence. Just the wind through broken glass."}}
{"song": "Satellite Hearts", "artist": "Neon Circuit", "beat": 1, "timestamp": "0:00", "duration": "30s", "lyric_line": "Ten thousand miles of static between your voice and mine", "scene": {"mood": "wonder", "colors": ["aurora green", "violet", "silver"], "composition": "wide shot", "camera": "static", "description": "Northern lights overhead. Figure looking up. Mouth open. Child's expression. Ten thousand miles of static between your voice and mine"}}
{"song": "Satellite Hearts", "artist": "Neon Circuit", "beat": 2, "timestamp": "0:30", "duration": "30s", "lyric_line": "I trace your constellation on the dashboard every night", "scene": {"mood": "isolation", "colors": ["cold blue", "black", "distant starlight"], "composition": "close-up", "camera": "slow pan", "description": "Extreme wide. Single figure. Vast empty landscape. Scale crushing. I trace your constellation on the dashboard every night"}}
{"song": "Satellite Hearts", "artist": "Neon Circuit", "beat": 3, "timestamp": "1:00", "duration": "30s", "lyric_line": "The signal fades to nothing but I keep the frequency", "scene": {"mood": "longing", "colors": ["teal", "silver", "moonlight"], "composition": "over the shoulder", "camera": "dolly in", "description": "Through a window. Figure on the other side. Glass between. Breath on the pane. The signal fades to nothing but I keep the frequency"}}
{"song": "Satellite Hearts", "artist": "Neon Circuit", "beat": 4, "timestamp": "1:30", "duration": "30s", "lyric_line": "Then suddenly your laughter breaks through like a summer storm", "scene": {"mood": "connection", "colors": ["warm gold", "rose", "blush"], "composition": "low angle", "camera": "dolly out", "description": "Two hands reaching. Fingers almost touching. Warm light between them. Then suddenly your laughter breaks through like a summer storm"}}
{"song": "Satellite Hearts", "artist": "Neon Circuit", "beat": 5, "timestamp": "2:00", "duration": "30s", "lyric_line": "We're dancing in the data stream, our pixels intertwined", "scene": {"mood": "euphoria", "colors": ["neon", "rainbow", "white flash"], "composition": "high angle", "camera": "handheld", "description": "Overexposed. Everything bright. Dancing. The frame can't contain the joy. We're dancing in the data stream, our pixels intertwined"}}
{"song": "Satellite Hearts", "artist": "Neon Circuit", "beat": 6, "timestamp": "2:30", "duration": "30s", "lyric_line": "But I can't tell if you're real or just a ghost in the machine", "scene": {"mood": "confusion", "colors": ["swirling", "unsettled", "green-grey"], "composition": "dutch angle", "camera": "steadicam", "description": "Multiple focal points. Nothing sharp. The viewer doesn't know where to look. But I can't tell if you're real or just a ghost in the machine"}}
{"song": "Satellite Hearts", "artist": "Neon Circuit", "beat": 7, "timestamp": "3:00", "duration": "30s", "lyric_line": "The picture clears and there you are \u2014 imperfect, warm, alive", "scene": {"mood": "clarity", "colors": ["clear blue", "white", "crisp"], "composition": "symmetrical", "camera": "slow zoom", "description": "Rack focus. Background blurs, foreground sharpens. Suddenly everything makes sense. The picture clears and there you are \u2014 imperfect, warm, alive"}}
{"song": "Satellite Hearts", "artist": "Neon Circuit", "beat": 8, "timestamp": "3:30", "duration": "30s", "lyric_line": "Your hand reaches through the screen, I swear I feel the heat", "scene": {"mood": "tenderness", "colors": ["blush pink", "warm cream", "soft gold"], "composition": "rule of thirds", "camera": "crane up", "description": "Close on a hand touching a face. Soft light. Shallow depth of field. Your hand reaches through the screen, I swear I feel the heat"}}
{"song": "Satellite Hearts", "artist": "Neon Circuit", "beat": 9, "timestamp": "4:00", "duration": "30s", "lyric_line": "The bandwidth's dying, say it now before the link goes dark", "scene": {"mood": "urgency", "colors": ["red", "black", "strobe white"], "composition": "extreme wide", "camera": "tracking shot", "description": "Handheld camera running. Blurred faces. Traffic. Heartbeat sound design. The bandwidth's dying, say it now before the link goes dark"}}
{"song": "Satellite Hearts", "artist": "Neon Circuit", "beat": 10, "timestamp": "4:30", "duration": "30s", "lyric_line": "Goodnight, satellite heart. I'll find you in the static.", "scene": {"mood": "bittersweet", "colors": ["amber", "lavender", "fading light"], "composition": "medium shot", "camera": "slow tilt down", "description": "Amber light fading. A smile that's also a goodbye. Beautiful and sad at once. Goodnight, satellite heart. I'll find you in the static."}}
{"song": "Concrete Garden", "artist": "Streetlight Prophet", "beat": 1, "timestamp": "0:00", "duration": "30s", "lyric_line": "They paved over every green thing when the developers came", "scene": {"mood": "oppression", "colors": ["concrete grey", "brown", "exhaust fume yellow"], "composition": "wide shot", "camera": "static", "description": "Concrete. Overpasses. No sky visible. Figures small against infrastructure. They paved over every green thing when the developers came"}}
{"song": "Concrete Garden", "artist": "Streetlight Prophet", "beat": 2, "timestamp": "0:30", "duration": "30s", "lyric_line": "But we planted seeds between the cracks and gave them all a name", "scene": {"mood": "resilience", "colors": ["green", "cracked concrete", "gold"], "composition": "close-up", "camera": "slow pan", "description": "Crack in pavement. Green shoot pushing through. Macro lens. But we planted seeds between the cracks and gave them all a name"}}
{"song": "Concrete Garden", "artist": "Streetlight Prophet", "beat": 3, "timestamp": "1:00", "duration": "30s", "lyric_line": "The mayor says progress looks like demolition and dust", "scene": {"mood": "anger", "colors": ["red", "black", "orange"], "composition": "over the shoulder", "camera": "dolly in", "description": "Shattered glass. Red light. Hands clenched. Jaw tight. The frame vibrates. The mayor says progress looks like demolition and dust"}}
{"song": "Concrete Garden", "artist": "Streetlight Prophet", "beat": 4, "timestamp": "1:30", "duration": "30s", "lyric_line": "But a dandelion broke through the asphalt this morning \u2014 that's us", "scene": {"mood": "beauty", "colors": ["wildflower colors", "green", "sunlight"], "composition": "low angle", "camera": "dolly out", "description": "Wildflowers in unexpected places. Color against grey. Nature reclaiming. But a dandelion broke through the asphalt this morning \u2014 that's us"}}
{"song": "Concrete Garden", "artist": "Streetlight Prophet", "beat": 5, "timestamp": "2:00", "duration": "30s", "lyric_line": "You can't kill what wants to live, can't silence what must sing", "scene": {"mood": "defiance", "colors": ["black", "neon green", "chrome"], "composition": "high angle", "camera": "handheld", "description": "Low angle. Figure standing against the wind. Debris flying past. Unmoved. You can't kill what wants to live, can't silence what must sing"}}
{"song": "Concrete Garden", "artist": "Streetlight Prophet", "beat": 6, "timestamp": "2:30", "duration": "30s", "lyric_line": "We're the roots beneath the road, we're the birds that built on string", "scene": {"mood": "community", "colors": ["warm tones", "string lights", "firelight"], "composition": "dutch angle", "camera": "steadicam", "description": "String lights. People gathered. Laughter out of focus. Warmth as visual language. We're the roots beneath the road, we're the birds that built on string"}}
{"song": "Concrete Garden", "artist": "Streetlight Prophet", "beat": 7, "timestamp": "3:00", "duration": "30s", "lyric_line": "When they tear the next block down we'll be dancing in the rubble", "scene": {"mood": "joy", "colors": ["bright", "multi", "saturated"], "composition": "symmetrical", "camera": "slow zoom", "description": "Saturated color. Wide smiles. Arms open. The world in full bloom. When they tear the next block down we'll be dancing in the rubble"}}
{"song": "Concrete Garden", "artist": "Streetlight Prophet", "beat": 8, "timestamp": "3:30", "duration": "30s", "lyric_line": "Every protest is a garden, every march plants something new", "scene": {"mood": "struggle", "colors": ["dust", "grey", "hard light"], "composition": "rule of thirds", "camera": "crane up", "description": "Close on hands working. Calluses. Dust. Effort visible in every frame. Every protest is a garden, every march plants something new"}}
{"song": "Concrete Garden", "artist": "Streetlight Prophet", "beat": 9, "timestamp": "4:00", "duration": "30s", "lyric_line": "The concrete is a drum and our footsteps keep the beat", "scene": {"mood": "growth", "colors": ["green", "brown", "morning light"], "composition": "extreme wide", "camera": "tracking shot", "description": "Time-lapse. Seed to flower. Sunrise to sunset. Transformation as rhythm. The concrete is a drum and our footsteps keep the beat"}}
{"song": "Concrete Garden", "artist": "Streetlight Prophet", "beat": 10, "timestamp": "4:30", "duration": "30s", "lyric_line": "Tomorrow there'll be flowers where they swore there'd only be defeat", "scene": {"mood": "hope", "colors": ["gold", "sky blue", "white"], "composition": "medium shot", "camera": "slow tilt down", "description": "Open horizon. Golden light breaking through clouds. The figure silhouetted against dawn. Tomorrow there'll be flowers where they swore there'd only be defeat"}}
{"song": "Gravity Well", "artist": "Void Walker", "beat": 1, "timestamp": "0:00", "duration": "30s", "lyric_line": "I felt the pull before I saw the edge", "scene": {"mood": "dread", "colors": ["void black", "deep red", "cold white"], "composition": "wide shot", "camera": "static", "description": "Corner of frame. Something in the periphery. Dark. The camera doesn't look directly. I felt the pull before I saw the edge"}}
{"song": "Gravity Well", "artist": "Void Walker", "beat": 2, "timestamp": "0:30", "duration": "30s", "lyric_line": "The stars bent sideways, light itself was dead", "scene": {"mood": "fascination", "colors": ["event horizon purple", "gravitational lens blue"], "composition": "close-up", "camera": "slow pan", "description": "Close on eyes. Reflection of something impossible. The pupil expands. The stars bent sideways, light itself was dead"}}
{"song": "Gravity Well", "artist": "Void Walker", "beat": 3, "timestamp": "1:00", "duration": "30s", "lyric_line": "I could have turned the ship around but something in me said stay", "scene": {"mood": "surrender", "colors": ["white", "dissolution", "prismatic"], "composition": "over the shoulder", "camera": "dolly in", "description": "Arms opening. Head back. Falling backward into something vast. I could have turned the ship around but something in me said stay"}}
{"song": "Gravity Well", "artist": "Void Walker", "beat": 4, "timestamp": "1:30", "duration": "30s", "lyric_line": "The event horizon glows like a halo made of nothing", "scene": {"mood": "awe", "colors": ["starfield", "nebula colors", "infinite dark"], "composition": "low angle", "camera": "dolly out", "description": "Wide shot of cosmos. Nebula. Stars being born. Human figure tiny at bottom. The event horizon glows like a halo made of nothing"}}
{"song": "Gravity Well", "artist": "Void Walker", "beat": 5, "timestamp": "2:00", "duration": "30s", "lyric_line": "Time stretches thin as wire, each second takes a year", "scene": {"mood": "terror", "colors": ["black", "red shift", "distortion"], "composition": "high angle", "camera": "handheld", "description": "Shaking camera. Red shift. Something approaching fast. The frame distorts. Time stretches thin as wire, each second takes a year"}}
{"song": "Gravity Well", "artist": "Void Walker", "beat": 6, "timestamp": "2:30", "duration": "30s", "lyric_line": "I am both the observer and the thing that disappears", "scene": {"mood": "peace", "colors": ["deep space black", "starlight", "calm"], "composition": "dutch angle", "camera": "steadicam", "description": "Still water. Stars reflected. Perfect mirror. No movement. No sound. I am both the observer and the thing that disappears"}}
{"song": "Gravity Well", "artist": "Void Walker", "beat": 7, "timestamp": "3:00", "duration": "30s", "lyric_line": "My body reads the tidal forces like sheet music played on bone", "scene": {"mood": "disorientation", "colors": ["warped", "chromatic aberration", "bent light"], "composition": "symmetrical", "camera": "slow zoom", "description": "Warped lens. Vertigo. Walls becoming floor. Gravity is a suggestion. My body reads the tidal forces like sheet music played on bone"}}
{"song": "Gravity Well", "artist": "Void Walker", "beat": 8, "timestamp": "3:30", "duration": "30s", "lyric_line": "I stop fighting, stop reaching, stop calling home", "scene": {"mood": "acceptance", "colors": ["soft blue", "warm grey", "sage"], "composition": "rule of thirds", "camera": "crane up", "description": "Soft focus. Gentle light. Figure breathing. The camera doesn't judge. I stop fighting, stop reaching, stop calling home"}}
{"song": "Gravity Well", "artist": "Void Walker", "beat": 9, "timestamp": "4:00", "duration": "30s", "lyric_line": "There is a peace in dissolution I was never meant to know", "scene": {"mood": "transcendence", "colors": ["pure white", "beyond visible", "golden"], "composition": "extreme wide", "camera": "tracking shot", "description": "Pure white expanding. Figure becoming light. Boundaries dissolving. There is a peace in dissolution I was never meant to know"}}
{"song": "Gravity Well", "artist": "Void Walker", "beat": 10, "timestamp": "4:30", "duration": "30s", "lyric_line": "Singularity. Silence. Everything and nothing both at once.", "scene": {"mood": "emptiness", "colors": ["void", "absolute black", "nothing"], "composition": "medium shot", "camera": "slow tilt down", "description": "Absolute black. No stars. No reference point. The void looking back. Singularity. Silence. Everything and nothing both at once."}}
{"song": "Rust Belt Lullaby", "artist": "Iron & Ember", "beat": 1, "timestamp": "0:00", "duration": "30s", "lyric_line": "My father's hands smelled like machine oil and prayer", "scene": {"mood": "nostalgia", "colors": ["amber", "sepia", "dusty rose"], "composition": "wide shot", "camera": "static", "description": "Sepia tones. A photograph come to life. Dust motes in afternoon light. My father's hands smelled like machine oil and prayer"}}
{"song": "Rust Belt Lullaby", "artist": "Iron & Ember", "beat": 2, "timestamp": "0:30", "duration": "30s", "lyric_line": "The factory whistle was our clock, the shift was our calendar", "scene": {"mood": "sadness", "colors": ["grey", "rain", "muted blue"], "composition": "close-up", "camera": "slow pan", "description": "Rain on glass. Grey light. A cup of tea going cold. Still life of loss. The factory whistle was our clock, the shift was our calendar"}}
{"song": "Rust Belt Lullaby", "artist": "Iron & Ember", "beat": 3, "timestamp": "1:00", "duration": "30s", "lyric_line": "He'd come home at midnight, wake me up to say goodnight", "scene": {"mood": "tenderness", "colors": ["blush pink", "warm cream", "soft gold"], "composition": "over the shoulder", "camera": "dolly in", "description": "Close on a hand touching a face. Soft light. Shallow depth of field. He'd come home at midnight, wake me up to say goodnight"}}
{"song": "Rust Belt Lullaby", "artist": "Iron & Ember", "beat": 4, "timestamp": "1:30", "duration": "30s", "lyric_line": "Now the mill is just a skeleton and he's been gone ten years", "scene": {"mood": "loss", "colors": ["faded", "dusty", "empty space"], "composition": "low angle", "camera": "dolly out", "description": "Empty chair. Dust settling. A coat still on a hook. Presence of absence. Now the mill is just a skeleton and he's been gone ten years"}}
{"song": "Rust Belt Lullaby", "artist": "Iron & Ember", "beat": 5, "timestamp": "2:00", "duration": "30s", "lyric_line": "But the river still runs brown with memory and rust", "scene": {"mood": "beauty", "colors": ["wildflower colors", "green", "sunlight"], "composition": "high angle", "camera": "handheld", "description": "Wildflowers in unexpected places. Color against grey. Nature reclaiming. But the river still runs brown with memory and rust"}}
{"song": "Rust Belt Lullaby", "artist": "Iron & Ember", "beat": 6, "timestamp": "2:30", "duration": "30s", "lyric_line": "I found his lunchbox in the attic, coffee stains still fresh", "scene": {"mood": "resignation", "colors": ["grey", "muted blue", "beige"], "composition": "dutch angle", "camera": "steadicam", "description": "Medium shot. Hands dropping keys on a table. Turning away. Not looking back. I found his lunchbox in the attic, coffee stains still fresh"}}
{"song": "Rust Belt Lullaby", "artist": "Iron & Ember", "beat": 7, "timestamp": "3:00", "duration": "30s", "lyric_line": "Some things don't decay \u2014 they just learn to hold still", "scene": {"mood": "love", "colors": ["neutral"], "composition": "symmetrical", "camera": "slow zoom", "description": "Visual interpretation of: Some things don't decay \u2014 they just learn to hold still"}}
{"song": "Rust Belt Lullaby", "artist": "Iron & Ember", "beat": 8, "timestamp": "3:30", "duration": "30s", "lyric_line": "I hum the songs he hummed to me though I've forgotten half the words", "scene": {"mood": "weariness", "colors": ["grey-brown", "faded", "dim"], "composition": "rule of thirds", "camera": "crane up", "description": "Slow movement. Heavy eyelids. The world in faded tones. Everything too much. I hum the songs he hummed to me though I've forgotten half the words"}}
{"song": "Rust Belt Lullaby", "artist": "Iron & Ember", "beat": 9, "timestamp": "4:00", "duration": "30s", "lyric_line": "The town's half-empty but the porch lights still come on at dusk", "scene": {"mood": "quiet hope", "colors": ["faint warm light", "candle glow", "dawn grey"], "composition": "extreme wide", "camera": "tracking shot", "description": "Faint warm light. Candle in dark room. Just enough to see by. The town's half-empty but the porch lights still come on at dusk"}}
{"song": "Rust Belt Lullaby", "artist": "Iron & Ember", "beat": 10, "timestamp": "4:30", "duration": "30s", "lyric_line": "Sleep now, rust belt baby. The furnace keeps us warm.", "scene": {"mood": "peace", "colors": ["deep space black", "starlight", "calm"], "composition": "medium shot", "camera": "slow tilt down", "description": "Still water. Stars reflected. Perfect mirror. No movement. No sound. Sleep now, rust belt baby. The furnace keeps us warm."}}
{"song": "Wildfire Sermon", "artist": "Prophet Ash", "beat": 1, "timestamp": "0:00", "duration": "30s", "lyric_line": "I didn't start the fire but I brought the gasoline", "scene": {"mood": "fury", "colors": ["dark red", "black", "flash"], "composition": "wide shot", "camera": "static", "description": "Dark red wash. Hands destroying. Frame shaking with rage. I didn't start the fire but I brought the gasoline"}}
{"song": "Wildfire Sermon", "artist": "Prophet Ash", "beat": 2, "timestamp": "0:30", "duration": "30s", "lyric_line": "Every sermon needs a spark and every spark needs a dream", "scene": {"mood": "ecstasy", "colors": ["fire", "gold", "blinding white"], "composition": "close-up", "camera": "slow pan", "description": "Fire and gold. Bodies arching. Light bursting from every surface. Every sermon needs a spark and every spark needs a dream"}}
{"song": "Wildfire Sermon", "artist": "Prophet Ash", "beat": 3, "timestamp": "1:00", "duration": "30s", "lyric_line": "The forest is a cathedral and the flames are choir boys singing", "scene": {"mood": "chaos", "colors": ["strobe", "fragmented", "clashing"], "composition": "over the shoulder", "camera": "dolly in", "description": "Fragmented frame. Collage. Everything at once. Order is a memory. The forest is a cathedral and the flames are choir boys singing"}}
{"song": "Wildfire Sermon", "artist": "Prophet Ash", "beat": 4, "timestamp": "1:30", "duration": "30s", "lyric_line": "Watch the old world burn \u2014 isn't the light beautiful?", "scene": {"mood": "joy", "colors": ["bright", "multi", "saturated"], "composition": "low angle", "camera": "dolly out", "description": "Saturated color. Wide smiles. Arms open. The world in full bloom. Watch the old world burn \u2014 isn't the light beautiful?"}}
{"song": "Wildfire Sermon", "artist": "Prophet Ash", "beat": 5, "timestamp": "2:00", "duration": "30s", "lyric_line": "We'll dance in the embers, we'll make love in the ash", "scene": {"mood": "destruction", "colors": ["fire", "ash", "smoke orange"], "composition": "high angle", "camera": "handheld", "description": "Fire. Ash falling like snow. Structures collapsing. Beautiful in its terrible way. We'll dance in the embers, we'll make love in the ash"}}
{"song": "Wildfire Sermon", "artist": "Prophet Ash", "beat": 6, "timestamp": "2:30", "duration": "30s", "lyric_line": "From destruction comes the soil where new things grow at last", "scene": {"mood": "creation", "colors": ["green", "light", "warm gold"], "composition": "dutch angle", "camera": "steadicam", "description": "Hands shaping clay. Light emerging from dark. Something new being born. From destruction comes the soil where new things grow at last"}}
{"song": "Wildfire Sermon", "artist": "Prophet Ash", "beat": 7, "timestamp": "3:00", "duration": "30s", "lyric_line": "But don't mistake the warmth for safety, don't mistake the glow for home", "scene": {"mood": "warning", "colors": ["red flash", "amber", "siren"], "composition": "symmetrical", "camera": "slow zoom", "description": "Red flash. Siren light. The calm before. Then: impact. But don't mistake the warmth for safety, don't mistake the glow for home"}}
{"song": "Wildfire Sermon", "artist": "Prophet Ash", "beat": 8, "timestamp": "3:30", "duration": "30s", "lyric_line": "Come closer, come closer \u2014 I promise the burning feels like flying", "scene": {"mood": "invitation", "colors": ["warm", "open", "golden"], "composition": "rule of thirds", "camera": "crane up", "description": "Open door. Warm light spilling out. A hand extended. Come in. Come closer, come closer \u2014 I promise the burning feels like flying"}}
{"song": "Wildfire Sermon", "artist": "Prophet Ash", "beat": 9, "timestamp": "4:00", "duration": "30s", "lyric_line": "We threw everything we owned into the blaze and laughed", "scene": {"mood": "abandon", "colors": ["wild", "free", "untethered"], "composition": "extreme wide", "camera": "tracking shot", "description": "Running through a field. Hair wild. No destination. Just movement. We threw everything we owned into the blaze and laughed"}}
{"song": "Wildfire Sermon", "artist": "Prophet Ash", "beat": 10, "timestamp": "4:30", "duration": "30s", "lyric_line": "Morning. Smoke. Green shoots. Begin again.", "scene": {"mood": "rebirth", "colors": ["green shoots", "dawn", "clear"], "composition": "medium shot", "camera": "slow tilt down", "description": "Dawn. Green shoots in ash. First breath after drowning. Morning. Smoke. Green shoots. Begin again."}}
{"song": "Midnight Transmission", "artist": "Frequency Ghost", "beat": 1, "timestamp": "0:00", "duration": "30s", "lyric_line": "There's a voice on the radio that shouldn't be there", "scene": {"mood": "mystery", "colors": ["deep blue", "shadow", "candle"], "composition": "wide shot", "camera": "static", "description": "Shadow figure in doorway. Candle. Face half-lit. Eyes knowing. There's a voice on the radio that shouldn't be there"}}
{"song": "Midnight Transmission", "artist": "Frequency Ghost", "beat": 2, "timestamp": "0:30", "duration": "30s", "lyric_line": "Speaking my name in a language I almost understand", "scene": {"mood": "loneliness", "colors": ["single light", "dark", "cold blue"], "composition": "close-up", "camera": "slow pan", "description": "Single light in vast dark. Figure beneath it. Nothing else. Speaking my name in a language I almost understand"}}
{"song": "Midnight Transmission", "artist": "Frequency Ghost", "beat": 3, "timestamp": "1:00", "duration": "30s", "lyric_line": "I turn the dial but it follows like a shadow made of sound", "scene": {"mood": "curiosity", "colors": ["warm yellow", "spotlight", "discovery"], "composition": "over the shoulder", "camera": "dolly in", "description": "Light moving across a surface. Discovery. Eyes widening. I turn the dial but it follows like a shadow made of sound"}}
{"song": "Midnight Transmission", "artist": "Frequency Ghost", "beat": 4, "timestamp": "1:30", "duration": "30s", "lyric_line": "Then it says something only I would know, something buried deep", "scene": {"mood": "connection", "colors": ["warm gold", "rose", "blush"], "composition": "low angle", "camera": "dolly out", "description": "Two hands reaching. Fingers almost touching. Warm light between them. Then it says something only I would know, something buried deep"}}
{"song": "Midnight Transmission", "artist": "Frequency Ghost", "beat": 5, "timestamp": "2:00", "duration": "30s", "lyric_line": "I'm not afraid anymore \u2014 I'm listening", "scene": {"mood": "paranoia", "colors": ["surveillance green", "strobe", "red"], "composition": "high angle", "camera": "handheld", "description": "Surveillance angles. Green tint. Multiple screens. Watching. Being watched. I'm not afraid anymore \u2014 I'm listening"}}
{"song": "Midnight Transmission", "artist": "Frequency Ghost", "beat": 6, "timestamp": "2:30", "duration": "30s", "lyric_line": "The voice knows my dreams, it describes them back to me", "scene": {"mood": "intimacy", "colors": ["candlelight", "warm", "close"], "composition": "dutch angle", "camera": "steadicam", "description": "Candlelight only. Two faces close. Shared breath. The world outside forgotten. The voice knows my dreams, it describes them back to me"}}
{"song": "Midnight Transmission", "artist": "Frequency Ghost", "beat": 7, "timestamp": "3:00", "duration": "30s", "lyric_line": "We're having a conversation across some membrane I can't see", "scene": {"mood": "urgency", "colors": ["red", "black", "strobe white"], "composition": "symmetrical", "camera": "slow zoom", "description": "Handheld camera running. Blurred faces. Traffic. Heartbeat sound design. We're having a conversation across some membrane I can't see"}}
{"song": "Midnight Transmission", "artist": "Frequency Ghost", "beat": 8, "timestamp": "3:30", "duration": "30s", "lyric_line": "Then static. Then nothing. Then a whisper: find me", "scene": {"mood": "disconnection", "colors": ["static", "grey", "broken signal"], "composition": "rule of thirds", "camera": "crane up", "description": "Static. Snow on screen. A voice breaking up. Distance measured in noise. Then static. Then nothing. Then a whisper: find me"}}
{"song": "Midnight Transmission", "artist": "Frequency Ghost", "beat": 9, "timestamp": "4:00", "duration": "30s", "lyric_line": "I search every frequency but the voice is gone", "scene": {"mood": "searching", "colors": ["flashlight beam", "dark", "moving light"], "composition": "extreme wide", "camera": "tracking shot", "description": "Flashlight beam cutting dark. Moving. Looking. Not finding yet. I search every frequency but the voice is gone"}}
{"song": "Midnight Transmission", "artist": "Frequency Ghost", "beat": 10, "timestamp": "4:30", "duration": "30s", "lyric_line": "Some nights I still hear it, faint, like a song in another room", "scene": {"mood": "haunting", "colors": ["faint blue", "echo", "silver"], "composition": "medium shot", "camera": "slow tilt down", "description": "Faint blue light. Echo of a figure. Present and absent simultaneously. Some nights I still hear it, faint, like a song in another room"}}
{"song": "Crown of Thorns and Roses", "artist": "Velvet Guillotine", "beat": 1, "timestamp": "0:00", "duration": "30s", "lyric_line": "I wore your love like a weapon and you never felt the blade", "scene": {"mood": "seduction", "colors": ["deep red", "velvet", "candlelight"], "composition": "wide shot", "camera": "static", "description": "Deep red. Velvet textures. Slow movement. Eyes that promise. I wore your love like a weapon and you never felt the blade"}}
{"song": "Crown of Thorns and Roses", "artist": "Velvet Guillotine", "beat": 2, "timestamp": "0:30", "duration": "30s", "lyric_line": "Every kiss was a negotiation, every touch a trade", "scene": {"mood": "power", "colors": ["gold", "black", "crimson"], "composition": "close-up", "camera": "slow pan", "description": "Throne. Gold. Black. The figure doesn't move. Doesn't need to. Every kiss was a negotiation, every touch a trade"}}
{"song": "Crown of Thorns and Roses", "artist": "Velvet Guillotine", "beat": 3, "timestamp": "1:00", "duration": "30s", "lyric_line": "The throne room smells like jasmine and someone else's fear", "scene": {"mood": "cruelty", "colors": ["cold silver", "black", "sharp white"], "composition": "over the shoulder", "camera": "dolly in", "description": "Silver blade. Cold light. A smile that doesn't reach the eyes. The throne room smells like jasmine and someone else's fear"}}
{"song": "Crown of Thorns and Roses", "artist": "Velvet Guillotine", "beat": 4, "timestamp": "1:30", "duration": "30s", "lyric_line": "I am beautiful when I'm angry \u2014 haven't you heard?", "scene": {"mood": "beauty", "colors": ["wildflower colors", "green", "sunlight"], "composition": "low angle", "camera": "dolly out", "description": "Wildflowers in unexpected places. Color against grey. Nature reclaiming. I am beautiful when I'm angry \u2014 haven't you heard?"}}
{"song": "Crown of Thorns and Roses", "artist": "Velvet Guillotine", "beat": 5, "timestamp": "2:00", "duration": "30s", "lyric_line": "Don't mistake my gentleness for weakness, darling", "scene": {"mood": "danger", "colors": ["red", "black", "warning yellow"], "composition": "high angle", "camera": "handheld", "description": "Red and black. Warning signs. The frame contracts. Something approaches. Don't mistake my gentleness for weakness, darling"}}
{"song": "Crown of Thorns and Roses", "artist": "Velvet Guillotine", "beat": 6, "timestamp": "2:30", "duration": "30s", "lyric_line": "I chose to be kind. I could burn this kingdom down.", "scene": {"mood": "vulnerability", "colors": ["soft", "exposed", "raw"], "composition": "dutch angle", "camera": "steadicam", "description": "Exposed skin. Soft light. Eyes open. Trust visible in every pore. I chose to be kind. I could burn this kingdom down."}}
{"song": "Crown of Thorns and Roses", "artist": "Velvet Guillotine", "beat": 7, "timestamp": "3:00", "duration": "30s", "lyric_line": "The roses in my crown have thorns that curve inward", "scene": {"mood": "fury", "colors": ["dark red", "black", "flash"], "composition": "symmetrical", "camera": "slow zoom", "description": "Dark red wash. Hands destroying. Frame shaking with rage. The roses in my crown have thorns that curve inward"}}
{"song": "Crown of Thorns and Roses", "artist": "Velvet Guillotine", "beat": 8, "timestamp": "3:30", "duration": "30s", "lyric_line": "I bleed for my own sins, not for yours", "scene": {"mood": "grace", "colors": ["white", "silver", "flowing"], "composition": "rule of thirds", "camera": "crane up", "description": "White. Flowing. Movement without effort. The body as art. I bleed for my own sins, not for yours"}}
{"song": "Crown of Thorns and Roses", "artist": "Velvet Guillotine", "beat": 9, "timestamp": "4:00", "duration": "30s", "lyric_line": "Tonight I lay the crown aside and sleep without armor", "scene": {"mood": "revenge", "colors": ["dark", "steel", "cold blue"], "composition": "extreme wide", "camera": "tracking shot", "description": "Cold blue. Steel. The plan unfolding in shadows. Patience as weapon. Tonight I lay the crown aside and sleep without armor"}}
{"song": "Crown of Thorns and Roses", "artist": "Velvet Guillotine", "beat": 10, "timestamp": "4:30", "duration": "30s", "lyric_line": "Mercy. The hardest word. The only gift worth giving.", "scene": {"mood": "mercy", "colors": ["warm gold", "white", "gentle"], "composition": "medium shot", "camera": "slow tilt down", "description": "Warm gold. Hand lowering a weapon. Choosing not to. The harder path. Mercy. The hardest word. The only gift worth giving."}}
{"song": "Apartment 4B", "artist": "Wallpaper & Wire", "beat": 1, "timestamp": "0:00", "duration": "30s", "lyric_line": "Four walls, one window, a view of another wall", "scene": {"mood": "claustrophobia", "colors": ["close walls", "yellow bulb", "cramped"], "composition": "wide shot", "camera": "static", "description": "Walls close. Ceiling low. Yellow bulb. No escape visible. Four walls, one window, a view of another wall"}}
{"song": "Apartment 4B", "artist": "Wallpaper & Wire", "beat": 2, "timestamp": "0:30", "duration": "30s", "lyric_line": "The radiator clicks like a metronome for the damned", "scene": {"mood": "routine", "colors": ["grey", "institutional", "fluorescent"], "composition": "close-up", "camera": "slow pan", "description": "Fluorescent light. Same motion repeated. Clock on the wall. Time as loop. The radiator clicks like a metronome for the damned"}}
{"song": "Apartment 4B", "artist": "Wallpaper & Wire", "beat": 3, "timestamp": "1:00", "duration": "30s", "lyric_line": "I've memorized every crack in the ceiling \u2014 they form a map", "scene": {"mood": "desperation", "colors": ["scratching", "clawing", "raw"], "composition": "over the shoulder", "camera": "dolly in", "description": "Hands clawing. Fingernails against surface. Raw need. Nothing held back. I've memorized every crack in the ceiling \u2014 they form a map"}}
{"song": "Apartment 4B", "artist": "Wallpaper & Wire", "beat": 4, "timestamp": "1:30", "duration": "30s", "lyric_line": "In my mind I've left a hundred times, bought a farm, learned to fly", "scene": {"mood": "fantasy", "colors": ["dreamy", "pastel", "floating"], "composition": "low angle", "camera": "dolly out", "description": "Pastel. Floating. Impossible architecture. Gravity optional. In my mind I've left a hundred times, bought a farm, learned to fly"}}
{"song": "Apartment 4B", "artist": "Wallpaper & Wire", "beat": 5, "timestamp": "2:00", "duration": "30s", "lyric_line": "Then one morning I open the door and just walk out", "scene": {"mood": "breakthrough", "colors": ["white burst", "open sky", "blinding"], "composition": "high angle", "camera": "handheld", "description": "White burst. Wall shattering. Open sky beyond. Freedom as explosion. Then one morning I open the door and just walk out"}}
{"song": "Apartment 4B", "artist": "Wallpaper & Wire", "beat": 6, "timestamp": "2:30", "duration": "30s", "lyric_line": "The hallway is an ocean, the stairs are a mountain range", "scene": {"mood": "freedom", "colors": ["open sky", "blue", "green"], "composition": "dutch angle", "camera": "steadicam", "description": "Open road. Blue sky. Green fields. Wind in hair. No walls. The hallway is an ocean, the stairs are a mountain range"}}
{"song": "Apartment 4B", "artist": "Wallpaper & Wire", "beat": 7, "timestamp": "3:00", "duration": "30s", "lyric_line": "The street hits me like cold water and I almost go back", "scene": {"mood": "fear", "colors": ["cold", "dark", "sharp"], "composition": "symmetrical", "camera": "slow zoom", "description": "Cold. Dark. Sharp edges. The frame contracts. Something unseen. The street hits me like cold water and I almost go back"}}
{"song": "Apartment 4B", "artist": "Wallpaper & Wire", "beat": 8, "timestamp": "3:30", "duration": "30s", "lyric_line": "But the sky \u2014 have you seen the sky? It goes on forever", "scene": {"mood": "joy", "colors": ["bright", "multi", "saturated"], "composition": "rule of thirds", "camera": "crane up", "description": "Saturated color. Wide smiles. Arms open. The world in full bloom. But the sky \u2014 have you seen the sky? It goes on forever"}}
{"song": "Apartment 4B", "artist": "Wallpaper & Wire", "beat": 9, "timestamp": "4:00", "duration": "30s", "lyric_line": "I stand on the sidewalk and cry because the world is so big", "scene": {"mood": "grounding", "colors": ["neutral"], "composition": "extreme wide", "camera": "tracking shot", "description": "Visual interpretation of: I stand on the sidewalk and cry because the world is so big"}}
{"song": "Apartment 4B", "artist": "Wallpaper & Wire", "beat": 10, "timestamp": "4:30", "duration": "30s", "lyric_line": "Home is not a place. Home is the moment you stop hiding.", "scene": {"mood": "home", "colors": ["neutral"], "composition": "medium shot", "camera": "slow tilt down", "description": "Visual interpretation of: Home is not a place. Home is the moment you stop hiding."}}

View File

@@ -1,129 +0,0 @@
#!/usr/bin/env python3
"""
augment_pairs.py — Training data augmentation: paraphrase and translate.
Usage:
python3 augment_pairs.py --input data.jsonl
python3 augment_pairs.py --input data.jsonl --paraphrases 3 --langs es,fr,de
python3 augment_pairs.py --input data.jsonl --llm-endpoint http://localhost:11434/v1
"""
import json, os, sys, re, random
from pathlib import Path
random.seed(42)
PARAPHRASE_TRANSFORMS = [
lambda s: re.sub(r"(\w+), (\w+)", r"\2, \1", s, count=1),
lambda s: f"A beautifully rendered scene: {s[0].lower()}{s[1:]}" if len(s) > 10 else s,
lambda s: s.replace("A ", "The ").replace("An ", "The ") if s.startswith(("A ", "An ")) else f"Here, {s[0].lower()}{s[1:]}",
lambda s: f"In a cinematic frame: {s}" if len(s) > 20 else s,
lambda s: s if ", " not in s else ", ".join(s.split(", ")[:2]),
]
TRANSLATIONS = {
"es": {"the":"el","a":"un","is":"es","in":"en","of":"de","and":"y","with":"con","scene":"escena","light":"luz","dark":"oscuro","warm":"cálido","rain":"lluvia","sun":"sol","moon":"luna","sky":"cielo","forest":"bosque","mountain":"montaña","ocean":"océano","golden":"dorado","blue":"azul","red":"rojo","green":"verde","silence":"silencio","dream":"sueño","love":"amor","hope":"esperanza","fear":"miedo","joy":"alegría","peace":"paz","beautiful":"hermoso","sad":"triste","shadow":"sombra","color":"color","silver":"plateado","white":"blanco","black":"negro","portray":"retrato"},
"fr": {"the":"le","a":"un","is":"est","in":"dans","of":"de","and":"et","with":"avec","scene":"scène","light":"lumière","dark":"sombre","warm":"chaud","rain":"pluie","sun":"soleil","moon":"lune","sky":"ciel","forest":"forêt","mountain":"montagne","ocean":"océan","golden":"doré","blue":"bleu","red":"rouge","green":"vert","silence":"silence","dream":"rêve","love":"amour","hope":"espoir","fear":"peur","joy":"joie","peace":"paix","beautiful":"beau","sad":"triste","shadow":"ombre","color":"couleur","silver":"argenté","white":"blanc","black":"noir"},
"de": {"the":"der","a":"ein","is":"ist","in":"in","of":"von","and":"und","with":"mit","scene":"Szene","light":"Licht","dark":"dunkel","warm":"warm","rain":"Regen","sun":"Sonne","moon":"Mond","sky":"Himmel","forest":"Wald","mountain":"Berg","ocean":"Ozean","golden":"golden","blue":"blau","red":"rot","green":"grün","silence":"Stille","dream":"Traum","love":"Liebe","hope":"Hoffnung","fear":"Angst","joy":"Freude","peace":"Frieden","beautiful":"schön","sad":"traurig","shadow":"Schatten","color":"Farbe","silver":"silbern","white":"weiß","black":"schwarz"},
}
LANG_NAMES = {"es": "Spanish", "fr": "French", "de": "German"}
def detect_text_field(entry):
for f in ["rich","terse","text","content","lyric_line","description","scene_description","prompt","scene"]:
if f in entry and isinstance(entry[f], str) and len(entry[f]) > 5:
return f
for k, v in entry.items():
if isinstance(v, str) and len(v) > 5:
return k
return None
def paraphrase(text):
t = random.choice(PARAPHRASE_TRANSFORMS)(text)
if t == text:
t = text.replace(" and ", " & ").replace(" with ", " alongside ")
if t == text:
t = f"In this scene: {text[0].lower()}{text[1:]}" if text[0].isupper() else text
return t
def translate(text, lang):
d = TRANSLATIONS.get(lang, {})
words = text.split()
out = []
for w in words:
lo = w.lower().strip(".,;:!?")
suf = w[len(w.rstrip(".,;:!?")):]
if lo in d:
out.append(d[lo] + suf)
else:
out.append(w)
return " ".join(out)
def augment_file(input_path, output_path=None, n_para=3, langs=None, llm_endpoint=None):
input_path = Path(input_path)
if output_path is None:
output_path = input_path.parent / f"{input_path.stem}_augmented{input_path.suffix}"
entries = [json.loads(l) for l in open(input_path) if l.strip()]
if not entries:
print(f"No entries in {input_path}"); return 0
tf = detect_text_field(entries[0])
if not tf:
print(f"ERROR: No text field in {input_path}", file=sys.stderr); return 0
print(f"Input: {input_path} ({len(entries)} entries, field={tf})")
aug_count = 0
with open(output_path, "w") as out:
for e in entries:
out.write(json.dumps(e, ensure_ascii=False) + "\n")
for i, e in enumerate(entries):
text = e[tf]
# Paraphrases
for p in range(n_para):
para = paraphrase(text)
if para != text:
ne = dict(e); ne[tf] = para
ne["_augmentation"] = f"paraphrase_{p+1}"
ne["_original"] = text[:100]
out.write(json.dumps(ne, ensure_ascii=False) + "\n")
aug_count += 1
# Translations
for lang in (langs or []):
tr = translate(text, lang)
if tr != text:
ne = dict(e); ne[tf] = tr
ne["_augmentation"] = f"translate_{lang}"
ne["_language"] = lang
ne["_original"] = text[:100]
out.write(json.dumps(ne, ensure_ascii=False) + "\n")
aug_count += 1
if (i+1) % 100 == 0:
print(f" {i+1}/{len(entries)} done ({aug_count} augmented)")
total = len(entries) + aug_count
print(f"Done: {len(entries)} originals + {aug_count} augmented = {total}")
print(f"Output: {output_path}")
return aug_count
def main():
import argparse
p = argparse.ArgumentParser()
p.add_argument("--input", required=True)
p.add_argument("--output", default=None)
p.add_argument("--paraphrases", type=int, default=3)
p.add_argument("--langs", default="es,fr,de")
p.add_argument("--llm-endpoint", default=None)
args = p.parse_args()
langs = [l.strip() for l in args.langs.split(",") if l.strip()] if args.langs else []
augment_file(args.input, args.output, args.paraphrases, langs, args.llm_endpoint)
if __name__ == "__main__":
main()