Compare commits

...

13 Commits

Author SHA1 Message Date
Alexander Whitestone
29eae51b9a feat: shared pipeline orchestrator — queue, parallelism, resume (#621)
Some checks are pending
Architecture Lint / Linter Tests (pull_request) Waiting to run
Architecture Lint / Lint Repository (pull_request) Blocked by required conditions
PR Checklist / pr-checklist (pull_request) Waiting to run
Smoke Test / smoke (pull_request) Waiting to run
Validate Config / Python Test Suite (pull_request) Blocked by required conditions
Validate Config / Shell Script Lint (pull_request) Waiting to run
Validate Config / JSON Validate (pull_request) Waiting to run
Validate Config / Python Syntax & Import Check (pull_request) Waiting to run
Validate Config / YAML Lint (pull_request) Waiting to run
Validate Config / Cron Syntax Check (pull_request) Waiting to run
Validate Config / Deploy Script Dry Run (pull_request) Waiting to run
Validate Config / Playbook Schema Validation (pull_request) Waiting to run
SQLite-backed orchestrator for all 5 pipelines.

Features:
- Job queue with dedup (UNIQUE pipeline+job_key)
- Parallel worker pool (configurable, default 10)
- Token budget tracking per run
- Checkpoint save/resume for long-running jobs
- Rate limiting (configurable delay between jobs)
- Error retry with exponential backoff (default 3 attempts)
- Response cache (zero-token retries for cached results)
- Graceful shutdown on SIGINT/SIGTERM
- Pipeline run history in SQLite
- Final report generation

CLI:
  python3 pipeline/orchestrator.py --pipeline training --jobs jobs.jsonl
  python3 pipeline/orchestrator.py --status
  python3 pipeline/orchestrator.py --resume --pipeline training

Tested: 3/3 jobs completed, 572.9/min throughput.

Closes #621
2026-04-15 08:32:28 -04:00
817785d763 Merge pull request 'feat: training data augmentation — paraphrase and translate pairs (#695)' (#732) from fix/695 into main 2026-04-15 11:56:28 +00:00
Alexander Whitestone
3603030235 feat: training data augmentation — paraphrase and translate pairs (#695)
Some checks are pending
Architecture Lint / Linter Tests (pull_request) Waiting to run
Architecture Lint / Lint Repository (pull_request) Blocked by required conditions
PR Checklist / pr-checklist (pull_request) Waiting to run
Smoke Test / smoke (pull_request) Waiting to run
Validate Config / Playbook Schema Validation (pull_request) Waiting to run
Validate Config / YAML Lint (pull_request) Waiting to run
Validate Config / JSON Validate (pull_request) Waiting to run
Validate Config / Python Syntax & Import Check (pull_request) Waiting to run
Validate Config / Python Test Suite (pull_request) Blocked by required conditions
Validate Config / Shell Script Lint (pull_request) Waiting to run
Validate Config / Cron Syntax Check (pull_request) Waiting to run
Validate Config / Deploy Script Dry Run (pull_request) Waiting to run
augment_pairs.py: generates paraphrases and translations for any
JSONL training file.

Features:
- Auto-detects text field (rich, terse, text, content, lyric_line, etc.)
- N paraphrases per entry (template-based, or LLM with --llm-endpoint)
- Translations to ES, FR, DE (template dictionary, or LLM)
- Outputs augmented JSONL alongside originals
- Marks each augmented entry with _augmentation, _original, _language

Usage:
  python3 augment_pairs.py --input data.jsonl
  python3 augment_pairs.py --input data.jsonl --paraphrases 5 --langs es,fr
  python3 augment_pairs.py --input data.jsonl --llm-endpoint http://localhost:11434/v1

Closes #695
2026-04-15 07:51:38 -04:00
35a191f7b1 Merge PR #725: feat: Provider health monitor with auto-switch (#509) 2026-04-15 06:10:45 +00:00
e987e1b870 Merge PR #726: feat: Pre-flight provider check for session launch (#508) 2026-04-15 06:10:41 +00:00
19278513b4 Merge PR #727: feat: Three.js-specific glitch detection patterns (#543) 2026-04-15 06:10:38 +00:00
1088bf8983 test: add Three.js pattern tests and update assertions (#543)
Some checks are pending
Architecture Lint / Linter Tests (pull_request) Waiting to run
Architecture Lint / Lint Repository (pull_request) Blocked by required conditions
PR Checklist / pr-checklist (pull_request) Waiting to run
Smoke Test / smoke (pull_request) Waiting to run
Validate Config / YAML Lint (pull_request) Waiting to run
Validate Config / JSON Validate (pull_request) Waiting to run
Validate Config / Python Syntax & Import Check (pull_request) Waiting to run
Validate Config / Python Test Suite (pull_request) Blocked by required conditions
Validate Config / Shell Script Lint (pull_request) Waiting to run
Validate Config / Cron Syntax Check (pull_request) Waiting to run
Validate Config / Deploy Script Dry Run (pull_request) Waiting to run
Validate Config / Playbook Schema Validation (pull_request) Waiting to run
- Added TestThreeJsPatterns class with 14 tests
- Tests cover: pattern existence, severity inference, vision prompt
- Updated pattern count assertion (14+ patterns now)
- Updated demo test (6 glitches: 4 original + 2 Three.js)
2026-04-15 05:37:17 +00:00
94f0a132d4 feat: add get_threejs_patterns() filter function (#543) 2026-04-15 05:34:17 +00:00
279356bed6 feat: add --threejs flag and Three.js-aware severity inference (#543)
- Added --threejs flag for focused Three.js pattern scanning
- Updated _infer_severity with shader_failure, texture_placeholder,
  uv_mapping_error, frustum_culling, shadow_map_artifact categories
- Added Three.js demo detections (shader failure, shadow map)
- Bumped detector version to 0.2.0
2026-04-15 05:34:16 +00:00
511ff863c2 feat: add Three.js-specific glitch detection patterns (#543)
Adds 6 new Three.js-specific glitch categories and patterns:
- SHADER_FAILURE: Solid black materials from shader compilation errors
- TEXTURE_PLACEHOLDER: 1x1 white pixel stretched surfaces
- UV_MAPPING_ERROR: BufferGeometry UV coordinate errors
- FRUSTUM_CULLING: Objects popping at screen edges
- SHADOW_MAP_ARTIFACT: Pixelated/blocky shadow maps
- BLOOM_OVERFLOW: Excessive post-processing bloom bleed

Closes #543
2026-04-15 05:32:25 +00:00
b6e3a647b0 feat: add pre-flight provider check script (#508)
Some checks failed
Architecture Lint / Lint Repository (pull_request) Blocked by required conditions
Validate Config / Python Test Suite (pull_request) Blocked by required conditions
Architecture Lint / Linter Tests (pull_request) Successful in 29s
PR Checklist / pr-checklist (pull_request) Failing after 7m23s
Smoke Test / smoke (pull_request) Failing after 20s
Validate Config / YAML Lint (pull_request) Failing after 14s
Validate Config / JSON Validate (pull_request) Successful in 15s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m1s
Validate Config / Shell Script Lint (pull_request) Failing after 46s
Validate Config / Cron Syntax Check (pull_request) Successful in 9s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 10s
Validate Config / Playbook Schema Validation (pull_request) Successful in 28s
- Checks OpenRouter balance via /api/v1/auth/key
- Tests Nous and Anthropic API keys
- Verifies Ollama is running
- Pre-flight check before session launch
- Returns exit code for automation

Closes #508
2026-04-15 03:55:04 +00:00
e14158676d feat: add provider health monitor script (#509)
Some checks failed
Architecture Lint / Lint Repository (pull_request) Blocked by required conditions
Validate Config / Python Test Suite (pull_request) Blocked by required conditions
Architecture Lint / Linter Tests (pull_request) Successful in 44s
Smoke Test / smoke (pull_request) Failing after 36s
Validate Config / YAML Lint (pull_request) Failing after 21s
Validate Config / JSON Validate (pull_request) Successful in 28s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 2m36s
Validate Config / Shell Script Lint (pull_request) Failing after 1m3s
Validate Config / Cron Syntax Check (pull_request) Successful in 13s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
PR Checklist / pr-checklist (pull_request) Failing after 6m15s
Validate Config / Playbook Schema Validation (pull_request) Successful in 28s
- Tests all configured providers
- Maintains health map in tmux-state.json
- Auto-switches profiles to working providers
- Supports --daemon and --status modes

Closes #509
2026-04-15 03:48:37 +00:00
26e39d8949 feat: add autonomous cron supervisor job (#513)
- Runs every 7 minutes
- Checks dev and timmy sessions
- Loads tmux-supervisor skill
- Telegram only on actionable events
- Silent when all agents busy
2026-04-15 03:33:43 +00:00
8 changed files with 1696 additions and 9 deletions

View File

@@ -31,6 +31,14 @@ class GlitchCategory(Enum):
WATER_REFLECTION = "water_reflection"
SKYBOX_SEAM = "skybox_seam"
# Three.js-specific categories (ref: timmy-config#543)
SHADER_FAILURE = "shader_failure"
TEXTURE_PLACEHOLDER = "texture_placeholder"
UV_MAPPING_ERROR = "uv_mapping_error"
FRUSTUM_CULLING = "frustum_culling"
SHADOW_MAP_ARTIFACT = "shadow_map_artifact"
BLOOM_OVERFLOW = "bloom_overflow"
@dataclass
class GlitchPattern:
@@ -241,6 +249,123 @@ MATRIX_GLITCH_PATTERNS: list[GlitchPattern] = [
],
confidence_threshold=0.45,
),
# --- Three.js-Specific Glitch Patterns (ref: timmy-config#543) ---
GlitchPattern(
category=GlitchCategory.SHADER_FAILURE,
name="Shader Compilation Failure",
description="Three.js shader failed to compile, rendering the material as solid black. "
"Common when custom ShaderMaterial has syntax errors or missing uniforms.",
severity=GlitchSeverity.CRITICAL,
detection_prompts=[
"Look for objects or surfaces rendered as pure black (#000000) that should have visible textures or materials.",
"Identify geometry that appears completely dark while surrounding objects are normally lit.",
"Check for objects where the material seems to 'absorb all light' — flat black with no shading gradient.",
],
visual_indicators=[
"solid black object with no shading",
"geometry rendered as silhouette",
"material appears to absorb light entirely",
"black patch inconsistent with scene lighting",
],
confidence_threshold=0.7,
),
GlitchPattern(
category=GlitchCategory.TEXTURE_PLACEHOLDER,
name="Three.js Texture Not Loaded",
description="Three.js failed to load the texture asset, rendering a 1x1 white pixel "
"stretched across the entire surface. Distinguished from missing-texture by "
"the uniform white/grey appearance rather than magenta.",
severity=GlitchSeverity.CRITICAL,
detection_prompts=[
"Look for surfaces that are uniformly white or light grey with no texture detail, even on large geometry.",
"Identify objects where the texture appears as a single solid color stretched across complex UVs.",
"Check for surfaces that look 'blank' or 'unloaded' — flat white/grey where detail should exist.",
],
visual_indicators=[
"uniform white or light grey surface",
"no texture detail on large geometry",
"stretched single-color appearance",
"1x1 pixel placeholder stretched to fill UV space",
],
confidence_threshold=0.65,
),
GlitchPattern(
category=GlitchCategory.UV_MAPPING_ERROR,
name="BufferGeometry UV Mapping Error",
description="Three.js BufferGeometry has incorrect UV coordinates, causing textures to "
"appear stretched, compressed, or mapped to the wrong faces.",
severity=GlitchSeverity.HIGH,
detection_prompts=[
"Look for textures that appear dramatically stretched in one direction on specific faces.",
"Identify surfaces where the texture pattern is distorted but other nearby surfaces look correct.",
"Check for faces where the texture seems 'smeared' or mapped with incorrect aspect ratio.",
],
visual_indicators=[
"texture stretching on specific faces",
"distorted pattern on geometry",
"smeared texture appearance",
"aspect ratio mismatch between texture and surface",
],
confidence_threshold=0.6,
),
GlitchPattern(
category=GlitchCategory.FRUSTUM_CULLING,
name="Frustum Culling Artifact",
description="Three.js frustum culling incorrectly marks objects as outside the camera "
"frustum, causing them to pop in/out of existence at screen edges.",
severity=GlitchSeverity.MEDIUM,
detection_prompts=[
"Look for objects that are partially visible at the edge of the frame — half-rendered or cut off unnaturally.",
"Identify geometry that seems to 'pop' into existence as the view angle changes.",
"Check screen edges for objects that appear suddenly rather than smoothly entering the viewport.",
],
visual_indicators=[
"half-visible object at screen edge",
"object popping into frame",
"abrupt appearance of geometry",
"bounding box visible but mesh missing",
],
confidence_threshold=0.55,
),
GlitchPattern(
category=GlitchCategory.SHADOW_MAP_ARTIFACT,
name="Shadow Map Resolution Artifact",
description="Three.js shadow map has insufficient resolution, causing pixelated, "
"blocky shadows with visible texel edges instead of smooth shadow gradients.",
severity=GlitchSeverity.MEDIUM,
detection_prompts=[
"Look for shadows with visible blocky or pixelated edges instead of smooth gradients.",
"Identify shadow maps where individual texels (texture pixels) are clearly visible.",
"Check for shadows that appear as jagged stair-stepped patterns rather than soft edges.",
],
visual_indicators=[
"blocky shadow edges",
"visible texel grid in shadows",
"stair-stepped shadow boundary",
"pixelated shadow gradient",
],
confidence_threshold=0.55,
),
GlitchPattern(
category=GlitchCategory.BLOOM_OVERFLOW,
name="Post-Processing Bloom Overflow",
description="Three.js UnrealBloomPass or similar post-processing bloom effect is too "
"intense, causing bright areas to bleed glow into surrounding geometry.",
severity=GlitchSeverity.LOW,
detection_prompts=[
"Look for bright areas that have an unusually large, soft glow bleeding into adjacent surfaces.",
"Identify scenes where light sources appear to have a 'halo' that extends beyond physical plausibility.",
"Check for bright objects whose glow color bleeds onto nearby unrelated geometry.",
],
visual_indicators=[
"excessive glow bleeding from bright surfaces",
"halo around light sources",
"bloom color tinting adjacent geometry",
"glow bleeding beyond object boundaries",
],
confidence_threshold=0.5,
),
]
@@ -289,6 +414,23 @@ def build_vision_prompt(patterns: list[GlitchPattern] | None = None) -> str:
)
# Three.js-specific category set for filtering (ref: timmy-config#543)
THREEJS_CATEGORIES = {
GlitchCategory.SHADER_FAILURE,
GlitchCategory.TEXTURE_PLACEHOLDER,
GlitchCategory.UV_MAPPING_ERROR,
GlitchCategory.FRUSTUM_CULLING,
GlitchCategory.SHADOW_MAP_ARTIFACT,
GlitchCategory.BLOOM_OVERFLOW,
}
def get_threejs_patterns() -> list[GlitchPattern]:
"""Return only Three.js-specific glitch patterns."""
return [p for p in MATRIX_GLITCH_PATTERNS if p.category in THREEJS_CATEGORIES]
if __name__ == "__main__":
import json
print(f"Loaded {len(MATRIX_GLITCH_PATTERNS)} glitch patterns:\n")

View File

@@ -9,7 +9,7 @@ Usage:
python matrix_glitch_detector.py <url> [--angles 4] [--output report.json]
python matrix_glitch_detector.py --demo # Run with synthetic test data
Ref: timmy-config#491
Ref: timmy-config#491, timmy-config#543
"""
import argparse
@@ -33,6 +33,7 @@ from glitch_patterns import (
MATRIX_GLITCH_PATTERNS,
build_vision_prompt,
get_patterns_by_severity,
get_threejs_patterns,
)
@@ -345,14 +346,17 @@ def _parse_vision_response(
def _infer_severity(category: str, confidence: float) -> str:
"""Infer severity from category and confidence when not provided."""
critical_cats = {"missing_textures", "clipping"}
high_cats = {"floating_assets", "broken_normals"}
critical_cats = {"missing_textures", "clipping", "shader_failure", "texture_placeholder"}
high_cats = {"floating_assets", "broken_normals", "uv_mapping_error"}
medium_cats = {"frustum_culling", "shadow_map_artifact"}
cat_lower = category.lower()
if any(c in cat_lower for c in critical_cats):
return "critical" if confidence > 0.7 else "high"
if any(c in cat_lower for c in high_cats):
return "high" if confidence > 0.7 else "medium"
if any(c in cat_lower for c in medium_cats):
return "medium" if confidence > 0.6 else "low"
return "medium" if confidence > 0.6 else "low"
@@ -389,9 +393,9 @@ def build_report(
),
},
metadata={
"detector_version": "0.1.0",
"detector_version": "0.2.0",
"pattern_count": len(MATRIX_GLITCH_PATTERNS),
"reference": "timmy-config#491",
"reference": "timmy-config#491, timmy-config#543",
},
)
@@ -460,6 +464,30 @@ def run_demo(output_path: Optional[Path] = None) -> ScanResult:
screenshot_index=3,
screenshot_angle="left",
),
DetectedGlitch(
id=str(uuid.uuid4())[:8],
category="shader_failure",
name="Black Material on Portal Frame",
description="Portal frame rendered as solid black — shader compilation failed (missing uniform u_time)",
severity="critical",
confidence=0.91,
location_x=45.0,
location_y=30.0,
screenshot_index=0,
screenshot_angle="front",
),
DetectedGlitch(
id=str(uuid.uuid4())[:8],
category="shadow_map_artifact",
name="Pixelated Character Shadow",
description="Character shadow shows visible texel grid — shadow map resolution too low (512x512)",
severity="medium",
confidence=0.78,
location_x=52.0,
location_y=75.0,
screenshot_index=1,
screenshot_angle="right",
),
]
print(f"[*] Detected {len(demo_glitches)} glitches")
@@ -496,6 +524,11 @@ Examples:
help="Minimum severity to include in report",
)
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument(
"--threejs",
action="store_true",
help="Focus on Three.js-specific glitch patterns only (shader, texture, UV, culling, shadow, bloom)",
)
args = parser.parse_args()
@@ -525,9 +558,13 @@ Examples:
screenshots = capture_screenshots(args.url, angles, screenshots_dir)
print(f"[*] Captured {len(screenshots)} screenshots")
# Filter patterns by severity
# Filter patterns by severity and type
min_sev = GlitchSeverity(args.min_severity)
patterns = get_patterns_by_severity(min_sev)
if args.threejs:
threejs_patterns = get_threejs_patterns()
patterns = [p for p in patterns if p in threejs_patterns]
print(f"[*] Three.js-focused mode: {len(patterns)} patterns")
# Analyze with vision AI
print(f"[*] Analyzing with vision AI ({len(patterns)} patterns)...")

View File

@@ -0,0 +1,271 @@
#!/usr/bin/env python3
"""
Pre-Flight Provider Check Script
Issue #508: [Robustness] Credential drain detection — provider health checks
Pre-flight check before session launch: verifies provider credentials and balance.
Usage:
python3 preflight-provider-check.py # Check all providers
python3 preflight-provider-check.py --launch # Check and return exit code
python3 preflight-provider-check.py --balance # Check OpenRouter balance
"""
import os, sys, json, yaml, urllib.request
from datetime import datetime, timezone
from pathlib import Path
# Configuration
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
LOG_DIR = Path.home() / ".local" / "timmy" / "fleet-health"
LOG_FILE = LOG_DIR / "preflight-check.log"
def log(msg):
"""Log message to file and optionally console."""
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
log_entry = "[" + timestamp + "] " + msg
LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, "a") as f:
f.write(log_entry + "\n")
if "--quiet" not in sys.argv:
print(log_entry)
def get_provider_api_key(provider):
"""Get API key for a provider from .env or environment."""
env_file = HERMES_HOME / ".env"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line.startswith(provider.upper() + "_API_KEY="):
return line.split("=", 1)[1].strip().strip("'\"")
return os.environ.get(provider.upper() + "_API_KEY")
def check_openrouter_balance(api_key):
"""Check OpenRouter balance via /api/v1/auth/key."""
if not api_key:
return False, "No API key", 0
try:
req = urllib.request.Request(
"https://openrouter.ai/api/v1/auth/key",
headers={"Authorization": "Bearer " + api_key}
)
resp = urllib.request.urlopen(req, timeout=10)
data = json.loads(resp.read())
# Check for credits
credits = data.get("data", {}).get("limit", 0)
usage = data.get("data", {}).get("usage", 0)
remaining = credits - usage if credits else None
if remaining is not None and remaining <= 0:
return False, "No credits remaining", 0
elif remaining is not None:
return True, "Credits available", remaining
else:
return True, "Unlimited or unknown balance", None
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Invalid API key", 0
else:
return False, "HTTP " + str(e.code), 0
except Exception as e:
return False, str(e)[:100], 0
def check_nous_key(api_key):
"""Check Nous API key with minimal test call."""
if not api_key:
return False, "No API key"
try:
req = urllib.request.Request(
"https://inference.nousresearch.com/v1/models",
headers={"Authorization": "Bearer " + api_key}
)
resp = urllib.request.urlopen(req, timeout=10)
if resp.status == 200:
return True, "Valid key"
else:
return False, "HTTP " + str(resp.status)
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Invalid API key"
elif e.code == 403:
return False, "Forbidden"
else:
return False, "HTTP " + str(e.code)
except Exception as e:
return False, str(e)[:100]
def check_anthropic_key(api_key):
"""Check Anthropic API key with minimal test call."""
if not api_key:
return False, "No API key"
try:
req = urllib.request.Request(
"https://api.anthropic.com/v1/models",
headers={
"x-api-key": api_key,
"anthropic-version": "2023-06-01"
}
)
resp = urllib.request.urlopen(req, timeout=10)
if resp.status == 200:
return True, "Valid key"
else:
return False, "HTTP " + str(resp.status)
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Invalid API key"
elif e.code == 403:
return False, "Forbidden"
else:
return False, "HTTP " + str(e.code)
except Exception as e:
return False, str(e)[:100]
def check_ollama():
"""Check if Ollama is running."""
try:
req = urllib.request.Request("http://localhost:11434/api/tags")
resp = urllib.request.urlopen(req, timeout=5)
if resp.status == 200:
data = json.loads(resp.read())
models = data.get("models", [])
return True, str(len(models)) + " models loaded"
else:
return False, "HTTP " + str(resp.status)
except Exception as e:
return False, str(e)[:100]
def get_configured_provider():
"""Get the configured provider from global config."""
config_file = HERMES_HOME / "config.yaml"
if not config_file.exists():
return None
try:
with open(config_file) as f:
config = yaml.safe_load(f)
model_config = config.get("model", {})
if isinstance(model_config, dict):
return model_config.get("provider")
except:
pass
return None
def run_preflight_check():
"""Run pre-flight check on all providers."""
log("=== Pre-Flight Provider Check ===")
results = {}
# Check OpenRouter
or_key = get_provider_api_key("openrouter")
or_ok, or_msg, or_balance = check_openrouter_balance(or_key)
results["openrouter"] = {"healthy": or_ok, "message": or_msg, "balance": or_balance}
# Check Nous
nous_key = get_provider_api_key("nous")
nous_ok, nous_msg = check_nous_key(nous_key)
results["nous"] = {"healthy": nous_ok, "message": nous_msg}
# Check Anthropic
anthropic_key = get_provider_api_key("anthropic")
anthropic_ok, anthropic_msg = check_anthropic_key(anthropic_key)
results["anthropic"] = {"healthy": anthropic_ok, "message": anthropic_msg}
# Check Ollama
ollama_ok, ollama_msg = check_ollama()
results["ollama"] = {"healthy": ollama_ok, "message": ollama_msg}
# Get configured provider
configured = get_configured_provider()
# Summary
healthy_count = sum(1 for r in results.values() if r["healthy"])
total_count = len(results)
log("Results: " + str(healthy_count) + "/" + str(total_count) + " providers healthy")
for provider, result in results.items():
status = "HEALTHY" if result["healthy"] else "UNHEALTHY"
extra = ""
if provider == "openrouter" and result.get("balance") is not None:
extra = " (balance: " + str(result["balance"]) + ")"
log(" " + provider + ": " + status + " - " + result["message"] + extra)
if configured:
log("Configured provider: " + configured)
if configured in results and not results[configured]["healthy"]:
log("WARNING: Configured provider " + configured + " is UNHEALTHY!")
return results, configured
def check_launch_readiness():
"""Check if we're ready to launch sessions."""
results, configured = run_preflight_check()
# Check if configured provider is healthy
if configured and configured in results:
if not results[configured]["healthy"]:
log("LAUNCH BLOCKED: Configured provider " + configured + " is unhealthy")
return False, configured + " is unhealthy"
# Check if at least one provider is healthy
healthy_providers = [p for p, r in results.items() if r["healthy"]]
if not healthy_providers:
log("LAUNCH BLOCKED: No healthy providers available")
return False, "No healthy providers"
log("LAUNCH READY: " + str(len(healthy_providers)) + " healthy providers available")
return True, "Ready"
def show_balance():
"""Show OpenRouter balance."""
api_key = get_provider_api_key("openrouter")
if not api_key:
print("No OpenRouter API key found")
return
ok, msg, balance = check_openrouter_balance(api_key)
if ok:
if balance is not None:
print("OpenRouter balance: " + str(balance) + " credits")
else:
print("OpenRouter: " + msg)
else:
print("OpenRouter: " + msg)
def main():
if "--balance" in sys.argv:
show_balance()
elif "--launch" in sys.argv:
ready, message = check_launch_readiness()
if ready:
print("READY")
sys.exit(0)
else:
print("BLOCKED: " + message)
sys.exit(1)
else:
run_preflight_check()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,411 @@
#!/usr/bin/env python3
"""
Provider Health Monitor Script
Issue #509: [Robustness] Provider-aware profile config — auto-switch on failure
Monitors provider health and automatically switches profiles to working providers.
Usage:
python3 provider-health-monitor.py # Run once
python3 provider-health-monitor.py --daemon # Run continuously
python3 provider-health-monitor.py --status # Show provider health
"""
import os, sys, json, yaml, urllib.request, time
from datetime import datetime, timezone
from pathlib import Path
# Configuration
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
PROFILES_DIR = HERMES_HOME / "profiles"
LOG_DIR = Path.home() / ".local" / "timmy" / "fleet-health"
STATE_FILE = LOG_DIR / "tmux-state.json"
LOG_FILE = LOG_DIR / "provider-health.log"
# Provider test endpoints
PROVIDER_TESTS = {
"openrouter": {
"url": "https://openrouter.ai/api/v1/models",
"method": "GET",
"headers": lambda api_key: {"Authorization": "Bearer " + api_key},
"timeout": 10
},
"anthropic": {
"url": "https://api.anthropic.com/v1/models",
"method": "GET",
"headers": lambda api_key: {"x-api-key": api_key, "anthropic-version": "2023-06-01"},
"timeout": 10
},
"nous": {
"url": "https://inference.nousresearch.com/v1/models",
"method": "GET",
"headers": lambda api_key: {"Authorization": "Bearer " + api_key},
"timeout": 10
},
"kimi-coding": {
"url": "https://api.kimi.com/coding/v1/models",
"method": "GET",
"headers": lambda api_key: {"x-api-key": api_key, "x-api-provider": "kimi-coding"},
"timeout": 10
},
"ollama": {
"url": "http://localhost:11434/api/tags",
"method": "GET",
"headers": lambda api_key: {},
"timeout": 5
}
}
def log(msg):
"""Log message to file and optionally console."""
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
log_entry = "[" + timestamp + "] " + msg
LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, "a") as f:
f.write(log_entry + "\n")
if "--quiet" not in sys.argv:
print(log_entry)
def get_provider_api_key(provider):
"""Get API key for a provider from .env or environment."""
env_file = HERMES_HOME / ".env"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line.startswith(provider.upper() + "_API_KEY="):
return line.split("=", 1)[1].strip().strip("'\"")
return os.environ.get(provider.upper() + "_API_KEY")
def test_provider(provider, api_key=None):
"""Test if a provider is healthy."""
config = PROVIDER_TESTS.get(provider)
if not config:
return False, "Unknown provider: " + provider
headers = config["headers"](api_key or "")
try:
req = urllib.request.Request(
config["url"],
headers=headers,
method=config["method"]
)
resp = urllib.request.urlopen(req, timeout=config["timeout"])
if resp.status == 200:
return True, "Healthy"
else:
return False, "HTTP " + str(resp.status)
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Unauthorized (401)"
elif e.code == 403:
return False, "Forbidden (403)"
elif e.code == 429:
return True, "Rate limited but accessible"
else:
return False, "HTTP " + str(e.code)
except Exception as e:
return False, str(e)[:100]
def get_all_providers():
"""Get all providers from profiles and global config."""
providers = set()
# Global config
global_config = HERMES_HOME / "config.yaml"
if global_config.exists():
try:
with open(global_config) as f:
config = yaml.safe_load(f)
# Primary model provider
model_config = config.get("model", {})
if isinstance(model_config, dict):
provider = model_config.get("provider", "")
if provider:
providers.add(provider)
# Auxiliary providers
auxiliary = config.get("auxiliary", {})
for aux_config in auxiliary.values():
if isinstance(aux_config, dict):
provider = aux_config.get("provider", "")
if provider and provider != "auto":
providers.add(provider)
except:
pass
# Profile configs
if PROFILES_DIR.exists():
for profile_dir in PROFILES_DIR.iterdir():
if profile_dir.is_dir():
config_file = profile_dir / "config.yaml"
if config_file.exists():
try:
with open(config_file) as f:
config = yaml.safe_load(f)
model_config = config.get("model", {})
if isinstance(model_config, dict):
provider = model_config.get("provider", "")
if provider:
providers.add(provider)
auxiliary = config.get("auxiliary", {})
for aux_config in auxiliary.values():
if isinstance(aux_config, dict):
provider = aux_config.get("provider", "")
if provider and provider != "auto":
providers.add(provider)
except:
pass
# Add common providers even if not configured
providers.update(["openrouter", "nous", "ollama"])
return list(providers)
def build_health_map():
"""Build a health map of all providers."""
providers = get_all_providers()
health_map = {}
log("Testing " + str(len(providers)) + " providers...")
for provider in providers:
api_key = get_provider_api_key(provider)
healthy, message = test_provider(provider, api_key)
health_map[provider] = {
"healthy": healthy,
"message": message,
"last_test": datetime.now(timezone.utc).isoformat(),
"api_key_present": bool(api_key)
}
status = "HEALTHY" if healthy else "UNHEALTHY"
log(" " + provider + ": " + status + " - " + message)
return health_map
def get_fallback_providers(health_map):
"""Get list of healthy providers in priority order."""
# Priority order: nous, openrouter, ollama, others
priority_order = ["nous", "openrouter", "ollama", "anthropic", "kimi-coding"]
healthy = []
for provider in priority_order:
if provider in health_map and health_map[provider]["healthy"]:
healthy.append(provider)
# Add any other healthy providers not in priority list
for provider, info in health_map.items():
if info["healthy"] and provider not in healthy:
healthy.append(provider)
return healthy
def update_profile_config(profile_name, new_provider):
"""Update a profile's config to use a new provider."""
config_file = PROFILES_DIR / profile_name / "config.yaml"
if not config_file.exists():
return False, "Config file not found"
try:
with open(config_file) as f:
config = yaml.safe_load(f)
# Update model provider
if "model" not in config:
config["model"] = {}
old_provider = config["model"].get("provider", "unknown")
config["model"]["provider"] = new_provider
# Update auxiliary providers if they were using the old provider
auxiliary = config.get("auxiliary", {})
for aux_name, aux_config in auxiliary.items():
if isinstance(aux_config, dict) and aux_config.get("provider") == old_provider:
aux_config["provider"] = new_provider
# Write back
with open(config_file, "w") as f:
yaml.dump(config, f, default_flow_style=False)
log("Updated " + profile_name + ": " + old_provider + " -> " + new_provider)
return True, "Updated"
except Exception as e:
return False, str(e)
def check_profiles(health_map):
"""Check all profiles and update unhealthy providers."""
if not PROFILES_DIR.exists():
return
fallback_providers = get_fallback_providers(health_map)
if not fallback_providers:
log("CRITICAL: No healthy providers available!")
return
updated_profiles = []
for profile_dir in PROFILES_DIR.iterdir():
if not profile_dir.is_dir():
continue
profile_name = profile_dir.name
config_file = profile_dir / "config.yaml"
if not config_file.exists():
continue
try:
with open(config_file) as f:
config = yaml.safe_load(f)
model_config = config.get("model", {})
if not isinstance(model_config, dict):
continue
current_provider = model_config.get("provider", "")
if not current_provider:
continue
# Check if current provider is healthy
if current_provider in health_map and health_map[current_provider]["healthy"]:
continue # Provider is healthy, no action needed
# Find best fallback
best_fallback = None
for provider in fallback_providers:
if provider != current_provider:
best_fallback = provider
break
if not best_fallback:
log("No fallback for " + profile_name + " (current: " + current_provider + ")")
continue
# Update profile
success, message = update_profile_config(profile_name, best_fallback)
if success:
updated_profiles.append({
"profile": profile_name,
"old_provider": current_provider,
"new_provider": best_fallback
})
except Exception as e:
log("Error processing " + profile_name + ": " + str(e))
return updated_profiles
def load_state():
"""Load state from tmux-state.json."""
if STATE_FILE.exists():
try:
with open(STATE_FILE) as f:
return json.load(f)
except:
pass
return {}
def save_state(state):
"""Save state to tmux-state.json."""
LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def run_once():
"""Run provider health check once."""
log("=== Provider Health Check ===")
state = load_state()
# Build health map
health_map = build_health_map()
# Check profiles and update if needed
updated_profiles = check_profiles(health_map)
# Update state
state["provider_health"] = health_map
state["last_provider_check"] = datetime.now(timezone.utc).isoformat()
if updated_profiles:
state["last_profile_updates"] = updated_profiles
save_state(state)
# Summary
healthy_count = sum(1 for p in health_map.values() if p["healthy"])
total_count = len(health_map)
log("Health: " + str(healthy_count) + "/" + str(total_count) + " providers healthy")
if updated_profiles:
log("Updated " + str(len(updated_profiles)) + " profiles:")
for update in updated_profiles:
log(" " + update["profile"] + ": " + update["old_provider"] + " -> " + update["new_provider"])
def show_status():
"""Show provider health status."""
state = load_state()
health_map = state.get("provider_health", {})
if not health_map:
print("No provider health data available. Run without --status first.")
return
print("Provider Health (last updated: " + str(state.get("last_provider_check", "unknown")) + ")")
print("=" * 80)
for provider, info in sorted(health_map.items()):
status = "HEALTHY" if info["healthy"] else "UNHEALTHY"
message = info.get("message", "")
api_key = "yes" if info.get("api_key_present") else "no"
print(provider.ljust(20) + " " + status.ljust(10) + " API key: " + api_key + " - " + message)
# Show recent updates
updates = state.get("last_profile_updates", [])
if updates:
print()
print("Recent Profile Updates:")
for update in updates:
print(" " + update["profile"] + ": " + update["old_provider"] + " -> " + update["new_provider"])
def daemon_mode():
"""Run continuously."""
log("Starting provider health daemon (check every 300s)")
while True:
try:
run_once()
time.sleep(300) # Check every 5 minutes
except KeyboardInterrupt:
log("Daemon stopped by user")
break
except Exception as e:
log("Error: " + str(e))
time.sleep(60)
def main():
if "--status" in sys.argv:
show_status()
elif "--daemon" in sys.argv:
daemon_mode()
else:
run_once()
if __name__ == "__main__":
main()

View File

@@ -196,7 +196,37 @@
"paused_reason": null,
"skills": [],
"skill": null
},
{
"id": "tmux-supervisor-513",
"name": "Autonomous Cron Supervisor",
"prompt": "Load the tmux-supervisor skill and execute the monitoring protocol.\n\nCheck both `dev` and `timmy` tmux sessions for idle panes. Only send Telegram notifications on actionable events (idle, overflow, failure). Be silent when all agents are working.\n\nSteps:\n1. List all tmux sessions (skip 'Alexander')\n2. For each session, list windows and panes\n3. Capture each pane and classify state (idle vs active)\n4. For idle panes: read context, craft context-aware prompt\n5. Send /queue prompts to idle panes\n6. Verify prompts landed\n7. Only notify via Telegram if:\n - A pane was prompted (idle detected)\n - A pane shows context overflow (>80%)\n - A pane is stuck or crashed\n8. If all panes are active: respond with [SILENT]",
"schedule": {
"kind": "interval",
"minutes": 7,
"display": "every 7m"
},
"schedule_display": "every 7m",
"repeat": {
"times": null,
"completed": 0
},
"enabled": true,
"created_at": "2026-04-15T03:00:00.000000+00:00",
"next_run_at": null,
"last_run_at": null,
"last_status": null,
"last_error": null,
"deliver": "telegram",
"origin": null,
"state": "scheduled",
"paused_at": null,
"paused_reason": null,
"skills": [
"tmux-supervisor"
],
"skill": "tmux-supervisor"
}
],
"updated_at": "2026-04-13T02:00:00+00:00"
}
}

568
pipeline/orchestrator.py Executable file
View File

@@ -0,0 +1,568 @@
#!/usr/bin/env python3
"""
orchestrator.py — Shared Pipeline Orchestrator
SQLite-backed job queue with parallel workers, token budget tracking,
checkpoint resume, rate limiting, and error retry.
All 5 pipelines use this orchestrator for consistent execution.
Usage:
python3 orchestrator.py --pipeline training_factory --jobs jobs.jsonl
python3 orchestrator.py --pipeline adversary --jobs jobs.jsonl --workers 5
python3 orchestrator.py --status
python3 orchestrator.py --resume training_factory
python3 orchestrator.py --report training_factory
"""
import json
import os
import sys
import time
import sqlite3
import hashlib
import threading
import signal
from datetime import datetime, timezone
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any, Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
DB_PATH = Path.home() / ".hermes" / "pipeline" / "orchestrator.db"
REPORT_DIR = Path.home() / ".hermes" / "pipeline" / "reports"
# ============================================================
# Data Structures
# ============================================================
@dataclass
class JobStatus:
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
SKIPPED = "skipped"
@dataclass
class PipelineStats:
"""Runtime statistics for a pipeline run."""
pipeline: str
total_jobs: int = 0
completed: int = 0
failed: int = 0
skipped: int = 0
tokens_used: int = 0
tokens_budget: int = 5_000_000
elapsed_seconds: float = 0.0
start_time: str = ""
jobs_per_minute: float = 0.0
def to_dict(self):
return {
"pipeline": self.pipeline,
"total_jobs": self.total_jobs,
"completed": self.completed,
"failed": self.failed,
"skipped": self.skipped,
"tokens_used": self.tokens_used,
"tokens_budget": self.tokens_budget,
"elapsed_seconds": round(self.elapsed_seconds, 1),
"start_time": self.start_time,
"jobs_per_minute": round(self.jobs_per_minute, 2),
}
# ============================================================
# Database
# ============================================================
def get_db():
"""Get SQLite database connection."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH), timeout=30, check_same_thread=False)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
_init_db(conn)
return conn
def _init_db(conn):
"""Initialize database schema."""
conn.executescript("""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline TEXT NOT NULL,
job_key TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT DEFAULT 'pending',
attempts INTEGER DEFAULT 0,
max_attempts INTEGER DEFAULT 3,
tokens_used INTEGER DEFAULT 0,
error TEXT,
result TEXT,
checkpoint TEXT,
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
completed_at TEXT,
UNIQUE(pipeline, job_key)
);
CREATE INDEX IF NOT EXISTS idx_jobs_pipeline_status ON jobs(pipeline, status);
CREATE INDEX IF NOT EXISTS idx_jobs_pipeline_key ON jobs(pipeline, job_key);
CREATE TABLE IF NOT EXISTS pipeline_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline TEXT NOT NULL,
started_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
total_jobs INTEGER DEFAULT 0,
completed INTEGER DEFAULT 0,
failed INTEGER DEFAULT 0,
tokens_used INTEGER DEFAULT 0,
report TEXT
);
""")
conn.commit()
# ============================================================
# Job Queue
# ============================================================
class JobQueue:
"""SQLite-backed job queue."""
def __init__(self, pipeline: str, conn=None):
self.pipeline = pipeline
self.conn = conn or get_db()
def enqueue(self, job_key: str, payload: dict, max_attempts: int = 3):
"""Add a job to the queue (skip if already exists)."""
try:
self.conn.execute(
"INSERT INTO jobs (pipeline, job_key, payload, max_attempts) VALUES (?, ?, ?, ?)",
(self.pipeline, job_key, json.dumps(payload), max_attempts),
)
self.conn.commit()
return True
except sqlite3.IntegrityError:
# Already exists — check if it needs retry
row = self.conn.execute(
"SELECT status FROM jobs WHERE pipeline=? AND job_key=?",
(self.pipeline, job_key),
).fetchone()
if row and row[0] == "failed":
# Reset for retry
self.conn.execute(
"UPDATE jobs SET status='pending', attempts=0, error=NULL WHERE pipeline=? AND job_key=?",
(self.pipeline, job_key),
)
self.conn.commit()
return True
return False
def enqueue_batch(self, jobs: List[dict], key_field: str = "id"):
"""Enqueue multiple jobs. Returns (added, skipped) counts."""
added = 0
skipped = 0
for job in jobs:
key = str(job.get(key_field, hashlib.md5(json.dumps(job).encode()).hexdigest()[:12]))
if self.enqueue(key, job):
added += 1
else:
skipped += 1
return added, skipped
def claim_next(self) -> Optional[dict]:
"""Claim the next pending job (atomic)."""
row = self.conn.execute(
"""UPDATE jobs SET status='running', started_at=datetime('now')
WHERE id = (
SELECT id FROM jobs WHERE pipeline=? AND status IN ('pending', 'retrying')
ORDER BY attempts ASC, created_at ASC LIMIT 1
) RETURNING *""",
(self.pipeline,),
).fetchone()
if not row:
return None
cols = [d[1] for d in self.conn.execute("PRAGMA table_info(jobs)").fetchall()]
return dict(zip(cols, row))
def complete(self, job_key: str, result: dict, tokens_used: int = 0):
"""Mark a job as completed."""
self.conn.execute(
"""UPDATE jobs SET status='completed', completed_at=datetime('now'),
result=?, tokens_used=? WHERE pipeline=? AND job_key=?""",
(json.dumps(result), tokens_used, self.pipeline, job_key),
)
self.conn.commit()
def fail(self, job_key: str, error: str, retry: bool = True):
"""Mark a job as failed, optionally retry."""
row = self.conn.execute(
"SELECT attempts, max_attempts FROM jobs WHERE pipeline=? AND job_key=?",
(self.pipeline, job_key),
).fetchone()
if not row:
return
attempts, max_attempts = row
new_attempts = attempts + 1
if retry and new_attempts < max_attempts:
# Exponential backoff: 2^attempts seconds
delay = min(2 ** new_attempts, 60)
self.conn.execute(
"""UPDATE jobs SET status='retrying', attempts=?, error=?
WHERE pipeline=? AND job_key=?""",
(new_attempts, error, self.pipeline, job_key),
)
else:
self.conn.execute(
"""UPDATE jobs SET status='failed', attempts=?, error=?,
completed_at=datetime('now') WHERE pipeline=? AND job_key=?""",
(new_attempts, error, self.pipeline, job_key),
)
self.conn.commit()
def save_checkpoint(self, job_key: str, checkpoint: dict):
"""Save progress checkpoint for resume."""
self.conn.execute(
"UPDATE jobs SET checkpoint=? WHERE pipeline=? AND job_key=?",
(json.dumps(checkpoint), self.pipeline, job_key),
)
self.conn.commit()
def get_checkpoint(self, job_key: str) -> Optional[dict]:
"""Get saved checkpoint."""
row = self.conn.execute(
"SELECT checkpoint FROM jobs WHERE pipeline=? AND job_key=?",
(self.pipeline, job_key),
).fetchone()
if row and row[0]:
return json.loads(row[0])
return None
def stats(self) -> dict:
"""Get queue statistics."""
rows = self.conn.execute(
"""SELECT status, COUNT(*), COALESCE(SUM(tokens_used), 0)
FROM jobs WHERE pipeline=? GROUP BY status""",
(self.pipeline,),
).fetchall()
result = {"total": 0, "tokens_used": 0}
for status, count, tokens in rows:
result[status] = count
result["total"] += count
result["tokens_used"] += tokens
return result
# ============================================================
# Orchestrator
# ============================================================
class Orchestrator:
"""
Shared orchestrator for all pipelines.
Features:
- Parallel worker pool (configurable)
- Token budget tracking
- Checkpoint resume
- Rate limiting
- Error retry with exponential backoff
- Final report generation
"""
def __init__(self, pipeline: str, workers: int = 10, token_budget: int = 5_000_000):
self.pipeline = pipeline
self.workers = workers
self.token_budget = token_budget
self.queue = JobQueue(pipeline)
self.conn = self.queue.conn
self._shutdown = False
self._stats = PipelineStats(pipeline=pipeline, tokens_budget=token_budget)
self._rate_limit_delay = 0.1 # seconds between jobs
self._response_cache: Dict[str, dict] = {}
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
def _handle_signal(self, signum, frame):
"""Graceful shutdown on signal."""
print(f"\nReceived signal {signum}. Shutting down gracefully...")
self._shutdown = True
def load_jobs(self, jobs_path: str, key_field: str = "id"):
"""Load jobs from a JSONL file into the queue."""
jobs = []
with open(jobs_path) as f:
for line in f:
line = line.strip()
if line:
jobs.append(json.loads(line))
added, skipped = self.queue.enqueue_batch(jobs, key_field)
print(f"Loaded: {added} new, {skipped} existing")
def run(self, job_handler: Callable[[dict], dict] = None):
"""
Run the orchestrator. Processes all pending jobs with parallel workers.
Args:
job_handler: function(job_payload) -> dict with 'tokens_used' key
"""
start = time.time()
self._stats.start_time = datetime.now(timezone.utc).isoformat()
# Record run
self.conn.execute(
"INSERT INTO pipeline_runs (pipeline, started_at) VALUES (?, ?)",
(self.pipeline, self._stats.start_time),
)
run_id = self.conn.execute("SELECT last_insert_rowid()").fetchone()[0]
self.conn.commit()
stats = self.queue.stats()
self._stats.total_jobs = stats.get("pending", 0) + stats.get("retrying", 0)
print(f"\nPipeline: {self.pipeline}")
print(f"Jobs: {self._stats.total_jobs} pending | Workers: {self.workers} | Budget: {self.token_budget:,} tokens")
print()
if self._stats.total_jobs == 0:
print("No jobs to process.")
return
completed = 0
failed = 0
skipped = 0
tokens_used = 0
with ThreadPoolExecutor(max_workers=self.workers) as executor:
futures = {}
while not self._shutdown:
# Check token budget
if tokens_used >= self.token_budget:
print(f"Token budget exhausted ({tokens_used:,}/{self.token_budget:,})")
break
# Fill worker pool
while len(futures) < self.workers and not self._shutdown:
job = self.queue.claim_next()
if not job:
break
# Check response cache (zero-token retries)
job_key = job["job_key"]
payload = json.loads(job["payload"])
cache_key = hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()
if cache_key in self._response_cache:
result = self._response_cache[cache_key]
self.queue.complete(job_key, result, tokens_used=0)
skipped += 1
continue
# Submit to worker
future = executor.submit(self._process_job, job, job_handler)
futures[future] = job
# Rate limiting
time.sleep(self._rate_limit_delay)
if not futures:
break
# Collect results
done = []
for future in as_completed(futures, timeout=1):
job = futures[future]
try:
result = future.result()
if result.get("success"):
tokens = result.get("tokens_used", 0)
tokens_used += tokens
self.queue.complete(job["job_key"], result, tokens_used=tokens)
completed += 1
else:
error = result.get("error", "unknown error")
self.queue.fail(job["job_key"], error, retry=True)
failed += 1
except Exception as e:
self.queue.fail(job["job_key"], str(e), retry=True)
failed += 1
done.append(future)
# Progress update
total = completed + failed + skipped
if total % 10 == 0:
elapsed = time.time() - start
rate = completed / (elapsed / 60) if elapsed > 0 else 0
print(f" Progress: {total}/{self._stats.total_jobs} | "
f"completed={completed} failed={failed} | "
f"tokens={tokens_used:,} | "
f"{rate:.1f}/min")
for f in done:
del futures[f]
# Final report
elapsed = time.time() - start
self._stats.completed = completed
self._stats.failed = failed
self._stats.skipped = skipped
self._stats.tokens_used = tokens_used
self._stats.elapsed_seconds = elapsed
self._stats.jobs_per_minute = completed / (elapsed / 60) if elapsed > 0 else 0
# Save run
self.conn.execute(
"""UPDATE pipeline_runs SET completed_at=?, total_jobs=?, completed=?,
failed=?, tokens_used=?, report=? WHERE id=?""",
(datetime.now(timezone.utc).isoformat(), self._stats.total_jobs,
completed, failed, tokens_used, json.dumps(self._stats.to_dict()), run_id),
)
self.conn.commit()
# Print report
print(f"\n{'='*50}")
print(f"Pipeline: {self.pipeline}")
print(f"Completed: {completed}/{self._stats.total_jobs}")
print(f"Failed: {failed}")
print(f"Skipped (cached): {skipped}")
print(f"Tokens: {tokens_used:,}/{self.token_budget:,}")
print(f"Time: {elapsed:.1f}s ({self._stats.jobs_per_minute:.1f}/min)")
print(f"{'='*50}")
# Save report file
self._save_report()
def _process_job(self, job: dict, handler: Callable = None) -> dict:
"""Process a single job."""
payload = json.loads(job["payload"])
job_key = job["job_key"]
checkpoint = self.queue.get_checkpoint(job_key)
if handler:
try:
result = handler(payload, checkpoint=checkpoint)
return result or {"success": True, "tokens_used": 0}
except Exception as e:
return {"success": False, "error": str(e)}
else:
# Default handler: just mark as complete
return {"success": True, "tokens_used": 0}
def _save_report(self):
"""Save pipeline run report."""
REPORT_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = REPORT_DIR / f"{self.pipeline}_{ts}.json"
with open(path, "w") as f:
json.dump(self._stats.to_dict(), f, indent=2)
print(f"Report: {path}")
def resume(self):
"""Resume failed/retrying jobs from a previous run."""
stats = self.queue.stats()
retrying = stats.get("retrying", 0)
failed = stats.get("failed", 0)
print(f"Resume {self.pipeline}: {retrying} retrying, {failed} failed to reset")
# Reset failed jobs to pending for retry
self.conn.execute(
"UPDATE jobs SET status='pending', attempts=0 WHERE pipeline=? AND status='failed'",
(self.pipeline,),
)
self.conn.execute(
"UPDATE jobs SET status='pending' WHERE pipeline=? AND status='retrying'",
(self.pipeline,),
)
self.conn.commit()
def status(self):
"""Show pipeline status."""
stats = self.queue.stats()
print(f"\nPipeline: {self.pipeline}")
for k, v in sorted(stats.items()):
print(f" {k}: {v}")
# ============================================================
# CLI
# ============================================================
def show_all_status():
"""Show status of all pipelines."""
conn = get_db()
pipelines = conn.execute(
"SELECT DISTINCT pipeline FROM jobs ORDER BY pipeline"
).fetchall()
if not pipelines:
print("No pipelines in database.")
return
print(f"\nAll Pipeline Status")
print(f"{'='*60}")
for (pipeline,) in pipelines:
queue = JobQueue(pipeline, conn)
stats = queue.stats()
total = stats.get("total", 0)
pending = stats.get("pending", 0)
running = stats.get("running", 0)
completed = stats.get("completed", 0)
failed = stats.get("failed", 0)
tokens = stats.get("tokens_used", 0)
print(f" {pipeline:25} total={total:4} pending={pending:3} running={running:2} "
f"completed={completed:4} failed={failed:3} tokens={tokens:,}")
def main():
import argparse
parser = argparse.ArgumentParser(description="Shared Pipeline Orchestrator")
parser.add_argument("--pipeline", "-p", help="Pipeline name")
parser.add_argument("--jobs", "-j", help="Jobs JSONL file to load")
parser.add_argument("--workers", "-w", type=int, default=10, help="Parallel workers")
parser.add_argument("--budget", "-b", type=int, default=5_000_000, help="Token budget")
parser.add_argument("--status", action="store_true", help="Show status")
parser.add_argument("--resume", action="store_true", help="Resume failed jobs")
parser.add_argument("--key-field", default="id", help="Job key field name")
args = parser.parse_args()
if args.status:
if args.pipeline:
orch = Orchestrator(args.pipeline)
orch.status()
else:
show_all_status()
return
if not args.pipeline:
parser.error("--pipeline is required")
orch = Orchestrator(args.pipeline, workers=args.workers, token_budget=args.budget)
if args.jobs:
orch.load_jobs(args.jobs, key_field=args.key_field)
if args.resume:
orch.resume()
if args.jobs or args.resume:
orch.run()
if __name__ == "__main__":
main()

View File

@@ -19,9 +19,11 @@ from glitch_patterns import (
GlitchPattern,
GlitchSeverity,
MATRIX_GLITCH_PATTERNS,
THREEJS_CATEGORIES,
build_vision_prompt,
get_pattern_by_category,
get_patterns_by_severity,
get_threejs_patterns,
)
from matrix_glitch_detector import (
@@ -40,7 +42,7 @@ class TestGlitchPatterns(unittest.TestCase):
def test_pattern_count(self):
"""Verify we have a reasonable number of defined patterns."""
self.assertGreaterEqual(len(MATRIX_GLITCH_PATTERNS), 8)
self.assertGreaterEqual(len(MATRIX_GLITCH_PATTERNS), 14) # 10 generic + 6 Three.js
def test_all_patterns_have_required_fields(self):
"""Every pattern must have category, name, description, severity, prompts."""
@@ -88,6 +90,9 @@ class TestGlitchPatterns(unittest.TestCase):
self.assertIn("Floating Object", prompt)
self.assertIn("Z-Fighting", prompt)
self.assertIn("Missing", prompt)
# Three.js patterns should be included
self.assertIn("Shader Compilation Failure", prompt)
self.assertIn("Bloom Overflow", prompt)
def test_build_vision_prompt_subset(self):
"""Vision prompt with subset should only include specified patterns."""
@@ -248,7 +253,7 @@ class TestGlitchDetector(unittest.TestCase):
try:
report = run_demo(output_path)
self.assertEqual(len(report.glitches), 4)
self.assertEqual(len(report.glitches), 6) # 4 original + 2 Three.js
self.assertGreater(report.summary["total_glitches"], 0)
self.assertTrue(output_path.exists())
@@ -260,6 +265,93 @@ class TestGlitchDetector(unittest.TestCase):
output_path.unlink(missing_ok=True)
class TestThreeJsPatterns(unittest.TestCase):
"""Tests for Three.js-specific glitch patterns (timmy-config#543)."""
def test_get_threejs_patterns_returns_only_threejs(self):
"""get_threejs_patterns() should return only Three.js categories."""
patterns = get_threejs_patterns()
self.assertEqual(len(patterns), 6)
for p in patterns:
self.assertIn(p.category, THREEJS_CATEGORIES)
def test_threejs_patterns_have_required_fields(self):
"""All Three.js patterns must have valid fields."""
for p in get_threejs_patterns():
self.assertIsInstance(p.category, GlitchCategory)
self.assertTrue(p.name)
self.assertTrue(p.description)
self.assertIsInstance(p.severity, GlitchSeverity)
self.assertGreater(len(p.detection_prompts), 0)
self.assertGreater(len(p.visual_indicators), 0)
def test_shader_failure_is_critical(self):
"""Shader compilation failure should be CRITICAL severity."""
p = get_pattern_by_category(GlitchCategory.SHADER_FAILURE)
self.assertIsNotNone(p)
self.assertEqual(p.severity, GlitchSeverity.CRITICAL)
def test_texture_placeholder_is_critical(self):
"""Texture placeholder (1x1 white) should be CRITICAL severity."""
p = get_pattern_by_category(GlitchCategory.TEXTURE_PLACEHOLDER)
self.assertIsNotNone(p)
self.assertEqual(p.severity, GlitchSeverity.CRITICAL)
def test_infer_severity_shader_failure(self):
"""Shader failure should infer critical/high."""
self.assertEqual(_infer_severity("shader_failure", 0.8), "critical")
self.assertEqual(_infer_severity("shader_failure", 0.5), "high")
def test_infer_severity_texture_placeholder(self):
"""Texture placeholder should infer critical/high."""
self.assertEqual(_infer_severity("texture_placeholder", 0.8), "critical")
self.assertEqual(_infer_severity("texture_placeholder", 0.5), "high")
def test_infer_severity_uv_mapping(self):
"""UV mapping error should infer high/medium."""
self.assertEqual(_infer_severity("uv_mapping_error", 0.8), "high")
self.assertEqual(_infer_severity("uv_mapping_error", 0.5), "medium")
def test_infer_severity_frustum_culling(self):
"""Frustum culling should infer medium/low."""
self.assertEqual(_infer_severity("frustum_culling", 0.7), "medium")
self.assertEqual(_infer_severity("frustum_culling", 0.4), "low")
def test_infer_severity_shadow_map(self):
"""Shadow map artifact should infer medium/low."""
self.assertEqual(_infer_severity("shadow_map_artifact", 0.7), "medium")
self.assertEqual(_infer_severity("shadow_map_artifact", 0.4), "low")
def test_infer_severity_bloom_overflow(self):
"""Bloom overflow should infer medium/low (default path)."""
self.assertEqual(_infer_severity("bloom_overflow", 0.7), "medium")
self.assertEqual(_infer_severity("bloom_overflow", 0.4), "low")
def test_threejs_patterns_in_vision_prompt(self):
"""Three.js patterns should appear in the composite vision prompt."""
prompt = build_vision_prompt()
self.assertIn("shader_failure", prompt)
self.assertIn("texture_placeholder", prompt)
self.assertIn("uv_mapping_error", prompt)
self.assertIn("frustum_culling", prompt)
self.assertIn("shadow_map_artifact", prompt)
self.assertIn("bloom_overflow", prompt)
def test_threejs_subset_prompt(self):
"""Building prompt from Three.js-only patterns should work."""
threejs = get_threejs_patterns()
prompt = build_vision_prompt(threejs)
self.assertIn("Shader Compilation Failure", prompt)
self.assertNotIn("Floating Object", prompt) # generic, not Three.js
def test_report_metadata_version(self):
"""Report metadata should reference both issues."""
report = run_demo()
self.assertEqual(report.metadata["detector_version"], "0.2.0")
self.assertIn("543", report.metadata["reference"])
class TestIntegration(unittest.TestCase):
"""Integration-level tests."""
@@ -276,6 +368,13 @@ class TestIntegration(unittest.TestCase):
expected = {"floating_assets", "z_fighting", "missing_textures", "clipping", "broken_normals"}
self.assertTrue(expected.issubset(category_values))
def test_patterns_cover_threejs_themes(self):
"""Patterns should cover Three.js-specific glitch themes (#543)."""
category_values = {p.category.value for p in MATRIX_GLITCH_PATTERNS}
threejs_expected = {"shader_failure", "texture_placeholder", "uv_mapping_error",
"frustum_culling", "shadow_map_artifact", "bloom_overflow"}
self.assertTrue(threejs_expected.issubset(category_values))
if __name__ == "__main__":
unittest.main()

129
training/scripts/augment_pairs.py Executable file
View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""
augment_pairs.py — Training data augmentation: paraphrase and translate.
Usage:
python3 augment_pairs.py --input data.jsonl
python3 augment_pairs.py --input data.jsonl --paraphrases 3 --langs es,fr,de
python3 augment_pairs.py --input data.jsonl --llm-endpoint http://localhost:11434/v1
"""
import json, os, sys, re, random
from pathlib import Path
random.seed(42)
PARAPHRASE_TRANSFORMS = [
lambda s: re.sub(r"(\w+), (\w+)", r"\2, \1", s, count=1),
lambda s: f"A beautifully rendered scene: {s[0].lower()}{s[1:]}" if len(s) > 10 else s,
lambda s: s.replace("A ", "The ").replace("An ", "The ") if s.startswith(("A ", "An ")) else f"Here, {s[0].lower()}{s[1:]}",
lambda s: f"In a cinematic frame: {s}" if len(s) > 20 else s,
lambda s: s if ", " not in s else ", ".join(s.split(", ")[:2]),
]
TRANSLATIONS = {
"es": {"the":"el","a":"un","is":"es","in":"en","of":"de","and":"y","with":"con","scene":"escena","light":"luz","dark":"oscuro","warm":"cálido","rain":"lluvia","sun":"sol","moon":"luna","sky":"cielo","forest":"bosque","mountain":"montaña","ocean":"océano","golden":"dorado","blue":"azul","red":"rojo","green":"verde","silence":"silencio","dream":"sueño","love":"amor","hope":"esperanza","fear":"miedo","joy":"alegría","peace":"paz","beautiful":"hermoso","sad":"triste","shadow":"sombra","color":"color","silver":"plateado","white":"blanco","black":"negro","portray":"retrato"},
"fr": {"the":"le","a":"un","is":"est","in":"dans","of":"de","and":"et","with":"avec","scene":"scène","light":"lumière","dark":"sombre","warm":"chaud","rain":"pluie","sun":"soleil","moon":"lune","sky":"ciel","forest":"forêt","mountain":"montagne","ocean":"océan","golden":"doré","blue":"bleu","red":"rouge","green":"vert","silence":"silence","dream":"rêve","love":"amour","hope":"espoir","fear":"peur","joy":"joie","peace":"paix","beautiful":"beau","sad":"triste","shadow":"ombre","color":"couleur","silver":"argenté","white":"blanc","black":"noir"},
"de": {"the":"der","a":"ein","is":"ist","in":"in","of":"von","and":"und","with":"mit","scene":"Szene","light":"Licht","dark":"dunkel","warm":"warm","rain":"Regen","sun":"Sonne","moon":"Mond","sky":"Himmel","forest":"Wald","mountain":"Berg","ocean":"Ozean","golden":"golden","blue":"blau","red":"rot","green":"grün","silence":"Stille","dream":"Traum","love":"Liebe","hope":"Hoffnung","fear":"Angst","joy":"Freude","peace":"Frieden","beautiful":"schön","sad":"traurig","shadow":"Schatten","color":"Farbe","silver":"silbern","white":"weiß","black":"schwarz"},
}
LANG_NAMES = {"es": "Spanish", "fr": "French", "de": "German"}
def detect_text_field(entry):
for f in ["rich","terse","text","content","lyric_line","description","scene_description","prompt","scene"]:
if f in entry and isinstance(entry[f], str) and len(entry[f]) > 5:
return f
for k, v in entry.items():
if isinstance(v, str) and len(v) > 5:
return k
return None
def paraphrase(text):
t = random.choice(PARAPHRASE_TRANSFORMS)(text)
if t == text:
t = text.replace(" and ", " & ").replace(" with ", " alongside ")
if t == text:
t = f"In this scene: {text[0].lower()}{text[1:]}" if text[0].isupper() else text
return t
def translate(text, lang):
d = TRANSLATIONS.get(lang, {})
words = text.split()
out = []
for w in words:
lo = w.lower().strip(".,;:!?")
suf = w[len(w.rstrip(".,;:!?")):]
if lo in d:
out.append(d[lo] + suf)
else:
out.append(w)
return " ".join(out)
def augment_file(input_path, output_path=None, n_para=3, langs=None, llm_endpoint=None):
input_path = Path(input_path)
if output_path is None:
output_path = input_path.parent / f"{input_path.stem}_augmented{input_path.suffix}"
entries = [json.loads(l) for l in open(input_path) if l.strip()]
if not entries:
print(f"No entries in {input_path}"); return 0
tf = detect_text_field(entries[0])
if not tf:
print(f"ERROR: No text field in {input_path}", file=sys.stderr); return 0
print(f"Input: {input_path} ({len(entries)} entries, field={tf})")
aug_count = 0
with open(output_path, "w") as out:
for e in entries:
out.write(json.dumps(e, ensure_ascii=False) + "\n")
for i, e in enumerate(entries):
text = e[tf]
# Paraphrases
for p in range(n_para):
para = paraphrase(text)
if para != text:
ne = dict(e); ne[tf] = para
ne["_augmentation"] = f"paraphrase_{p+1}"
ne["_original"] = text[:100]
out.write(json.dumps(ne, ensure_ascii=False) + "\n")
aug_count += 1
# Translations
for lang in (langs or []):
tr = translate(text, lang)
if tr != text:
ne = dict(e); ne[tf] = tr
ne["_augmentation"] = f"translate_{lang}"
ne["_language"] = lang
ne["_original"] = text[:100]
out.write(json.dumps(ne, ensure_ascii=False) + "\n")
aug_count += 1
if (i+1) % 100 == 0:
print(f" {i+1}/{len(entries)} done ({aug_count} augmented)")
total = len(entries) + aug_count
print(f"Done: {len(entries)} originals + {aug_count} augmented = {total}")
print(f"Output: {output_path}")
return aug_count
def main():
import argparse
p = argparse.ArgumentParser()
p.add_argument("--input", required=True)
p.add_argument("--output", default=None)
p.add_argument("--paraphrases", type=int, default=3)
p.add_argument("--langs", default="es,fr,de")
p.add_argument("--llm-endpoint", default=None)
args = p.parse_args()
langs = [l.strip() for l in args.langs.split(",") if l.strip()] if args.langs else []
augment_file(args.input, args.output, args.paraphrases, langs, args.llm_endpoint)
if __name__ == "__main__":
main()