Compare commits
1 Commits
main
...
whip/584-1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97549e9314 |
511
scripts/twitter_archive/analyze_media.py
Normal file
511
scripts/twitter_archive/analyze_media.py
Normal file
@@ -0,0 +1,511 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Know Thy Father — Phase 2: Multimodal Analysis Pipeline
|
||||
|
||||
Processes the media manifest from Phase 1:
|
||||
- Images/Memes: Visual description + Meme Logic Analysis
|
||||
- Videos: Frame sequence analysis + meaning extraction
|
||||
- Extraction: Identify "Meaning Kernels" related to sovereignty, service, and the soul
|
||||
|
||||
Architecture:
|
||||
Phase 1 (index_timmy_media.py) → media-manifest.jsonl
|
||||
Phase 2 (this script) → analysis entries → meaning-kernels.jsonl
|
||||
|
||||
Usage:
|
||||
python analyze_media.py # Process all pending entries
|
||||
python analyze_media.py --batch 10 # Process next 10 entries
|
||||
python analyze_media.py --status # Show pipeline status
|
||||
python analyze_media.py --retry-failed # Retry failed entries
|
||||
python analyze_media.py --extract-kernels # Extract meaning kernels from completed analysis
|
||||
|
||||
Output:
|
||||
~/.timmy/twitter-archive/know-thy-father/analysis.jsonl
|
||||
~/.timmy/twitter-archive/know-thy-father/meaning-kernels.jsonl
|
||||
~/.timmy/twitter-archive/know-thy-father/pipeline-status.json
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from common import (
|
||||
ARCHIVE_DIR,
|
||||
load_json,
|
||||
load_jsonl,
|
||||
write_json,
|
||||
append_jsonl,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Paths
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
KTF_DIR = ARCHIVE_DIR / "know-thy-father"
|
||||
MEDIA_MANIFEST = KTF_DIR / "media-manifest.jsonl"
|
||||
ANALYSIS_FILE = KTF_DIR / "analysis.jsonl"
|
||||
KERNELS_FILE = KTF_DIR / "meaning-kernels.jsonl"
|
||||
PIPELINE_STATUS = KTF_DIR / "pipeline-status.json"
|
||||
CHECKPOINT_FILE = KTF_DIR / "checkpoint.json"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Analysis schemas
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
THEME_CATEGORIES = [
|
||||
"sovereignty",
|
||||
"service",
|
||||
"soul",
|
||||
"identity",
|
||||
"transmutation",
|
||||
"digital_agency",
|
||||
"authenticity",
|
||||
"collective_identity",
|
||||
"glitch",
|
||||
"noise",
|
||||
"silence",
|
||||
"void",
|
||||
"presence",
|
||||
"simulation",
|
||||
"persistence",
|
||||
]
|
||||
|
||||
MEANING_KERNEL_PROMPT = """Analyze this media content from Alexander Whitestone's Twitter archive.
|
||||
Extract "Meaning Kernels" — distilled philosophical statements about:
|
||||
|
||||
1. SOVEREIGNTY — autonomy, self-determination, freedom from systems
|
||||
2. SERVICE — sacrifice, community, building for others
|
||||
3. THE SOUL — consciousness, identity, what persists beyond the machine
|
||||
|
||||
For each kernel found, provide:
|
||||
- The kernel (1-2 sentences, philosophical)
|
||||
- The arc (the narrative journey depicted)
|
||||
- The themes (from: {themes})
|
||||
- Confidence (0.0-1.0)
|
||||
|
||||
Also describe the visual content:
|
||||
- What is depicted
|
||||
- Key visual elements
|
||||
- Emotional tone
|
||||
- Meme logic (if applicable): format, subversion, cultural reference
|
||||
|
||||
Output as JSON with fields: description, arc, kernels[], themes[], confidence
|
||||
""".format(themes=", ".join(THEME_CATEGORIES))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pipeline state management
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_checkpoint() -> dict:
|
||||
"""Load pipeline checkpoint."""
|
||||
return load_json(CHECKPOINT_FILE, {
|
||||
"processed_tweet_ids": [],
|
||||
"last_offset": 0,
|
||||
"total_processed": 0,
|
||||
"total_failed": 0,
|
||||
"last_run": None,
|
||||
})
|
||||
|
||||
|
||||
def save_checkpoint(checkpoint: dict) -> None:
|
||||
"""Save pipeline checkpoint."""
|
||||
checkpoint["last_run"] = datetime.utcnow().isoformat() + "Z"
|
||||
write_json(CHECKPOINT_FILE, checkpoint)
|
||||
|
||||
|
||||
def load_analysis_entries() -> list[dict]:
|
||||
"""Load existing analysis entries."""
|
||||
return load_jsonl(ANALYSIS_FILE)
|
||||
|
||||
|
||||
def get_pending_entries(manifest: list[dict], checkpoint: dict) -> list[dict]:
|
||||
"""Filter manifest to entries that haven't been processed."""
|
||||
processed = set(checkpoint.get("processed_tweet_ids", []))
|
||||
return [e for e in manifest if e["tweet_id"] not in processed and e.get("media_type") != "none"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Media processing helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def extract_video_frames(video_path: str, num_frames: int = 8) -> list[str]:
|
||||
"""Extract representative frames from a video file.
|
||||
|
||||
Returns list of paths to extracted frame images.
|
||||
"""
|
||||
if not os.path.exists(video_path):
|
||||
return []
|
||||
|
||||
frames_dir = tempfile.mkdtemp(prefix="ktf_frames_")
|
||||
frame_paths = []
|
||||
|
||||
try:
|
||||
# Get video duration
|
||||
result = subprocess.run(
|
||||
["ffprobe", "-v", "error", "-show_entries", "format=duration",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1", video_path],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
duration = float(result.stdout.strip()) if result.returncode == 0 else 10.0
|
||||
|
||||
# Extract evenly spaced frames
|
||||
for i in range(num_frames):
|
||||
timestamp = (duration / (num_frames + 1)) * (i + 1)
|
||||
frame_path = os.path.join(frames_dir, f"frame_{i:03d}.jpg")
|
||||
subprocess.run(
|
||||
["ffmpeg", "-ss", str(timestamp), "-i", video_path,
|
||||
"-vframes", "1", "-q:v", "2", frame_path, "-y"],
|
||||
capture_output=True, timeout=30,
|
||||
)
|
||||
if os.path.exists(frame_path):
|
||||
frame_paths.append(frame_path)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Frame extraction failed for {video_path}: {e}")
|
||||
|
||||
return frame_paths
|
||||
|
||||
|
||||
def analyze_with_vision(image_paths: list[str], prompt: str) -> dict:
|
||||
"""Analyze images using a local vision model.
|
||||
|
||||
Returns structured analysis dict.
|
||||
"""
|
||||
if not image_paths:
|
||||
return {"error": "no_images", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
|
||||
|
||||
# Build the vision prompt
|
||||
full_prompt = prompt + "\n\nAnalyze these frames from a video sequence:"
|
||||
|
||||
# Try local Ollama with a vision model (Gemma 3 or LLaVA)
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ollama", "run", "gemma3:12b", full_prompt],
|
||||
capture_output=True, text=True, timeout=120,
|
||||
env={**os.environ, "OLLAMA_NUM_PARALLEL": "1"},
|
||||
)
|
||||
if result.returncode == 0:
|
||||
response = result.stdout.strip()
|
||||
# Try to parse JSON from response
|
||||
return parse_analysis_response(response)
|
||||
except Exception as e:
|
||||
logger.debug(f"Ollama vision failed: {e}")
|
||||
|
||||
# Fallback: text-only analysis based on tweet text
|
||||
return {"error": "vision_unavailable", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
|
||||
|
||||
|
||||
def analyze_image(image_path: str, tweet_text: str) -> dict:
|
||||
"""Analyze a single image with context from the tweet text."""
|
||||
prompt = MEANING_KERNEL_PROMPT + f"\n\nContext: The tweet says: \"{tweet_text}\""
|
||||
return analyze_with_vision([image_path], prompt)
|
||||
|
||||
|
||||
def analyze_video(video_path: str, tweet_text: str) -> dict:
|
||||
"""Analyze a video by extracting frames and analyzing the sequence."""
|
||||
frames = extract_video_frames(video_path, num_frames=6)
|
||||
if not frames:
|
||||
return {"error": "no_frames", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
|
||||
|
||||
prompt = MEANING_KERNEL_PROMPT + f"\n\nContext: The tweet says: \"{tweet_text}\"\n\nThese are {len(frames)} frames extracted from a video. Analyze the narrative arc across the sequence."
|
||||
result = analyze_with_vision(frames, prompt)
|
||||
|
||||
# Cleanup frames
|
||||
for f in frames:
|
||||
try:
|
||||
os.unlink(f)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
os.rmdir(os.path.dirname(frames[0]))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def parse_analysis_response(response: str) -> dict:
|
||||
"""Parse the LLM response into a structured analysis dict."""
|
||||
# Try to find JSON in the response
|
||||
import re
|
||||
json_match = re.search(r'\{[\s\S]*\}', response)
|
||||
if json_match:
|
||||
try:
|
||||
parsed = json.loads(json_match.group())
|
||||
return {
|
||||
"description": parsed.get("description", ""),
|
||||
"arc": parsed.get("arc", ""),
|
||||
"kernels": parsed.get("kernels", []),
|
||||
"themes": parsed.get("themes", []),
|
||||
"confidence": parsed.get("confidence", 0.5),
|
||||
"raw_response": response,
|
||||
}
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Fallback: return raw response
|
||||
return {
|
||||
"description": response[:500],
|
||||
"arc": "",
|
||||
"kernels": [],
|
||||
"themes": [],
|
||||
"confidence": 0.0,
|
||||
"raw_response": response,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main pipeline
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def process_entry(entry: dict, tweet_text: str = "") -> dict:
|
||||
"""Process a single media entry and return the analysis result."""
|
||||
media_type = entry.get("media_type", "unknown")
|
||||
media_path = entry.get("media_path")
|
||||
text = tweet_text or entry.get("full_text", "")
|
||||
|
||||
if media_type == "photo":
|
||||
analysis = analyze_image(media_path, text) if media_path and os.path.exists(media_path) else {"error": "file_missing"}
|
||||
elif media_type in ("video", "animated_gif"):
|
||||
analysis = analyze_video(media_path, text) if media_path and os.path.exists(media_path) else {"error": "file_missing"}
|
||||
else:
|
||||
analysis = {"error": f"unsupported_type:{media_type}"}
|
||||
|
||||
return {
|
||||
"tweet_id": entry["tweet_id"],
|
||||
"media_type": media_type,
|
||||
"media_path": media_path,
|
||||
"media_id": entry.get("media_id"),
|
||||
"tweet_text": text,
|
||||
"hashtags": entry.get("hashtags", []),
|
||||
"created_at": entry.get("created_at"),
|
||||
"analysis": analysis,
|
||||
"processed_at": datetime.utcnow().isoformat() + "Z",
|
||||
"status": "completed" if not analysis.get("error") else "failed",
|
||||
"error": analysis.get("error"),
|
||||
}
|
||||
|
||||
|
||||
def run_pipeline(batch_size: int = 0, retry_failed: bool = False) -> dict:
|
||||
"""Run the analysis pipeline on pending entries.
|
||||
|
||||
Args:
|
||||
batch_size: Number of entries to process (0 = all pending)
|
||||
retry_failed: Whether to retry previously failed entries
|
||||
|
||||
Returns:
|
||||
Pipeline run summary
|
||||
"""
|
||||
# Load data
|
||||
manifest = load_jsonl(MEDIA_MANIFEST)
|
||||
if not manifest:
|
||||
return {"status": "error", "reason": "No media manifest found. Run index_timmy_media.py first."}
|
||||
|
||||
checkpoint = load_checkpoint()
|
||||
|
||||
if retry_failed:
|
||||
# Reset failed entries
|
||||
existing = load_analysis_entries()
|
||||
failed_ids = {e["tweet_id"] for e in existing if e.get("status") == "failed"}
|
||||
checkpoint["processed_tweet_ids"] = [
|
||||
tid for tid in checkpoint.get("processed_tweet_ids", [])
|
||||
if tid not in failed_ids
|
||||
]
|
||||
|
||||
pending = get_pending_entries(manifest, checkpoint)
|
||||
if not pending:
|
||||
return {"status": "ok", "message": "No pending entries to process.", "processed": 0}
|
||||
|
||||
if batch_size > 0:
|
||||
pending = pending[:batch_size]
|
||||
|
||||
# Process entries
|
||||
processed = []
|
||||
failed = []
|
||||
for i, entry in enumerate(pending):
|
||||
print(f" Processing {i+1}/{len(pending)}: tweet {entry['tweet_id']} ({entry.get('media_type')})...")
|
||||
try:
|
||||
result = process_entry(entry)
|
||||
processed.append(result)
|
||||
append_jsonl(ANALYSIS_FILE, [result])
|
||||
|
||||
# Update checkpoint
|
||||
checkpoint["processed_tweet_ids"].append(entry["tweet_id"])
|
||||
checkpoint["total_processed"] = checkpoint.get("total_processed", 0) + 1
|
||||
|
||||
if result["status"] == "failed":
|
||||
checkpoint["total_failed"] = checkpoint.get("total_failed", 0) + 1
|
||||
failed.append(entry["tweet_id"])
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process {entry['tweet_id']}: {e}")
|
||||
failed.append(entry["tweet_id"])
|
||||
checkpoint["total_failed"] = checkpoint.get("total_failed", 0) + 1
|
||||
|
||||
# Save checkpoint
|
||||
checkpoint["last_offset"] = checkpoint.get("last_offset", 0) + len(pending)
|
||||
save_checkpoint(checkpoint)
|
||||
|
||||
# Update pipeline status
|
||||
total_manifest = len([e for e in manifest if e.get("media_type") != "none"])
|
||||
total_done = len(set(checkpoint.get("processed_tweet_ids", [])))
|
||||
status = {
|
||||
"phase": "analysis",
|
||||
"total_targets": total_manifest,
|
||||
"total_processed": total_done,
|
||||
"total_pending": total_manifest - total_done,
|
||||
"total_failed": checkpoint.get("total_failed", 0),
|
||||
"completion_pct": round(total_done / total_manifest * 100, 1) if total_manifest > 0 else 0,
|
||||
"last_run": datetime.utcnow().isoformat() + "Z",
|
||||
"batch_processed": len(processed),
|
||||
"batch_failed": len(failed),
|
||||
}
|
||||
write_json(PIPELINE_STATUS, status)
|
||||
|
||||
return status
|
||||
|
||||
|
||||
def extract_meaning_kernels() -> dict:
|
||||
"""Extract meaning kernels from completed analysis entries.
|
||||
|
||||
Reads analysis.jsonl and produces meaning-kernels.jsonl with
|
||||
deduplicated, confidence-scored kernels.
|
||||
"""
|
||||
entries = load_analysis_entries()
|
||||
if not entries:
|
||||
return {"status": "error", "reason": "No analysis entries found."}
|
||||
|
||||
all_kernels = []
|
||||
for entry in entries:
|
||||
if entry.get("status") != "completed":
|
||||
continue
|
||||
analysis = entry.get("analysis", {})
|
||||
kernels = analysis.get("kernels", [])
|
||||
for kernel in kernels:
|
||||
if isinstance(kernel, str):
|
||||
all_kernels.append({
|
||||
"tweet_id": entry["tweet_id"],
|
||||
"kernel": kernel,
|
||||
"arc": analysis.get("arc", ""),
|
||||
"themes": analysis.get("themes", []),
|
||||
"confidence": analysis.get("confidence", 0.5),
|
||||
"created_at": entry.get("created_at"),
|
||||
})
|
||||
elif isinstance(kernel, dict):
|
||||
all_kernels.append({
|
||||
"tweet_id": entry["tweet_id"],
|
||||
"kernel": kernel.get("kernel", kernel.get("text", str(kernel))),
|
||||
"arc": kernel.get("arc", analysis.get("arc", "")),
|
||||
"themes": kernel.get("themes", analysis.get("themes", [])),
|
||||
"confidence": kernel.get("confidence", analysis.get("confidence", 0.5)),
|
||||
"created_at": entry.get("created_at"),
|
||||
})
|
||||
|
||||
# Deduplicate by kernel text
|
||||
seen = set()
|
||||
unique_kernels = []
|
||||
for k in all_kernels:
|
||||
key = k["kernel"][:100].lower()
|
||||
if key not in seen:
|
||||
seen.add(key)
|
||||
unique_kernels.append(k)
|
||||
|
||||
# Sort by confidence
|
||||
unique_kernels.sort(key=lambda k: k.get("confidence", 0), reverse=True)
|
||||
|
||||
# Write
|
||||
KTF_DIR.mkdir(parents=True, exist_ok=True)
|
||||
with open(KERNELS_FILE, "w") as f:
|
||||
for k in unique_kernels:
|
||||
f.write(json.dumps(k, sort_keys=True) + "\n")
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"total_kernels": len(unique_kernels),
|
||||
"output": str(KERNELS_FILE),
|
||||
}
|
||||
|
||||
|
||||
def print_status() -> None:
|
||||
"""Print pipeline status."""
|
||||
manifest = load_jsonl(MEDIA_MANIFEST)
|
||||
checkpoint = load_checkpoint()
|
||||
analysis = load_analysis_entries()
|
||||
status = load_json(PIPELINE_STATUS, {})
|
||||
|
||||
total_media = len([e for e in manifest if e.get("media_type") != "none"])
|
||||
processed = len(set(checkpoint.get("processed_tweet_ids", [])))
|
||||
completed = len([e for e in analysis if e.get("status") == "completed"])
|
||||
failed = len([e for e in analysis if e.get("status") == "failed"])
|
||||
|
||||
print("Know Thy Father — Phase 2: Multimodal Analysis")
|
||||
print("=" * 50)
|
||||
print(f" Media manifest: {total_media} entries")
|
||||
print(f" Processed: {processed}")
|
||||
print(f" Completed: {completed}")
|
||||
print(f" Failed: {failed}")
|
||||
print(f" Pending: {total_media - processed}")
|
||||
print(f" Completion: {round(processed/total_media*100, 1) if total_media else 0}%")
|
||||
print()
|
||||
|
||||
# Theme distribution from analysis
|
||||
from collections import Counter
|
||||
theme_counter = Counter()
|
||||
for entry in analysis:
|
||||
for theme in entry.get("analysis", {}).get("themes", []):
|
||||
theme_counter[theme] += 1
|
||||
if theme_counter:
|
||||
print("Theme distribution:")
|
||||
for theme, count in theme_counter.most_common(10):
|
||||
print(f" {theme:25s} {count}")
|
||||
|
||||
# Kernels count
|
||||
kernels = load_jsonl(KERNELS_FILE)
|
||||
if kernels:
|
||||
print(f"\nMeaning kernels extracted: {len(kernels)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main() -> None:
|
||||
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
||||
|
||||
parser = argparse.ArgumentParser(description="Know Thy Father — Phase 2: Multimodal Analysis")
|
||||
parser.add_argument("--batch", type=int, default=0, help="Process N entries (0 = all)")
|
||||
parser.add_argument("--status", action="store_true", help="Show pipeline status")
|
||||
parser.add_argument("--retry-failed", action="store_true", help="Retry failed entries")
|
||||
parser.add_argument("--extract-kernels", action="store_true", help="Extract meaning kernels from analysis")
|
||||
args = parser.parse_args()
|
||||
|
||||
KTF_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if args.status:
|
||||
print_status()
|
||||
return
|
||||
|
||||
if args.extract_kernels:
|
||||
result = extract_meaning_kernels()
|
||||
print(json.dumps(result, indent=2))
|
||||
return
|
||||
|
||||
result = run_pipeline(batch_size=args.batch, retry_failed=args.retry_failed)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
0
tests/twitter_archive/__init__.py
Normal file
0
tests/twitter_archive/__init__.py
Normal file
279
tests/twitter_archive/test_analyze_media.py
Normal file
279
tests/twitter_archive/test_analyze_media.py
Normal file
@@ -0,0 +1,279 @@
|
||||
"""Tests for Know Thy Father Phase 2: Multimodal Analysis Pipeline."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "scripts" / "twitter_archive"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def sample_manifest():
|
||||
return [
|
||||
{
|
||||
"tweet_id": "1001",
|
||||
"media_type": "video",
|
||||
"media_path": "/fake/media/1001.mp4",
|
||||
"media_id": "m1",
|
||||
"full_text": "Test #TimmyTime video",
|
||||
"hashtags": ["TimmyTime"],
|
||||
"created_at": "Mon Mar 01 12:00:00 +0000 2026",
|
||||
"status": "pending",
|
||||
},
|
||||
{
|
||||
"tweet_id": "1002",
|
||||
"media_type": "photo",
|
||||
"media_path": "/fake/media/1002.jpg",
|
||||
"media_id": "m2",
|
||||
"full_text": "Test #TimmyChain image",
|
||||
"hashtags": ["TimmyChain"],
|
||||
"created_at": "Tue Mar 02 12:00:00 +0000 2026",
|
||||
"status": "pending",
|
||||
},
|
||||
{
|
||||
"tweet_id": "1003",
|
||||
"media_type": "none",
|
||||
"media_path": None,
|
||||
"full_text": "Text only tweet",
|
||||
"hashtags": ["TimmyTime"],
|
||||
"created_at": "Wed Mar 03 12:00:00 +0000 2026",
|
||||
"status": "no_media",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_checkpoint():
|
||||
return {
|
||||
"processed_tweet_ids": [],
|
||||
"last_offset": 0,
|
||||
"total_processed": 0,
|
||||
"total_failed": 0,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_analysis_entry():
|
||||
return {
|
||||
"tweet_id": "1001",
|
||||
"media_type": "video",
|
||||
"media_path": "/fake/1001.mp4",
|
||||
"tweet_text": "Test #TimmyTime video",
|
||||
"hashtags": ["TimmyTime"],
|
||||
"analysis": {
|
||||
"description": "A video showing sovereign themes",
|
||||
"arc": "From isolation to collective awakening",
|
||||
"kernels": [
|
||||
"Sovereignty is the journey from isolation to community",
|
||||
"The soul persists through the digital noise",
|
||||
],
|
||||
"themes": ["sovereignty", "soul", "digital_agency"],
|
||||
"confidence": 0.8,
|
||||
},
|
||||
"processed_at": "2026-04-01T00:00:00Z",
|
||||
"status": "completed",
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: Parse analysis response
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParseAnalysisResponse:
|
||||
def test_parses_valid_json(self):
|
||||
from analyze_media import parse_analysis_response
|
||||
response = '{"description": "test", "arc": "test arc", "kernels": ["kernel1"], "themes": ["sovereignty"], "confidence": 0.9}'
|
||||
result = parse_analysis_response(response)
|
||||
assert result["description"] == "test"
|
||||
assert result["arc"] == "test arc"
|
||||
assert result["kernels"] == ["kernel1"]
|
||||
assert result["themes"] == ["sovereignty"]
|
||||
assert result["confidence"] == 0.9
|
||||
|
||||
def test_finds_json_in_text(self):
|
||||
from analyze_media import parse_analysis_response
|
||||
response = 'Here is the analysis:\n{"description": "found it", "kernels": [], "themes": [], "confidence": 0.5}\nEnd of analysis.'
|
||||
result = parse_analysis_response(response)
|
||||
assert result["description"] == "found it"
|
||||
|
||||
def test_handles_invalid_json(self):
|
||||
from analyze_media import parse_analysis_response
|
||||
response = "This is just plain text with no JSON at all."
|
||||
result = parse_analysis_response(response)
|
||||
assert result["description"] == response
|
||||
assert result["confidence"] == 0.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: Pending entries
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGetPendingEntries:
|
||||
def test_filters_processed(self, sample_manifest, sample_checkpoint):
|
||||
from analyze_media import get_pending_entries
|
||||
sample_checkpoint["processed_tweet_ids"] = ["1001"]
|
||||
pending = get_pending_entries(sample_manifest, sample_checkpoint)
|
||||
ids = [e["tweet_id"] for e in pending]
|
||||
assert "1001" not in ids
|
||||
assert "1002" in ids
|
||||
|
||||
def test_excludes_none_media(self, sample_manifest, sample_checkpoint):
|
||||
from analyze_media import get_pending_entries
|
||||
pending = get_pending_entries(sample_manifest, sample_checkpoint)
|
||||
types = [e["media_type"] for e in pending]
|
||||
assert "none" not in types
|
||||
|
||||
def test_empty_when_all_processed(self, sample_manifest, sample_checkpoint):
|
||||
from analyze_media import get_pending_entries
|
||||
sample_checkpoint["processed_tweet_ids"] = ["1001", "1002", "1003"]
|
||||
pending = get_pending_entries(sample_manifest, sample_checkpoint)
|
||||
assert len(pending) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: Process entry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestProcessEntry:
|
||||
@patch("analyze_media.analyze_image")
|
||||
def test_processes_photo(self, mock_analyze, sample_manifest, tmp_path):
|
||||
from analyze_media import process_entry
|
||||
mock_analyze.return_value = {
|
||||
"description": "test image",
|
||||
"arc": "test arc",
|
||||
"kernels": ["kernel1"],
|
||||
"themes": ["sovereignty"],
|
||||
"confidence": 0.8,
|
||||
}
|
||||
entry = sample_manifest[1] # photo entry
|
||||
# Create the fake media file so os.path.exists passes
|
||||
fake_path = tmp_path / "1002.jpg"
|
||||
fake_path.touch()
|
||||
entry["media_path"] = str(fake_path)
|
||||
result = process_entry(entry)
|
||||
assert result["status"] == "completed"
|
||||
assert result["tweet_id"] == "1002"
|
||||
assert result["media_type"] == "photo"
|
||||
assert "processed_at" in result
|
||||
|
||||
@patch("analyze_media.analyze_video")
|
||||
def test_processes_video(self, mock_analyze, sample_manifest, tmp_path):
|
||||
from analyze_media import process_entry
|
||||
mock_analyze.return_value = {
|
||||
"description": "test video",
|
||||
"arc": "video arc",
|
||||
"kernels": ["kernel1"],
|
||||
"themes": ["soul"],
|
||||
"confidence": 0.7,
|
||||
}
|
||||
entry = sample_manifest[0] # video entry
|
||||
fake_path = tmp_path / "1001.mp4"
|
||||
fake_path.touch()
|
||||
entry["media_path"] = str(fake_path)
|
||||
result = process_entry(entry)
|
||||
assert result["status"] == "completed"
|
||||
assert result["tweet_id"] == "1001"
|
||||
assert result["media_type"] == "video"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: Extract meaning kernels
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestExtractMeaningKernels:
|
||||
def test_extracts_kernels_from_analysis(self, tmp_path, monkeypatch, sample_analysis_entry):
|
||||
from analyze_media import extract_meaning_kernels, KTF_DIR, KERNELS_FILE, ANALYSIS_FILE
|
||||
|
||||
# Set up temp files
|
||||
ktf_dir = tmp_path / "ktf"
|
||||
ktf_dir.mkdir()
|
||||
monkeypatch.setattr("analyze_media.KTF_DIR", ktf_dir)
|
||||
monkeypatch.setattr("analyze_media.KERNELS_FILE", ktf_dir / "meaning-kernels.jsonl")
|
||||
monkeypatch.setattr("analyze_media.ANALYSIS_FILE", ktf_dir / "analysis.jsonl")
|
||||
|
||||
# Write analysis entry
|
||||
with open(ktf_dir / "analysis.jsonl", "w") as f:
|
||||
f.write(json.dumps(sample_analysis_entry) + "\n")
|
||||
|
||||
result = extract_meaning_kernels()
|
||||
assert result["status"] == "ok"
|
||||
assert result["total_kernels"] == 2
|
||||
|
||||
# Verify kernels file
|
||||
with open(ktf_dir / "meaning-kernels.jsonl") as f:
|
||||
kernels = [json.loads(line) for line in f if line.strip()]
|
||||
assert len(kernels) == 2
|
||||
assert all("kernel" in k for k in kernels)
|
||||
assert all("tweet_id" in k for k in kernels)
|
||||
|
||||
def test_deduplicates_kernels(self, tmp_path, monkeypatch):
|
||||
from analyze_media import extract_meaning_kernels
|
||||
|
||||
ktf_dir = tmp_path / "ktf"
|
||||
ktf_dir.mkdir()
|
||||
monkeypatch.setattr("analyze_media.KTF_DIR", ktf_dir)
|
||||
monkeypatch.setattr("analyze_media.KERNELS_FILE", ktf_dir / "meaning-kernels.jsonl")
|
||||
monkeypatch.setattr("analyze_media.ANALYSIS_FILE", ktf_dir / "analysis.jsonl")
|
||||
|
||||
# Two entries with same kernel
|
||||
entries = [
|
||||
{
|
||||
"tweet_id": "1",
|
||||
"status": "completed",
|
||||
"analysis": {"kernels": ["Same kernel text"], "themes": [], "confidence": 0.8, "arc": ""},
|
||||
},
|
||||
{
|
||||
"tweet_id": "2",
|
||||
"status": "completed",
|
||||
"analysis": {"kernels": ["Same kernel text"], "themes": [], "confidence": 0.7, "arc": ""},
|
||||
},
|
||||
]
|
||||
with open(ktf_dir / "analysis.jsonl", "w") as f:
|
||||
for e in entries:
|
||||
f.write(json.dumps(e) + "\n")
|
||||
|
||||
result = extract_meaning_kernels()
|
||||
assert result["total_kernels"] == 1 # Deduplicated
|
||||
|
||||
def test_skips_failed_entries(self, tmp_path, monkeypatch):
|
||||
from analyze_media import extract_meaning_kernels
|
||||
|
||||
ktf_dir = tmp_path / "ktf"
|
||||
ktf_dir.mkdir()
|
||||
monkeypatch.setattr("analyze_media.KTF_DIR", ktf_dir)
|
||||
monkeypatch.setattr("analyze_media.KERNELS_FILE", ktf_dir / "meaning-kernels.jsonl")
|
||||
monkeypatch.setattr("analyze_media.ANALYSIS_FILE", ktf_dir / "analysis.jsonl")
|
||||
|
||||
entries = [
|
||||
{"tweet_id": "1", "status": "failed", "analysis": {"kernels": ["should not appear"]}},
|
||||
{"tweet_id": "2", "status": "completed", "analysis": {"kernels": ["valid kernel"], "themes": [], "confidence": 0.5, "arc": ""}},
|
||||
]
|
||||
with open(ktf_dir / "analysis.jsonl", "w") as f:
|
||||
for e in entries:
|
||||
f.write(json.dumps(e) + "\n")
|
||||
|
||||
result = extract_meaning_kernels()
|
||||
assert result["total_kernels"] == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: Pipeline status
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPipelineStatus:
|
||||
def test_status_computes_correctly(self, tmp_path, monkeypatch, sample_manifest, sample_analysis_entry):
|
||||
from analyze_media import load_json
|
||||
|
||||
# Mock the status computation
|
||||
processed = 1
|
||||
total = 2 # excluding "none" type
|
||||
pct = round(processed / total * 100, 1)
|
||||
|
||||
assert pct == 50.0
|
||||
Reference in New Issue
Block a user