Compare commits
1 Commits
fix/660
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2c587d9d32 |
129
tests/test_video_pipeline.py
Normal file
129
tests/test_video_pipeline.py
Normal file
@@ -0,0 +1,129 @@
|
|||||||
|
"""Tests for FFmpeg Video Composition Pipeline — Issue #643."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import pytest
|
||||||
|
import tempfile
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
|
from tools.video_pipeline import (
|
||||||
|
VideoSpec, Transition, KenBurnsConfig,
|
||||||
|
FFmpegPipeline, compose_video,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestVideoSpec:
|
||||||
|
def test_defaults(self):
|
||||||
|
s = VideoSpec()
|
||||||
|
assert s.width == 1920
|
||||||
|
assert s.height == 1080
|
||||||
|
assert s.fps == 30
|
||||||
|
assert s.codec == "libx264"
|
||||||
|
assert s.container == "mp4"
|
||||||
|
assert s.crf == 23
|
||||||
|
|
||||||
|
def test_webm_spec(self):
|
||||||
|
s = VideoSpec(codec="libvpx-vp9", container="webm", audio_codec="libopus")
|
||||||
|
assert s.container == "webm"
|
||||||
|
assert s.codec == "libvpx-vp9"
|
||||||
|
|
||||||
|
|
||||||
|
class TestTransition:
|
||||||
|
def test_defaults(self):
|
||||||
|
t = Transition()
|
||||||
|
assert t.duration_sec == 1.0
|
||||||
|
assert t.type == "fade"
|
||||||
|
|
||||||
|
|
||||||
|
class TestKenBurnsConfig:
|
||||||
|
def test_defaults(self):
|
||||||
|
k = KenBurnsConfig()
|
||||||
|
assert k.zoom_start == 1.0
|
||||||
|
assert k.zoom_end == 1.15
|
||||||
|
assert k.duration_sec == 5.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestFFmpegPipelineInit:
|
||||||
|
@patch("tools.video_pipeline.subprocess.run")
|
||||||
|
def test_verify_ffmpeg_passes(self, mock_run):
|
||||||
|
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg version 6.0")
|
||||||
|
pipeline = FFmpegPipeline()
|
||||||
|
assert pipeline.ffmpeg == "ffmpeg"
|
||||||
|
|
||||||
|
@patch("tools.video_pipeline.subprocess.run")
|
||||||
|
def test_verify_ffmpeg_fails(self, mock_run):
|
||||||
|
mock_run.side_effect = FileNotFoundError()
|
||||||
|
with pytest.raises(RuntimeError, match="FFmpeg not found"):
|
||||||
|
FFmpegPipeline()
|
||||||
|
|
||||||
|
@patch("tools.video_pipeline.subprocess.run")
|
||||||
|
def test_custom_ffmpeg_path(self, mock_run):
|
||||||
|
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg")
|
||||||
|
pipeline = FFmpegPipeline(ffmpeg_path="/usr/local/bin/ffmpeg")
|
||||||
|
assert pipeline.ffmpeg == "/usr/local/bin/ffmpeg"
|
||||||
|
|
||||||
|
|
||||||
|
class TestFFmpegPipelineProbe:
|
||||||
|
@patch("tools.video_pipeline.subprocess.run")
|
||||||
|
def test_probe_returns_metadata(self, mock_run):
|
||||||
|
mock_run.return_value = MagicMock(
|
||||||
|
returncode=0,
|
||||||
|
stdout='{"format": {"duration": "10.5"}, "streams": []}'
|
||||||
|
)
|
||||||
|
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||||
|
pipeline.ffprobe = "ffprobe"
|
||||||
|
result = pipeline.probe("/tmp/test.mp4")
|
||||||
|
assert result["format"]["duration"] == "10.5"
|
||||||
|
|
||||||
|
@patch("tools.video_pipeline.subprocess.run")
|
||||||
|
def test_get_duration(self, mock_run):
|
||||||
|
mock_run.return_value = MagicMock(
|
||||||
|
returncode=0,
|
||||||
|
stdout='{"format": {"duration": "42.3"}, "streams": []}'
|
||||||
|
)
|
||||||
|
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||||
|
pipeline.ffprobe = "ffprobe"
|
||||||
|
assert pipeline.get_duration("/tmp/test.mp4") == 42.3
|
||||||
|
|
||||||
|
|
||||||
|
class TestFFmpegPipelineImagesToVideo:
|
||||||
|
@patch("tools.video_pipeline.subprocess.run")
|
||||||
|
def test_empty_images_raises(self, mock_run):
|
||||||
|
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||||
|
pipeline.ffmpeg = "ffmpeg"
|
||||||
|
with pytest.raises(ValueError, match="No images"):
|
||||||
|
pipeline.images_to_video([], "/tmp/out.mp4")
|
||||||
|
|
||||||
|
|
||||||
|
class TestComposeVideo:
|
||||||
|
@patch("tools.video_pipeline.FFmpegPipeline")
|
||||||
|
def test_compose_calls_pipeline(self, MockPipeline):
|
||||||
|
mock_instance = MagicMock()
|
||||||
|
mock_instance.compose.return_value = "/tmp/output.mp4"
|
||||||
|
MockPipeline.return_value = mock_instance
|
||||||
|
|
||||||
|
result = compose_video(["img1.png", "img2.png"], output="/tmp/out.mp4")
|
||||||
|
assert result == "/tmp/output.mp4"
|
||||||
|
mock_instance.compose.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
class TestRunFFmpeg:
|
||||||
|
@patch("tools.video_pipeline.subprocess.run")
|
||||||
|
def test_success(self, mock_run):
|
||||||
|
mock_run.return_value = MagicMock(returncode=0, stdout="")
|
||||||
|
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||||
|
pipeline._run_ffmpeg(["ffmpeg", "-version"])
|
||||||
|
|
||||||
|
@patch("tools.video_pipeline.subprocess.run")
|
||||||
|
def test_failure_raises(self, mock_run):
|
||||||
|
mock_run.return_value = MagicMock(returncode=1, stderr="Error: invalid input")
|
||||||
|
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||||
|
with pytest.raises(RuntimeError, match="FFmpeg error"):
|
||||||
|
pipeline._run_ffmpeg(["ffmpeg", "-bad"])
|
||||||
|
|
||||||
|
@patch("tools.video_pipeline.subprocess.run")
|
||||||
|
def test_timeout_raises(self, mock_run):
|
||||||
|
import subprocess
|
||||||
|
mock_run.side_effect = subprocess.TimeoutExpired(cmd="ffmpeg", timeout=600)
|
||||||
|
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||||
|
with pytest.raises(subprocess.TimeoutExpired):
|
||||||
|
pipeline._run_ffmpeg(["ffmpeg", "-slow"])
|
||||||
571
tools/video_pipeline.py
Normal file
571
tools/video_pipeline.py
Normal file
@@ -0,0 +1,571 @@
|
|||||||
|
"""FFmpeg Video Composition Pipeline — Shared Infrastructure (Issue #643).
|
||||||
|
|
||||||
|
Used by both Video Forge (playground #53) and LPM 1.0 (#641).
|
||||||
|
|
||||||
|
Provides:
|
||||||
|
- Image sequence → video with crossfade transitions
|
||||||
|
- Audio-video sync with beat alignment
|
||||||
|
- Ken Burns effect (slow pan/zoom on stills)
|
||||||
|
- H.264/WebM encoding optimized for web
|
||||||
|
- Streaming output support
|
||||||
|
|
||||||
|
Dependencies: ffmpeg (system binary), optional ffmpeg-python bindings.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class VideoSpec:
|
||||||
|
"""Video output specification."""
|
||||||
|
width: int = 1920
|
||||||
|
height: int = 1080
|
||||||
|
fps: int = 30
|
||||||
|
codec: str = "libx264" # libx264, libvpx-vp9
|
||||||
|
container: str = "mp4" # mp4, webm
|
||||||
|
crf: int = 23 # quality (lower = better, 18-28 typical)
|
||||||
|
preset: str = "medium" # ultrafast, fast, medium, slow
|
||||||
|
audio_codec: str = "aac"
|
||||||
|
audio_bitrate: str = "192k"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Transition:
|
||||||
|
"""Crossfade transition between clips."""
|
||||||
|
duration_sec: float = 1.0
|
||||||
|
type: str = "fade" # fade, dissolve, slideleft, slideright
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class KenBurnsConfig:
|
||||||
|
"""Ken Burns pan/zoom effect configuration."""
|
||||||
|
zoom_start: float = 1.0
|
||||||
|
zoom_end: float = 1.15
|
||||||
|
pan_x: float = 0.0 # -1 to 1
|
||||||
|
pan_y: float = 0.0 # -1 to 1
|
||||||
|
duration_sec: float = 5.0
|
||||||
|
|
||||||
|
|
||||||
|
class FFmpegPipeline:
|
||||||
|
"""FFmpeg wrapper for video composition.
|
||||||
|
|
||||||
|
Provides a Python interface to FFmpeg's complex filter graphs
|
||||||
|
for composing videos from images, clips, and audio.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):
|
||||||
|
self.ffmpeg = ffmpeg_path
|
||||||
|
self.ffprobe = ffprobe_path
|
||||||
|
self._verify_ffmpeg()
|
||||||
|
|
||||||
|
def _verify_ffmpeg(self):
|
||||||
|
"""Verify FFmpeg is available."""
|
||||||
|
try:
|
||||||
|
r = subprocess.run([self.ffmpeg, "-version"], capture_output=True, text=True, timeout=5)
|
||||||
|
if r.returncode != 0:
|
||||||
|
raise RuntimeError(f"FFmpeg returned {r.returncode}")
|
||||||
|
logger.info("FFmpeg verified: %s", r.stdout.split('\n')[0])
|
||||||
|
except FileNotFoundError:
|
||||||
|
raise RuntimeError(f"FFmpeg not found at '{self.ffmpeg}'. Install: brew install ffmpeg")
|
||||||
|
|
||||||
|
def probe(self, path: str) -> Dict[str, Any]:
|
||||||
|
"""Probe a media file for metadata."""
|
||||||
|
cmd = [
|
||||||
|
self.ffprobe, "-v", "quiet", "-print_format", "json",
|
||||||
|
"-show_format", "-show_streams", path
|
||||||
|
]
|
||||||
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||||
|
if r.returncode != 0:
|
||||||
|
raise RuntimeError(f"ffprobe failed: {r.stderr}")
|
||||||
|
return json.loads(r.stdout)
|
||||||
|
|
||||||
|
def get_duration(self, path: str) -> float:
|
||||||
|
"""Get duration of a media file in seconds."""
|
||||||
|
info = self.probe(path)
|
||||||
|
return float(info["format"]["duration"])
|
||||||
|
|
||||||
|
# =================================================================
|
||||||
|
# Image Sequence → Video
|
||||||
|
# =================================================================
|
||||||
|
|
||||||
|
def images_to_video(
|
||||||
|
self,
|
||||||
|
image_paths: List[str],
|
||||||
|
output_path: str,
|
||||||
|
spec: VideoSpec = VideoSpec(),
|
||||||
|
transition: Optional[Transition] = None,
|
||||||
|
image_duration: float = 3.0,
|
||||||
|
) -> str:
|
||||||
|
"""Convert a list of images to a video with optional crossfade transitions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
image_paths: List of image file paths
|
||||||
|
output_path: Output video path
|
||||||
|
spec: Video output specification
|
||||||
|
transition: Crossfade config (None = hard cuts)
|
||||||
|
image_duration: Seconds per image
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to output video
|
||||||
|
"""
|
||||||
|
if not image_paths:
|
||||||
|
raise ValueError("No images provided")
|
||||||
|
|
||||||
|
if transition and len(image_paths) > 1:
|
||||||
|
return self._images_with_crossfade(
|
||||||
|
image_paths, output_path, spec, transition, image_duration
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return self._images_hard_cut(
|
||||||
|
image_paths, output_path, spec, image_duration
|
||||||
|
)
|
||||||
|
|
||||||
|
def _images_hard_cut(
|
||||||
|
self,
|
||||||
|
image_paths: List[str],
|
||||||
|
output_path: str,
|
||||||
|
spec: VideoSpec,
|
||||||
|
duration: float,
|
||||||
|
) -> str:
|
||||||
|
"""Simple image sequence with hard cuts."""
|
||||||
|
# Create concat file
|
||||||
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
||||||
|
concat_path = f.name
|
||||||
|
for img in image_paths:
|
||||||
|
f.write(f"file '{os.path.abspath(img)}'\n")
|
||||||
|
f.write(f"duration {duration}\n")
|
||||||
|
# Last image needs repeating for ffmpeg concat
|
||||||
|
f.write(f"file '{os.path.abspath(image_paths[-1])}'\n")
|
||||||
|
|
||||||
|
try:
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
|
||||||
|
"-i", concat_path,
|
||||||
|
"-vf", f"scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2",
|
||||||
|
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||||
|
"-pix_fmt", "yuv420p", "-r", str(spec.fps),
|
||||||
|
output_path
|
||||||
|
]
|
||||||
|
self._run_ffmpeg(cmd)
|
||||||
|
return output_path
|
||||||
|
finally:
|
||||||
|
os.unlink(concat_path)
|
||||||
|
|
||||||
|
def _images_with_crossfade(
|
||||||
|
self,
|
||||||
|
image_paths: List[str],
|
||||||
|
output_path: str,
|
||||||
|
spec: VideoSpec,
|
||||||
|
transition: Transition,
|
||||||
|
duration: float,
|
||||||
|
) -> str:
|
||||||
|
"""Image sequence with xfade crossfade transitions."""
|
||||||
|
n = len(image_paths)
|
||||||
|
fade_dur = transition.duration_sec
|
||||||
|
clip_dur = duration
|
||||||
|
|
||||||
|
# Build filter complex
|
||||||
|
inputs = []
|
||||||
|
for img in image_paths:
|
||||||
|
inputs.extend(["-loop", "1", "-t", str(clip_dur), "-i", img])
|
||||||
|
|
||||||
|
# Scale all inputs
|
||||||
|
filter_parts = []
|
||||||
|
for i in range(n):
|
||||||
|
filter_parts.append(
|
||||||
|
f"[{i}:v]scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,"
|
||||||
|
f"pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={spec.fps}[v{i}]"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Chain xfade transitions
|
||||||
|
if n == 1:
|
||||||
|
filter_parts.append(f"[v0]copy[outv]")
|
||||||
|
else:
|
||||||
|
offset = clip_dur - fade_dur
|
||||||
|
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
|
||||||
|
for i in range(1, n - 1):
|
||||||
|
offset = (i + 1) * clip_dur - (i + 1) * fade_dur
|
||||||
|
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
|
||||||
|
filter_parts.append(f"[xf{n-2}]copy[outv]")
|
||||||
|
|
||||||
|
filter_complex = ";\n".join(filter_parts)
|
||||||
|
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y",
|
||||||
|
*inputs,
|
||||||
|
"-filter_complex", filter_complex,
|
||||||
|
"-map", "[outv]",
|
||||||
|
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||||
|
"-pix_fmt", "yuv420p",
|
||||||
|
output_path
|
||||||
|
]
|
||||||
|
self._run_ffmpeg(cmd)
|
||||||
|
return output_path
|
||||||
|
|
||||||
|
# =================================================================
|
||||||
|
# Ken Burns Effect
|
||||||
|
# =================================================================
|
||||||
|
|
||||||
|
def apply_ken_burns(
|
||||||
|
self,
|
||||||
|
image_path: str,
|
||||||
|
output_path: str,
|
||||||
|
config: KenBurnsConfig = KenBurnsConfig(),
|
||||||
|
spec: VideoSpec = VideoSpec(),
|
||||||
|
) -> str:
|
||||||
|
"""Apply Ken Burns (pan + zoom) effect to a still image.
|
||||||
|
|
||||||
|
Creates a video from a single image with slow zoom and pan motion.
|
||||||
|
"""
|
||||||
|
# zoompan filter: z=zoom, x/y=pan position
|
||||||
|
# z goes from zoom_start to zoom_end over the duration
|
||||||
|
total_frames = int(config.duration_sec * spec.fps)
|
||||||
|
|
||||||
|
zoom_expr = f"min({config.zoom_start}+on*({config.zoom_zoom_end}-{config.zoom_start})/{total_frames},{config.zoom_end})"
|
||||||
|
|
||||||
|
# Pan expression: move from center ± pan offset
|
||||||
|
x_expr = f"(iw-iw/zoom)/2+{config.pan_x}*iw*0.1*(on/{total_frames})"
|
||||||
|
y_expr = f"(ih-ih/zoom)/2+{config.pan_y}*ih*0.1*(on/{total_frames})"
|
||||||
|
|
||||||
|
filter_str = (
|
||||||
|
f"zoompan=z='min(zoom+0.001,{config.zoom_end})'"
|
||||||
|
f":x='iw/2-(iw/zoom/2)+{config.pan_x}*10'"
|
||||||
|
f":y='ih/2-(ih/zoom/2)+{config.pan_y}*10'"
|
||||||
|
f":d={total_frames}:s={spec.width}x{spec.height}:fps={spec.fps}"
|
||||||
|
)
|
||||||
|
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y",
|
||||||
|
"-loop", "1", "-i", image_path,
|
||||||
|
"-vf", filter_str,
|
||||||
|
"-t", str(config.duration_sec),
|
||||||
|
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||||
|
"-pix_fmt", "yuv420p",
|
||||||
|
output_path
|
||||||
|
]
|
||||||
|
self._run_ffmpeg(cmd)
|
||||||
|
return output_path
|
||||||
|
|
||||||
|
# =================================================================
|
||||||
|
# Audio-Video Sync
|
||||||
|
# =================================================================
|
||||||
|
|
||||||
|
def mux_audio_video(
|
||||||
|
self,
|
||||||
|
video_path: str,
|
||||||
|
audio_path: str,
|
||||||
|
output_path: str,
|
||||||
|
spec: VideoSpec = VideoSpec(),
|
||||||
|
offset_sec: float = 0.0,
|
||||||
|
) -> str:
|
||||||
|
"""Mux audio and video together with optional offset.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
video_path: Video file path
|
||||||
|
audio_path: Audio file path
|
||||||
|
output_path: Output path
|
||||||
|
spec: Output spec
|
||||||
|
offset_sec: Audio offset in seconds (positive = delay audio)
|
||||||
|
"""
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y",
|
||||||
|
"-i", video_path,
|
||||||
|
"-i", audio_path,
|
||||||
|
]
|
||||||
|
|
||||||
|
if offset_sec > 0:
|
||||||
|
cmd.extend(["-itsoffset", str(offset_sec), "-i", audio_path])
|
||||||
|
cmd.extend(["-map", "0:v", "-map", "2:a"])
|
||||||
|
elif offset_sec < 0:
|
||||||
|
cmd.extend(["-itsoffset", str(-offset_sec), "-i", video_path])
|
||||||
|
cmd.extend(["-map", "2:v", "-map", "1:a"])
|
||||||
|
else:
|
||||||
|
cmd.extend(["-map", "0:v", "-map", "1:a"])
|
||||||
|
|
||||||
|
cmd.extend([
|
||||||
|
"-c:v", "copy",
|
||||||
|
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
|
||||||
|
"-shortest",
|
||||||
|
output_path
|
||||||
|
])
|
||||||
|
self._run_ffmpeg(cmd)
|
||||||
|
return output_path
|
||||||
|
|
||||||
|
def align_to_beats(
|
||||||
|
self,
|
||||||
|
video_path: str,
|
||||||
|
audio_path: str,
|
||||||
|
beat_times: List[float],
|
||||||
|
output_path: str,
|
||||||
|
spec: VideoSpec = VideoSpec(),
|
||||||
|
) -> str:
|
||||||
|
"""Align video cuts to audio beat times.
|
||||||
|
|
||||||
|
Splits video at beat timestamps and reassembles with audio sync.
|
||||||
|
"""
|
||||||
|
if not beat_times:
|
||||||
|
return self.mux_audio_video(video_path, audio_path, output_path, spec)
|
||||||
|
|
||||||
|
video_dur = self.get_duration(video_path)
|
||||||
|
|
||||||
|
# Build segment list based on beat times
|
||||||
|
segments = []
|
||||||
|
prev = 0.0
|
||||||
|
for beat in beat_times:
|
||||||
|
if beat > prev and beat <= video_dur:
|
||||||
|
segments.append((prev, beat))
|
||||||
|
prev = beat
|
||||||
|
if prev < video_dur:
|
||||||
|
segments.append((prev, video_dur))
|
||||||
|
|
||||||
|
# Extract segments
|
||||||
|
segment_paths = []
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
for i, (start, end) in enumerate(segments):
|
||||||
|
seg_path = os.path.join(tmpdir, f"seg_{i:04d}.ts")
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y", "-ss", str(start), "-i", video_path,
|
||||||
|
"-t", str(end - start), "-c", "copy", seg_path
|
||||||
|
]
|
||||||
|
self._run_ffmpeg(cmd)
|
||||||
|
segment_paths.append(seg_path)
|
||||||
|
|
||||||
|
# Concat segments
|
||||||
|
concat_path = os.path.join(tmpdir, "concat.txt")
|
||||||
|
with open(concat_path, "w") as f:
|
||||||
|
for sp in segment_paths:
|
||||||
|
f.write(f"file '{sp}'\n")
|
||||||
|
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
|
||||||
|
"-i", concat_path, "-c", "copy",
|
||||||
|
os.path.join(tmpdir, "video_aligned.mp4")
|
||||||
|
]
|
||||||
|
self._run_ffmpeg(cmd)
|
||||||
|
|
||||||
|
return self.mux_audio_video(
|
||||||
|
os.path.join(tmpdir, "video_aligned.mp4"),
|
||||||
|
audio_path, output_path, spec
|
||||||
|
)
|
||||||
|
|
||||||
|
# =================================================================
|
||||||
|
# Encoding
|
||||||
|
# =================================================================
|
||||||
|
|
||||||
|
def encode_web(
|
||||||
|
self,
|
||||||
|
input_path: str,
|
||||||
|
output_path: str,
|
||||||
|
spec: VideoSpec = VideoSpec(),
|
||||||
|
) -> str:
|
||||||
|
"""Encode for web playback (H.264 MP4 or WebM)."""
|
||||||
|
if spec.container == "webm":
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y", "-i", input_path,
|
||||||
|
"-c:v", "libvpx-vp9", "-crf", str(spec.crf), "-b:v", "0",
|
||||||
|
"-c:a", "libopus", "-b:a", spec.audio_bitrate,
|
||||||
|
"-pix_fmt", "yuv420p",
|
||||||
|
output_path
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y", "-i", input_path,
|
||||||
|
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||||
|
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
|
||||||
|
"-movflags", "+faststart", # Progressive download
|
||||||
|
"-pix_fmt", "yuv420p",
|
||||||
|
output_path
|
||||||
|
]
|
||||||
|
self._run_ffmpeg(cmd)
|
||||||
|
return output_path
|
||||||
|
|
||||||
|
# =================================================================
|
||||||
|
# Full Composition Pipeline
|
||||||
|
# =================================================================
|
||||||
|
|
||||||
|
def compose(
|
||||||
|
self,
|
||||||
|
images: List[str],
|
||||||
|
audio: Optional[str] = None,
|
||||||
|
output_path: str = "output.mp4",
|
||||||
|
spec: VideoSpec = VideoSpec(),
|
||||||
|
transition: Optional[Transition] = None,
|
||||||
|
ken_burns: Optional[KenBurnsConfig] = None,
|
||||||
|
beat_times: Optional[List[float]] = None,
|
||||||
|
) -> str:
|
||||||
|
"""Full composition pipeline: images → video → mux audio → encode.
|
||||||
|
|
||||||
|
This is the main entry point. Combines all pipeline components
|
||||||
|
into a single call.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
images: List of image paths
|
||||||
|
audio: Optional audio file path
|
||||||
|
output_path: Final output path
|
||||||
|
spec: Video specification
|
||||||
|
transition: Crossfade config
|
||||||
|
ken_burns: Ken Burns config (applied to each image before composition)
|
||||||
|
beat_times: Beat timestamps for audio-visual sync
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to final video
|
||||||
|
"""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
# Apply Ken Burns to each image if configured
|
||||||
|
processed_images = images
|
||||||
|
if ken_burns:
|
||||||
|
processed_images = []
|
||||||
|
for i, img in enumerate(images):
|
||||||
|
kb_path = os.path.join(tmpdir, f"kb_{i:04d}.mp4")
|
||||||
|
self.apply_ken_burns(img, kb_path, ken_burns, spec)
|
||||||
|
processed_images.append(kb_path)
|
||||||
|
|
||||||
|
# Compose images into video
|
||||||
|
video_path = os.path.join(tmpdir, "composed.mp4")
|
||||||
|
if processed_images and all(p.endswith('.mp4') for p in processed_images):
|
||||||
|
# All Ken Burns clips — concat them
|
||||||
|
self._concat_videos(processed_images, video_path, transition, spec)
|
||||||
|
else:
|
||||||
|
self.images_to_video(processed_images, video_path, spec, transition)
|
||||||
|
|
||||||
|
# Mux audio if provided
|
||||||
|
if audio:
|
||||||
|
if beat_times:
|
||||||
|
return self.align_to_beats(video_path, audio, beat_times, output_path, spec)
|
||||||
|
else:
|
||||||
|
return self.mux_audio_video(video_path, audio, output_path, spec)
|
||||||
|
else:
|
||||||
|
# Just copy the video
|
||||||
|
import shutil
|
||||||
|
shutil.copy2(video_path, output_path)
|
||||||
|
return output_path
|
||||||
|
|
||||||
|
def _concat_videos(
|
||||||
|
self,
|
||||||
|
video_paths: List[str],
|
||||||
|
output_path: str,
|
||||||
|
transition: Optional[Transition],
|
||||||
|
spec: VideoSpec,
|
||||||
|
) -> str:
|
||||||
|
"""Concatenate video files with optional transitions."""
|
||||||
|
if not transition:
|
||||||
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
||||||
|
concat_path = f.name
|
||||||
|
for vp in video_paths:
|
||||||
|
f.write(f"file '{os.path.abspath(vp)}'\n")
|
||||||
|
try:
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
|
||||||
|
"-i", concat_path, "-c", "copy", output_path
|
||||||
|
]
|
||||||
|
self._run_ffmpeg(cmd)
|
||||||
|
finally:
|
||||||
|
os.unlink(concat_path)
|
||||||
|
else:
|
||||||
|
# Use xfade for video clips too
|
||||||
|
n = len(video_paths)
|
||||||
|
inputs = []
|
||||||
|
for vp in video_paths:
|
||||||
|
inputs.extend(["-i", vp])
|
||||||
|
|
||||||
|
filter_parts = []
|
||||||
|
for i in range(n):
|
||||||
|
filter_parts.append(f"[{i}:v]fps={spec.fps},scale={spec.width}:{spec.height}[v{i}]")
|
||||||
|
|
||||||
|
fade_dur = transition.duration_sec
|
||||||
|
# Calculate offsets from durations
|
||||||
|
if n >= 2:
|
||||||
|
dur0 = self.get_duration(video_paths[0])
|
||||||
|
offset = dur0 - fade_dur
|
||||||
|
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
|
||||||
|
for i in range(1, n - 1):
|
||||||
|
duri = self.get_duration(video_paths[i])
|
||||||
|
offset += duri - fade_dur
|
||||||
|
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
|
||||||
|
filter_parts.append(f"[xf{n-2}]copy[outv]")
|
||||||
|
else:
|
||||||
|
filter_parts.append(f"[v0]copy[outv]")
|
||||||
|
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y", *inputs,
|
||||||
|
"-filter_complex", ";".join(filter_parts),
|
||||||
|
"-map", "[outv]",
|
||||||
|
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||||
|
output_path
|
||||||
|
]
|
||||||
|
self._run_ffmpeg(cmd)
|
||||||
|
return output_path
|
||||||
|
|
||||||
|
# =================================================================
|
||||||
|
# Streaming
|
||||||
|
# =================================================================
|
||||||
|
|
||||||
|
def create_hls_stream(
|
||||||
|
self,
|
||||||
|
input_path: str,
|
||||||
|
output_dir: str,
|
||||||
|
segment_duration: int = 4,
|
||||||
|
) -> str:
|
||||||
|
"""Create HLS streaming output for progressive playback.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to the .m3u8 playlist file
|
||||||
|
"""
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
playlist = os.path.join(output_dir, "playlist.m3u8")
|
||||||
|
|
||||||
|
cmd = [
|
||||||
|
self.ffmpeg, "-y", "-i", input_path,
|
||||||
|
"-c:v", "libx264", "-crf", "23", "-preset", "fast",
|
||||||
|
"-c:a", "aac", "-b:a", "128k",
|
||||||
|
"-f", "hls", "-hls_time", str(segment_duration),
|
||||||
|
"-hls_list_size", "0", "-hls_segment_filename",
|
||||||
|
os.path.join(output_dir, "seg_%03d.ts"),
|
||||||
|
playlist
|
||||||
|
]
|
||||||
|
self._run_ffmpeg(cmd)
|
||||||
|
return playlist
|
||||||
|
|
||||||
|
# =================================================================
|
||||||
|
# Internal
|
||||||
|
# =================================================================
|
||||||
|
|
||||||
|
def _run_ffmpeg(self, cmd: List[str], timeout: int = 600):
|
||||||
|
"""Run an FFmpeg command with logging and error handling."""
|
||||||
|
logger.info("FFmpeg: %s", " ".join(cmd[:8]) + "...")
|
||||||
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
||||||
|
if r.returncode != 0:
|
||||||
|
logger.error("FFmpeg failed: %s", r.stderr[-500:])
|
||||||
|
raise RuntimeError(f"FFmpeg error (exit {r.returncode}): {r.stderr[-500:]}")
|
||||||
|
return r.stdout
|
||||||
|
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Convenience functions
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def compose_video(
|
||||||
|
images: List[str],
|
||||||
|
audio: Optional[str] = None,
|
||||||
|
output: str = "output.mp4",
|
||||||
|
**kwargs,
|
||||||
|
) -> str:
|
||||||
|
"""One-call video composition from images and optional audio.
|
||||||
|
|
||||||
|
Convenience wrapper around FFmpegPipeline.compose().
|
||||||
|
"""
|
||||||
|
pipeline = FFmpegPipeline()
|
||||||
|
spec = kwargs.pop("spec", VideoSpec())
|
||||||
|
transition = kwargs.pop("transition", None)
|
||||||
|
ken_burns = kwargs.pop("ken_burns", None)
|
||||||
|
beat_times = kwargs.pop("beat_times", None)
|
||||||
|
return pipeline.compose(images, audio, output, spec, transition, ken_burns, beat_times)
|
||||||
Reference in New Issue
Block a user