Addresses review item #5 (test coverage gaps): - describe_image: oversize skip, valid JSON parse, truncated JSON repair, fallback - describe_image_sharegpt: oversize skip, natural language return - generate_training_pairs: JSONL format, ShareGPT format, dry run
334 lines
12 KiB
Python
334 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for generate_scene_descriptions.py
|
|
|
|
Tests the scene description generation pipeline including:
|
|
- Media file scanning
|
|
- Model detection
|
|
- JSON parsing from vision responses
|
|
- Output format validation
|
|
|
|
Ref: timmy-config#689
|
|
"""
|
|
|
|
import json
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
# Add scripts to path for import
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
|
|
|
|
from generate_scene_descriptions import (
|
|
IMAGE_EXTS,
|
|
VIDEO_EXTS,
|
|
ALL_EXTS,
|
|
VISION_MODELS,
|
|
auto_detect_model,
|
|
check_model_available,
|
|
scan_media,
|
|
extract_video_frame,
|
|
)
|
|
|
|
|
|
class TestMediaScanning(unittest.TestCase):
|
|
"""Test media file scanning."""
|
|
|
|
def test_scan_empty_directory(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
result = scan_media(tmpdir)
|
|
self.assertEqual(result, [])
|
|
|
|
def test_scan_nonexistent_directory(self):
|
|
result = scan_media("/nonexistent/path/that/does/not/exist")
|
|
self.assertEqual(result, [])
|
|
|
|
def test_scan_with_images(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Create test files
|
|
for ext in [".jpg", ".png", ".webp"]:
|
|
(Path(tmpdir) / f"test{ext}").touch()
|
|
|
|
result = scan_media(tmpdir)
|
|
self.assertEqual(len(result), 3)
|
|
|
|
def test_scan_recursive(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
subdir = Path(tmpdir) / "sub" / "dir"
|
|
subdir.mkdir(parents=True)
|
|
(subdir / "deep.jpg").touch()
|
|
(Path(tmpdir) / "top.png").touch()
|
|
|
|
result = scan_media(tmpdir)
|
|
self.assertEqual(len(result), 2)
|
|
|
|
def test_scan_ignores_unsupported(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
(Path(tmpdir) / "image.jpg").touch()
|
|
(Path(tmpdir) / "document.pdf").touch()
|
|
(Path(tmpdir) / "script.py").touch()
|
|
|
|
result = scan_media(tmpdir)
|
|
self.assertEqual(len(result), 1)
|
|
|
|
def test_scan_sorted_output(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
for name in ["z.jpg", "a.png", "m.webp"]:
|
|
(Path(tmpdir) / name).touch()
|
|
|
|
result = scan_media(tmpdir)
|
|
names = [p.name for p in result]
|
|
self.assertEqual(names, sorted(names))
|
|
|
|
|
|
class TestModelDetection(unittest.TestCase):
|
|
"""Test model availability detection."""
|
|
|
|
@patch('generate_scene_descriptions.urllib.request.urlopen')
|
|
def test_check_model_available(self, mock_urlopen):
|
|
mock_resp = MagicMock()
|
|
mock_resp.read.return_value = json.dumps({
|
|
"models": [{"name": "gemma4:latest"}]
|
|
}).encode()
|
|
mock_urlopen.return_value.__enter__ = MagicMock(return_value=mock_resp)
|
|
mock_urlopen.return_value.__exit__ = MagicMock(return_value=False)
|
|
mock_urlopen.return_value = mock_resp
|
|
|
|
result = check_model_available("gemma4:latest")
|
|
self.assertTrue(result)
|
|
|
|
@patch('generate_scene_descriptions.urllib.request.urlopen')
|
|
def test_check_model_not_available(self, mock_urlopen):
|
|
mock_resp = MagicMock()
|
|
mock_resp.read.return_value = json.dumps({
|
|
"models": [{"name": "llama2:7b"}]
|
|
}).encode()
|
|
mock_urlopen.return_value = mock_resp
|
|
|
|
result = check_model_available("gemma4:latest")
|
|
self.assertFalse(result)
|
|
|
|
@patch('generate_scene_descriptions.check_model_available')
|
|
def test_auto_detect_prefers_gemma4(self, mock_check):
|
|
def side_effect(model, url):
|
|
return model == "gemma4:latest"
|
|
mock_check.side_effect = side_effect
|
|
|
|
result = auto_detect_model()
|
|
self.assertEqual(result, "gemma4:latest")
|
|
|
|
@patch('generate_scene_descriptions.check_model_available')
|
|
def test_auto_detect_falls_back(self, mock_check):
|
|
def side_effect(model, url):
|
|
return model == "llava:latest"
|
|
mock_check.side_effect = side_effect
|
|
|
|
result = auto_detect_model()
|
|
self.assertEqual(result, "llava:latest")
|
|
|
|
@patch('generate_scene_descriptions.check_model_available')
|
|
def test_auto_detect_returns_none_when_no_models(self, mock_check):
|
|
mock_check.return_value = False
|
|
result = auto_detect_model()
|
|
self.assertIsNone(result)
|
|
|
|
|
|
class TestConstants(unittest.TestCase):
|
|
"""Test constant definitions."""
|
|
|
|
def test_image_extensions(self):
|
|
self.assertIn(".jpg", IMAGE_EXTS)
|
|
self.assertIn(".png", IMAGE_EXTS)
|
|
self.assertIn(".webp", IMAGE_EXTS)
|
|
|
|
def test_video_extensions(self):
|
|
self.assertIn(".mp4", VIDEO_EXTS)
|
|
self.assertIn(".webm", VIDEO_EXTS)
|
|
|
|
def test_all_extensions_union(self):
|
|
self.assertEqual(ALL_EXTS, IMAGE_EXTS | VIDEO_EXTS)
|
|
|
|
def test_vision_models_ordered(self):
|
|
self.assertEqual(VISION_MODELS[0], "gemma4:latest")
|
|
self.assertIn("llava:latest", VISION_MODELS)
|
|
|
|
|
|
class TestVideoFrameExtraction(unittest.TestCase):
|
|
"""Test video frame extraction."""
|
|
|
|
def test_extract_nonexistent_video(self):
|
|
result = extract_video_frame(Path("/nonexistent.mp4"), Path("/tmp/frame.jpg"))
|
|
self.assertFalse(result)
|
|
|
|
|
|
class TestDescribeImage(unittest.TestCase):
|
|
"""Test describe_image() with mocked Ollama responses."""
|
|
|
|
def test_skips_oversized_file(self):
|
|
"""Files exceeding MAX_FILE_SIZE should be skipped without API call."""
|
|
import generate_scene_descriptions
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(b"\x00" * (51 * 1024 * 1024))
|
|
f.flush()
|
|
result = generate_scene_descriptions.describe_image(Path(f.name))
|
|
Path(f.name).unlink()
|
|
self.assertIsNone(result)
|
|
|
|
@patch('generate_scene_descriptions.urllib.request.urlopen')
|
|
def test_parses_valid_json_response(self, mock_urlopen):
|
|
"""Valid JSON response should be parsed and returned."""
|
|
import generate_scene_descriptions
|
|
resp_data = {
|
|
"response": '{"mood": "calm", "colors": ["blue", "white"], "composition": "wide-shot", "camera": "static", "lighting": "natural", "description": "A serene ocean scene."}'
|
|
}
|
|
mock_resp = MagicMock()
|
|
mock_resp.read.return_value = json.dumps(resp_data).encode()
|
|
mock_urlopen.return_value = mock_resp
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(b"\xff\xd8\xff\xe0" + b"\x00" * 1000)
|
|
f.flush()
|
|
result = generate_scene_descriptions.describe_image(Path(f.name))
|
|
Path(f.name).unlink()
|
|
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result["mood"], "calm")
|
|
self.assertIn("lighting", result)
|
|
|
|
@patch('generate_scene_descriptions.urllib.request.urlopen')
|
|
def test_repair_truncated_json(self, mock_urlopen):
|
|
"""Truncated JSON should be repaired with regex extraction."""
|
|
import generate_scene_descriptions
|
|
resp_data = {
|
|
"response": '{"mood": "dark", "colors": ["red"], "composition": "close-up", "camera": "handheld", "lighting": "dramatic", "description": "A shadowy figure in a dimly lit alley'
|
|
}
|
|
mock_resp = MagicMock()
|
|
mock_resp.read.return_value = json.dumps(resp_data).encode()
|
|
mock_urlopen.return_value = mock_resp
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(b"\xff\xd8\xff\xe0" + b"\x00" * 1000)
|
|
f.flush()
|
|
result = generate_scene_descriptions.describe_image(Path(f.name))
|
|
Path(f.name).unlink()
|
|
|
|
self.assertIsNotNone(result)
|
|
self.assertEqual(result["mood"], "dark")
|
|
self.assertEqual(result["lighting"], "dramatic")
|
|
|
|
@patch('generate_scene_descriptions.urllib.request.urlopen')
|
|
def test_fallback_on_invalid_json(self, mock_urlopen):
|
|
"""Completely invalid JSON response should still return a fallback."""
|
|
import generate_scene_descriptions
|
|
resp_data = {"response": "This is just plain text describing a beautiful sunset over mountains."}
|
|
mock_resp = MagicMock()
|
|
mock_resp.read.return_value = json.dumps(resp_data).encode()
|
|
mock_urlopen.return_value = mock_resp
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(b"\xff\xd8\xff\xe0" + b"\x00" * 1000)
|
|
f.flush()
|
|
result = generate_scene_descriptions.describe_image(Path(f.name))
|
|
Path(f.name).unlink()
|
|
|
|
self.assertIsNotNone(result)
|
|
self.assertIn("description", result)
|
|
self.assertIn("lighting", result)
|
|
|
|
|
|
class TestDescribeImageSharegpt(unittest.TestCase):
|
|
"""Test describe_image_sharegpt() with mocked Ollama responses."""
|
|
|
|
def test_skips_oversized_file(self):
|
|
"""Files exceeding MAX_FILE_SIZE should be skipped."""
|
|
import generate_scene_descriptions
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(b"\x00" * (51 * 1024 * 1024))
|
|
f.flush()
|
|
result = generate_scene_descriptions.describe_image_sharegpt(Path(f.name))
|
|
Path(f.name).unlink()
|
|
self.assertIsNone(result)
|
|
|
|
@patch('generate_scene_descriptions.urllib.request.urlopen')
|
|
def test_returns_natural_language(self, mock_urlopen):
|
|
"""Should return the raw response text."""
|
|
import generate_scene_descriptions
|
|
resp_data = {"response": "A warm sunset over rolling hills with golden light."}
|
|
mock_resp = MagicMock()
|
|
mock_resp.read.return_value = json.dumps(resp_data).encode()
|
|
mock_urlopen.return_value = mock_resp
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(b"\xff\xd8\xff\xe0" + b"\x00" * 1000)
|
|
f.flush()
|
|
result = generate_scene_descriptions.describe_image_sharegpt(Path(f.name))
|
|
Path(f.name).unlink()
|
|
|
|
self.assertIsNotNone(result)
|
|
self.assertIn("sunset", result)
|
|
|
|
|
|
class TestGenerateTrainingPairs(unittest.TestCase):
|
|
"""Test generate_training_pairs() orchestration."""
|
|
|
|
@patch('generate_scene_descriptions.describe_image')
|
|
def test_jsonl_output_format(self, mock_describe):
|
|
"""JSONL format should produce structured description objects."""
|
|
import generate_scene_descriptions
|
|
mock_describe.return_value = {"mood": "calm", "description": "Test"}
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(b"\x00" * 1000)
|
|
f.flush()
|
|
pairs = generate_scene_descriptions.generate_training_pairs(
|
|
[Path(f.name)], "test-model", "http://localhost:11434",
|
|
output_format="jsonl"
|
|
)
|
|
Path(f.name).unlink()
|
|
|
|
self.assertEqual(len(pairs), 1)
|
|
self.assertIn("description", pairs[0])
|
|
self.assertIn("generated_at", pairs[0])
|
|
|
|
@patch('generate_scene_descriptions.describe_image_sharegpt')
|
|
def test_sharegpt_output_format(self, mock_describe):
|
|
"""ShareGPT format should produce conversation objects."""
|
|
import generate_scene_descriptions
|
|
mock_describe.return_value = "A description of the scene."
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(b"\x00" * 1000)
|
|
f.flush()
|
|
pairs = generate_scene_descriptions.generate_training_pairs(
|
|
[Path(f.name)], "test-model", "http://localhost:11434",
|
|
output_format="sharegpt"
|
|
)
|
|
Path(f.name).unlink()
|
|
|
|
self.assertEqual(len(pairs), 1)
|
|
self.assertIn("conversations", pairs[0])
|
|
self.assertEqual(len(pairs[0]["conversations"]), 2)
|
|
|
|
@patch('generate_scene_descriptions.describe_image')
|
|
def test_dry_run_skips_api_calls(self, mock_describe):
|
|
"""Dry run should not call describe_image."""
|
|
import generate_scene_descriptions
|
|
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
|
|
f.write(b"\x00" * 1000)
|
|
f.flush()
|
|
pairs = generate_scene_descriptions.generate_training_pairs(
|
|
[Path(f.name)], "test-model", "http://localhost:11434",
|
|
dry_run=True
|
|
)
|
|
Path(f.name).unlink()
|
|
|
|
mock_describe.assert_not_called()
|
|
self.assertEqual(len(pairs), 1)
|
|
self.assertEqual(pairs[0]["status"], "dry-run")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|