229 lines
8.2 KiB
Python
229 lines
8.2 KiB
Python
"""Unit tests for content.extraction.clipper."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
from content.extraction.clipper import (
|
|
ClipResult,
|
|
_build_ffmpeg_cmd,
|
|
_ffmpeg_available,
|
|
extract_clip,
|
|
extract_clips,
|
|
)
|
|
|
|
# ── _ffmpeg_available ─────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestFfmpegAvailable:
|
|
def test_returns_bool(self):
|
|
result = _ffmpeg_available()
|
|
assert isinstance(result, bool)
|
|
|
|
def test_false_when_shutil_finds_nothing(self):
|
|
with patch("content.extraction.clipper.shutil.which", return_value=None):
|
|
assert _ffmpeg_available() is False
|
|
|
|
def test_true_when_shutil_finds_ffmpeg(self):
|
|
with patch("content.extraction.clipper.shutil.which", return_value="/usr/bin/ffmpeg"):
|
|
assert _ffmpeg_available() is True
|
|
|
|
|
|
# ── _build_ffmpeg_cmd ─────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestBuildFfmpegCmd:
|
|
def test_returns_list(self):
|
|
cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4")
|
|
assert isinstance(cmd, list)
|
|
|
|
def test_starts_with_ffmpeg(self):
|
|
cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4")
|
|
assert cmd[0] == "ffmpeg"
|
|
|
|
def test_contains_source_path(self):
|
|
cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4")
|
|
assert "/src.mp4" in cmd
|
|
|
|
def test_contains_output_path(self):
|
|
cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4")
|
|
assert "/out.mp4" in cmd
|
|
|
|
def test_duration_is_end_minus_start(self):
|
|
cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4")
|
|
t_idx = cmd.index("-t")
|
|
assert float(cmd[t_idx + 1]) == pytest.approx(20.0)
|
|
|
|
def test_start_time_in_seek(self):
|
|
cmd = _build_ffmpeg_cmd("/src.mp4", 5.5, 15.5, "/out.mp4")
|
|
ss_idx = cmd.index("-ss")
|
|
assert float(cmd[ss_idx + 1]) == pytest.approx(5.5)
|
|
|
|
def test_overwrite_flag_present(self):
|
|
cmd = _build_ffmpeg_cmd("/src.mp4", 0, 10, "/out.mp4")
|
|
assert "-y" in cmd
|
|
|
|
|
|
# ── extract_clip ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestExtractClip:
|
|
@pytest.mark.asyncio
|
|
async def test_returns_failure_when_ffmpeg_missing(self):
|
|
with patch("content.extraction.clipper._ffmpeg_available", return_value=False):
|
|
result = await extract_clip(
|
|
{"highlight_id": "h1", "source_path": "/a.mp4", "start_time": 0, "end_time": 5}
|
|
)
|
|
assert result.success is False
|
|
assert "ffmpeg" in result.error.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_failure_when_source_missing(self, tmp_path):
|
|
with patch("content.extraction.clipper._ffmpeg_available", return_value=True):
|
|
result = await extract_clip(
|
|
{
|
|
"highlight_id": "h1",
|
|
"source_path": str(tmp_path / "nonexistent.mp4"),
|
|
"start_time": 0,
|
|
"end_time": 5,
|
|
}
|
|
)
|
|
assert result.success is False
|
|
assert "source_path" in result.error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_failure_when_invalid_time_range(self, tmp_path):
|
|
src = tmp_path / "src.mp4"
|
|
src.write_bytes(b"fake")
|
|
with patch("content.extraction.clipper._ffmpeg_available", return_value=True):
|
|
result = await extract_clip(
|
|
{
|
|
"highlight_id": "h1",
|
|
"source_path": str(src),
|
|
"start_time": 30,
|
|
"end_time": 10, # end < start
|
|
}
|
|
)
|
|
assert result.success is False
|
|
assert "invalid time range" in result.error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_successful_extraction(self, tmp_path):
|
|
src = tmp_path / "src.mp4"
|
|
src.write_bytes(b"fake video")
|
|
|
|
mock_proc = AsyncMock()
|
|
mock_proc.returncode = 0
|
|
mock_proc.communicate = AsyncMock(return_value=(b"", b""))
|
|
|
|
with (
|
|
patch("content.extraction.clipper._ffmpeg_available", return_value=True),
|
|
patch(
|
|
"asyncio.create_subprocess_exec",
|
|
return_value=mock_proc,
|
|
),
|
|
):
|
|
result = await extract_clip(
|
|
{
|
|
"highlight_id": "h1",
|
|
"source_path": str(src),
|
|
"start_time": 0,
|
|
"end_time": 10,
|
|
},
|
|
output_dir=str(tmp_path),
|
|
)
|
|
|
|
assert result.success is True
|
|
assert result.highlight_id == "h1"
|
|
assert result.duration == pytest.approx(10.0)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ffmpeg_nonzero_exit_returns_failure(self, tmp_path):
|
|
src = tmp_path / "src.mp4"
|
|
src.write_bytes(b"fake")
|
|
|
|
mock_proc = AsyncMock()
|
|
mock_proc.returncode = 1
|
|
mock_proc.communicate = AsyncMock(return_value=(b"", b"encoding error"))
|
|
|
|
with (
|
|
patch("content.extraction.clipper._ffmpeg_available", return_value=True),
|
|
patch("asyncio.create_subprocess_exec", return_value=mock_proc),
|
|
):
|
|
result = await extract_clip(
|
|
{
|
|
"highlight_id": "h2",
|
|
"source_path": str(src),
|
|
"start_time": 0,
|
|
"end_time": 5,
|
|
},
|
|
output_dir=str(tmp_path),
|
|
)
|
|
assert result.success is False
|
|
assert result.error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_returns_failure(self, tmp_path):
|
|
src = tmp_path / "src.mp4"
|
|
src.write_bytes(b"fake")
|
|
|
|
async def _slow_communicate():
|
|
await asyncio.sleep(1000)
|
|
|
|
mock_proc = AsyncMock()
|
|
mock_proc.returncode = None
|
|
mock_proc.communicate = _slow_communicate
|
|
|
|
with (
|
|
patch("content.extraction.clipper._ffmpeg_available", return_value=True),
|
|
patch("asyncio.create_subprocess_exec", return_value=mock_proc),
|
|
patch("asyncio.wait_for", side_effect=TimeoutError),
|
|
):
|
|
result = await extract_clip(
|
|
{
|
|
"highlight_id": "h3",
|
|
"source_path": str(src),
|
|
"start_time": 0,
|
|
"end_time": 5,
|
|
},
|
|
output_dir=str(tmp_path),
|
|
)
|
|
assert result.success is False
|
|
assert "timed out" in result.error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_uses_default_highlight_id_when_missing(self):
|
|
with patch("content.extraction.clipper._ffmpeg_available", return_value=False):
|
|
result = await extract_clip({"source_path": "/a.mp4", "start_time": 0, "end_time": 5})
|
|
assert result.highlight_id == "unknown"
|
|
|
|
|
|
# ── extract_clips ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestExtractClips:
|
|
@pytest.mark.asyncio
|
|
async def test_returns_list_of_results(self):
|
|
async def _fake_extract(h, output_dir=None):
|
|
return ClipResult(highlight_id=h["highlight_id"], success=True, duration=5.0)
|
|
|
|
with patch("content.extraction.clipper.extract_clip", side_effect=_fake_extract):
|
|
results = await extract_clips(
|
|
[
|
|
{"highlight_id": "a", "source_path": "", "start_time": 0, "end_time": 5},
|
|
{"highlight_id": "b", "source_path": "", "start_time": 5, "end_time": 10},
|
|
]
|
|
)
|
|
|
|
assert len(results) == 2
|
|
assert results[0].highlight_id == "a"
|
|
assert results[1].highlight_id == "b"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_list_returns_empty(self):
|
|
results = await extract_clips([])
|
|
assert results == []
|