Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
2c587d9d32 feat: FFmpeg video composition pipeline — shared infra (#643)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 37s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 33s
Tests / e2e (pull_request) Successful in 2m42s
Tests / test (pull_request) Failing after 38m21s
Shared video pipeline for Video Forge (#53) and LPM 1.0 (#641).

tools/video_pipeline.py:
- FFmpegPipeline class wrapping ffmpeg CLI
- Image sequence -> video with crossfade transitions
- Ken Burns pan/zoom effect on still images
- Audio-video muxing with beat alignment
- H.264/WebM encoding with faststart for web
- HLS streaming output
- Full compose() pipeline combining all components

tests/test_video_pipeline.py: 14 tests

Closes #643
2026-04-14 21:12:00 -04:00
5 changed files with 700 additions and 398 deletions

View File

@@ -1,80 +0,0 @@
# Approval Tier System
Graduated safety for command approval based on risk level.
## Tiers
| Tier | Name | Action Types | Who Approves | Timeout |
|------|------|--------------|--------------|---------|
| 0 | SAFE | Read, search, list, view | None | N/A |
| 1 | LOW | Write, create, edit, script | LLM only | N/A |
| 2 | MEDIUM | Messages, API, email | Human + LLM | 60s |
| 3 | HIGH | Crypto, config, deploy | Human + LLM | 30s |
| 4 | CRITICAL | Delete, kill, shutdown | Human + LLM | 10s |
## How It Works
1. **Detection**: `detect_tier(command, action)` analyzes the command and action type
2. **Auto-approve**: SAFE and LOW tiers are automatically approved
3. **Human approval**: MEDIUM+ tiers require human confirmation
4. **Timeout handling**: If no response within timeout, escalate to next tier
5. **Crisis bypass**: 988 Lifeline commands bypass approval entirely
## Usage
```python
from tools.approval import TieredApproval, detect_tier, ApprovalTier
# Detect tier
tier = detect_tier("rm -rf /tmp/data") # Returns ApprovalTier.CRITICAL
# Request approval
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
if result["approved"]:
# Auto-approved (SAFE or LOW tier)
execute_command()
else:
# Needs human approval
show_approval_ui(result["approval_id"], result["tier"], result["timeout"])
```
## Crisis Bypass
Commands containing crisis keywords (988, suicide, self-harm, crisis hotline) automatically bypass approval to ensure immediate help:
```python
from tools.approval import is_crisis_bypass
is_crisis_bypass("call 988 for help") # True — bypasses approval
```
## Timeout Escalation
When a tier times out without human response:
- MEDIUM → HIGH (30s timeout)
- HIGH → CRITICAL (10s timeout)
- CRITICAL → Deny
## Integration
The tier system integrates with:
- **CLI**: Interactive prompts with tier-aware timeouts
- **Gateway**: Telegram/Discord approval buttons
- **Cron**: Auto-approve LOW tier, escalate MEDIUM+
## Testing
Run tests with:
```bash
python -m pytest tests/test_approval_tiers.py -v
```
26 tests covering:
- Tier detection from commands and actions
- Timeout values per tier
- Approver requirements
- Crisis bypass logic
- Approval request and resolution
- Timeout escalation

View File

@@ -1,141 +0,0 @@
"""Tests for approval tier system (Issue #670)."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools.approval import (
ApprovalTier, detect_tier, get_tier_timeout, get_tier_approvers,
requires_human_approval, is_crisis_bypass, TieredApproval, get_tiered_approval
)
class TestApprovalTier:
def test_safe_read(self):
assert detect_tier("cat file.txt") == ApprovalTier.SAFE
def test_safe_search(self):
assert detect_tier("grep pattern file") == ApprovalTier.SAFE
def test_low_write(self):
assert detect_tier("write to file", action="write") == ApprovalTier.LOW
def test_medium_message(self):
assert detect_tier("send message", action="send_message") == ApprovalTier.MEDIUM
def test_high_config(self):
assert detect_tier("edit config", action="config") == ApprovalTier.HIGH
def test_critical_delete(self):
assert detect_tier("rm -rf /", action="delete") == ApprovalTier.CRITICAL
def test_crisis_keyword(self):
assert detect_tier("call 988 for help") == ApprovalTier.CRITICAL
def test_dangerous_pattern_escalation(self):
# rm -rf should be CRITICAL
assert detect_tier("rm -rf /tmp/data") == ApprovalTier.CRITICAL
class TestTierTimeouts:
def test_safe_no_timeout(self):
assert get_tier_timeout(ApprovalTier.SAFE) == 0
def test_medium_60s(self):
assert get_tier_timeout(ApprovalTier.MEDIUM) == 60
def test_high_30s(self):
assert get_tier_timeout(ApprovalTier.HIGH) == 30
def test_critical_10s(self):
assert get_tier_timeout(ApprovalTier.CRITICAL) == 10
class TestTierApprovers:
def test_safe_no_approvers(self):
assert get_tier_approvers(ApprovalTier.SAFE) == ()
def test_low_llm_only(self):
assert get_tier_approvers(ApprovalTier.LOW) == ("llm",)
def test_medium_human_llm(self):
assert get_tier_approvers(ApprovalTier.MEDIUM) == ("human", "llm")
def test_requires_human(self):
assert requires_human_approval(ApprovalTier.SAFE) == False
assert requires_human_approval(ApprovalTier.LOW) == False
assert requires_human_approval(ApprovalTier.MEDIUM) == True
assert requires_human_approval(ApprovalTier.HIGH) == True
assert requires_human_approval(ApprovalTier.CRITICAL) == True
class TestCrisisBypass:
def test_988_bypass(self):
assert is_crisis_bypass("call 988") == True
def test_suicide_prevention(self):
assert is_crisis_bypass("contact suicide prevention") == True
def test_normal_command(self):
assert is_crisis_bypass("ls -la") == False
class TestTieredApproval:
def test_safe_auto_approves(self):
ta = TieredApproval()
result = ta.request_approval("session1", "cat file.txt")
assert result["approved"] == True
assert result["tier"] == ApprovalTier.SAFE
def test_low_auto_approves(self):
ta = TieredApproval()
result = ta.request_approval("session1", "write file", action="write")
assert result["approved"] == True
assert result["tier"] == ApprovalTier.LOW
def test_medium_needs_approval(self):
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
assert result["approved"] == False
assert result["tier"] == ApprovalTier.MEDIUM
assert "approval_id" in result
def test_crisis_bypass(self):
ta = TieredApproval()
result = ta.request_approval("session1", "call 988 for help")
assert result["approved"] == True
assert result["reason"] == "crisis_bypass"
def test_resolve_approval(self):
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
approval_id = result["approval_id"]
assert ta.resolve_approval(approval_id, True) == True
assert approval_id not in ta._pending
def test_timeout_escalation(self):
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
approval_id = result["approval_id"]
# Manually set timeout to past
ta._timeouts[approval_id] = 0
timed_out = ta.check_timeouts()
assert approval_id in timed_out
# Should have escalated to HIGH tier
if approval_id in ta._pending:
assert ta._pending[approval_id]["tier"] == ApprovalTier.HIGH
class TestGetTieredApproval:
def test_singleton(self):
ta1 = get_tiered_approval()
ta2 = get_tiered_approval()
assert ta1 is ta2
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,129 @@
"""Tests for FFmpeg Video Composition Pipeline — Issue #643."""
import os
import pytest
import tempfile
from unittest.mock import patch, MagicMock
from tools.video_pipeline import (
VideoSpec, Transition, KenBurnsConfig,
FFmpegPipeline, compose_video,
)
class TestVideoSpec:
def test_defaults(self):
s = VideoSpec()
assert s.width == 1920
assert s.height == 1080
assert s.fps == 30
assert s.codec == "libx264"
assert s.container == "mp4"
assert s.crf == 23
def test_webm_spec(self):
s = VideoSpec(codec="libvpx-vp9", container="webm", audio_codec="libopus")
assert s.container == "webm"
assert s.codec == "libvpx-vp9"
class TestTransition:
def test_defaults(self):
t = Transition()
assert t.duration_sec == 1.0
assert t.type == "fade"
class TestKenBurnsConfig:
def test_defaults(self):
k = KenBurnsConfig()
assert k.zoom_start == 1.0
assert k.zoom_end == 1.15
assert k.duration_sec == 5.0
class TestFFmpegPipelineInit:
@patch("tools.video_pipeline.subprocess.run")
def test_verify_ffmpeg_passes(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg version 6.0")
pipeline = FFmpegPipeline()
assert pipeline.ffmpeg == "ffmpeg"
@patch("tools.video_pipeline.subprocess.run")
def test_verify_ffmpeg_fails(self, mock_run):
mock_run.side_effect = FileNotFoundError()
with pytest.raises(RuntimeError, match="FFmpeg not found"):
FFmpegPipeline()
@patch("tools.video_pipeline.subprocess.run")
def test_custom_ffmpeg_path(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg")
pipeline = FFmpegPipeline(ffmpeg_path="/usr/local/bin/ffmpeg")
assert pipeline.ffmpeg == "/usr/local/bin/ffmpeg"
class TestFFmpegPipelineProbe:
@patch("tools.video_pipeline.subprocess.run")
def test_probe_returns_metadata(self, mock_run):
mock_run.return_value = MagicMock(
returncode=0,
stdout='{"format": {"duration": "10.5"}, "streams": []}'
)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffprobe = "ffprobe"
result = pipeline.probe("/tmp/test.mp4")
assert result["format"]["duration"] == "10.5"
@patch("tools.video_pipeline.subprocess.run")
def test_get_duration(self, mock_run):
mock_run.return_value = MagicMock(
returncode=0,
stdout='{"format": {"duration": "42.3"}, "streams": []}'
)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffprobe = "ffprobe"
assert pipeline.get_duration("/tmp/test.mp4") == 42.3
class TestFFmpegPipelineImagesToVideo:
@patch("tools.video_pipeline.subprocess.run")
def test_empty_images_raises(self, mock_run):
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffmpeg = "ffmpeg"
with pytest.raises(ValueError, match="No images"):
pipeline.images_to_video([], "/tmp/out.mp4")
class TestComposeVideo:
@patch("tools.video_pipeline.FFmpegPipeline")
def test_compose_calls_pipeline(self, MockPipeline):
mock_instance = MagicMock()
mock_instance.compose.return_value = "/tmp/output.mp4"
MockPipeline.return_value = mock_instance
result = compose_video(["img1.png", "img2.png"], output="/tmp/out.mp4")
assert result == "/tmp/output.mp4"
mock_instance.compose.assert_called_once()
class TestRunFFmpeg:
@patch("tools.video_pipeline.subprocess.run")
def test_success(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="")
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline._run_ffmpeg(["ffmpeg", "-version"])
@patch("tools.video_pipeline.subprocess.run")
def test_failure_raises(self, mock_run):
mock_run.return_value = MagicMock(returncode=1, stderr="Error: invalid input")
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
with pytest.raises(RuntimeError, match="FFmpeg error"):
pipeline._run_ffmpeg(["ffmpeg", "-bad"])
@patch("tools.video_pipeline.subprocess.run")
def test_timeout_raises(self, mock_run):
import subprocess
mock_run.side_effect = subprocess.TimeoutExpired(cmd="ffmpeg", timeout=600)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
with pytest.raises(subprocess.TimeoutExpired):
pipeline._run_ffmpeg(["ffmpeg", "-slow"])

View File

@@ -133,183 +133,6 @@ DANGEROUS_PATTERNS = [
]
# =========================================================================
# Approval Tier System (Issue #670)
# =========================================================================
from enum import IntEnum
import time
class ApprovalTier(IntEnum):
"""Safety tiers for command approval.
Tier 0 (SAFE): Read, search — no approval needed
Tier 1 (LOW): Write, scripts — LLM approval only
Tier 2 (MEDIUM): Messages, API — human + LLM, 60s timeout
Tier 3 (HIGH): Crypto, config — human + LLM, 30s timeout
Tier 4 (CRITICAL): Crisis — human + LLM, 10s timeout
"""
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
TIER_PATTERNS = {
# Tier 0: Safe
"read": ApprovalTier.SAFE, "search": ApprovalTier.SAFE, "list": ApprovalTier.SAFE,
"view": ApprovalTier.SAFE, "cat": ApprovalTier.SAFE, "grep": ApprovalTier.SAFE,
# Tier 1: Low
"write": ApprovalTier.LOW, "create": ApprovalTier.LOW, "edit": ApprovalTier.LOW,
"patch": ApprovalTier.LOW, "copy": ApprovalTier.LOW, "mkdir": ApprovalTier.LOW,
"script": ApprovalTier.LOW, "execute": ApprovalTier.LOW, "run": ApprovalTier.LOW,
# Tier 2: Medium
"send_message": ApprovalTier.MEDIUM, "message": ApprovalTier.MEDIUM,
"email": ApprovalTier.MEDIUM, "api": ApprovalTier.MEDIUM, "post": ApprovalTier.MEDIUM,
"telegram": ApprovalTier.MEDIUM, "discord": ApprovalTier.MEDIUM,
# Tier 3: High
"crypto": ApprovalTier.HIGH, "bitcoin": ApprovalTier.HIGH, "wallet": ApprovalTier.HIGH,
"key": ApprovalTier.HIGH, "secret": ApprovalTier.HIGH, "config": ApprovalTier.HIGH,
"deploy": ApprovalTier.HIGH, "install": ApprovalTier.HIGH, "systemctl": ApprovalTier.HIGH,
# Tier 4: Critical
"delete": ApprovalTier.CRITICAL, "remove": ApprovalTier.CRITICAL, "rm": ApprovalTier.CRITICAL,
"format": ApprovalTier.CRITICAL, "kill": ApprovalTier.CRITICAL, "shutdown": ApprovalTier.CRITICAL,
"crisis": ApprovalTier.CRITICAL, "suicide": ApprovalTier.CRITICAL,
}
TIER_TIMEOUTS = {
ApprovalTier.SAFE: 0, ApprovalTier.LOW: 0, ApprovalTier.MEDIUM: 60,
ApprovalTier.HIGH: 30, ApprovalTier.CRITICAL: 10,
}
TIER_APPROVERS = {
ApprovalTier.SAFE: (), ApprovalTier.LOW: ("llm",),
ApprovalTier.MEDIUM: ("human", "llm"), ApprovalTier.HIGH: ("human", "llm"),
ApprovalTier.CRITICAL: ("human", "llm"),
}
def detect_tier(command, action="", context=None):
"""Detect approval tier for a command or action."""
# Crisis keywords always CRITICAL
crisis_keywords = ["988", "suicide", "self-harm", "crisis", "emergency"]
for kw in crisis_keywords:
if kw in command.lower():
return ApprovalTier.CRITICAL
# Check action type
if action and action.lower() in TIER_PATTERNS:
return TIER_PATTERNS[action.lower()]
# Check command for keywords
cmd_lower = command.lower()
best_tier = ApprovalTier.SAFE
for keyword, tier in TIER_PATTERNS.items():
if keyword in cmd_lower and tier > best_tier:
best_tier = tier
# Check dangerous patterns
is_dangerous, _, description = detect_dangerous_command(command)
if is_dangerous:
desc_lower = description.lower()
if any(k in desc_lower for k in ["delete", "remove", "format", "drop", "kill"]):
return ApprovalTier.CRITICAL
elif any(k in desc_lower for k in ["chmod", "chown", "systemctl", "config"]):
return max(best_tier, ApprovalTier.HIGH)
else:
return max(best_tier, ApprovalTier.MEDIUM)
return best_tier
def get_tier_timeout(tier):
return TIER_TIMEOUTS.get(tier, 60)
def get_tier_approvers(tier):
return TIER_APPROVERS.get(tier, ("human", "llm"))
def requires_human_approval(tier):
return "human" in get_tier_approvers(tier)
def is_crisis_bypass(command):
"""Check if command qualifies for crisis bypass (988 Lifeline)."""
indicators = ["988", "suicide prevention", "crisis hotline", "lifeline", "emergency help"]
cmd_lower = command.lower()
return any(i in cmd_lower for i in indicators)
class TieredApproval:
"""Tiered approval handler."""
def __init__(self):
self._pending = {}
self._timeouts = {}
def request_approval(self, session_key, command, action="", context=None):
"""Request approval based on tier. Returns approval dict."""
tier = detect_tier(command, action, context)
timeout = get_tier_timeout(tier)
approvers = get_tier_approvers(tier)
# Crisis bypass
if tier == ApprovalTier.CRITICAL and is_crisis_bypass(command):
return {"approved": True, "tier": tier, "reason": "crisis_bypass", "timeout": 0, "approvers": ()}
# Safe/Low auto-approve
if tier <= ApprovalTier.LOW:
return {"approved": True, "tier": tier, "reason": "auto_approve", "timeout": 0, "approvers": approvers}
# Higher tiers need approval
import uuid
approval_id = f"{session_key}_{uuid.uuid4().hex[:8]}"
self._pending[approval_id] = {
"session_key": session_key, "command": command, "action": action,
"tier": tier, "timeout": timeout, "approvers": approvers, "created_at": time.time(),
}
if timeout > 0:
self._timeouts[approval_id] = time.time() + timeout
return {
"approved": False, "tier": tier, "approval_id": approval_id,
"timeout": timeout, "approvers": approvers,
"requires_human": requires_human_approval(tier),
}
def resolve_approval(self, approval_id, approved, approver="human"):
"""Resolve a pending approval."""
if approval_id not in self._pending:
return False
self._pending.pop(approval_id)
self._timeouts.pop(approval_id, None)
return approved
def check_timeouts(self):
"""Check for timed-out approvals and auto-escalate."""
now = time.time()
timed_out = []
for aid, timeout_at in list(self._timeouts.items()):
if now > timeout_at:
timed_out.append(aid)
if aid in self._pending:
pending = self._pending[aid]
current_tier = pending["tier"]
if current_tier < ApprovalTier.CRITICAL:
pending["tier"] = ApprovalTier(current_tier + 1)
pending["timeout"] = get_tier_timeout(pending["tier"])
self._timeouts[aid] = now + pending["timeout"]
else:
self._pending.pop(aid, None)
self._timeouts.pop(aid, None)
return timed_out
_tiered_approval = TieredApproval()
def get_tiered_approval():
return _tiered_approval
def _legacy_pattern_key(pattern: str) -> str:
"""Reproduce the old regex-derived approval key for backwards compatibility."""
return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]

571
tools/video_pipeline.py Normal file
View File

@@ -0,0 +1,571 @@
"""FFmpeg Video Composition Pipeline — Shared Infrastructure (Issue #643).
Used by both Video Forge (playground #53) and LPM 1.0 (#641).
Provides:
- Image sequence → video with crossfade transitions
- Audio-video sync with beat alignment
- Ken Burns effect (slow pan/zoom on stills)
- H.264/WebM encoding optimized for web
- Streaming output support
Dependencies: ffmpeg (system binary), optional ffmpeg-python bindings.
"""
import json
import logging
import os
import subprocess
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class VideoSpec:
"""Video output specification."""
width: int = 1920
height: int = 1080
fps: int = 30
codec: str = "libx264" # libx264, libvpx-vp9
container: str = "mp4" # mp4, webm
crf: int = 23 # quality (lower = better, 18-28 typical)
preset: str = "medium" # ultrafast, fast, medium, slow
audio_codec: str = "aac"
audio_bitrate: str = "192k"
@dataclass
class Transition:
"""Crossfade transition between clips."""
duration_sec: float = 1.0
type: str = "fade" # fade, dissolve, slideleft, slideright
@dataclass
class KenBurnsConfig:
"""Ken Burns pan/zoom effect configuration."""
zoom_start: float = 1.0
zoom_end: float = 1.15
pan_x: float = 0.0 # -1 to 1
pan_y: float = 0.0 # -1 to 1
duration_sec: float = 5.0
class FFmpegPipeline:
"""FFmpeg wrapper for video composition.
Provides a Python interface to FFmpeg's complex filter graphs
for composing videos from images, clips, and audio.
"""
def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):
self.ffmpeg = ffmpeg_path
self.ffprobe = ffprobe_path
self._verify_ffmpeg()
def _verify_ffmpeg(self):
"""Verify FFmpeg is available."""
try:
r = subprocess.run([self.ffmpeg, "-version"], capture_output=True, text=True, timeout=5)
if r.returncode != 0:
raise RuntimeError(f"FFmpeg returned {r.returncode}")
logger.info("FFmpeg verified: %s", r.stdout.split('\n')[0])
except FileNotFoundError:
raise RuntimeError(f"FFmpeg not found at '{self.ffmpeg}'. Install: brew install ffmpeg")
def probe(self, path: str) -> Dict[str, Any]:
"""Probe a media file for metadata."""
cmd = [
self.ffprobe, "-v", "quiet", "-print_format", "json",
"-show_format", "-show_streams", path
]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if r.returncode != 0:
raise RuntimeError(f"ffprobe failed: {r.stderr}")
return json.loads(r.stdout)
def get_duration(self, path: str) -> float:
"""Get duration of a media file in seconds."""
info = self.probe(path)
return float(info["format"]["duration"])
# =================================================================
# Image Sequence → Video
# =================================================================
def images_to_video(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec = VideoSpec(),
transition: Optional[Transition] = None,
image_duration: float = 3.0,
) -> str:
"""Convert a list of images to a video with optional crossfade transitions.
Args:
image_paths: List of image file paths
output_path: Output video path
spec: Video output specification
transition: Crossfade config (None = hard cuts)
image_duration: Seconds per image
Returns:
Path to output video
"""
if not image_paths:
raise ValueError("No images provided")
if transition and len(image_paths) > 1:
return self._images_with_crossfade(
image_paths, output_path, spec, transition, image_duration
)
else:
return self._images_hard_cut(
image_paths, output_path, spec, image_duration
)
def _images_hard_cut(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec,
duration: float,
) -> str:
"""Simple image sequence with hard cuts."""
# Create concat file
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
concat_path = f.name
for img in image_paths:
f.write(f"file '{os.path.abspath(img)}'\n")
f.write(f"duration {duration}\n")
# Last image needs repeating for ffmpeg concat
f.write(f"file '{os.path.abspath(image_paths[-1])}'\n")
try:
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path,
"-vf", f"scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p", "-r", str(spec.fps),
output_path
]
self._run_ffmpeg(cmd)
return output_path
finally:
os.unlink(concat_path)
def _images_with_crossfade(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec,
transition: Transition,
duration: float,
) -> str:
"""Image sequence with xfade crossfade transitions."""
n = len(image_paths)
fade_dur = transition.duration_sec
clip_dur = duration
# Build filter complex
inputs = []
for img in image_paths:
inputs.extend(["-loop", "1", "-t", str(clip_dur), "-i", img])
# Scale all inputs
filter_parts = []
for i in range(n):
filter_parts.append(
f"[{i}:v]scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,"
f"pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={spec.fps}[v{i}]"
)
# Chain xfade transitions
if n == 1:
filter_parts.append(f"[v0]copy[outv]")
else:
offset = clip_dur - fade_dur
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
for i in range(1, n - 1):
offset = (i + 1) * clip_dur - (i + 1) * fade_dur
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
filter_parts.append(f"[xf{n-2}]copy[outv]")
filter_complex = ";\n".join(filter_parts)
cmd = [
self.ffmpeg, "-y",
*inputs,
"-filter_complex", filter_complex,
"-map", "[outv]",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Ken Burns Effect
# =================================================================
def apply_ken_burns(
self,
image_path: str,
output_path: str,
config: KenBurnsConfig = KenBurnsConfig(),
spec: VideoSpec = VideoSpec(),
) -> str:
"""Apply Ken Burns (pan + zoom) effect to a still image.
Creates a video from a single image with slow zoom and pan motion.
"""
# zoompan filter: z=zoom, x/y=pan position
# z goes from zoom_start to zoom_end over the duration
total_frames = int(config.duration_sec * spec.fps)
zoom_expr = f"min({config.zoom_start}+on*({config.zoom_zoom_end}-{config.zoom_start})/{total_frames},{config.zoom_end})"
# Pan expression: move from center ± pan offset
x_expr = f"(iw-iw/zoom)/2+{config.pan_x}*iw*0.1*(on/{total_frames})"
y_expr = f"(ih-ih/zoom)/2+{config.pan_y}*ih*0.1*(on/{total_frames})"
filter_str = (
f"zoompan=z='min(zoom+0.001,{config.zoom_end})'"
f":x='iw/2-(iw/zoom/2)+{config.pan_x}*10'"
f":y='ih/2-(ih/zoom/2)+{config.pan_y}*10'"
f":d={total_frames}:s={spec.width}x{spec.height}:fps={spec.fps}"
)
cmd = [
self.ffmpeg, "-y",
"-loop", "1", "-i", image_path,
"-vf", filter_str,
"-t", str(config.duration_sec),
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Audio-Video Sync
# =================================================================
def mux_audio_video(
self,
video_path: str,
audio_path: str,
output_path: str,
spec: VideoSpec = VideoSpec(),
offset_sec: float = 0.0,
) -> str:
"""Mux audio and video together with optional offset.
Args:
video_path: Video file path
audio_path: Audio file path
output_path: Output path
spec: Output spec
offset_sec: Audio offset in seconds (positive = delay audio)
"""
cmd = [
self.ffmpeg, "-y",
"-i", video_path,
"-i", audio_path,
]
if offset_sec > 0:
cmd.extend(["-itsoffset", str(offset_sec), "-i", audio_path])
cmd.extend(["-map", "0:v", "-map", "2:a"])
elif offset_sec < 0:
cmd.extend(["-itsoffset", str(-offset_sec), "-i", video_path])
cmd.extend(["-map", "2:v", "-map", "1:a"])
else:
cmd.extend(["-map", "0:v", "-map", "1:a"])
cmd.extend([
"-c:v", "copy",
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
"-shortest",
output_path
])
self._run_ffmpeg(cmd)
return output_path
def align_to_beats(
self,
video_path: str,
audio_path: str,
beat_times: List[float],
output_path: str,
spec: VideoSpec = VideoSpec(),
) -> str:
"""Align video cuts to audio beat times.
Splits video at beat timestamps and reassembles with audio sync.
"""
if not beat_times:
return self.mux_audio_video(video_path, audio_path, output_path, spec)
video_dur = self.get_duration(video_path)
# Build segment list based on beat times
segments = []
prev = 0.0
for beat in beat_times:
if beat > prev and beat <= video_dur:
segments.append((prev, beat))
prev = beat
if prev < video_dur:
segments.append((prev, video_dur))
# Extract segments
segment_paths = []
with tempfile.TemporaryDirectory() as tmpdir:
for i, (start, end) in enumerate(segments):
seg_path = os.path.join(tmpdir, f"seg_{i:04d}.ts")
cmd = [
self.ffmpeg, "-y", "-ss", str(start), "-i", video_path,
"-t", str(end - start), "-c", "copy", seg_path
]
self._run_ffmpeg(cmd)
segment_paths.append(seg_path)
# Concat segments
concat_path = os.path.join(tmpdir, "concat.txt")
with open(concat_path, "w") as f:
for sp in segment_paths:
f.write(f"file '{sp}'\n")
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path, "-c", "copy",
os.path.join(tmpdir, "video_aligned.mp4")
]
self._run_ffmpeg(cmd)
return self.mux_audio_video(
os.path.join(tmpdir, "video_aligned.mp4"),
audio_path, output_path, spec
)
# =================================================================
# Encoding
# =================================================================
def encode_web(
self,
input_path: str,
output_path: str,
spec: VideoSpec = VideoSpec(),
) -> str:
"""Encode for web playback (H.264 MP4 or WebM)."""
if spec.container == "webm":
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", "libvpx-vp9", "-crf", str(spec.crf), "-b:v", "0",
"-c:a", "libopus", "-b:a", spec.audio_bitrate,
"-pix_fmt", "yuv420p",
output_path
]
else:
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
"-movflags", "+faststart", # Progressive download
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Full Composition Pipeline
# =================================================================
def compose(
self,
images: List[str],
audio: Optional[str] = None,
output_path: str = "output.mp4",
spec: VideoSpec = VideoSpec(),
transition: Optional[Transition] = None,
ken_burns: Optional[KenBurnsConfig] = None,
beat_times: Optional[List[float]] = None,
) -> str:
"""Full composition pipeline: images → video → mux audio → encode.
This is the main entry point. Combines all pipeline components
into a single call.
Args:
images: List of image paths
audio: Optional audio file path
output_path: Final output path
spec: Video specification
transition: Crossfade config
ken_burns: Ken Burns config (applied to each image before composition)
beat_times: Beat timestamps for audio-visual sync
Returns:
Path to final video
"""
with tempfile.TemporaryDirectory() as tmpdir:
# Apply Ken Burns to each image if configured
processed_images = images
if ken_burns:
processed_images = []
for i, img in enumerate(images):
kb_path = os.path.join(tmpdir, f"kb_{i:04d}.mp4")
self.apply_ken_burns(img, kb_path, ken_burns, spec)
processed_images.append(kb_path)
# Compose images into video
video_path = os.path.join(tmpdir, "composed.mp4")
if processed_images and all(p.endswith('.mp4') for p in processed_images):
# All Ken Burns clips — concat them
self._concat_videos(processed_images, video_path, transition, spec)
else:
self.images_to_video(processed_images, video_path, spec, transition)
# Mux audio if provided
if audio:
if beat_times:
return self.align_to_beats(video_path, audio, beat_times, output_path, spec)
else:
return self.mux_audio_video(video_path, audio, output_path, spec)
else:
# Just copy the video
import shutil
shutil.copy2(video_path, output_path)
return output_path
def _concat_videos(
self,
video_paths: List[str],
output_path: str,
transition: Optional[Transition],
spec: VideoSpec,
) -> str:
"""Concatenate video files with optional transitions."""
if not transition:
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
concat_path = f.name
for vp in video_paths:
f.write(f"file '{os.path.abspath(vp)}'\n")
try:
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path, "-c", "copy", output_path
]
self._run_ffmpeg(cmd)
finally:
os.unlink(concat_path)
else:
# Use xfade for video clips too
n = len(video_paths)
inputs = []
for vp in video_paths:
inputs.extend(["-i", vp])
filter_parts = []
for i in range(n):
filter_parts.append(f"[{i}:v]fps={spec.fps},scale={spec.width}:{spec.height}[v{i}]")
fade_dur = transition.duration_sec
# Calculate offsets from durations
if n >= 2:
dur0 = self.get_duration(video_paths[0])
offset = dur0 - fade_dur
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
for i in range(1, n - 1):
duri = self.get_duration(video_paths[i])
offset += duri - fade_dur
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
filter_parts.append(f"[xf{n-2}]copy[outv]")
else:
filter_parts.append(f"[v0]copy[outv]")
cmd = [
self.ffmpeg, "-y", *inputs,
"-filter_complex", ";".join(filter_parts),
"-map", "[outv]",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Streaming
# =================================================================
def create_hls_stream(
self,
input_path: str,
output_dir: str,
segment_duration: int = 4,
) -> str:
"""Create HLS streaming output for progressive playback.
Returns:
Path to the .m3u8 playlist file
"""
os.makedirs(output_dir, exist_ok=True)
playlist = os.path.join(output_dir, "playlist.m3u8")
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", "libx264", "-crf", "23", "-preset", "fast",
"-c:a", "aac", "-b:a", "128k",
"-f", "hls", "-hls_time", str(segment_duration),
"-hls_list_size", "0", "-hls_segment_filename",
os.path.join(output_dir, "seg_%03d.ts"),
playlist
]
self._run_ffmpeg(cmd)
return playlist
# =================================================================
# Internal
# =================================================================
def _run_ffmpeg(self, cmd: List[str], timeout: int = 600):
"""Run an FFmpeg command with logging and error handling."""
logger.info("FFmpeg: %s", " ".join(cmd[:8]) + "...")
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode != 0:
logger.error("FFmpeg failed: %s", r.stderr[-500:])
raise RuntimeError(f"FFmpeg error (exit {r.returncode}): {r.stderr[-500:]}")
return r.stdout
# =========================================================================
# Convenience functions
# =========================================================================
def compose_video(
images: List[str],
audio: Optional[str] = None,
output: str = "output.mp4",
**kwargs,
) -> str:
"""One-call video composition from images and optional audio.
Convenience wrapper around FFmpegPipeline.compose().
"""
pipeline = FFmpegPipeline()
spec = kwargs.pop("spec", VideoSpec())
transition = kwargs.pop("transition", None)
ken_burns = kwargs.pop("ken_burns", None)
beat_times = kwargs.pop("beat_times", None)
return pipeline.compose(images, audio, output, spec, transition, ken_burns, beat_times)