Compare commits

..

2 Commits

Author SHA1 Message Date
067a787c60 test: Add GPU Scheduler tests (#645)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 41s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 43s
Tests / e2e (pull_request) Successful in 2m52s
Tests / test (pull_request) Failing after 42m4s
2026-04-15 01:16:19 +00:00
7445039841 feat: Add GPU Inference Scheduler (#645) 2026-04-15 01:16:17 +00:00
4 changed files with 684 additions and 700 deletions

View File

@@ -1,129 +0,0 @@
"""Tests for FFmpeg Video Composition Pipeline — Issue #643."""
import os
import pytest
import tempfile
from unittest.mock import patch, MagicMock
from tools.video_pipeline import (
VideoSpec, Transition, KenBurnsConfig,
FFmpegPipeline, compose_video,
)
class TestVideoSpec:
def test_defaults(self):
s = VideoSpec()
assert s.width == 1920
assert s.height == 1080
assert s.fps == 30
assert s.codec == "libx264"
assert s.container == "mp4"
assert s.crf == 23
def test_webm_spec(self):
s = VideoSpec(codec="libvpx-vp9", container="webm", audio_codec="libopus")
assert s.container == "webm"
assert s.codec == "libvpx-vp9"
class TestTransition:
def test_defaults(self):
t = Transition()
assert t.duration_sec == 1.0
assert t.type == "fade"
class TestKenBurnsConfig:
def test_defaults(self):
k = KenBurnsConfig()
assert k.zoom_start == 1.0
assert k.zoom_end == 1.15
assert k.duration_sec == 5.0
class TestFFmpegPipelineInit:
@patch("tools.video_pipeline.subprocess.run")
def test_verify_ffmpeg_passes(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg version 6.0")
pipeline = FFmpegPipeline()
assert pipeline.ffmpeg == "ffmpeg"
@patch("tools.video_pipeline.subprocess.run")
def test_verify_ffmpeg_fails(self, mock_run):
mock_run.side_effect = FileNotFoundError()
with pytest.raises(RuntimeError, match="FFmpeg not found"):
FFmpegPipeline()
@patch("tools.video_pipeline.subprocess.run")
def test_custom_ffmpeg_path(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg")
pipeline = FFmpegPipeline(ffmpeg_path="/usr/local/bin/ffmpeg")
assert pipeline.ffmpeg == "/usr/local/bin/ffmpeg"
class TestFFmpegPipelineProbe:
@patch("tools.video_pipeline.subprocess.run")
def test_probe_returns_metadata(self, mock_run):
mock_run.return_value = MagicMock(
returncode=0,
stdout='{"format": {"duration": "10.5"}, "streams": []}'
)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffprobe = "ffprobe"
result = pipeline.probe("/tmp/test.mp4")
assert result["format"]["duration"] == "10.5"
@patch("tools.video_pipeline.subprocess.run")
def test_get_duration(self, mock_run):
mock_run.return_value = MagicMock(
returncode=0,
stdout='{"format": {"duration": "42.3"}, "streams": []}'
)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffprobe = "ffprobe"
assert pipeline.get_duration("/tmp/test.mp4") == 42.3
class TestFFmpegPipelineImagesToVideo:
@patch("tools.video_pipeline.subprocess.run")
def test_empty_images_raises(self, mock_run):
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffmpeg = "ffmpeg"
with pytest.raises(ValueError, match="No images"):
pipeline.images_to_video([], "/tmp/out.mp4")
class TestComposeVideo:
@patch("tools.video_pipeline.FFmpegPipeline")
def test_compose_calls_pipeline(self, MockPipeline):
mock_instance = MagicMock()
mock_instance.compose.return_value = "/tmp/output.mp4"
MockPipeline.return_value = mock_instance
result = compose_video(["img1.png", "img2.png"], output="/tmp/out.mp4")
assert result == "/tmp/output.mp4"
mock_instance.compose.assert_called_once()
class TestRunFFmpeg:
@patch("tools.video_pipeline.subprocess.run")
def test_success(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="")
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline._run_ffmpeg(["ffmpeg", "-version"])
@patch("tools.video_pipeline.subprocess.run")
def test_failure_raises(self, mock_run):
mock_run.return_value = MagicMock(returncode=1, stderr="Error: invalid input")
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
with pytest.raises(RuntimeError, match="FFmpeg error"):
pipeline._run_ffmpeg(["ffmpeg", "-bad"])
@patch("tools.video_pipeline.subprocess.run")
def test_timeout_raises(self, mock_run):
import subprocess
mock_run.side_effect = subprocess.TimeoutExpired(cmd="ffmpeg", timeout=600)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
with pytest.raises(subprocess.TimeoutExpired):
pipeline._run_ffmpeg(["ffmpeg", "-slow"])

View File

@@ -0,0 +1,256 @@
"""
Tests for GPU Inference Scheduler.
"""
import pytest
import tempfile
import os
from pathlib import Path
from tools.gpu_scheduler import (
Priority,
ModelSpec,
InferenceJob,
InferenceScheduler,
MODEL_REGISTRY,
)
@pytest.fixture
def scheduler():
"""Create a scheduler with a temp database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_scheduler.db"
sched = InferenceScheduler(vram_budget_mb=32768, queue_db=str(db_path))
yield sched
class TestPriority:
"""Test priority ordering."""
def test_priority_ordering(self):
"""Realtime < Interactive < Batch."""
assert Priority.REALTIME < Priority.INTERACTIVE
assert Priority.INTERACTIVE < Priority.BATCH
def test_priority_comparison(self):
"""Lower value = higher priority."""
assert Priority.REALTIME.value == 1
assert Priority.INTERACTIVE.value == 2
assert Priority.BATCH.value == 3
class TestModelSpec:
"""Test model specifications."""
def test_model_registry_has_models(self):
"""Registry should have known models."""
assert "llama3_70b" in MODEL_REGISTRY
assert "sd_xl" in MODEL_REGISTRY
assert "mimo_v2_pro" in MODEL_REGISTRY
def test_model_vram(self):
"""Models should have VRAM requirements."""
llama = MODEL_REGISTRY["llama3_70b"]
assert llama.vram_mb > 0
assert llama.vram_mb == 40960 # 40GB
class TestInferenceScheduler:
"""Test the scheduler."""
def test_init(self, scheduler):
"""Scheduler should initialize."""
assert scheduler.vram_budget_mb == 32768
assert scheduler.gpu_state.total_vram_mb == 32768
assert len(scheduler.job_queue) == 0
def test_submit_job(self, scheduler):
"""Submit a job."""
job = scheduler.submit_job(
job_id="test-1",
project="playground",
model_name="llama3_8b",
priority=Priority.INTERACTIVE,
)
assert job.job_id == "test-1"
assert job.status == "queued"
assert len(scheduler.job_queue) == 1
def test_submit_unknown_model(self, scheduler):
"""Submit with unknown model should raise."""
with pytest.raises(ValueError, match="Unknown model"):
scheduler.submit_job(
job_id="test-1",
project="playground",
model_name="nonexistent",
)
def test_priority_ordering(self, scheduler):
"""Jobs should be ordered by priority."""
scheduler.submit_job("batch-1", "harvester", "llama3_8b", Priority.BATCH)
scheduler.submit_job("rt-1", "lpm", "llama3_8b", Priority.REALTIME)
scheduler.submit_job("int-1", "playground", "llama3_8b", Priority.INTERACTIVE)
# RT should be first
assert scheduler.job_queue[0].job_id == "rt-1"
assert scheduler.job_queue[1].job_id == "int-1"
assert scheduler.job_queue[2].job_id == "batch-1"
def test_get_next_job(self, scheduler):
"""Get next job should return highest priority."""
scheduler.submit_job("batch-1", "harvester", "llama3_8b", Priority.BATCH)
scheduler.submit_job("rt-1", "lpm", "llama3_8b", Priority.REALTIME)
next_job = scheduler.get_next_job()
assert next_job.job_id == "rt-1"
def test_start_job(self, scheduler):
"""Start a job."""
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
success = scheduler.start_job(job)
assert success
assert job.status == "loading"
assert job.started_at is not None
assert scheduler.gpu_state.used_vram_mb == 8192 # llama3_8b VRAM
def test_complete_job(self, scheduler):
"""Complete a job."""
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
scheduler.start_job(job)
scheduler.complete_job(job)
assert job.status == "completed"
assert job.completed_at is not None
assert scheduler.gpu_state.used_vram_mb == 0
assert len(scheduler.job_queue) == 0
assert len(scheduler.completed_jobs) == 1
def test_complete_job_with_error(self, scheduler):
"""Complete a job with error."""
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
scheduler.start_job(job)
scheduler.complete_job(job, error="CUDA out of memory")
assert job.status == "failed"
assert job.error == "CUDA out of memory"
def test_vram_tracking(self, scheduler):
"""VRAM should be tracked correctly."""
# Submit two small jobs
job1 = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
job2 = scheduler.submit_job("test-2", "playground", "llama3_8b", Priority.INTERACTIVE)
# Start first
scheduler.start_job(job1)
assert scheduler.gpu_state.used_vram_mb == 8192
# Start second (should work, still have room)
scheduler.start_job(job2)
assert scheduler.gpu_state.used_vram_mb == 16384
# Complete first
scheduler.complete_job(job1)
assert scheduler.gpu_state.used_vram_mb == 8192
def test_cpu_fallback(self, scheduler):
"""CPU fallback when VRAM full."""
# Fill VRAM with two 16GB models (32GB total = our budget)
job1 = scheduler.submit_job("big-1", "lpm", "mimo_v2_pro", Priority.REALTIME)
scheduler.start_job(job1)
assert scheduler.gpu_state.used_vram_mb == 16384
# Start another 16GB model (should work, exactly fills VRAM)
job2 = scheduler.submit_job("big-2", "playground", "mimo_v2_pro", Priority.INTERACTIVE)
scheduler.start_job(job2)
assert scheduler.gpu_state.used_vram_mb == 32768 # Full
# Now try a third model - should get CPU fallback
job3 = scheduler.submit_job("big-3", "harvester", "mimo_v2_pro", Priority.BATCH)
next_job = scheduler.get_next_job()
# Should get job3 with CPU fallback since VRAM is full
assert next_job.job_id == "big-3"
assert next_job.use_cpu_fallback
def test_get_status(self, scheduler):
"""Get scheduler status."""
scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
scheduler.submit_job("test-2", "harvester", "llama3_8b", Priority.BATCH)
status = scheduler.get_status()
assert status["gpu"]["total_vram_mb"] == 32768
assert status["queue"]["pending"] == 2
assert status["queue"]["by_priority"]["INTERACTIVE"] == 1
assert status["queue"]["by_priority"]["BATCH"] == 1
def test_register_model(self, scheduler):
"""Register a custom model."""
custom = ModelSpec(name="Custom Model", vram_mb=4096)
scheduler.register_model("custom_model", custom)
assert "custom_model" in MODEL_REGISTRY
job = scheduler.submit_job("test-1", "playground", "custom_model")
assert job.model.vram_mb == 4096
class TestCrossProjectScenarios:
"""Test cross-project scenarios from the issue."""
def test_video_forge_batch_plus_lpm_live(self, scheduler):
"""
Video Forge batch + LPM live.
LPM should get priority, batch should queue.
"""
# Video Forge batch job
vf_job = scheduler.submit_job(
"vf-batch-1", "video_forge", "sd_xl", Priority.BATCH
)
# LPM live job (higher priority)
lpm_job = scheduler.submit_job(
"lpm-live-1", "lpm", "lpm_video", Priority.REALTIME
)
# Next job should be LPM
next_job = scheduler.get_next_job()
assert next_job.job_id == "lpm-live-1"
assert next_job.priority == Priority.REALTIME
def test_three_video_forge_jobs(self, scheduler):
"""Three Video Forge jobs should queue sequentially."""
jobs = []
for i in range(3):
job = scheduler.submit_job(
f"vf-{i}", "video_forge", "sd_xl", Priority.BATCH
)
jobs.append(job)
# Start first
scheduler.start_job(jobs[0])
assert scheduler.gpu_state.used_vram_mb == 8192
# Second should queue (VRAM occupied)
next_job = scheduler.get_next_job()
assert next_job.job_id == "vf-1"
def test_night_harvester_plus_playground(self, scheduler):
"""Night harvester runs on idle cycles."""
harvester = scheduler.submit_job(
"harvest-1", "harvester", "llama3_8b", Priority.BATCH
)
playground = scheduler.submit_job(
"play-1", "playground", "sdxl_turbo", Priority.INTERACTIVE
)
# Playground should get priority
next_job = scheduler.get_next_job()
assert next_job.job_id == "play-1"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

428
tools/gpu_scheduler.py Normal file
View File

@@ -0,0 +1,428 @@
"""
GPU Inference Scheduler — Multi-Model Resource Management
Queue-based model loading with priority lanes and VRAM budget tracking.
Prevents GPU OOM crashes when multiple projects compete for VRAM.
Priority lanes:
1. real-time (LPM) — highest priority, interactive
2. interactive (playground) — user-facing, medium priority
3. batch (harvester) — background, lowest priority
"""
import json
import time
import threading
import logging
from enum import IntEnum
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field, asdict
logger = logging.getLogger("hermes.gpu_scheduler")
class Priority(IntEnum):
"""Job priority levels. Lower value = higher priority."""
REALTIME = 1 # LPM, live video, interactive sessions
INTERACTIVE = 2 # Playground, chat, user-facing
BATCH = 3 # Harvester, overnight jobs, background
@dataclass
class ModelSpec:
"""Specification for a model and its VRAM requirements."""
name: str
vram_mb: int # VRAM required in MB
loader: str = "ollama" # How to load: ollama, vllm, llama_cpp, custom
model_id: str = "" # Model identifier (e.g., "llama3:70b")
cacheable: bool = True # Can be cached between jobs
cpu_fallback: bool = True # Can fall back to CPU if GPU busy
estimated_batch_ms: int = 1000 # Estimated time per batch
@dataclass
class InferenceJob:
"""A job requesting GPU inference."""
job_id: str
project: str # "video_forge", "lpm", "playground", "harvester"
model: ModelSpec
priority: Priority
batch_size: int = 1
created_at: float = field(default_factory=time.time)
started_at: Optional[float] = None
completed_at: Optional[float] = None
status: str = "queued" # queued, loading, running, completed, failed
error: Optional[str] = None
use_cpu_fallback: bool = False
@dataclass
class GPUState:
"""Current GPU state."""
total_vram_mb: int = 0
used_vram_mb: int = 0
loaded_models: List[str] = field(default_factory=list)
active_job: Optional[str] = None
@property
def available_vram_mb(self) -> int:
return self.total_vram_mb - self.used_vram_mb
def can_fit(self, model: ModelSpec) -> bool:
return self.available_vram_mb >= model.vram_mb
# Known models and their VRAM requirements
MODEL_REGISTRY: Dict[str, ModelSpec] = {
# Video Forge models
"sd_xl": ModelSpec(name="Stable Diffusion XL", vram_mb=8192, loader="comfyui", model_id="sd_xl"),
"heartmula": ModelSpec(name="HeartMuLa", vram_mb=4096, loader="custom", model_id="heartmula"),
"wan2.1": ModelSpec(name="Wan2.1", vram_mb=12288, loader="custom", model_id="wan2.1"),
# LPM models
"lpm_video": ModelSpec(name="LPM Video Gen", vram_mb=16384, loader="custom", model_id="lpm_video"),
"lpm_a2a": ModelSpec(name="LPM A2A", vram_mb=8192, loader="custom", model_id="lpm_a2a"),
# Local inference (hermes)
"llama3_70b": ModelSpec(name="Llama 3 70B", vram_mb=40960, loader="ollama", model_id="llama3:70b"),
"llama3_8b": ModelSpec(name="Llama 3 8B", vram_mb=8192, loader="ollama", model_id="llama3:8b"),
"mimo_v2_pro": ModelSpec(name="MiMo v2 Pro", vram_mb=16384, loader="ollama", model_id="xiaomi/mimo-v2-pro"),
# Playground
"sdxl_turbo": ModelSpec(name="SDXL Turbo", vram_mb=6144, loader="comfyui", model_id="sdxl_turbo"),
}
# Default VRAM budget (can be overridden)
DEFAULT_VRAM_MB = 49152 # 48GB (e.g., L40S, A6000)
class InferenceScheduler:
"""
GPU Inference Scheduler.
Manages a queue of inference jobs with priority scheduling,
VRAM budget tracking, and CPU fallback.
"""
def __init__(
self,
vram_budget_mb: int = DEFAULT_VRAM_MB,
queue_db: str = "~/.hermes/gpu_scheduler.db",
):
self.vram_budget_mb = vram_budget_mb
self.queue_db = Path(queue_db).expanduser()
self.queue_db.parent.mkdir(parents=True, exist_ok=True)
# State
self.gpu_state = GPUState(total_vram_mb=vram_budget_mb)
self.job_queue: List[InferenceJob] = []
self.completed_jobs: List[InferenceJob] = []
self._lock = threading.Lock()
self._running = False
self._worker_thread: Optional[threading.Thread] = None
# Load persisted state
self._load_state()
logger.info(
"GPU Scheduler initialized: %dMB VRAM budget",
vram_budget_mb,
)
def _load_state(self):
"""Load state from SQLite."""
import sqlite3
conn = sqlite3.connect(str(self.queue_db))
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
job_id TEXT PRIMARY KEY,
project TEXT,
model_name TEXT,
priority INTEGER,
batch_size INTEGER,
created_at REAL,
started_at REAL,
completed_at REAL,
status TEXT,
error TEXT,
use_cpu_fallback INTEGER
)
""")
conn.commit()
# Load pending jobs
rows = conn.execute(
"SELECT * FROM jobs WHERE status IN ('queued', 'loading', 'running')"
).fetchall()
for row in rows:
model_name = row[2]
model = MODEL_REGISTRY.get(model_name, ModelSpec(name=model_name, vram_mb=8192))
job = InferenceJob(
job_id=row[0],
project=row[1],
model=model,
priority=Priority(row[3]),
batch_size=row[4],
created_at=row[5],
started_at=row[6],
completed_at=row[7],
status=row[8],
error=row[9],
use_cpu_fallback=bool(row[10]),
)
self.job_queue.append(job)
conn.close()
logger.info("Loaded %d pending jobs", len(self.job_queue))
def _save_job(self, job: InferenceJob):
"""Persist job to SQLite."""
import sqlite3
conn = sqlite3.connect(str(self.queue_db))
conn.execute("""
INSERT OR REPLACE INTO jobs
(job_id, project, model_name, priority, batch_size, created_at,
started_at, completed_at, status, error, use_cpu_fallback)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
job.job_id,
job.project,
job.model.name,
job.priority.value,
job.batch_size,
job.created_at,
job.started_at,
job.completed_at,
job.status,
job.error,
int(job.use_cpu_fallback),
))
conn.commit()
conn.close()
def submit_job(
self,
job_id: str,
project: str,
model_name: str,
priority: Priority = Priority.BATCH,
batch_size: int = 1,
) -> InferenceJob:
"""
Submit an inference job to the queue.
Args:
job_id: Unique job identifier
project: Project name (video_forge, lpm, playground, harvester)
model_name: Model name from MODEL_REGISTRY
priority: Job priority
batch_size: Number of items to process
Returns:
The created InferenceJob
"""
model = MODEL_REGISTRY.get(model_name)
if not model:
raise ValueError(f"Unknown model: {model_name}. Registered: {list(MODEL_REGISTRY.keys())}")
job = InferenceJob(
job_id=job_id,
project=project,
model=model,
priority=priority,
batch_size=batch_size,
)
with self._lock:
# Insert in priority order
inserted = False
for i, existing in enumerate(self.job_queue):
if job.priority < existing.priority:
self.job_queue.insert(i, job)
inserted = True
break
if not inserted:
self.job_queue.append(job)
self._save_job(job)
logger.info(
"Job submitted: %s (project=%s, model=%s, priority=%s)",
job_id, project, model_name, priority.name,
)
return job
def get_next_job(self) -> Optional[InferenceJob]:
"""Get the next job to process based on priority and VRAM availability."""
with self._lock:
for job in self.job_queue:
if job.status != "queued":
continue
# Check if model fits in VRAM
if self.gpu_state.can_fit(job.model):
return job
# Check CPU fallback
if job.model.cpu_fallback:
job.use_cpu_fallback = True
return job
return None
def start_job(self, job: InferenceJob) -> bool:
"""
Mark a job as started and load its model.
Returns True if successful, False if insufficient VRAM.
"""
with self._lock:
if not job.use_cpu_fallback:
if not self.gpu_state.can_fit(job.model):
logger.warning(
"Insufficient VRAM for %s: need %dMB, have %dMB",
job.model.name,
job.model.vram_mb,
self.gpu_state.available_vram_mb,
)
return False
# Reserve VRAM
self.gpu_state.used_vram_mb += job.model.vram_mb
if job.model.name not in self.gpu_state.loaded_models:
self.gpu_state.loaded_models.append(job.model.name)
job.status = "loading"
job.started_at = time.time()
self.gpu_state.active_job = job.job_id
self._save_job(job)
logger.info(
"Job started: %s (model=%s, cpu_fallback=%s, vram_used=%dMB)",
job.job_id,
job.model.name,
job.use_cpu_fallback,
self.gpu_state.used_vram_mb,
)
return True
def complete_job(self, job: InferenceJob, error: str = None):
"""Mark a job as completed and release its VRAM."""
with self._lock:
job.completed_at = time.time()
job.status = "completed" if not error else "failed"
job.error = error
if not job.use_cpu_fallback:
# Release VRAM
self.gpu_state.used_vram_mb = max(
0,
self.gpu_state.used_vram_mb - job.model.vram_mb,
)
if self.gpu_state.active_job == job.job_id:
self.gpu_state.active_job = None
# Move to completed
self.job_queue.remove(job)
self.completed_jobs.append(job)
self._save_job(job)
duration = (job.completed_at - job.started_at) * 1000 if job.started_at else 0
logger.info(
"Job completed: %s (status=%s, duration=%.0fms)",
job.job_id,
job.status,
duration,
)
def get_status(self) -> Dict[str, Any]:
"""Get scheduler status."""
with self._lock:
return {
"gpu": {
"total_vram_mb": self.gpu_state.total_vram_mb,
"used_vram_mb": self.gpu_state.used_vram_mb,
"available_vram_mb": self.gpu_state.available_vram_mb,
"utilization_pct": round(
self.gpu_state.used_vram_mb / self.gpu_state.total_vram_mb * 100, 1
),
"loaded_models": self.gpu_state.loaded_models,
"active_job": self.gpu_state.active_job,
},
"queue": {
"pending": len([j for j in self.job_queue if j.status == "queued"]),
"loading": len([j for j in self.job_queue if j.status == "loading"]),
"running": len([j for j in self.job_queue if j.status == "running"]),
"by_priority": {
p.name: len([j for j in self.job_queue if j.priority == p and j.status == "queued"])
for p in Priority
},
},
"completed": {
"total": len(self.completed_jobs),
"success": len([j for j in self.completed_jobs if j.status == "completed"]),
"failed": len([j for j in self.completed_jobs if j.status == "failed"]),
},
}
def register_model(self, name: str, spec: ModelSpec):
"""Register a new model."""
MODEL_REGISTRY[name] = spec
logger.info("Registered model: %s (%dMB VRAM)", name, spec.vram_mb)
def clear_completed(self):
"""Clear completed jobs from memory (keep in DB)."""
with self._lock:
self.completed_jobs.clear()
# ============================================================================
# CLI Interface
# ============================================================================
def main():
"""CLI entry point for testing."""
import argparse
parser = argparse.ArgumentParser(description="GPU Inference Scheduler")
parser.add_argument("action", choices=["status", "submit", "list", "clear"])
parser.add_argument("--job-id", help="Job ID for submit")
parser.add_argument("--project", help="Project name")
parser.add_argument("--model", help="Model name")
parser.add_argument("--priority", choices=["realtime", "interactive", "batch"], default="batch")
parser.add_argument("--vram", type=int, default=DEFAULT_VRAM_MB, help="VRAM budget in MB")
args = parser.parse_args()
scheduler = InferenceScheduler(vram_budget_mb=args.vram)
if args.action == "status":
status = scheduler.get_status()
print(json.dumps(status, indent=2))
elif args.action == "submit":
if not all([args.job_id, args.project, args.model]):
print("Error: --job-id, --project, and --model required for submit")
return
priority = Priority[args.priority.upper()]
job = scheduler.submit_job(args.job_id, args.project, args.model, priority)
print(f"Submitted: {job.job_id}")
elif args.action == "list":
print(f"Pending jobs: {len(scheduler.job_queue)}")
for job in scheduler.job_queue:
print(f" {job.job_id}: {job.project}/{job.model.name} [{job.priority.name}] {job.status}")
elif args.action == "clear":
scheduler.clear_completed()
print("Cleared completed jobs from memory")
if __name__ == "__main__":
main()

View File

@@ -1,571 +0,0 @@
"""FFmpeg Video Composition Pipeline — Shared Infrastructure (Issue #643).
Used by both Video Forge (playground #53) and LPM 1.0 (#641).
Provides:
- Image sequence → video with crossfade transitions
- Audio-video sync with beat alignment
- Ken Burns effect (slow pan/zoom on stills)
- H.264/WebM encoding optimized for web
- Streaming output support
Dependencies: ffmpeg (system binary), optional ffmpeg-python bindings.
"""
import json
import logging
import os
import subprocess
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class VideoSpec:
"""Video output specification."""
width: int = 1920
height: int = 1080
fps: int = 30
codec: str = "libx264" # libx264, libvpx-vp9
container: str = "mp4" # mp4, webm
crf: int = 23 # quality (lower = better, 18-28 typical)
preset: str = "medium" # ultrafast, fast, medium, slow
audio_codec: str = "aac"
audio_bitrate: str = "192k"
@dataclass
class Transition:
"""Crossfade transition between clips."""
duration_sec: float = 1.0
type: str = "fade" # fade, dissolve, slideleft, slideright
@dataclass
class KenBurnsConfig:
"""Ken Burns pan/zoom effect configuration."""
zoom_start: float = 1.0
zoom_end: float = 1.15
pan_x: float = 0.0 # -1 to 1
pan_y: float = 0.0 # -1 to 1
duration_sec: float = 5.0
class FFmpegPipeline:
"""FFmpeg wrapper for video composition.
Provides a Python interface to FFmpeg's complex filter graphs
for composing videos from images, clips, and audio.
"""
def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):
self.ffmpeg = ffmpeg_path
self.ffprobe = ffprobe_path
self._verify_ffmpeg()
def _verify_ffmpeg(self):
"""Verify FFmpeg is available."""
try:
r = subprocess.run([self.ffmpeg, "-version"], capture_output=True, text=True, timeout=5)
if r.returncode != 0:
raise RuntimeError(f"FFmpeg returned {r.returncode}")
logger.info("FFmpeg verified: %s", r.stdout.split('\n')[0])
except FileNotFoundError:
raise RuntimeError(f"FFmpeg not found at '{self.ffmpeg}'. Install: brew install ffmpeg")
def probe(self, path: str) -> Dict[str, Any]:
"""Probe a media file for metadata."""
cmd = [
self.ffprobe, "-v", "quiet", "-print_format", "json",
"-show_format", "-show_streams", path
]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if r.returncode != 0:
raise RuntimeError(f"ffprobe failed: {r.stderr}")
return json.loads(r.stdout)
def get_duration(self, path: str) -> float:
"""Get duration of a media file in seconds."""
info = self.probe(path)
return float(info["format"]["duration"])
# =================================================================
# Image Sequence → Video
# =================================================================
def images_to_video(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec = VideoSpec(),
transition: Optional[Transition] = None,
image_duration: float = 3.0,
) -> str:
"""Convert a list of images to a video with optional crossfade transitions.
Args:
image_paths: List of image file paths
output_path: Output video path
spec: Video output specification
transition: Crossfade config (None = hard cuts)
image_duration: Seconds per image
Returns:
Path to output video
"""
if not image_paths:
raise ValueError("No images provided")
if transition and len(image_paths) > 1:
return self._images_with_crossfade(
image_paths, output_path, spec, transition, image_duration
)
else:
return self._images_hard_cut(
image_paths, output_path, spec, image_duration
)
def _images_hard_cut(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec,
duration: float,
) -> str:
"""Simple image sequence with hard cuts."""
# Create concat file
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
concat_path = f.name
for img in image_paths:
f.write(f"file '{os.path.abspath(img)}'\n")
f.write(f"duration {duration}\n")
# Last image needs repeating for ffmpeg concat
f.write(f"file '{os.path.abspath(image_paths[-1])}'\n")
try:
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path,
"-vf", f"scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p", "-r", str(spec.fps),
output_path
]
self._run_ffmpeg(cmd)
return output_path
finally:
os.unlink(concat_path)
def _images_with_crossfade(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec,
transition: Transition,
duration: float,
) -> str:
"""Image sequence with xfade crossfade transitions."""
n = len(image_paths)
fade_dur = transition.duration_sec
clip_dur = duration
# Build filter complex
inputs = []
for img in image_paths:
inputs.extend(["-loop", "1", "-t", str(clip_dur), "-i", img])
# Scale all inputs
filter_parts = []
for i in range(n):
filter_parts.append(
f"[{i}:v]scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,"
f"pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={spec.fps}[v{i}]"
)
# Chain xfade transitions
if n == 1:
filter_parts.append(f"[v0]copy[outv]")
else:
offset = clip_dur - fade_dur
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
for i in range(1, n - 1):
offset = (i + 1) * clip_dur - (i + 1) * fade_dur
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
filter_parts.append(f"[xf{n-2}]copy[outv]")
filter_complex = ";\n".join(filter_parts)
cmd = [
self.ffmpeg, "-y",
*inputs,
"-filter_complex", filter_complex,
"-map", "[outv]",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Ken Burns Effect
# =================================================================
def apply_ken_burns(
self,
image_path: str,
output_path: str,
config: KenBurnsConfig = KenBurnsConfig(),
spec: VideoSpec = VideoSpec(),
) -> str:
"""Apply Ken Burns (pan + zoom) effect to a still image.
Creates a video from a single image with slow zoom and pan motion.
"""
# zoompan filter: z=zoom, x/y=pan position
# z goes from zoom_start to zoom_end over the duration
total_frames = int(config.duration_sec * spec.fps)
zoom_expr = f"min({config.zoom_start}+on*({config.zoom_zoom_end}-{config.zoom_start})/{total_frames},{config.zoom_end})"
# Pan expression: move from center ± pan offset
x_expr = f"(iw-iw/zoom)/2+{config.pan_x}*iw*0.1*(on/{total_frames})"
y_expr = f"(ih-ih/zoom)/2+{config.pan_y}*ih*0.1*(on/{total_frames})"
filter_str = (
f"zoompan=z='min(zoom+0.001,{config.zoom_end})'"
f":x='iw/2-(iw/zoom/2)+{config.pan_x}*10'"
f":y='ih/2-(ih/zoom/2)+{config.pan_y}*10'"
f":d={total_frames}:s={spec.width}x{spec.height}:fps={spec.fps}"
)
cmd = [
self.ffmpeg, "-y",
"-loop", "1", "-i", image_path,
"-vf", filter_str,
"-t", str(config.duration_sec),
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Audio-Video Sync
# =================================================================
def mux_audio_video(
self,
video_path: str,
audio_path: str,
output_path: str,
spec: VideoSpec = VideoSpec(),
offset_sec: float = 0.0,
) -> str:
"""Mux audio and video together with optional offset.
Args:
video_path: Video file path
audio_path: Audio file path
output_path: Output path
spec: Output spec
offset_sec: Audio offset in seconds (positive = delay audio)
"""
cmd = [
self.ffmpeg, "-y",
"-i", video_path,
"-i", audio_path,
]
if offset_sec > 0:
cmd.extend(["-itsoffset", str(offset_sec), "-i", audio_path])
cmd.extend(["-map", "0:v", "-map", "2:a"])
elif offset_sec < 0:
cmd.extend(["-itsoffset", str(-offset_sec), "-i", video_path])
cmd.extend(["-map", "2:v", "-map", "1:a"])
else:
cmd.extend(["-map", "0:v", "-map", "1:a"])
cmd.extend([
"-c:v", "copy",
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
"-shortest",
output_path
])
self._run_ffmpeg(cmd)
return output_path
def align_to_beats(
self,
video_path: str,
audio_path: str,
beat_times: List[float],
output_path: str,
spec: VideoSpec = VideoSpec(),
) -> str:
"""Align video cuts to audio beat times.
Splits video at beat timestamps and reassembles with audio sync.
"""
if not beat_times:
return self.mux_audio_video(video_path, audio_path, output_path, spec)
video_dur = self.get_duration(video_path)
# Build segment list based on beat times
segments = []
prev = 0.0
for beat in beat_times:
if beat > prev and beat <= video_dur:
segments.append((prev, beat))
prev = beat
if prev < video_dur:
segments.append((prev, video_dur))
# Extract segments
segment_paths = []
with tempfile.TemporaryDirectory() as tmpdir:
for i, (start, end) in enumerate(segments):
seg_path = os.path.join(tmpdir, f"seg_{i:04d}.ts")
cmd = [
self.ffmpeg, "-y", "-ss", str(start), "-i", video_path,
"-t", str(end - start), "-c", "copy", seg_path
]
self._run_ffmpeg(cmd)
segment_paths.append(seg_path)
# Concat segments
concat_path = os.path.join(tmpdir, "concat.txt")
with open(concat_path, "w") as f:
for sp in segment_paths:
f.write(f"file '{sp}'\n")
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path, "-c", "copy",
os.path.join(tmpdir, "video_aligned.mp4")
]
self._run_ffmpeg(cmd)
return self.mux_audio_video(
os.path.join(tmpdir, "video_aligned.mp4"),
audio_path, output_path, spec
)
# =================================================================
# Encoding
# =================================================================
def encode_web(
self,
input_path: str,
output_path: str,
spec: VideoSpec = VideoSpec(),
) -> str:
"""Encode for web playback (H.264 MP4 or WebM)."""
if spec.container == "webm":
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", "libvpx-vp9", "-crf", str(spec.crf), "-b:v", "0",
"-c:a", "libopus", "-b:a", spec.audio_bitrate,
"-pix_fmt", "yuv420p",
output_path
]
else:
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
"-movflags", "+faststart", # Progressive download
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Full Composition Pipeline
# =================================================================
def compose(
self,
images: List[str],
audio: Optional[str] = None,
output_path: str = "output.mp4",
spec: VideoSpec = VideoSpec(),
transition: Optional[Transition] = None,
ken_burns: Optional[KenBurnsConfig] = None,
beat_times: Optional[List[float]] = None,
) -> str:
"""Full composition pipeline: images → video → mux audio → encode.
This is the main entry point. Combines all pipeline components
into a single call.
Args:
images: List of image paths
audio: Optional audio file path
output_path: Final output path
spec: Video specification
transition: Crossfade config
ken_burns: Ken Burns config (applied to each image before composition)
beat_times: Beat timestamps for audio-visual sync
Returns:
Path to final video
"""
with tempfile.TemporaryDirectory() as tmpdir:
# Apply Ken Burns to each image if configured
processed_images = images
if ken_burns:
processed_images = []
for i, img in enumerate(images):
kb_path = os.path.join(tmpdir, f"kb_{i:04d}.mp4")
self.apply_ken_burns(img, kb_path, ken_burns, spec)
processed_images.append(kb_path)
# Compose images into video
video_path = os.path.join(tmpdir, "composed.mp4")
if processed_images and all(p.endswith('.mp4') for p in processed_images):
# All Ken Burns clips — concat them
self._concat_videos(processed_images, video_path, transition, spec)
else:
self.images_to_video(processed_images, video_path, spec, transition)
# Mux audio if provided
if audio:
if beat_times:
return self.align_to_beats(video_path, audio, beat_times, output_path, spec)
else:
return self.mux_audio_video(video_path, audio, output_path, spec)
else:
# Just copy the video
import shutil
shutil.copy2(video_path, output_path)
return output_path
def _concat_videos(
self,
video_paths: List[str],
output_path: str,
transition: Optional[Transition],
spec: VideoSpec,
) -> str:
"""Concatenate video files with optional transitions."""
if not transition:
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
concat_path = f.name
for vp in video_paths:
f.write(f"file '{os.path.abspath(vp)}'\n")
try:
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path, "-c", "copy", output_path
]
self._run_ffmpeg(cmd)
finally:
os.unlink(concat_path)
else:
# Use xfade for video clips too
n = len(video_paths)
inputs = []
for vp in video_paths:
inputs.extend(["-i", vp])
filter_parts = []
for i in range(n):
filter_parts.append(f"[{i}:v]fps={spec.fps},scale={spec.width}:{spec.height}[v{i}]")
fade_dur = transition.duration_sec
# Calculate offsets from durations
if n >= 2:
dur0 = self.get_duration(video_paths[0])
offset = dur0 - fade_dur
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
for i in range(1, n - 1):
duri = self.get_duration(video_paths[i])
offset += duri - fade_dur
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
filter_parts.append(f"[xf{n-2}]copy[outv]")
else:
filter_parts.append(f"[v0]copy[outv]")
cmd = [
self.ffmpeg, "-y", *inputs,
"-filter_complex", ";".join(filter_parts),
"-map", "[outv]",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Streaming
# =================================================================
def create_hls_stream(
self,
input_path: str,
output_dir: str,
segment_duration: int = 4,
) -> str:
"""Create HLS streaming output for progressive playback.
Returns:
Path to the .m3u8 playlist file
"""
os.makedirs(output_dir, exist_ok=True)
playlist = os.path.join(output_dir, "playlist.m3u8")
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", "libx264", "-crf", "23", "-preset", "fast",
"-c:a", "aac", "-b:a", "128k",
"-f", "hls", "-hls_time", str(segment_duration),
"-hls_list_size", "0", "-hls_segment_filename",
os.path.join(output_dir, "seg_%03d.ts"),
playlist
]
self._run_ffmpeg(cmd)
return playlist
# =================================================================
# Internal
# =================================================================
def _run_ffmpeg(self, cmd: List[str], timeout: int = 600):
"""Run an FFmpeg command with logging and error handling."""
logger.info("FFmpeg: %s", " ".join(cmd[:8]) + "...")
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode != 0:
logger.error("FFmpeg failed: %s", r.stderr[-500:])
raise RuntimeError(f"FFmpeg error (exit {r.returncode}): {r.stderr[-500:]}")
return r.stdout
# =========================================================================
# Convenience functions
# =========================================================================
def compose_video(
images: List[str],
audio: Optional[str] = None,
output: str = "output.mp4",
**kwargs,
) -> str:
"""One-call video composition from images and optional audio.
Convenience wrapper around FFmpegPipeline.compose().
"""
pipeline = FFmpegPipeline()
spec = kwargs.pop("spec", VideoSpec())
transition = kwargs.pop("transition", None)
ken_burns = kwargs.pop("ken_burns", None)
beat_times = kwargs.pop("beat_times", None)
return pipeline.compose(images, audio, output, spec, transition, ken_burns, beat_times)