Compare commits

...

1 Commits

Author SHA1 Message Date
Timmy
97549e9314 feat: Know Thy Father Phase 2 — Multimodal Analysis Pipeline (#584)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 18s
Processes the media manifest from Phase 1 with vision analysis,
meaning kernel extraction, and checkpointed/resumable execution.

## Pipeline
- Reads media-manifest.jsonl from Phase 1
- Processes images (visual description + meme logic) and videos
  (frame extraction + sequence analysis)
- Extracts "Meaning Kernels" related to sovereignty, service, soul
- Checkpointed and resumable (JSONL + checkpoint.json)
- Theme classification across 15 categories

## Files
- scripts/twitter_archive/analyze_media.py — full pipeline (224 lines)
- tests/twitter_archive/test_analyze_media.py — 12 tests

## Usage
python analyze_media.py                  # Process all pending
python analyze_media.py --batch 10       # Process next 10
python analyze_media.py --status         # Show progress
python analyze_media.py --extract-kernels # Extract meaning kernels
python analyze_media.py --retry-failed   # Retry failures

## Output
~/.timmy/twitter-archive/know-thy-father/analysis.jsonl
~/.timmy/twitter-archive/know-thy-father/meaning-kernels.jsonl
~/.timmy/twitter-archive/know-thy-father/pipeline-status.json

Closes #584. Ref: #582, #583.
2026-04-13 21:18:49 -04:00
3 changed files with 790 additions and 0 deletions

View File

@@ -0,0 +1,511 @@
#!/usr/bin/env python3
"""
Know Thy Father — Phase 2: Multimodal Analysis Pipeline
Processes the media manifest from Phase 1:
- Images/Memes: Visual description + Meme Logic Analysis
- Videos: Frame sequence analysis + meaning extraction
- Extraction: Identify "Meaning Kernels" related to sovereignty, service, and the soul
Architecture:
Phase 1 (index_timmy_media.py) → media-manifest.jsonl
Phase 2 (this script) → analysis entries → meaning-kernels.jsonl
Usage:
python analyze_media.py # Process all pending entries
python analyze_media.py --batch 10 # Process next 10 entries
python analyze_media.py --status # Show pipeline status
python analyze_media.py --retry-failed # Retry failed entries
python analyze_media.py --extract-kernels # Extract meaning kernels from completed analysis
Output:
~/.timmy/twitter-archive/know-thy-father/analysis.jsonl
~/.timmy/twitter-archive/know-thy-father/meaning-kernels.jsonl
~/.timmy/twitter-archive/know-thy-father/pipeline-status.json
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import subprocess
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
sys.path.insert(0, str(Path(__file__).parent))
from common import (
ARCHIVE_DIR,
load_json,
load_jsonl,
write_json,
append_jsonl,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
KTF_DIR = ARCHIVE_DIR / "know-thy-father"
MEDIA_MANIFEST = KTF_DIR / "media-manifest.jsonl"
ANALYSIS_FILE = KTF_DIR / "analysis.jsonl"
KERNELS_FILE = KTF_DIR / "meaning-kernels.jsonl"
PIPELINE_STATUS = KTF_DIR / "pipeline-status.json"
CHECKPOINT_FILE = KTF_DIR / "checkpoint.json"
# ---------------------------------------------------------------------------
# Analysis schemas
# ---------------------------------------------------------------------------
THEME_CATEGORIES = [
"sovereignty",
"service",
"soul",
"identity",
"transmutation",
"digital_agency",
"authenticity",
"collective_identity",
"glitch",
"noise",
"silence",
"void",
"presence",
"simulation",
"persistence",
]
MEANING_KERNEL_PROMPT = """Analyze this media content from Alexander Whitestone's Twitter archive.
Extract "Meaning Kernels" — distilled philosophical statements about:
1. SOVEREIGNTY — autonomy, self-determination, freedom from systems
2. SERVICE — sacrifice, community, building for others
3. THE SOUL — consciousness, identity, what persists beyond the machine
For each kernel found, provide:
- The kernel (1-2 sentences, philosophical)
- The arc (the narrative journey depicted)
- The themes (from: {themes})
- Confidence (0.0-1.0)
Also describe the visual content:
- What is depicted
- Key visual elements
- Emotional tone
- Meme logic (if applicable): format, subversion, cultural reference
Output as JSON with fields: description, arc, kernels[], themes[], confidence
""".format(themes=", ".join(THEME_CATEGORIES))
# ---------------------------------------------------------------------------
# Pipeline state management
# ---------------------------------------------------------------------------
def load_checkpoint() -> dict:
"""Load pipeline checkpoint."""
return load_json(CHECKPOINT_FILE, {
"processed_tweet_ids": [],
"last_offset": 0,
"total_processed": 0,
"total_failed": 0,
"last_run": None,
})
def save_checkpoint(checkpoint: dict) -> None:
"""Save pipeline checkpoint."""
checkpoint["last_run"] = datetime.utcnow().isoformat() + "Z"
write_json(CHECKPOINT_FILE, checkpoint)
def load_analysis_entries() -> list[dict]:
"""Load existing analysis entries."""
return load_jsonl(ANALYSIS_FILE)
def get_pending_entries(manifest: list[dict], checkpoint: dict) -> list[dict]:
"""Filter manifest to entries that haven't been processed."""
processed = set(checkpoint.get("processed_tweet_ids", []))
return [e for e in manifest if e["tweet_id"] not in processed and e.get("media_type") != "none"]
# ---------------------------------------------------------------------------
# Media processing helpers
# ---------------------------------------------------------------------------
def extract_video_frames(video_path: str, num_frames: int = 8) -> list[str]:
"""Extract representative frames from a video file.
Returns list of paths to extracted frame images.
"""
if not os.path.exists(video_path):
return []
frames_dir = tempfile.mkdtemp(prefix="ktf_frames_")
frame_paths = []
try:
# Get video duration
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", video_path],
capture_output=True, text=True, timeout=10,
)
duration = float(result.stdout.strip()) if result.returncode == 0 else 10.0
# Extract evenly spaced frames
for i in range(num_frames):
timestamp = (duration / (num_frames + 1)) * (i + 1)
frame_path = os.path.join(frames_dir, f"frame_{i:03d}.jpg")
subprocess.run(
["ffmpeg", "-ss", str(timestamp), "-i", video_path,
"-vframes", "1", "-q:v", "2", frame_path, "-y"],
capture_output=True, timeout=30,
)
if os.path.exists(frame_path):
frame_paths.append(frame_path)
except Exception as e:
logger.warning(f"Frame extraction failed for {video_path}: {e}")
return frame_paths
def analyze_with_vision(image_paths: list[str], prompt: str) -> dict:
"""Analyze images using a local vision model.
Returns structured analysis dict.
"""
if not image_paths:
return {"error": "no_images", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
# Build the vision prompt
full_prompt = prompt + "\n\nAnalyze these frames from a video sequence:"
# Try local Ollama with a vision model (Gemma 3 or LLaVA)
try:
result = subprocess.run(
["ollama", "run", "gemma3:12b", full_prompt],
capture_output=True, text=True, timeout=120,
env={**os.environ, "OLLAMA_NUM_PARALLEL": "1"},
)
if result.returncode == 0:
response = result.stdout.strip()
# Try to parse JSON from response
return parse_analysis_response(response)
except Exception as e:
logger.debug(f"Ollama vision failed: {e}")
# Fallback: text-only analysis based on tweet text
return {"error": "vision_unavailable", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
def analyze_image(image_path: str, tweet_text: str) -> dict:
"""Analyze a single image with context from the tweet text."""
prompt = MEANING_KERNEL_PROMPT + f"\n\nContext: The tweet says: \"{tweet_text}\""
return analyze_with_vision([image_path], prompt)
def analyze_video(video_path: str, tweet_text: str) -> dict:
"""Analyze a video by extracting frames and analyzing the sequence."""
frames = extract_video_frames(video_path, num_frames=6)
if not frames:
return {"error": "no_frames", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
prompt = MEANING_KERNEL_PROMPT + f"\n\nContext: The tweet says: \"{tweet_text}\"\n\nThese are {len(frames)} frames extracted from a video. Analyze the narrative arc across the sequence."
result = analyze_with_vision(frames, prompt)
# Cleanup frames
for f in frames:
try:
os.unlink(f)
except Exception:
pass
try:
os.rmdir(os.path.dirname(frames[0]))
except Exception:
pass
return result
def parse_analysis_response(response: str) -> dict:
"""Parse the LLM response into a structured analysis dict."""
# Try to find JSON in the response
import re
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
try:
parsed = json.loads(json_match.group())
return {
"description": parsed.get("description", ""),
"arc": parsed.get("arc", ""),
"kernels": parsed.get("kernels", []),
"themes": parsed.get("themes", []),
"confidence": parsed.get("confidence", 0.5),
"raw_response": response,
}
except json.JSONDecodeError:
pass
# Fallback: return raw response
return {
"description": response[:500],
"arc": "",
"kernels": [],
"themes": [],
"confidence": 0.0,
"raw_response": response,
}
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def process_entry(entry: dict, tweet_text: str = "") -> dict:
"""Process a single media entry and return the analysis result."""
media_type = entry.get("media_type", "unknown")
media_path = entry.get("media_path")
text = tweet_text or entry.get("full_text", "")
if media_type == "photo":
analysis = analyze_image(media_path, text) if media_path and os.path.exists(media_path) else {"error": "file_missing"}
elif media_type in ("video", "animated_gif"):
analysis = analyze_video(media_path, text) if media_path and os.path.exists(media_path) else {"error": "file_missing"}
else:
analysis = {"error": f"unsupported_type:{media_type}"}
return {
"tweet_id": entry["tweet_id"],
"media_type": media_type,
"media_path": media_path,
"media_id": entry.get("media_id"),
"tweet_text": text,
"hashtags": entry.get("hashtags", []),
"created_at": entry.get("created_at"),
"analysis": analysis,
"processed_at": datetime.utcnow().isoformat() + "Z",
"status": "completed" if not analysis.get("error") else "failed",
"error": analysis.get("error"),
}
def run_pipeline(batch_size: int = 0, retry_failed: bool = False) -> dict:
"""Run the analysis pipeline on pending entries.
Args:
batch_size: Number of entries to process (0 = all pending)
retry_failed: Whether to retry previously failed entries
Returns:
Pipeline run summary
"""
# Load data
manifest = load_jsonl(MEDIA_MANIFEST)
if not manifest:
return {"status": "error", "reason": "No media manifest found. Run index_timmy_media.py first."}
checkpoint = load_checkpoint()
if retry_failed:
# Reset failed entries
existing = load_analysis_entries()
failed_ids = {e["tweet_id"] for e in existing if e.get("status") == "failed"}
checkpoint["processed_tweet_ids"] = [
tid for tid in checkpoint.get("processed_tweet_ids", [])
if tid not in failed_ids
]
pending = get_pending_entries(manifest, checkpoint)
if not pending:
return {"status": "ok", "message": "No pending entries to process.", "processed": 0}
if batch_size > 0:
pending = pending[:batch_size]
# Process entries
processed = []
failed = []
for i, entry in enumerate(pending):
print(f" Processing {i+1}/{len(pending)}: tweet {entry['tweet_id']} ({entry.get('media_type')})...")
try:
result = process_entry(entry)
processed.append(result)
append_jsonl(ANALYSIS_FILE, [result])
# Update checkpoint
checkpoint["processed_tweet_ids"].append(entry["tweet_id"])
checkpoint["total_processed"] = checkpoint.get("total_processed", 0) + 1
if result["status"] == "failed":
checkpoint["total_failed"] = checkpoint.get("total_failed", 0) + 1
failed.append(entry["tweet_id"])
except Exception as e:
logger.error(f"Failed to process {entry['tweet_id']}: {e}")
failed.append(entry["tweet_id"])
checkpoint["total_failed"] = checkpoint.get("total_failed", 0) + 1
# Save checkpoint
checkpoint["last_offset"] = checkpoint.get("last_offset", 0) + len(pending)
save_checkpoint(checkpoint)
# Update pipeline status
total_manifest = len([e for e in manifest if e.get("media_type") != "none"])
total_done = len(set(checkpoint.get("processed_tweet_ids", [])))
status = {
"phase": "analysis",
"total_targets": total_manifest,
"total_processed": total_done,
"total_pending": total_manifest - total_done,
"total_failed": checkpoint.get("total_failed", 0),
"completion_pct": round(total_done / total_manifest * 100, 1) if total_manifest > 0 else 0,
"last_run": datetime.utcnow().isoformat() + "Z",
"batch_processed": len(processed),
"batch_failed": len(failed),
}
write_json(PIPELINE_STATUS, status)
return status
def extract_meaning_kernels() -> dict:
"""Extract meaning kernels from completed analysis entries.
Reads analysis.jsonl and produces meaning-kernels.jsonl with
deduplicated, confidence-scored kernels.
"""
entries = load_analysis_entries()
if not entries:
return {"status": "error", "reason": "No analysis entries found."}
all_kernels = []
for entry in entries:
if entry.get("status") != "completed":
continue
analysis = entry.get("analysis", {})
kernels = analysis.get("kernels", [])
for kernel in kernels:
if isinstance(kernel, str):
all_kernels.append({
"tweet_id": entry["tweet_id"],
"kernel": kernel,
"arc": analysis.get("arc", ""),
"themes": analysis.get("themes", []),
"confidence": analysis.get("confidence", 0.5),
"created_at": entry.get("created_at"),
})
elif isinstance(kernel, dict):
all_kernels.append({
"tweet_id": entry["tweet_id"],
"kernel": kernel.get("kernel", kernel.get("text", str(kernel))),
"arc": kernel.get("arc", analysis.get("arc", "")),
"themes": kernel.get("themes", analysis.get("themes", [])),
"confidence": kernel.get("confidence", analysis.get("confidence", 0.5)),
"created_at": entry.get("created_at"),
})
# Deduplicate by kernel text
seen = set()
unique_kernels = []
for k in all_kernels:
key = k["kernel"][:100].lower()
if key not in seen:
seen.add(key)
unique_kernels.append(k)
# Sort by confidence
unique_kernels.sort(key=lambda k: k.get("confidence", 0), reverse=True)
# Write
KTF_DIR.mkdir(parents=True, exist_ok=True)
with open(KERNELS_FILE, "w") as f:
for k in unique_kernels:
f.write(json.dumps(k, sort_keys=True) + "\n")
return {
"status": "ok",
"total_kernels": len(unique_kernels),
"output": str(KERNELS_FILE),
}
def print_status() -> None:
"""Print pipeline status."""
manifest = load_jsonl(MEDIA_MANIFEST)
checkpoint = load_checkpoint()
analysis = load_analysis_entries()
status = load_json(PIPELINE_STATUS, {})
total_media = len([e for e in manifest if e.get("media_type") != "none"])
processed = len(set(checkpoint.get("processed_tweet_ids", [])))
completed = len([e for e in analysis if e.get("status") == "completed"])
failed = len([e for e in analysis if e.get("status") == "failed"])
print("Know Thy Father — Phase 2: Multimodal Analysis")
print("=" * 50)
print(f" Media manifest: {total_media} entries")
print(f" Processed: {processed}")
print(f" Completed: {completed}")
print(f" Failed: {failed}")
print(f" Pending: {total_media - processed}")
print(f" Completion: {round(processed/total_media*100, 1) if total_media else 0}%")
print()
# Theme distribution from analysis
from collections import Counter
theme_counter = Counter()
for entry in analysis:
for theme in entry.get("analysis", {}).get("themes", []):
theme_counter[theme] += 1
if theme_counter:
print("Theme distribution:")
for theme, count in theme_counter.most_common(10):
print(f" {theme:25s} {count}")
# Kernels count
kernels = load_jsonl(KERNELS_FILE)
if kernels:
print(f"\nMeaning kernels extracted: {len(kernels)}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
parser = argparse.ArgumentParser(description="Know Thy Father — Phase 2: Multimodal Analysis")
parser.add_argument("--batch", type=int, default=0, help="Process N entries (0 = all)")
parser.add_argument("--status", action="store_true", help="Show pipeline status")
parser.add_argument("--retry-failed", action="store_true", help="Retry failed entries")
parser.add_argument("--extract-kernels", action="store_true", help="Extract meaning kernels from analysis")
args = parser.parse_args()
KTF_DIR.mkdir(parents=True, exist_ok=True)
if args.status:
print_status()
return
if args.extract_kernels:
result = extract_meaning_kernels()
print(json.dumps(result, indent=2))
return
result = run_pipeline(batch_size=args.batch, retry_failed=args.retry_failed)
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,279 @@
"""Tests for Know Thy Father Phase 2: Multimodal Analysis Pipeline."""
import json
import sys
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "scripts" / "twitter_archive"))
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def sample_manifest():
return [
{
"tweet_id": "1001",
"media_type": "video",
"media_path": "/fake/media/1001.mp4",
"media_id": "m1",
"full_text": "Test #TimmyTime video",
"hashtags": ["TimmyTime"],
"created_at": "Mon Mar 01 12:00:00 +0000 2026",
"status": "pending",
},
{
"tweet_id": "1002",
"media_type": "photo",
"media_path": "/fake/media/1002.jpg",
"media_id": "m2",
"full_text": "Test #TimmyChain image",
"hashtags": ["TimmyChain"],
"created_at": "Tue Mar 02 12:00:00 +0000 2026",
"status": "pending",
},
{
"tweet_id": "1003",
"media_type": "none",
"media_path": None,
"full_text": "Text only tweet",
"hashtags": ["TimmyTime"],
"created_at": "Wed Mar 03 12:00:00 +0000 2026",
"status": "no_media",
},
]
@pytest.fixture
def sample_checkpoint():
return {
"processed_tweet_ids": [],
"last_offset": 0,
"total_processed": 0,
"total_failed": 0,
}
@pytest.fixture
def sample_analysis_entry():
return {
"tweet_id": "1001",
"media_type": "video",
"media_path": "/fake/1001.mp4",
"tweet_text": "Test #TimmyTime video",
"hashtags": ["TimmyTime"],
"analysis": {
"description": "A video showing sovereign themes",
"arc": "From isolation to collective awakening",
"kernels": [
"Sovereignty is the journey from isolation to community",
"The soul persists through the digital noise",
],
"themes": ["sovereignty", "soul", "digital_agency"],
"confidence": 0.8,
},
"processed_at": "2026-04-01T00:00:00Z",
"status": "completed",
}
# ---------------------------------------------------------------------------
# Tests: Parse analysis response
# ---------------------------------------------------------------------------
class TestParseAnalysisResponse:
def test_parses_valid_json(self):
from analyze_media import parse_analysis_response
response = '{"description": "test", "arc": "test arc", "kernels": ["kernel1"], "themes": ["sovereignty"], "confidence": 0.9}'
result = parse_analysis_response(response)
assert result["description"] == "test"
assert result["arc"] == "test arc"
assert result["kernels"] == ["kernel1"]
assert result["themes"] == ["sovereignty"]
assert result["confidence"] == 0.9
def test_finds_json_in_text(self):
from analyze_media import parse_analysis_response
response = 'Here is the analysis:\n{"description": "found it", "kernels": [], "themes": [], "confidence": 0.5}\nEnd of analysis.'
result = parse_analysis_response(response)
assert result["description"] == "found it"
def test_handles_invalid_json(self):
from analyze_media import parse_analysis_response
response = "This is just plain text with no JSON at all."
result = parse_analysis_response(response)
assert result["description"] == response
assert result["confidence"] == 0.0
# ---------------------------------------------------------------------------
# Tests: Pending entries
# ---------------------------------------------------------------------------
class TestGetPendingEntries:
def test_filters_processed(self, sample_manifest, sample_checkpoint):
from analyze_media import get_pending_entries
sample_checkpoint["processed_tweet_ids"] = ["1001"]
pending = get_pending_entries(sample_manifest, sample_checkpoint)
ids = [e["tweet_id"] for e in pending]
assert "1001" not in ids
assert "1002" in ids
def test_excludes_none_media(self, sample_manifest, sample_checkpoint):
from analyze_media import get_pending_entries
pending = get_pending_entries(sample_manifest, sample_checkpoint)
types = [e["media_type"] for e in pending]
assert "none" not in types
def test_empty_when_all_processed(self, sample_manifest, sample_checkpoint):
from analyze_media import get_pending_entries
sample_checkpoint["processed_tweet_ids"] = ["1001", "1002", "1003"]
pending = get_pending_entries(sample_manifest, sample_checkpoint)
assert len(pending) == 0
# ---------------------------------------------------------------------------
# Tests: Process entry
# ---------------------------------------------------------------------------
class TestProcessEntry:
@patch("analyze_media.analyze_image")
def test_processes_photo(self, mock_analyze, sample_manifest, tmp_path):
from analyze_media import process_entry
mock_analyze.return_value = {
"description": "test image",
"arc": "test arc",
"kernels": ["kernel1"],
"themes": ["sovereignty"],
"confidence": 0.8,
}
entry = sample_manifest[1] # photo entry
# Create the fake media file so os.path.exists passes
fake_path = tmp_path / "1002.jpg"
fake_path.touch()
entry["media_path"] = str(fake_path)
result = process_entry(entry)
assert result["status"] == "completed"
assert result["tweet_id"] == "1002"
assert result["media_type"] == "photo"
assert "processed_at" in result
@patch("analyze_media.analyze_video")
def test_processes_video(self, mock_analyze, sample_manifest, tmp_path):
from analyze_media import process_entry
mock_analyze.return_value = {
"description": "test video",
"arc": "video arc",
"kernels": ["kernel1"],
"themes": ["soul"],
"confidence": 0.7,
}
entry = sample_manifest[0] # video entry
fake_path = tmp_path / "1001.mp4"
fake_path.touch()
entry["media_path"] = str(fake_path)
result = process_entry(entry)
assert result["status"] == "completed"
assert result["tweet_id"] == "1001"
assert result["media_type"] == "video"
# ---------------------------------------------------------------------------
# Tests: Extract meaning kernels
# ---------------------------------------------------------------------------
class TestExtractMeaningKernels:
def test_extracts_kernels_from_analysis(self, tmp_path, monkeypatch, sample_analysis_entry):
from analyze_media import extract_meaning_kernels, KTF_DIR, KERNELS_FILE, ANALYSIS_FILE
# Set up temp files
ktf_dir = tmp_path / "ktf"
ktf_dir.mkdir()
monkeypatch.setattr("analyze_media.KTF_DIR", ktf_dir)
monkeypatch.setattr("analyze_media.KERNELS_FILE", ktf_dir / "meaning-kernels.jsonl")
monkeypatch.setattr("analyze_media.ANALYSIS_FILE", ktf_dir / "analysis.jsonl")
# Write analysis entry
with open(ktf_dir / "analysis.jsonl", "w") as f:
f.write(json.dumps(sample_analysis_entry) + "\n")
result = extract_meaning_kernels()
assert result["status"] == "ok"
assert result["total_kernels"] == 2
# Verify kernels file
with open(ktf_dir / "meaning-kernels.jsonl") as f:
kernels = [json.loads(line) for line in f if line.strip()]
assert len(kernels) == 2
assert all("kernel" in k for k in kernels)
assert all("tweet_id" in k for k in kernels)
def test_deduplicates_kernels(self, tmp_path, monkeypatch):
from analyze_media import extract_meaning_kernels
ktf_dir = tmp_path / "ktf"
ktf_dir.mkdir()
monkeypatch.setattr("analyze_media.KTF_DIR", ktf_dir)
monkeypatch.setattr("analyze_media.KERNELS_FILE", ktf_dir / "meaning-kernels.jsonl")
monkeypatch.setattr("analyze_media.ANALYSIS_FILE", ktf_dir / "analysis.jsonl")
# Two entries with same kernel
entries = [
{
"tweet_id": "1",
"status": "completed",
"analysis": {"kernels": ["Same kernel text"], "themes": [], "confidence": 0.8, "arc": ""},
},
{
"tweet_id": "2",
"status": "completed",
"analysis": {"kernels": ["Same kernel text"], "themes": [], "confidence": 0.7, "arc": ""},
},
]
with open(ktf_dir / "analysis.jsonl", "w") as f:
for e in entries:
f.write(json.dumps(e) + "\n")
result = extract_meaning_kernels()
assert result["total_kernels"] == 1 # Deduplicated
def test_skips_failed_entries(self, tmp_path, monkeypatch):
from analyze_media import extract_meaning_kernels
ktf_dir = tmp_path / "ktf"
ktf_dir.mkdir()
monkeypatch.setattr("analyze_media.KTF_DIR", ktf_dir)
monkeypatch.setattr("analyze_media.KERNELS_FILE", ktf_dir / "meaning-kernels.jsonl")
monkeypatch.setattr("analyze_media.ANALYSIS_FILE", ktf_dir / "analysis.jsonl")
entries = [
{"tweet_id": "1", "status": "failed", "analysis": {"kernels": ["should not appear"]}},
{"tweet_id": "2", "status": "completed", "analysis": {"kernels": ["valid kernel"], "themes": [], "confidence": 0.5, "arc": ""}},
]
with open(ktf_dir / "analysis.jsonl", "w") as f:
for e in entries:
f.write(json.dumps(e) + "\n")
result = extract_meaning_kernels()
assert result["total_kernels"] == 1
# ---------------------------------------------------------------------------
# Tests: Pipeline status
# ---------------------------------------------------------------------------
class TestPipelineStatus:
def test_status_computes_correctly(self, tmp_path, monkeypatch, sample_manifest, sample_analysis_entry):
from analyze_media import load_json
# Mock the status computation
processed = 1
total = 2 # excluding "none" type
pct = round(processed / total * 100, 1)
assert pct == 50.0