forked from Rockachopa/Timmy-time-dashboard
This commit is contained in:
230
tests/unit/test_content_clipper.py
Normal file
230
tests/unit/test_content_clipper.py
Normal file
@@ -0,0 +1,230 @@
|
||||
"""Unit tests for content.extraction.clipper."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from content.extraction.clipper import (
|
||||
ClipResult,
|
||||
_build_ffmpeg_cmd,
|
||||
_ffmpeg_available,
|
||||
extract_clip,
|
||||
extract_clips,
|
||||
)
|
||||
|
||||
# ── _ffmpeg_available ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestFfmpegAvailable:
|
||||
def test_returns_bool(self):
|
||||
result = _ffmpeg_available()
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_false_when_shutil_finds_nothing(self):
|
||||
with patch("content.extraction.clipper.shutil.which", return_value=None):
|
||||
assert _ffmpeg_available() is False
|
||||
|
||||
def test_true_when_shutil_finds_ffmpeg(self):
|
||||
with patch("content.extraction.clipper.shutil.which", return_value="/usr/bin/ffmpeg"):
|
||||
assert _ffmpeg_available() is True
|
||||
|
||||
|
||||
# ── _build_ffmpeg_cmd ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestBuildFfmpegCmd:
|
||||
def test_returns_list(self):
|
||||
cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4")
|
||||
assert isinstance(cmd, list)
|
||||
|
||||
def test_starts_with_ffmpeg(self):
|
||||
cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4")
|
||||
assert cmd[0] == "ffmpeg"
|
||||
|
||||
def test_contains_source_path(self):
|
||||
cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4")
|
||||
assert "/src.mp4" in cmd
|
||||
|
||||
def test_contains_output_path(self):
|
||||
cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4")
|
||||
assert "/out.mp4" in cmd
|
||||
|
||||
def test_duration_is_end_minus_start(self):
|
||||
cmd = _build_ffmpeg_cmd("/src.mp4", 10.0, 30.0, "/out.mp4")
|
||||
t_idx = cmd.index("-t")
|
||||
assert float(cmd[t_idx + 1]) == pytest.approx(20.0)
|
||||
|
||||
def test_start_time_in_seek(self):
|
||||
cmd = _build_ffmpeg_cmd("/src.mp4", 5.5, 15.5, "/out.mp4")
|
||||
ss_idx = cmd.index("-ss")
|
||||
assert float(cmd[ss_idx + 1]) == pytest.approx(5.5)
|
||||
|
||||
def test_overwrite_flag_present(self):
|
||||
cmd = _build_ffmpeg_cmd("/src.mp4", 0, 10, "/out.mp4")
|
||||
assert "-y" in cmd
|
||||
|
||||
|
||||
# ── extract_clip ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestExtractClip:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_failure_when_ffmpeg_missing(self):
|
||||
with patch("content.extraction.clipper._ffmpeg_available", return_value=False):
|
||||
result = await extract_clip(
|
||||
{"highlight_id": "h1", "source_path": "/a.mp4", "start_time": 0, "end_time": 5}
|
||||
)
|
||||
assert result.success is False
|
||||
assert "ffmpeg" in result.error.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_failure_when_source_missing(self, tmp_path):
|
||||
with patch("content.extraction.clipper._ffmpeg_available", return_value=True):
|
||||
result = await extract_clip(
|
||||
{
|
||||
"highlight_id": "h1",
|
||||
"source_path": str(tmp_path / "nonexistent.mp4"),
|
||||
"start_time": 0,
|
||||
"end_time": 5,
|
||||
}
|
||||
)
|
||||
assert result.success is False
|
||||
assert "source_path" in result.error
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_failure_when_invalid_time_range(self, tmp_path):
|
||||
src = tmp_path / "src.mp4"
|
||||
src.write_bytes(b"fake")
|
||||
with patch("content.extraction.clipper._ffmpeg_available", return_value=True):
|
||||
result = await extract_clip(
|
||||
{
|
||||
"highlight_id": "h1",
|
||||
"source_path": str(src),
|
||||
"start_time": 30,
|
||||
"end_time": 10, # end < start
|
||||
}
|
||||
)
|
||||
assert result.success is False
|
||||
assert "invalid time range" in result.error
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_successful_extraction(self, tmp_path):
|
||||
src = tmp_path / "src.mp4"
|
||||
src.write_bytes(b"fake video")
|
||||
|
||||
mock_proc = AsyncMock()
|
||||
mock_proc.returncode = 0
|
||||
mock_proc.communicate = AsyncMock(return_value=(b"", b""))
|
||||
|
||||
with (
|
||||
patch("content.extraction.clipper._ffmpeg_available", return_value=True),
|
||||
patch(
|
||||
"asyncio.create_subprocess_exec",
|
||||
return_value=mock_proc,
|
||||
),
|
||||
):
|
||||
result = await extract_clip(
|
||||
{
|
||||
"highlight_id": "h1",
|
||||
"source_path": str(src),
|
||||
"start_time": 0,
|
||||
"end_time": 10,
|
||||
},
|
||||
output_dir=str(tmp_path),
|
||||
)
|
||||
|
||||
assert result.success is True
|
||||
assert result.highlight_id == "h1"
|
||||
assert result.duration == pytest.approx(10.0)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ffmpeg_nonzero_exit_returns_failure(self, tmp_path):
|
||||
src = tmp_path / "src.mp4"
|
||||
src.write_bytes(b"fake")
|
||||
|
||||
mock_proc = AsyncMock()
|
||||
mock_proc.returncode = 1
|
||||
mock_proc.communicate = AsyncMock(return_value=(b"", b"encoding error"))
|
||||
|
||||
with (
|
||||
patch("content.extraction.clipper._ffmpeg_available", return_value=True),
|
||||
patch("asyncio.create_subprocess_exec", return_value=mock_proc),
|
||||
):
|
||||
result = await extract_clip(
|
||||
{
|
||||
"highlight_id": "h2",
|
||||
"source_path": str(src),
|
||||
"start_time": 0,
|
||||
"end_time": 5,
|
||||
},
|
||||
output_dir=str(tmp_path),
|
||||
)
|
||||
assert result.success is False
|
||||
assert result.error
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_timeout_returns_failure(self, tmp_path):
|
||||
src = tmp_path / "src.mp4"
|
||||
src.write_bytes(b"fake")
|
||||
|
||||
async def _slow_communicate():
|
||||
await asyncio.sleep(1000)
|
||||
|
||||
mock_proc = AsyncMock()
|
||||
mock_proc.returncode = None
|
||||
mock_proc.communicate = _slow_communicate
|
||||
|
||||
with (
|
||||
patch("content.extraction.clipper._ffmpeg_available", return_value=True),
|
||||
patch("asyncio.create_subprocess_exec", return_value=mock_proc),
|
||||
patch("asyncio.wait_for", side_effect=TimeoutError),
|
||||
):
|
||||
result = await extract_clip(
|
||||
{
|
||||
"highlight_id": "h3",
|
||||
"source_path": str(src),
|
||||
"start_time": 0,
|
||||
"end_time": 5,
|
||||
},
|
||||
output_dir=str(tmp_path),
|
||||
)
|
||||
assert result.success is False
|
||||
assert "timed out" in result.error
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_uses_default_highlight_id_when_missing(self):
|
||||
with patch("content.extraction.clipper._ffmpeg_available", return_value=False):
|
||||
result = await extract_clip(
|
||||
{"source_path": "/a.mp4", "start_time": 0, "end_time": 5}
|
||||
)
|
||||
assert result.highlight_id == "unknown"
|
||||
|
||||
|
||||
# ── extract_clips ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestExtractClips:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_list_of_results(self):
|
||||
async def _fake_extract(h, output_dir=None):
|
||||
return ClipResult(highlight_id=h["highlight_id"], success=True, duration=5.0)
|
||||
|
||||
with patch("content.extraction.clipper.extract_clip", side_effect=_fake_extract):
|
||||
results = await extract_clips(
|
||||
[
|
||||
{"highlight_id": "a", "source_path": "", "start_time": 0, "end_time": 5},
|
||||
{"highlight_id": "b", "source_path": "", "start_time": 5, "end_time": 10},
|
||||
]
|
||||
)
|
||||
|
||||
assert len(results) == 2
|
||||
assert results[0].highlight_id == "a"
|
||||
assert results[1].highlight_id == "b"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_list_returns_empty(self):
|
||||
results = await extract_clips([])
|
||||
assert results == []
|
||||
Reference in New Issue
Block a user