Compare commits
2 Commits
fix/issue-
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
| 067a787c60 | |||
| 7445039841 |
@@ -1,129 +0,0 @@
|
||||
"""Tests for FFmpeg Video Composition Pipeline — Issue #643."""
|
||||
|
||||
import os
|
||||
import pytest
|
||||
import tempfile
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from tools.video_pipeline import (
|
||||
VideoSpec, Transition, KenBurnsConfig,
|
||||
FFmpegPipeline, compose_video,
|
||||
)
|
||||
|
||||
|
||||
class TestVideoSpec:
|
||||
def test_defaults(self):
|
||||
s = VideoSpec()
|
||||
assert s.width == 1920
|
||||
assert s.height == 1080
|
||||
assert s.fps == 30
|
||||
assert s.codec == "libx264"
|
||||
assert s.container == "mp4"
|
||||
assert s.crf == 23
|
||||
|
||||
def test_webm_spec(self):
|
||||
s = VideoSpec(codec="libvpx-vp9", container="webm", audio_codec="libopus")
|
||||
assert s.container == "webm"
|
||||
assert s.codec == "libvpx-vp9"
|
||||
|
||||
|
||||
class TestTransition:
|
||||
def test_defaults(self):
|
||||
t = Transition()
|
||||
assert t.duration_sec == 1.0
|
||||
assert t.type == "fade"
|
||||
|
||||
|
||||
class TestKenBurnsConfig:
|
||||
def test_defaults(self):
|
||||
k = KenBurnsConfig()
|
||||
assert k.zoom_start == 1.0
|
||||
assert k.zoom_end == 1.15
|
||||
assert k.duration_sec == 5.0
|
||||
|
||||
|
||||
class TestFFmpegPipelineInit:
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_verify_ffmpeg_passes(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg version 6.0")
|
||||
pipeline = FFmpegPipeline()
|
||||
assert pipeline.ffmpeg == "ffmpeg"
|
||||
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_verify_ffmpeg_fails(self, mock_run):
|
||||
mock_run.side_effect = FileNotFoundError()
|
||||
with pytest.raises(RuntimeError, match="FFmpeg not found"):
|
||||
FFmpegPipeline()
|
||||
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_custom_ffmpeg_path(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg")
|
||||
pipeline = FFmpegPipeline(ffmpeg_path="/usr/local/bin/ffmpeg")
|
||||
assert pipeline.ffmpeg == "/usr/local/bin/ffmpeg"
|
||||
|
||||
|
||||
class TestFFmpegPipelineProbe:
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_probe_returns_metadata(self, mock_run):
|
||||
mock_run.return_value = MagicMock(
|
||||
returncode=0,
|
||||
stdout='{"format": {"duration": "10.5"}, "streams": []}'
|
||||
)
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
pipeline.ffprobe = "ffprobe"
|
||||
result = pipeline.probe("/tmp/test.mp4")
|
||||
assert result["format"]["duration"] == "10.5"
|
||||
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_get_duration(self, mock_run):
|
||||
mock_run.return_value = MagicMock(
|
||||
returncode=0,
|
||||
stdout='{"format": {"duration": "42.3"}, "streams": []}'
|
||||
)
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
pipeline.ffprobe = "ffprobe"
|
||||
assert pipeline.get_duration("/tmp/test.mp4") == 42.3
|
||||
|
||||
|
||||
class TestFFmpegPipelineImagesToVideo:
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_empty_images_raises(self, mock_run):
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
pipeline.ffmpeg = "ffmpeg"
|
||||
with pytest.raises(ValueError, match="No images"):
|
||||
pipeline.images_to_video([], "/tmp/out.mp4")
|
||||
|
||||
|
||||
class TestComposeVideo:
|
||||
@patch("tools.video_pipeline.FFmpegPipeline")
|
||||
def test_compose_calls_pipeline(self, MockPipeline):
|
||||
mock_instance = MagicMock()
|
||||
mock_instance.compose.return_value = "/tmp/output.mp4"
|
||||
MockPipeline.return_value = mock_instance
|
||||
|
||||
result = compose_video(["img1.png", "img2.png"], output="/tmp/out.mp4")
|
||||
assert result == "/tmp/output.mp4"
|
||||
mock_instance.compose.assert_called_once()
|
||||
|
||||
|
||||
class TestRunFFmpeg:
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_success(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="")
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
pipeline._run_ffmpeg(["ffmpeg", "-version"])
|
||||
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_failure_raises(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=1, stderr="Error: invalid input")
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
with pytest.raises(RuntimeError, match="FFmpeg error"):
|
||||
pipeline._run_ffmpeg(["ffmpeg", "-bad"])
|
||||
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_timeout_raises(self, mock_run):
|
||||
import subprocess
|
||||
mock_run.side_effect = subprocess.TimeoutExpired(cmd="ffmpeg", timeout=600)
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
with pytest.raises(subprocess.TimeoutExpired):
|
||||
pipeline._run_ffmpeg(["ffmpeg", "-slow"])
|
||||
256
tests/tools/test_gpu_scheduler.py
Normal file
256
tests/tools/test_gpu_scheduler.py
Normal file
@@ -0,0 +1,256 @@
|
||||
"""
|
||||
Tests for GPU Inference Scheduler.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from tools.gpu_scheduler import (
|
||||
Priority,
|
||||
ModelSpec,
|
||||
InferenceJob,
|
||||
InferenceScheduler,
|
||||
MODEL_REGISTRY,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scheduler():
|
||||
"""Create a scheduler with a temp database."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = Path(tmpdir) / "test_scheduler.db"
|
||||
sched = InferenceScheduler(vram_budget_mb=32768, queue_db=str(db_path))
|
||||
yield sched
|
||||
|
||||
|
||||
class TestPriority:
|
||||
"""Test priority ordering."""
|
||||
|
||||
def test_priority_ordering(self):
|
||||
"""Realtime < Interactive < Batch."""
|
||||
assert Priority.REALTIME < Priority.INTERACTIVE
|
||||
assert Priority.INTERACTIVE < Priority.BATCH
|
||||
|
||||
def test_priority_comparison(self):
|
||||
"""Lower value = higher priority."""
|
||||
assert Priority.REALTIME.value == 1
|
||||
assert Priority.INTERACTIVE.value == 2
|
||||
assert Priority.BATCH.value == 3
|
||||
|
||||
|
||||
class TestModelSpec:
|
||||
"""Test model specifications."""
|
||||
|
||||
def test_model_registry_has_models(self):
|
||||
"""Registry should have known models."""
|
||||
assert "llama3_70b" in MODEL_REGISTRY
|
||||
assert "sd_xl" in MODEL_REGISTRY
|
||||
assert "mimo_v2_pro" in MODEL_REGISTRY
|
||||
|
||||
def test_model_vram(self):
|
||||
"""Models should have VRAM requirements."""
|
||||
llama = MODEL_REGISTRY["llama3_70b"]
|
||||
assert llama.vram_mb > 0
|
||||
assert llama.vram_mb == 40960 # 40GB
|
||||
|
||||
|
||||
class TestInferenceScheduler:
|
||||
"""Test the scheduler."""
|
||||
|
||||
def test_init(self, scheduler):
|
||||
"""Scheduler should initialize."""
|
||||
assert scheduler.vram_budget_mb == 32768
|
||||
assert scheduler.gpu_state.total_vram_mb == 32768
|
||||
assert len(scheduler.job_queue) == 0
|
||||
|
||||
def test_submit_job(self, scheduler):
|
||||
"""Submit a job."""
|
||||
job = scheduler.submit_job(
|
||||
job_id="test-1",
|
||||
project="playground",
|
||||
model_name="llama3_8b",
|
||||
priority=Priority.INTERACTIVE,
|
||||
)
|
||||
|
||||
assert job.job_id == "test-1"
|
||||
assert job.status == "queued"
|
||||
assert len(scheduler.job_queue) == 1
|
||||
|
||||
def test_submit_unknown_model(self, scheduler):
|
||||
"""Submit with unknown model should raise."""
|
||||
with pytest.raises(ValueError, match="Unknown model"):
|
||||
scheduler.submit_job(
|
||||
job_id="test-1",
|
||||
project="playground",
|
||||
model_name="nonexistent",
|
||||
)
|
||||
|
||||
def test_priority_ordering(self, scheduler):
|
||||
"""Jobs should be ordered by priority."""
|
||||
scheduler.submit_job("batch-1", "harvester", "llama3_8b", Priority.BATCH)
|
||||
scheduler.submit_job("rt-1", "lpm", "llama3_8b", Priority.REALTIME)
|
||||
scheduler.submit_job("int-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
|
||||
# RT should be first
|
||||
assert scheduler.job_queue[0].job_id == "rt-1"
|
||||
assert scheduler.job_queue[1].job_id == "int-1"
|
||||
assert scheduler.job_queue[2].job_id == "batch-1"
|
||||
|
||||
def test_get_next_job(self, scheduler):
|
||||
"""Get next job should return highest priority."""
|
||||
scheduler.submit_job("batch-1", "harvester", "llama3_8b", Priority.BATCH)
|
||||
scheduler.submit_job("rt-1", "lpm", "llama3_8b", Priority.REALTIME)
|
||||
|
||||
next_job = scheduler.get_next_job()
|
||||
assert next_job.job_id == "rt-1"
|
||||
|
||||
def test_start_job(self, scheduler):
|
||||
"""Start a job."""
|
||||
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
success = scheduler.start_job(job)
|
||||
|
||||
assert success
|
||||
assert job.status == "loading"
|
||||
assert job.started_at is not None
|
||||
assert scheduler.gpu_state.used_vram_mb == 8192 # llama3_8b VRAM
|
||||
|
||||
def test_complete_job(self, scheduler):
|
||||
"""Complete a job."""
|
||||
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
scheduler.start_job(job)
|
||||
scheduler.complete_job(job)
|
||||
|
||||
assert job.status == "completed"
|
||||
assert job.completed_at is not None
|
||||
assert scheduler.gpu_state.used_vram_mb == 0
|
||||
assert len(scheduler.job_queue) == 0
|
||||
assert len(scheduler.completed_jobs) == 1
|
||||
|
||||
def test_complete_job_with_error(self, scheduler):
|
||||
"""Complete a job with error."""
|
||||
job = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
scheduler.start_job(job)
|
||||
scheduler.complete_job(job, error="CUDA out of memory")
|
||||
|
||||
assert job.status == "failed"
|
||||
assert job.error == "CUDA out of memory"
|
||||
|
||||
def test_vram_tracking(self, scheduler):
|
||||
"""VRAM should be tracked correctly."""
|
||||
# Submit two small jobs
|
||||
job1 = scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
job2 = scheduler.submit_job("test-2", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
|
||||
# Start first
|
||||
scheduler.start_job(job1)
|
||||
assert scheduler.gpu_state.used_vram_mb == 8192
|
||||
|
||||
# Start second (should work, still have room)
|
||||
scheduler.start_job(job2)
|
||||
assert scheduler.gpu_state.used_vram_mb == 16384
|
||||
|
||||
# Complete first
|
||||
scheduler.complete_job(job1)
|
||||
assert scheduler.gpu_state.used_vram_mb == 8192
|
||||
|
||||
def test_cpu_fallback(self, scheduler):
|
||||
"""CPU fallback when VRAM full."""
|
||||
# Fill VRAM with two 16GB models (32GB total = our budget)
|
||||
job1 = scheduler.submit_job("big-1", "lpm", "mimo_v2_pro", Priority.REALTIME)
|
||||
scheduler.start_job(job1)
|
||||
assert scheduler.gpu_state.used_vram_mb == 16384
|
||||
|
||||
# Start another 16GB model (should work, exactly fills VRAM)
|
||||
job2 = scheduler.submit_job("big-2", "playground", "mimo_v2_pro", Priority.INTERACTIVE)
|
||||
scheduler.start_job(job2)
|
||||
assert scheduler.gpu_state.used_vram_mb == 32768 # Full
|
||||
|
||||
# Now try a third model - should get CPU fallback
|
||||
job3 = scheduler.submit_job("big-3", "harvester", "mimo_v2_pro", Priority.BATCH)
|
||||
next_job = scheduler.get_next_job()
|
||||
|
||||
# Should get job3 with CPU fallback since VRAM is full
|
||||
assert next_job.job_id == "big-3"
|
||||
assert next_job.use_cpu_fallback
|
||||
|
||||
def test_get_status(self, scheduler):
|
||||
"""Get scheduler status."""
|
||||
scheduler.submit_job("test-1", "playground", "llama3_8b", Priority.INTERACTIVE)
|
||||
scheduler.submit_job("test-2", "harvester", "llama3_8b", Priority.BATCH)
|
||||
|
||||
status = scheduler.get_status()
|
||||
|
||||
assert status["gpu"]["total_vram_mb"] == 32768
|
||||
assert status["queue"]["pending"] == 2
|
||||
assert status["queue"]["by_priority"]["INTERACTIVE"] == 1
|
||||
assert status["queue"]["by_priority"]["BATCH"] == 1
|
||||
|
||||
def test_register_model(self, scheduler):
|
||||
"""Register a custom model."""
|
||||
custom = ModelSpec(name="Custom Model", vram_mb=4096)
|
||||
scheduler.register_model("custom_model", custom)
|
||||
|
||||
assert "custom_model" in MODEL_REGISTRY
|
||||
|
||||
job = scheduler.submit_job("test-1", "playground", "custom_model")
|
||||
assert job.model.vram_mb == 4096
|
||||
|
||||
|
||||
class TestCrossProjectScenarios:
|
||||
"""Test cross-project scenarios from the issue."""
|
||||
|
||||
def test_video_forge_batch_plus_lpm_live(self, scheduler):
|
||||
"""
|
||||
Video Forge batch + LPM live.
|
||||
LPM should get priority, batch should queue.
|
||||
"""
|
||||
# Video Forge batch job
|
||||
vf_job = scheduler.submit_job(
|
||||
"vf-batch-1", "video_forge", "sd_xl", Priority.BATCH
|
||||
)
|
||||
|
||||
# LPM live job (higher priority)
|
||||
lpm_job = scheduler.submit_job(
|
||||
"lpm-live-1", "lpm", "lpm_video", Priority.REALTIME
|
||||
)
|
||||
|
||||
# Next job should be LPM
|
||||
next_job = scheduler.get_next_job()
|
||||
assert next_job.job_id == "lpm-live-1"
|
||||
assert next_job.priority == Priority.REALTIME
|
||||
|
||||
def test_three_video_forge_jobs(self, scheduler):
|
||||
"""Three Video Forge jobs should queue sequentially."""
|
||||
jobs = []
|
||||
for i in range(3):
|
||||
job = scheduler.submit_job(
|
||||
f"vf-{i}", "video_forge", "sd_xl", Priority.BATCH
|
||||
)
|
||||
jobs.append(job)
|
||||
|
||||
# Start first
|
||||
scheduler.start_job(jobs[0])
|
||||
assert scheduler.gpu_state.used_vram_mb == 8192
|
||||
|
||||
# Second should queue (VRAM occupied)
|
||||
next_job = scheduler.get_next_job()
|
||||
assert next_job.job_id == "vf-1"
|
||||
|
||||
def test_night_harvester_plus_playground(self, scheduler):
|
||||
"""Night harvester runs on idle cycles."""
|
||||
harvester = scheduler.submit_job(
|
||||
"harvest-1", "harvester", "llama3_8b", Priority.BATCH
|
||||
)
|
||||
playground = scheduler.submit_job(
|
||||
"play-1", "playground", "sdxl_turbo", Priority.INTERACTIVE
|
||||
)
|
||||
|
||||
# Playground should get priority
|
||||
next_job = scheduler.get_next_job()
|
||||
assert next_job.job_id == "play-1"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
428
tools/gpu_scheduler.py
Normal file
428
tools/gpu_scheduler.py
Normal file
@@ -0,0 +1,428 @@
|
||||
"""
|
||||
GPU Inference Scheduler — Multi-Model Resource Management
|
||||
|
||||
Queue-based model loading with priority lanes and VRAM budget tracking.
|
||||
Prevents GPU OOM crashes when multiple projects compete for VRAM.
|
||||
|
||||
Priority lanes:
|
||||
1. real-time (LPM) — highest priority, interactive
|
||||
2. interactive (playground) — user-facing, medium priority
|
||||
3. batch (harvester) — background, lowest priority
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
import logging
|
||||
from enum import IntEnum
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any
|
||||
from dataclasses import dataclass, field, asdict
|
||||
|
||||
logger = logging.getLogger("hermes.gpu_scheduler")
|
||||
|
||||
|
||||
class Priority(IntEnum):
|
||||
"""Job priority levels. Lower value = higher priority."""
|
||||
REALTIME = 1 # LPM, live video, interactive sessions
|
||||
INTERACTIVE = 2 # Playground, chat, user-facing
|
||||
BATCH = 3 # Harvester, overnight jobs, background
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelSpec:
|
||||
"""Specification for a model and its VRAM requirements."""
|
||||
name: str
|
||||
vram_mb: int # VRAM required in MB
|
||||
loader: str = "ollama" # How to load: ollama, vllm, llama_cpp, custom
|
||||
model_id: str = "" # Model identifier (e.g., "llama3:70b")
|
||||
cacheable: bool = True # Can be cached between jobs
|
||||
cpu_fallback: bool = True # Can fall back to CPU if GPU busy
|
||||
estimated_batch_ms: int = 1000 # Estimated time per batch
|
||||
|
||||
|
||||
@dataclass
|
||||
class InferenceJob:
|
||||
"""A job requesting GPU inference."""
|
||||
job_id: str
|
||||
project: str # "video_forge", "lpm", "playground", "harvester"
|
||||
model: ModelSpec
|
||||
priority: Priority
|
||||
batch_size: int = 1
|
||||
created_at: float = field(default_factory=time.time)
|
||||
started_at: Optional[float] = None
|
||||
completed_at: Optional[float] = None
|
||||
status: str = "queued" # queued, loading, running, completed, failed
|
||||
error: Optional[str] = None
|
||||
use_cpu_fallback: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class GPUState:
|
||||
"""Current GPU state."""
|
||||
total_vram_mb: int = 0
|
||||
used_vram_mb: int = 0
|
||||
loaded_models: List[str] = field(default_factory=list)
|
||||
active_job: Optional[str] = None
|
||||
|
||||
@property
|
||||
def available_vram_mb(self) -> int:
|
||||
return self.total_vram_mb - self.used_vram_mb
|
||||
|
||||
def can_fit(self, model: ModelSpec) -> bool:
|
||||
return self.available_vram_mb >= model.vram_mb
|
||||
|
||||
|
||||
# Known models and their VRAM requirements
|
||||
MODEL_REGISTRY: Dict[str, ModelSpec] = {
|
||||
# Video Forge models
|
||||
"sd_xl": ModelSpec(name="Stable Diffusion XL", vram_mb=8192, loader="comfyui", model_id="sd_xl"),
|
||||
"heartmula": ModelSpec(name="HeartMuLa", vram_mb=4096, loader="custom", model_id="heartmula"),
|
||||
"wan2.1": ModelSpec(name="Wan2.1", vram_mb=12288, loader="custom", model_id="wan2.1"),
|
||||
|
||||
# LPM models
|
||||
"lpm_video": ModelSpec(name="LPM Video Gen", vram_mb=16384, loader="custom", model_id="lpm_video"),
|
||||
"lpm_a2a": ModelSpec(name="LPM A2A", vram_mb=8192, loader="custom", model_id="lpm_a2a"),
|
||||
|
||||
# Local inference (hermes)
|
||||
"llama3_70b": ModelSpec(name="Llama 3 70B", vram_mb=40960, loader="ollama", model_id="llama3:70b"),
|
||||
"llama3_8b": ModelSpec(name="Llama 3 8B", vram_mb=8192, loader="ollama", model_id="llama3:8b"),
|
||||
"mimo_v2_pro": ModelSpec(name="MiMo v2 Pro", vram_mb=16384, loader="ollama", model_id="xiaomi/mimo-v2-pro"),
|
||||
|
||||
# Playground
|
||||
"sdxl_turbo": ModelSpec(name="SDXL Turbo", vram_mb=6144, loader="comfyui", model_id="sdxl_turbo"),
|
||||
}
|
||||
|
||||
# Default VRAM budget (can be overridden)
|
||||
DEFAULT_VRAM_MB = 49152 # 48GB (e.g., L40S, A6000)
|
||||
|
||||
|
||||
class InferenceScheduler:
|
||||
"""
|
||||
GPU Inference Scheduler.
|
||||
|
||||
Manages a queue of inference jobs with priority scheduling,
|
||||
VRAM budget tracking, and CPU fallback.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
vram_budget_mb: int = DEFAULT_VRAM_MB,
|
||||
queue_db: str = "~/.hermes/gpu_scheduler.db",
|
||||
):
|
||||
self.vram_budget_mb = vram_budget_mb
|
||||
self.queue_db = Path(queue_db).expanduser()
|
||||
self.queue_db.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# State
|
||||
self.gpu_state = GPUState(total_vram_mb=vram_budget_mb)
|
||||
self.job_queue: List[InferenceJob] = []
|
||||
self.completed_jobs: List[InferenceJob] = []
|
||||
self._lock = threading.Lock()
|
||||
self._running = False
|
||||
self._worker_thread: Optional[threading.Thread] = None
|
||||
|
||||
# Load persisted state
|
||||
self._load_state()
|
||||
|
||||
logger.info(
|
||||
"GPU Scheduler initialized: %dMB VRAM budget",
|
||||
vram_budget_mb,
|
||||
)
|
||||
|
||||
def _load_state(self):
|
||||
"""Load state from SQLite."""
|
||||
import sqlite3
|
||||
conn = sqlite3.connect(str(self.queue_db))
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
job_id TEXT PRIMARY KEY,
|
||||
project TEXT,
|
||||
model_name TEXT,
|
||||
priority INTEGER,
|
||||
batch_size INTEGER,
|
||||
created_at REAL,
|
||||
started_at REAL,
|
||||
completed_at REAL,
|
||||
status TEXT,
|
||||
error TEXT,
|
||||
use_cpu_fallback INTEGER
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
# Load pending jobs
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM jobs WHERE status IN ('queued', 'loading', 'running')"
|
||||
).fetchall()
|
||||
|
||||
for row in rows:
|
||||
model_name = row[2]
|
||||
model = MODEL_REGISTRY.get(model_name, ModelSpec(name=model_name, vram_mb=8192))
|
||||
job = InferenceJob(
|
||||
job_id=row[0],
|
||||
project=row[1],
|
||||
model=model,
|
||||
priority=Priority(row[3]),
|
||||
batch_size=row[4],
|
||||
created_at=row[5],
|
||||
started_at=row[6],
|
||||
completed_at=row[7],
|
||||
status=row[8],
|
||||
error=row[9],
|
||||
use_cpu_fallback=bool(row[10]),
|
||||
)
|
||||
self.job_queue.append(job)
|
||||
|
||||
conn.close()
|
||||
logger.info("Loaded %d pending jobs", len(self.job_queue))
|
||||
|
||||
def _save_job(self, job: InferenceJob):
|
||||
"""Persist job to SQLite."""
|
||||
import sqlite3
|
||||
conn = sqlite3.connect(str(self.queue_db))
|
||||
conn.execute("""
|
||||
INSERT OR REPLACE INTO jobs
|
||||
(job_id, project, model_name, priority, batch_size, created_at,
|
||||
started_at, completed_at, status, error, use_cpu_fallback)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
job.job_id,
|
||||
job.project,
|
||||
job.model.name,
|
||||
job.priority.value,
|
||||
job.batch_size,
|
||||
job.created_at,
|
||||
job.started_at,
|
||||
job.completed_at,
|
||||
job.status,
|
||||
job.error,
|
||||
int(job.use_cpu_fallback),
|
||||
))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def submit_job(
|
||||
self,
|
||||
job_id: str,
|
||||
project: str,
|
||||
model_name: str,
|
||||
priority: Priority = Priority.BATCH,
|
||||
batch_size: int = 1,
|
||||
) -> InferenceJob:
|
||||
"""
|
||||
Submit an inference job to the queue.
|
||||
|
||||
Args:
|
||||
job_id: Unique job identifier
|
||||
project: Project name (video_forge, lpm, playground, harvester)
|
||||
model_name: Model name from MODEL_REGISTRY
|
||||
priority: Job priority
|
||||
batch_size: Number of items to process
|
||||
|
||||
Returns:
|
||||
The created InferenceJob
|
||||
"""
|
||||
model = MODEL_REGISTRY.get(model_name)
|
||||
if not model:
|
||||
raise ValueError(f"Unknown model: {model_name}. Registered: {list(MODEL_REGISTRY.keys())}")
|
||||
|
||||
job = InferenceJob(
|
||||
job_id=job_id,
|
||||
project=project,
|
||||
model=model,
|
||||
priority=priority,
|
||||
batch_size=batch_size,
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
# Insert in priority order
|
||||
inserted = False
|
||||
for i, existing in enumerate(self.job_queue):
|
||||
if job.priority < existing.priority:
|
||||
self.job_queue.insert(i, job)
|
||||
inserted = True
|
||||
break
|
||||
if not inserted:
|
||||
self.job_queue.append(job)
|
||||
|
||||
self._save_job(job)
|
||||
|
||||
logger.info(
|
||||
"Job submitted: %s (project=%s, model=%s, priority=%s)",
|
||||
job_id, project, model_name, priority.name,
|
||||
)
|
||||
|
||||
return job
|
||||
|
||||
def get_next_job(self) -> Optional[InferenceJob]:
|
||||
"""Get the next job to process based on priority and VRAM availability."""
|
||||
with self._lock:
|
||||
for job in self.job_queue:
|
||||
if job.status != "queued":
|
||||
continue
|
||||
|
||||
# Check if model fits in VRAM
|
||||
if self.gpu_state.can_fit(job.model):
|
||||
return job
|
||||
|
||||
# Check CPU fallback
|
||||
if job.model.cpu_fallback:
|
||||
job.use_cpu_fallback = True
|
||||
return job
|
||||
|
||||
return None
|
||||
|
||||
def start_job(self, job: InferenceJob) -> bool:
|
||||
"""
|
||||
Mark a job as started and load its model.
|
||||
|
||||
Returns True if successful, False if insufficient VRAM.
|
||||
"""
|
||||
with self._lock:
|
||||
if not job.use_cpu_fallback:
|
||||
if not self.gpu_state.can_fit(job.model):
|
||||
logger.warning(
|
||||
"Insufficient VRAM for %s: need %dMB, have %dMB",
|
||||
job.model.name,
|
||||
job.model.vram_mb,
|
||||
self.gpu_state.available_vram_mb,
|
||||
)
|
||||
return False
|
||||
|
||||
# Reserve VRAM
|
||||
self.gpu_state.used_vram_mb += job.model.vram_mb
|
||||
if job.model.name not in self.gpu_state.loaded_models:
|
||||
self.gpu_state.loaded_models.append(job.model.name)
|
||||
|
||||
job.status = "loading"
|
||||
job.started_at = time.time()
|
||||
self.gpu_state.active_job = job.job_id
|
||||
self._save_job(job)
|
||||
|
||||
logger.info(
|
||||
"Job started: %s (model=%s, cpu_fallback=%s, vram_used=%dMB)",
|
||||
job.job_id,
|
||||
job.model.name,
|
||||
job.use_cpu_fallback,
|
||||
self.gpu_state.used_vram_mb,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def complete_job(self, job: InferenceJob, error: str = None):
|
||||
"""Mark a job as completed and release its VRAM."""
|
||||
with self._lock:
|
||||
job.completed_at = time.time()
|
||||
job.status = "completed" if not error else "failed"
|
||||
job.error = error
|
||||
|
||||
if not job.use_cpu_fallback:
|
||||
# Release VRAM
|
||||
self.gpu_state.used_vram_mb = max(
|
||||
0,
|
||||
self.gpu_state.used_vram_mb - job.model.vram_mb,
|
||||
)
|
||||
|
||||
if self.gpu_state.active_job == job.job_id:
|
||||
self.gpu_state.active_job = None
|
||||
|
||||
# Move to completed
|
||||
self.job_queue.remove(job)
|
||||
self.completed_jobs.append(job)
|
||||
self._save_job(job)
|
||||
|
||||
duration = (job.completed_at - job.started_at) * 1000 if job.started_at else 0
|
||||
logger.info(
|
||||
"Job completed: %s (status=%s, duration=%.0fms)",
|
||||
job.job_id,
|
||||
job.status,
|
||||
duration,
|
||||
)
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""Get scheduler status."""
|
||||
with self._lock:
|
||||
return {
|
||||
"gpu": {
|
||||
"total_vram_mb": self.gpu_state.total_vram_mb,
|
||||
"used_vram_mb": self.gpu_state.used_vram_mb,
|
||||
"available_vram_mb": self.gpu_state.available_vram_mb,
|
||||
"utilization_pct": round(
|
||||
self.gpu_state.used_vram_mb / self.gpu_state.total_vram_mb * 100, 1
|
||||
),
|
||||
"loaded_models": self.gpu_state.loaded_models,
|
||||
"active_job": self.gpu_state.active_job,
|
||||
},
|
||||
"queue": {
|
||||
"pending": len([j for j in self.job_queue if j.status == "queued"]),
|
||||
"loading": len([j for j in self.job_queue if j.status == "loading"]),
|
||||
"running": len([j for j in self.job_queue if j.status == "running"]),
|
||||
"by_priority": {
|
||||
p.name: len([j for j in self.job_queue if j.priority == p and j.status == "queued"])
|
||||
for p in Priority
|
||||
},
|
||||
},
|
||||
"completed": {
|
||||
"total": len(self.completed_jobs),
|
||||
"success": len([j for j in self.completed_jobs if j.status == "completed"]),
|
||||
"failed": len([j for j in self.completed_jobs if j.status == "failed"]),
|
||||
},
|
||||
}
|
||||
|
||||
def register_model(self, name: str, spec: ModelSpec):
|
||||
"""Register a new model."""
|
||||
MODEL_REGISTRY[name] = spec
|
||||
logger.info("Registered model: %s (%dMB VRAM)", name, spec.vram_mb)
|
||||
|
||||
def clear_completed(self):
|
||||
"""Clear completed jobs from memory (keep in DB)."""
|
||||
with self._lock:
|
||||
self.completed_jobs.clear()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CLI Interface
|
||||
# ============================================================================
|
||||
|
||||
def main():
|
||||
"""CLI entry point for testing."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="GPU Inference Scheduler")
|
||||
parser.add_argument("action", choices=["status", "submit", "list", "clear"])
|
||||
parser.add_argument("--job-id", help="Job ID for submit")
|
||||
parser.add_argument("--project", help="Project name")
|
||||
parser.add_argument("--model", help="Model name")
|
||||
parser.add_argument("--priority", choices=["realtime", "interactive", "batch"], default="batch")
|
||||
parser.add_argument("--vram", type=int, default=DEFAULT_VRAM_MB, help="VRAM budget in MB")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
scheduler = InferenceScheduler(vram_budget_mb=args.vram)
|
||||
|
||||
if args.action == "status":
|
||||
status = scheduler.get_status()
|
||||
print(json.dumps(status, indent=2))
|
||||
|
||||
elif args.action == "submit":
|
||||
if not all([args.job_id, args.project, args.model]):
|
||||
print("Error: --job-id, --project, and --model required for submit")
|
||||
return
|
||||
|
||||
priority = Priority[args.priority.upper()]
|
||||
job = scheduler.submit_job(args.job_id, args.project, args.model, priority)
|
||||
print(f"Submitted: {job.job_id}")
|
||||
|
||||
elif args.action == "list":
|
||||
print(f"Pending jobs: {len(scheduler.job_queue)}")
|
||||
for job in scheduler.job_queue:
|
||||
print(f" {job.job_id}: {job.project}/{job.model.name} [{job.priority.name}] {job.status}")
|
||||
|
||||
elif args.action == "clear":
|
||||
scheduler.clear_completed()
|
||||
print("Cleared completed jobs from memory")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,571 +0,0 @@
|
||||
"""FFmpeg Video Composition Pipeline — Shared Infrastructure (Issue #643).
|
||||
|
||||
Used by both Video Forge (playground #53) and LPM 1.0 (#641).
|
||||
|
||||
Provides:
|
||||
- Image sequence → video with crossfade transitions
|
||||
- Audio-video sync with beat alignment
|
||||
- Ken Burns effect (slow pan/zoom on stills)
|
||||
- H.264/WebM encoding optimized for web
|
||||
- Streaming output support
|
||||
|
||||
Dependencies: ffmpeg (system binary), optional ffmpeg-python bindings.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VideoSpec:
|
||||
"""Video output specification."""
|
||||
width: int = 1920
|
||||
height: int = 1080
|
||||
fps: int = 30
|
||||
codec: str = "libx264" # libx264, libvpx-vp9
|
||||
container: str = "mp4" # mp4, webm
|
||||
crf: int = 23 # quality (lower = better, 18-28 typical)
|
||||
preset: str = "medium" # ultrafast, fast, medium, slow
|
||||
audio_codec: str = "aac"
|
||||
audio_bitrate: str = "192k"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Transition:
|
||||
"""Crossfade transition between clips."""
|
||||
duration_sec: float = 1.0
|
||||
type: str = "fade" # fade, dissolve, slideleft, slideright
|
||||
|
||||
|
||||
@dataclass
|
||||
class KenBurnsConfig:
|
||||
"""Ken Burns pan/zoom effect configuration."""
|
||||
zoom_start: float = 1.0
|
||||
zoom_end: float = 1.15
|
||||
pan_x: float = 0.0 # -1 to 1
|
||||
pan_y: float = 0.0 # -1 to 1
|
||||
duration_sec: float = 5.0
|
||||
|
||||
|
||||
class FFmpegPipeline:
|
||||
"""FFmpeg wrapper for video composition.
|
||||
|
||||
Provides a Python interface to FFmpeg's complex filter graphs
|
||||
for composing videos from images, clips, and audio.
|
||||
"""
|
||||
|
||||
def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):
|
||||
self.ffmpeg = ffmpeg_path
|
||||
self.ffprobe = ffprobe_path
|
||||
self._verify_ffmpeg()
|
||||
|
||||
def _verify_ffmpeg(self):
|
||||
"""Verify FFmpeg is available."""
|
||||
try:
|
||||
r = subprocess.run([self.ffmpeg, "-version"], capture_output=True, text=True, timeout=5)
|
||||
if r.returncode != 0:
|
||||
raise RuntimeError(f"FFmpeg returned {r.returncode}")
|
||||
logger.info("FFmpeg verified: %s", r.stdout.split('\n')[0])
|
||||
except FileNotFoundError:
|
||||
raise RuntimeError(f"FFmpeg not found at '{self.ffmpeg}'. Install: brew install ffmpeg")
|
||||
|
||||
def probe(self, path: str) -> Dict[str, Any]:
|
||||
"""Probe a media file for metadata."""
|
||||
cmd = [
|
||||
self.ffprobe, "-v", "quiet", "-print_format", "json",
|
||||
"-show_format", "-show_streams", path
|
||||
]
|
||||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
if r.returncode != 0:
|
||||
raise RuntimeError(f"ffprobe failed: {r.stderr}")
|
||||
return json.loads(r.stdout)
|
||||
|
||||
def get_duration(self, path: str) -> float:
|
||||
"""Get duration of a media file in seconds."""
|
||||
info = self.probe(path)
|
||||
return float(info["format"]["duration"])
|
||||
|
||||
# =================================================================
|
||||
# Image Sequence → Video
|
||||
# =================================================================
|
||||
|
||||
def images_to_video(
|
||||
self,
|
||||
image_paths: List[str],
|
||||
output_path: str,
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
transition: Optional[Transition] = None,
|
||||
image_duration: float = 3.0,
|
||||
) -> str:
|
||||
"""Convert a list of images to a video with optional crossfade transitions.
|
||||
|
||||
Args:
|
||||
image_paths: List of image file paths
|
||||
output_path: Output video path
|
||||
spec: Video output specification
|
||||
transition: Crossfade config (None = hard cuts)
|
||||
image_duration: Seconds per image
|
||||
|
||||
Returns:
|
||||
Path to output video
|
||||
"""
|
||||
if not image_paths:
|
||||
raise ValueError("No images provided")
|
||||
|
||||
if transition and len(image_paths) > 1:
|
||||
return self._images_with_crossfade(
|
||||
image_paths, output_path, spec, transition, image_duration
|
||||
)
|
||||
else:
|
||||
return self._images_hard_cut(
|
||||
image_paths, output_path, spec, image_duration
|
||||
)
|
||||
|
||||
def _images_hard_cut(
|
||||
self,
|
||||
image_paths: List[str],
|
||||
output_path: str,
|
||||
spec: VideoSpec,
|
||||
duration: float,
|
||||
) -> str:
|
||||
"""Simple image sequence with hard cuts."""
|
||||
# Create concat file
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
||||
concat_path = f.name
|
||||
for img in image_paths:
|
||||
f.write(f"file '{os.path.abspath(img)}'\n")
|
||||
f.write(f"duration {duration}\n")
|
||||
# Last image needs repeating for ffmpeg concat
|
||||
f.write(f"file '{os.path.abspath(image_paths[-1])}'\n")
|
||||
|
||||
try:
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
|
||||
"-i", concat_path,
|
||||
"-vf", f"scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2",
|
||||
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||
"-pix_fmt", "yuv420p", "-r", str(spec.fps),
|
||||
output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
finally:
|
||||
os.unlink(concat_path)
|
||||
|
||||
def _images_with_crossfade(
|
||||
self,
|
||||
image_paths: List[str],
|
||||
output_path: str,
|
||||
spec: VideoSpec,
|
||||
transition: Transition,
|
||||
duration: float,
|
||||
) -> str:
|
||||
"""Image sequence with xfade crossfade transitions."""
|
||||
n = len(image_paths)
|
||||
fade_dur = transition.duration_sec
|
||||
clip_dur = duration
|
||||
|
||||
# Build filter complex
|
||||
inputs = []
|
||||
for img in image_paths:
|
||||
inputs.extend(["-loop", "1", "-t", str(clip_dur), "-i", img])
|
||||
|
||||
# Scale all inputs
|
||||
filter_parts = []
|
||||
for i in range(n):
|
||||
filter_parts.append(
|
||||
f"[{i}:v]scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,"
|
||||
f"pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={spec.fps}[v{i}]"
|
||||
)
|
||||
|
||||
# Chain xfade transitions
|
||||
if n == 1:
|
||||
filter_parts.append(f"[v0]copy[outv]")
|
||||
else:
|
||||
offset = clip_dur - fade_dur
|
||||
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
|
||||
for i in range(1, n - 1):
|
||||
offset = (i + 1) * clip_dur - (i + 1) * fade_dur
|
||||
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
|
||||
filter_parts.append(f"[xf{n-2}]copy[outv]")
|
||||
|
||||
filter_complex = ";\n".join(filter_parts)
|
||||
|
||||
cmd = [
|
||||
self.ffmpeg, "-y",
|
||||
*inputs,
|
||||
"-filter_complex", filter_complex,
|
||||
"-map", "[outv]",
|
||||
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||
"-pix_fmt", "yuv420p",
|
||||
output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
|
||||
# =================================================================
|
||||
# Ken Burns Effect
|
||||
# =================================================================
|
||||
|
||||
def apply_ken_burns(
|
||||
self,
|
||||
image_path: str,
|
||||
output_path: str,
|
||||
config: KenBurnsConfig = KenBurnsConfig(),
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
) -> str:
|
||||
"""Apply Ken Burns (pan + zoom) effect to a still image.
|
||||
|
||||
Creates a video from a single image with slow zoom and pan motion.
|
||||
"""
|
||||
# zoompan filter: z=zoom, x/y=pan position
|
||||
# z goes from zoom_start to zoom_end over the duration
|
||||
total_frames = int(config.duration_sec * spec.fps)
|
||||
|
||||
zoom_expr = f"min({config.zoom_start}+on*({config.zoom_zoom_end}-{config.zoom_start})/{total_frames},{config.zoom_end})"
|
||||
|
||||
# Pan expression: move from center ± pan offset
|
||||
x_expr = f"(iw-iw/zoom)/2+{config.pan_x}*iw*0.1*(on/{total_frames})"
|
||||
y_expr = f"(ih-ih/zoom)/2+{config.pan_y}*ih*0.1*(on/{total_frames})"
|
||||
|
||||
filter_str = (
|
||||
f"zoompan=z='min(zoom+0.001,{config.zoom_end})'"
|
||||
f":x='iw/2-(iw/zoom/2)+{config.pan_x}*10'"
|
||||
f":y='ih/2-(ih/zoom/2)+{config.pan_y}*10'"
|
||||
f":d={total_frames}:s={spec.width}x{spec.height}:fps={spec.fps}"
|
||||
)
|
||||
|
||||
cmd = [
|
||||
self.ffmpeg, "-y",
|
||||
"-loop", "1", "-i", image_path,
|
||||
"-vf", filter_str,
|
||||
"-t", str(config.duration_sec),
|
||||
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||
"-pix_fmt", "yuv420p",
|
||||
output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
|
||||
# =================================================================
|
||||
# Audio-Video Sync
|
||||
# =================================================================
|
||||
|
||||
def mux_audio_video(
|
||||
self,
|
||||
video_path: str,
|
||||
audio_path: str,
|
||||
output_path: str,
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
offset_sec: float = 0.0,
|
||||
) -> str:
|
||||
"""Mux audio and video together with optional offset.
|
||||
|
||||
Args:
|
||||
video_path: Video file path
|
||||
audio_path: Audio file path
|
||||
output_path: Output path
|
||||
spec: Output spec
|
||||
offset_sec: Audio offset in seconds (positive = delay audio)
|
||||
"""
|
||||
cmd = [
|
||||
self.ffmpeg, "-y",
|
||||
"-i", video_path,
|
||||
"-i", audio_path,
|
||||
]
|
||||
|
||||
if offset_sec > 0:
|
||||
cmd.extend(["-itsoffset", str(offset_sec), "-i", audio_path])
|
||||
cmd.extend(["-map", "0:v", "-map", "2:a"])
|
||||
elif offset_sec < 0:
|
||||
cmd.extend(["-itsoffset", str(-offset_sec), "-i", video_path])
|
||||
cmd.extend(["-map", "2:v", "-map", "1:a"])
|
||||
else:
|
||||
cmd.extend(["-map", "0:v", "-map", "1:a"])
|
||||
|
||||
cmd.extend([
|
||||
"-c:v", "copy",
|
||||
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
|
||||
"-shortest",
|
||||
output_path
|
||||
])
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
|
||||
def align_to_beats(
|
||||
self,
|
||||
video_path: str,
|
||||
audio_path: str,
|
||||
beat_times: List[float],
|
||||
output_path: str,
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
) -> str:
|
||||
"""Align video cuts to audio beat times.
|
||||
|
||||
Splits video at beat timestamps and reassembles with audio sync.
|
||||
"""
|
||||
if not beat_times:
|
||||
return self.mux_audio_video(video_path, audio_path, output_path, spec)
|
||||
|
||||
video_dur = self.get_duration(video_path)
|
||||
|
||||
# Build segment list based on beat times
|
||||
segments = []
|
||||
prev = 0.0
|
||||
for beat in beat_times:
|
||||
if beat > prev and beat <= video_dur:
|
||||
segments.append((prev, beat))
|
||||
prev = beat
|
||||
if prev < video_dur:
|
||||
segments.append((prev, video_dur))
|
||||
|
||||
# Extract segments
|
||||
segment_paths = []
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
for i, (start, end) in enumerate(segments):
|
||||
seg_path = os.path.join(tmpdir, f"seg_{i:04d}.ts")
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-ss", str(start), "-i", video_path,
|
||||
"-t", str(end - start), "-c", "copy", seg_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
segment_paths.append(seg_path)
|
||||
|
||||
# Concat segments
|
||||
concat_path = os.path.join(tmpdir, "concat.txt")
|
||||
with open(concat_path, "w") as f:
|
||||
for sp in segment_paths:
|
||||
f.write(f"file '{sp}'\n")
|
||||
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
|
||||
"-i", concat_path, "-c", "copy",
|
||||
os.path.join(tmpdir, "video_aligned.mp4")
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
|
||||
return self.mux_audio_video(
|
||||
os.path.join(tmpdir, "video_aligned.mp4"),
|
||||
audio_path, output_path, spec
|
||||
)
|
||||
|
||||
# =================================================================
|
||||
# Encoding
|
||||
# =================================================================
|
||||
|
||||
def encode_web(
|
||||
self,
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
) -> str:
|
||||
"""Encode for web playback (H.264 MP4 or WebM)."""
|
||||
if spec.container == "webm":
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-i", input_path,
|
||||
"-c:v", "libvpx-vp9", "-crf", str(spec.crf), "-b:v", "0",
|
||||
"-c:a", "libopus", "-b:a", spec.audio_bitrate,
|
||||
"-pix_fmt", "yuv420p",
|
||||
output_path
|
||||
]
|
||||
else:
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-i", input_path,
|
||||
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
|
||||
"-movflags", "+faststart", # Progressive download
|
||||
"-pix_fmt", "yuv420p",
|
||||
output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
|
||||
# =================================================================
|
||||
# Full Composition Pipeline
|
||||
# =================================================================
|
||||
|
||||
def compose(
|
||||
self,
|
||||
images: List[str],
|
||||
audio: Optional[str] = None,
|
||||
output_path: str = "output.mp4",
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
transition: Optional[Transition] = None,
|
||||
ken_burns: Optional[KenBurnsConfig] = None,
|
||||
beat_times: Optional[List[float]] = None,
|
||||
) -> str:
|
||||
"""Full composition pipeline: images → video → mux audio → encode.
|
||||
|
||||
This is the main entry point. Combines all pipeline components
|
||||
into a single call.
|
||||
|
||||
Args:
|
||||
images: List of image paths
|
||||
audio: Optional audio file path
|
||||
output_path: Final output path
|
||||
spec: Video specification
|
||||
transition: Crossfade config
|
||||
ken_burns: Ken Burns config (applied to each image before composition)
|
||||
beat_times: Beat timestamps for audio-visual sync
|
||||
|
||||
Returns:
|
||||
Path to final video
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Apply Ken Burns to each image if configured
|
||||
processed_images = images
|
||||
if ken_burns:
|
||||
processed_images = []
|
||||
for i, img in enumerate(images):
|
||||
kb_path = os.path.join(tmpdir, f"kb_{i:04d}.mp4")
|
||||
self.apply_ken_burns(img, kb_path, ken_burns, spec)
|
||||
processed_images.append(kb_path)
|
||||
|
||||
# Compose images into video
|
||||
video_path = os.path.join(tmpdir, "composed.mp4")
|
||||
if processed_images and all(p.endswith('.mp4') for p in processed_images):
|
||||
# All Ken Burns clips — concat them
|
||||
self._concat_videos(processed_images, video_path, transition, spec)
|
||||
else:
|
||||
self.images_to_video(processed_images, video_path, spec, transition)
|
||||
|
||||
# Mux audio if provided
|
||||
if audio:
|
||||
if beat_times:
|
||||
return self.align_to_beats(video_path, audio, beat_times, output_path, spec)
|
||||
else:
|
||||
return self.mux_audio_video(video_path, audio, output_path, spec)
|
||||
else:
|
||||
# Just copy the video
|
||||
import shutil
|
||||
shutil.copy2(video_path, output_path)
|
||||
return output_path
|
||||
|
||||
def _concat_videos(
|
||||
self,
|
||||
video_paths: List[str],
|
||||
output_path: str,
|
||||
transition: Optional[Transition],
|
||||
spec: VideoSpec,
|
||||
) -> str:
|
||||
"""Concatenate video files with optional transitions."""
|
||||
if not transition:
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
||||
concat_path = f.name
|
||||
for vp in video_paths:
|
||||
f.write(f"file '{os.path.abspath(vp)}'\n")
|
||||
try:
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
|
||||
"-i", concat_path, "-c", "copy", output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
finally:
|
||||
os.unlink(concat_path)
|
||||
else:
|
||||
# Use xfade for video clips too
|
||||
n = len(video_paths)
|
||||
inputs = []
|
||||
for vp in video_paths:
|
||||
inputs.extend(["-i", vp])
|
||||
|
||||
filter_parts = []
|
||||
for i in range(n):
|
||||
filter_parts.append(f"[{i}:v]fps={spec.fps},scale={spec.width}:{spec.height}[v{i}]")
|
||||
|
||||
fade_dur = transition.duration_sec
|
||||
# Calculate offsets from durations
|
||||
if n >= 2:
|
||||
dur0 = self.get_duration(video_paths[0])
|
||||
offset = dur0 - fade_dur
|
||||
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
|
||||
for i in range(1, n - 1):
|
||||
duri = self.get_duration(video_paths[i])
|
||||
offset += duri - fade_dur
|
||||
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
|
||||
filter_parts.append(f"[xf{n-2}]copy[outv]")
|
||||
else:
|
||||
filter_parts.append(f"[v0]copy[outv]")
|
||||
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", *inputs,
|
||||
"-filter_complex", ";".join(filter_parts),
|
||||
"-map", "[outv]",
|
||||
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||
output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
|
||||
# =================================================================
|
||||
# Streaming
|
||||
# =================================================================
|
||||
|
||||
def create_hls_stream(
|
||||
self,
|
||||
input_path: str,
|
||||
output_dir: str,
|
||||
segment_duration: int = 4,
|
||||
) -> str:
|
||||
"""Create HLS streaming output for progressive playback.
|
||||
|
||||
Returns:
|
||||
Path to the .m3u8 playlist file
|
||||
"""
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
playlist = os.path.join(output_dir, "playlist.m3u8")
|
||||
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-i", input_path,
|
||||
"-c:v", "libx264", "-crf", "23", "-preset", "fast",
|
||||
"-c:a", "aac", "-b:a", "128k",
|
||||
"-f", "hls", "-hls_time", str(segment_duration),
|
||||
"-hls_list_size", "0", "-hls_segment_filename",
|
||||
os.path.join(output_dir, "seg_%03d.ts"),
|
||||
playlist
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return playlist
|
||||
|
||||
# =================================================================
|
||||
# Internal
|
||||
# =================================================================
|
||||
|
||||
def _run_ffmpeg(self, cmd: List[str], timeout: int = 600):
|
||||
"""Run an FFmpeg command with logging and error handling."""
|
||||
logger.info("FFmpeg: %s", " ".join(cmd[:8]) + "...")
|
||||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
||||
if r.returncode != 0:
|
||||
logger.error("FFmpeg failed: %s", r.stderr[-500:])
|
||||
raise RuntimeError(f"FFmpeg error (exit {r.returncode}): {r.stderr[-500:]}")
|
||||
return r.stdout
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Convenience functions
|
||||
# =========================================================================
|
||||
|
||||
def compose_video(
|
||||
images: List[str],
|
||||
audio: Optional[str] = None,
|
||||
output: str = "output.mp4",
|
||||
**kwargs,
|
||||
) -> str:
|
||||
"""One-call video composition from images and optional audio.
|
||||
|
||||
Convenience wrapper around FFmpegPipeline.compose().
|
||||
"""
|
||||
pipeline = FFmpegPipeline()
|
||||
spec = kwargs.pop("spec", VideoSpec())
|
||||
transition = kwargs.pop("transition", None)
|
||||
ken_burns = kwargs.pop("ken_burns", None)
|
||||
beat_times = kwargs.pop("beat_times", None)
|
||||
return pipeline.compose(images, audio, output, spec, transition, ken_burns, beat_times)
|
||||
Reference in New Issue
Block a user