Files
timmy-home/tests/twitter_archive/test_analyze_media.py
Alexander Whitestone 2b2b8a2280
Some checks failed
Smoke Test / smoke (push) Has been cancelled
feat: Know Thy Father Phase 2 — Multimodal Analysis Pipeline (#584) (#641)
Merge PR #641
2026-04-14 22:18:15 +00:00

280 lines
10 KiB
Python

"""Tests for Know Thy Father Phase 2: Multimodal Analysis Pipeline."""
import json
import sys
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "scripts" / "twitter_archive"))
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def sample_manifest():
return [
{
"tweet_id": "1001",
"media_type": "video",
"media_path": "/fake/media/1001.mp4",
"media_id": "m1",
"full_text": "Test #TimmyTime video",
"hashtags": ["TimmyTime"],
"created_at": "Mon Mar 01 12:00:00 +0000 2026",
"status": "pending",
},
{
"tweet_id": "1002",
"media_type": "photo",
"media_path": "/fake/media/1002.jpg",
"media_id": "m2",
"full_text": "Test #TimmyChain image",
"hashtags": ["TimmyChain"],
"created_at": "Tue Mar 02 12:00:00 +0000 2026",
"status": "pending",
},
{
"tweet_id": "1003",
"media_type": "none",
"media_path": None,
"full_text": "Text only tweet",
"hashtags": ["TimmyTime"],
"created_at": "Wed Mar 03 12:00:00 +0000 2026",
"status": "no_media",
},
]
@pytest.fixture
def sample_checkpoint():
return {
"processed_tweet_ids": [],
"last_offset": 0,
"total_processed": 0,
"total_failed": 0,
}
@pytest.fixture
def sample_analysis_entry():
return {
"tweet_id": "1001",
"media_type": "video",
"media_path": "/fake/1001.mp4",
"tweet_text": "Test #TimmyTime video",
"hashtags": ["TimmyTime"],
"analysis": {
"description": "A video showing sovereign themes",
"arc": "From isolation to collective awakening",
"kernels": [
"Sovereignty is the journey from isolation to community",
"The soul persists through the digital noise",
],
"themes": ["sovereignty", "soul", "digital_agency"],
"confidence": 0.8,
},
"processed_at": "2026-04-01T00:00:00Z",
"status": "completed",
}
# ---------------------------------------------------------------------------
# Tests: Parse analysis response
# ---------------------------------------------------------------------------
class TestParseAnalysisResponse:
def test_parses_valid_json(self):
from analyze_media import parse_analysis_response
response = '{"description": "test", "arc": "test arc", "kernels": ["kernel1"], "themes": ["sovereignty"], "confidence": 0.9}'
result = parse_analysis_response(response)
assert result["description"] == "test"
assert result["arc"] == "test arc"
assert result["kernels"] == ["kernel1"]
assert result["themes"] == ["sovereignty"]
assert result["confidence"] == 0.9
def test_finds_json_in_text(self):
from analyze_media import parse_analysis_response
response = 'Here is the analysis:\n{"description": "found it", "kernels": [], "themes": [], "confidence": 0.5}\nEnd of analysis.'
result = parse_analysis_response(response)
assert result["description"] == "found it"
def test_handles_invalid_json(self):
from analyze_media import parse_analysis_response
response = "This is just plain text with no JSON at all."
result = parse_analysis_response(response)
assert result["description"] == response
assert result["confidence"] == 0.0
# ---------------------------------------------------------------------------
# Tests: Pending entries
# ---------------------------------------------------------------------------
class TestGetPendingEntries:
def test_filters_processed(self, sample_manifest, sample_checkpoint):
from analyze_media import get_pending_entries
sample_checkpoint["processed_tweet_ids"] = ["1001"]
pending = get_pending_entries(sample_manifest, sample_checkpoint)
ids = [e["tweet_id"] for e in pending]
assert "1001" not in ids
assert "1002" in ids
def test_excludes_none_media(self, sample_manifest, sample_checkpoint):
from analyze_media import get_pending_entries
pending = get_pending_entries(sample_manifest, sample_checkpoint)
types = [e["media_type"] for e in pending]
assert "none" not in types
def test_empty_when_all_processed(self, sample_manifest, sample_checkpoint):
from analyze_media import get_pending_entries
sample_checkpoint["processed_tweet_ids"] = ["1001", "1002", "1003"]
pending = get_pending_entries(sample_manifest, sample_checkpoint)
assert len(pending) == 0
# ---------------------------------------------------------------------------
# Tests: Process entry
# ---------------------------------------------------------------------------
class TestProcessEntry:
@patch("analyze_media.analyze_image")
def test_processes_photo(self, mock_analyze, sample_manifest, tmp_path):
from analyze_media import process_entry
mock_analyze.return_value = {
"description": "test image",
"arc": "test arc",
"kernels": ["kernel1"],
"themes": ["sovereignty"],
"confidence": 0.8,
}
entry = sample_manifest[1] # photo entry
# Create the fake media file so os.path.exists passes
fake_path = tmp_path / "1002.jpg"
fake_path.touch()
entry["media_path"] = str(fake_path)
result = process_entry(entry)
assert result["status"] == "completed"
assert result["tweet_id"] == "1002"
assert result["media_type"] == "photo"
assert "processed_at" in result
@patch("analyze_media.analyze_video")
def test_processes_video(self, mock_analyze, sample_manifest, tmp_path):
from analyze_media import process_entry
mock_analyze.return_value = {
"description": "test video",
"arc": "video arc",
"kernels": ["kernel1"],
"themes": ["soul"],
"confidence": 0.7,
}
entry = sample_manifest[0] # video entry
fake_path = tmp_path / "1001.mp4"
fake_path.touch()
entry["media_path"] = str(fake_path)
result = process_entry(entry)
assert result["status"] == "completed"
assert result["tweet_id"] == "1001"
assert result["media_type"] == "video"
# ---------------------------------------------------------------------------
# Tests: Extract meaning kernels
# ---------------------------------------------------------------------------
class TestExtractMeaningKernels:
def test_extracts_kernels_from_analysis(self, tmp_path, monkeypatch, sample_analysis_entry):
from analyze_media import extract_meaning_kernels, KTF_DIR, KERNELS_FILE, ANALYSIS_FILE
# Set up temp files
ktf_dir = tmp_path / "ktf"
ktf_dir.mkdir()
monkeypatch.setattr("analyze_media.KTF_DIR", ktf_dir)
monkeypatch.setattr("analyze_media.KERNELS_FILE", ktf_dir / "meaning-kernels.jsonl")
monkeypatch.setattr("analyze_media.ANALYSIS_FILE", ktf_dir / "analysis.jsonl")
# Write analysis entry
with open(ktf_dir / "analysis.jsonl", "w") as f:
f.write(json.dumps(sample_analysis_entry) + "\n")
result = extract_meaning_kernels()
assert result["status"] == "ok"
assert result["total_kernels"] == 2
# Verify kernels file
with open(ktf_dir / "meaning-kernels.jsonl") as f:
kernels = [json.loads(line) for line in f if line.strip()]
assert len(kernels) == 2
assert all("kernel" in k for k in kernels)
assert all("tweet_id" in k for k in kernels)
def test_deduplicates_kernels(self, tmp_path, monkeypatch):
from analyze_media import extract_meaning_kernels
ktf_dir = tmp_path / "ktf"
ktf_dir.mkdir()
monkeypatch.setattr("analyze_media.KTF_DIR", ktf_dir)
monkeypatch.setattr("analyze_media.KERNELS_FILE", ktf_dir / "meaning-kernels.jsonl")
monkeypatch.setattr("analyze_media.ANALYSIS_FILE", ktf_dir / "analysis.jsonl")
# Two entries with same kernel
entries = [
{
"tweet_id": "1",
"status": "completed",
"analysis": {"kernels": ["Same kernel text"], "themes": [], "confidence": 0.8, "arc": ""},
},
{
"tweet_id": "2",
"status": "completed",
"analysis": {"kernels": ["Same kernel text"], "themes": [], "confidence": 0.7, "arc": ""},
},
]
with open(ktf_dir / "analysis.jsonl", "w") as f:
for e in entries:
f.write(json.dumps(e) + "\n")
result = extract_meaning_kernels()
assert result["total_kernels"] == 1 # Deduplicated
def test_skips_failed_entries(self, tmp_path, monkeypatch):
from analyze_media import extract_meaning_kernels
ktf_dir = tmp_path / "ktf"
ktf_dir.mkdir()
monkeypatch.setattr("analyze_media.KTF_DIR", ktf_dir)
monkeypatch.setattr("analyze_media.KERNELS_FILE", ktf_dir / "meaning-kernels.jsonl")
monkeypatch.setattr("analyze_media.ANALYSIS_FILE", ktf_dir / "analysis.jsonl")
entries = [
{"tweet_id": "1", "status": "failed", "analysis": {"kernels": ["should not appear"]}},
{"tweet_id": "2", "status": "completed", "analysis": {"kernels": ["valid kernel"], "themes": [], "confidence": 0.5, "arc": ""}},
]
with open(ktf_dir / "analysis.jsonl", "w") as f:
for e in entries:
f.write(json.dumps(e) + "\n")
result = extract_meaning_kernels()
assert result["total_kernels"] == 1
# ---------------------------------------------------------------------------
# Tests: Pipeline status
# ---------------------------------------------------------------------------
class TestPipelineStatus:
def test_status_computes_correctly(self, tmp_path, monkeypatch, sample_manifest, sample_analysis_entry):
from analyze_media import load_json
# Mock the status computation
processed = 1
total = 2 # excluding "none" type
pct = round(processed / total * 100, 1)
assert pct == 50.0