Files
timmy-home/scripts/twitter_archive/analyze_media.py
Alexander Whitestone 2b2b8a2280
Some checks failed
Smoke Test / smoke (push) Has been cancelled
feat: Know Thy Father Phase 2 — Multimodal Analysis Pipeline (#584) (#641)
Merge PR #641
2026-04-14 22:18:15 +00:00

512 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Know Thy Father — Phase 2: Multimodal Analysis Pipeline
Processes the media manifest from Phase 1:
- Images/Memes: Visual description + Meme Logic Analysis
- Videos: Frame sequence analysis + meaning extraction
- Extraction: Identify "Meaning Kernels" related to sovereignty, service, and the soul
Architecture:
Phase 1 (index_timmy_media.py) → media-manifest.jsonl
Phase 2 (this script) → analysis entries → meaning-kernels.jsonl
Usage:
python analyze_media.py # Process all pending entries
python analyze_media.py --batch 10 # Process next 10 entries
python analyze_media.py --status # Show pipeline status
python analyze_media.py --retry-failed # Retry failed entries
python analyze_media.py --extract-kernels # Extract meaning kernels from completed analysis
Output:
~/.timmy/twitter-archive/know-thy-father/analysis.jsonl
~/.timmy/twitter-archive/know-thy-father/meaning-kernels.jsonl
~/.timmy/twitter-archive/know-thy-father/pipeline-status.json
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import subprocess
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
sys.path.insert(0, str(Path(__file__).parent))
from common import (
ARCHIVE_DIR,
load_json,
load_jsonl,
write_json,
append_jsonl,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
KTF_DIR = ARCHIVE_DIR / "know-thy-father"
MEDIA_MANIFEST = KTF_DIR / "media-manifest.jsonl"
ANALYSIS_FILE = KTF_DIR / "analysis.jsonl"
KERNELS_FILE = KTF_DIR / "meaning-kernels.jsonl"
PIPELINE_STATUS = KTF_DIR / "pipeline-status.json"
CHECKPOINT_FILE = KTF_DIR / "checkpoint.json"
# ---------------------------------------------------------------------------
# Analysis schemas
# ---------------------------------------------------------------------------
THEME_CATEGORIES = [
"sovereignty",
"service",
"soul",
"identity",
"transmutation",
"digital_agency",
"authenticity",
"collective_identity",
"glitch",
"noise",
"silence",
"void",
"presence",
"simulation",
"persistence",
]
MEANING_KERNEL_PROMPT = """Analyze this media content from Alexander Whitestone's Twitter archive.
Extract "Meaning Kernels" — distilled philosophical statements about:
1. SOVEREIGNTY — autonomy, self-determination, freedom from systems
2. SERVICE — sacrifice, community, building for others
3. THE SOUL — consciousness, identity, what persists beyond the machine
For each kernel found, provide:
- The kernel (1-2 sentences, philosophical)
- The arc (the narrative journey depicted)
- The themes (from: {themes})
- Confidence (0.0-1.0)
Also describe the visual content:
- What is depicted
- Key visual elements
- Emotional tone
- Meme logic (if applicable): format, subversion, cultural reference
Output as JSON with fields: description, arc, kernels[], themes[], confidence
""".format(themes=", ".join(THEME_CATEGORIES))
# ---------------------------------------------------------------------------
# Pipeline state management
# ---------------------------------------------------------------------------
def load_checkpoint() -> dict:
"""Load pipeline checkpoint."""
return load_json(CHECKPOINT_FILE, {
"processed_tweet_ids": [],
"last_offset": 0,
"total_processed": 0,
"total_failed": 0,
"last_run": None,
})
def save_checkpoint(checkpoint: dict) -> None:
"""Save pipeline checkpoint."""
checkpoint["last_run"] = datetime.utcnow().isoformat() + "Z"
write_json(CHECKPOINT_FILE, checkpoint)
def load_analysis_entries() -> list[dict]:
"""Load existing analysis entries."""
return load_jsonl(ANALYSIS_FILE)
def get_pending_entries(manifest: list[dict], checkpoint: dict) -> list[dict]:
"""Filter manifest to entries that haven't been processed."""
processed = set(checkpoint.get("processed_tweet_ids", []))
return [e for e in manifest if e["tweet_id"] not in processed and e.get("media_type") != "none"]
# ---------------------------------------------------------------------------
# Media processing helpers
# ---------------------------------------------------------------------------
def extract_video_frames(video_path: str, num_frames: int = 8) -> list[str]:
"""Extract representative frames from a video file.
Returns list of paths to extracted frame images.
"""
if not os.path.exists(video_path):
return []
frames_dir = tempfile.mkdtemp(prefix="ktf_frames_")
frame_paths = []
try:
# Get video duration
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", video_path],
capture_output=True, text=True, timeout=10,
)
duration = float(result.stdout.strip()) if result.returncode == 0 else 10.0
# Extract evenly spaced frames
for i in range(num_frames):
timestamp = (duration / (num_frames + 1)) * (i + 1)
frame_path = os.path.join(frames_dir, f"frame_{i:03d}.jpg")
subprocess.run(
["ffmpeg", "-ss", str(timestamp), "-i", video_path,
"-vframes", "1", "-q:v", "2", frame_path, "-y"],
capture_output=True, timeout=30,
)
if os.path.exists(frame_path):
frame_paths.append(frame_path)
except Exception as e:
logger.warning(f"Frame extraction failed for {video_path}: {e}")
return frame_paths
def analyze_with_vision(image_paths: list[str], prompt: str) -> dict:
"""Analyze images using a local vision model.
Returns structured analysis dict.
"""
if not image_paths:
return {"error": "no_images", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
# Build the vision prompt
full_prompt = prompt + "\n\nAnalyze these frames from a video sequence:"
# Try local Ollama with a vision model (Gemma 3 or LLaVA)
try:
result = subprocess.run(
["ollama", "run", "gemma3:12b", full_prompt],
capture_output=True, text=True, timeout=120,
env={**os.environ, "OLLAMA_NUM_PARALLEL": "1"},
)
if result.returncode == 0:
response = result.stdout.strip()
# Try to parse JSON from response
return parse_analysis_response(response)
except Exception as e:
logger.debug(f"Ollama vision failed: {e}")
# Fallback: text-only analysis based on tweet text
return {"error": "vision_unavailable", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
def analyze_image(image_path: str, tweet_text: str) -> dict:
"""Analyze a single image with context from the tweet text."""
prompt = MEANING_KERNEL_PROMPT + f"\n\nContext: The tweet says: \"{tweet_text}\""
return analyze_with_vision([image_path], prompt)
def analyze_video(video_path: str, tweet_text: str) -> dict:
"""Analyze a video by extracting frames and analyzing the sequence."""
frames = extract_video_frames(video_path, num_frames=6)
if not frames:
return {"error": "no_frames", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
prompt = MEANING_KERNEL_PROMPT + f"\n\nContext: The tweet says: \"{tweet_text}\"\n\nThese are {len(frames)} frames extracted from a video. Analyze the narrative arc across the sequence."
result = analyze_with_vision(frames, prompt)
# Cleanup frames
for f in frames:
try:
os.unlink(f)
except Exception:
pass
try:
os.rmdir(os.path.dirname(frames[0]))
except Exception:
pass
return result
def parse_analysis_response(response: str) -> dict:
"""Parse the LLM response into a structured analysis dict."""
# Try to find JSON in the response
import re
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
try:
parsed = json.loads(json_match.group())
return {
"description": parsed.get("description", ""),
"arc": parsed.get("arc", ""),
"kernels": parsed.get("kernels", []),
"themes": parsed.get("themes", []),
"confidence": parsed.get("confidence", 0.5),
"raw_response": response,
}
except json.JSONDecodeError:
pass
# Fallback: return raw response
return {
"description": response[:500],
"arc": "",
"kernels": [],
"themes": [],
"confidence": 0.0,
"raw_response": response,
}
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def process_entry(entry: dict, tweet_text: str = "") -> dict:
"""Process a single media entry and return the analysis result."""
media_type = entry.get("media_type", "unknown")
media_path = entry.get("media_path")
text = tweet_text or entry.get("full_text", "")
if media_type == "photo":
analysis = analyze_image(media_path, text) if media_path and os.path.exists(media_path) else {"error": "file_missing"}
elif media_type in ("video", "animated_gif"):
analysis = analyze_video(media_path, text) if media_path and os.path.exists(media_path) else {"error": "file_missing"}
else:
analysis = {"error": f"unsupported_type:{media_type}"}
return {
"tweet_id": entry["tweet_id"],
"media_type": media_type,
"media_path": media_path,
"media_id": entry.get("media_id"),
"tweet_text": text,
"hashtags": entry.get("hashtags", []),
"created_at": entry.get("created_at"),
"analysis": analysis,
"processed_at": datetime.utcnow().isoformat() + "Z",
"status": "completed" if not analysis.get("error") else "failed",
"error": analysis.get("error"),
}
def run_pipeline(batch_size: int = 0, retry_failed: bool = False) -> dict:
"""Run the analysis pipeline on pending entries.
Args:
batch_size: Number of entries to process (0 = all pending)
retry_failed: Whether to retry previously failed entries
Returns:
Pipeline run summary
"""
# Load data
manifest = load_jsonl(MEDIA_MANIFEST)
if not manifest:
return {"status": "error", "reason": "No media manifest found. Run index_timmy_media.py first."}
checkpoint = load_checkpoint()
if retry_failed:
# Reset failed entries
existing = load_analysis_entries()
failed_ids = {e["tweet_id"] for e in existing if e.get("status") == "failed"}
checkpoint["processed_tweet_ids"] = [
tid for tid in checkpoint.get("processed_tweet_ids", [])
if tid not in failed_ids
]
pending = get_pending_entries(manifest, checkpoint)
if not pending:
return {"status": "ok", "message": "No pending entries to process.", "processed": 0}
if batch_size > 0:
pending = pending[:batch_size]
# Process entries
processed = []
failed = []
for i, entry in enumerate(pending):
print(f" Processing {i+1}/{len(pending)}: tweet {entry['tweet_id']} ({entry.get('media_type')})...")
try:
result = process_entry(entry)
processed.append(result)
append_jsonl(ANALYSIS_FILE, [result])
# Update checkpoint
checkpoint["processed_tweet_ids"].append(entry["tweet_id"])
checkpoint["total_processed"] = checkpoint.get("total_processed", 0) + 1
if result["status"] == "failed":
checkpoint["total_failed"] = checkpoint.get("total_failed", 0) + 1
failed.append(entry["tweet_id"])
except Exception as e:
logger.error(f"Failed to process {entry['tweet_id']}: {e}")
failed.append(entry["tweet_id"])
checkpoint["total_failed"] = checkpoint.get("total_failed", 0) + 1
# Save checkpoint
checkpoint["last_offset"] = checkpoint.get("last_offset", 0) + len(pending)
save_checkpoint(checkpoint)
# Update pipeline status
total_manifest = len([e for e in manifest if e.get("media_type") != "none"])
total_done = len(set(checkpoint.get("processed_tweet_ids", [])))
status = {
"phase": "analysis",
"total_targets": total_manifest,
"total_processed": total_done,
"total_pending": total_manifest - total_done,
"total_failed": checkpoint.get("total_failed", 0),
"completion_pct": round(total_done / total_manifest * 100, 1) if total_manifest > 0 else 0,
"last_run": datetime.utcnow().isoformat() + "Z",
"batch_processed": len(processed),
"batch_failed": len(failed),
}
write_json(PIPELINE_STATUS, status)
return status
def extract_meaning_kernels() -> dict:
"""Extract meaning kernels from completed analysis entries.
Reads analysis.jsonl and produces meaning-kernels.jsonl with
deduplicated, confidence-scored kernels.
"""
entries = load_analysis_entries()
if not entries:
return {"status": "error", "reason": "No analysis entries found."}
all_kernels = []
for entry in entries:
if entry.get("status") != "completed":
continue
analysis = entry.get("analysis", {})
kernels = analysis.get("kernels", [])
for kernel in kernels:
if isinstance(kernel, str):
all_kernels.append({
"tweet_id": entry["tweet_id"],
"kernel": kernel,
"arc": analysis.get("arc", ""),
"themes": analysis.get("themes", []),
"confidence": analysis.get("confidence", 0.5),
"created_at": entry.get("created_at"),
})
elif isinstance(kernel, dict):
all_kernels.append({
"tweet_id": entry["tweet_id"],
"kernel": kernel.get("kernel", kernel.get("text", str(kernel))),
"arc": kernel.get("arc", analysis.get("arc", "")),
"themes": kernel.get("themes", analysis.get("themes", [])),
"confidence": kernel.get("confidence", analysis.get("confidence", 0.5)),
"created_at": entry.get("created_at"),
})
# Deduplicate by kernel text
seen = set()
unique_kernels = []
for k in all_kernels:
key = k["kernel"][:100].lower()
if key not in seen:
seen.add(key)
unique_kernels.append(k)
# Sort by confidence
unique_kernels.sort(key=lambda k: k.get("confidence", 0), reverse=True)
# Write
KTF_DIR.mkdir(parents=True, exist_ok=True)
with open(KERNELS_FILE, "w") as f:
for k in unique_kernels:
f.write(json.dumps(k, sort_keys=True) + "\n")
return {
"status": "ok",
"total_kernels": len(unique_kernels),
"output": str(KERNELS_FILE),
}
def print_status() -> None:
"""Print pipeline status."""
manifest = load_jsonl(MEDIA_MANIFEST)
checkpoint = load_checkpoint()
analysis = load_analysis_entries()
status = load_json(PIPELINE_STATUS, {})
total_media = len([e for e in manifest if e.get("media_type") != "none"])
processed = len(set(checkpoint.get("processed_tweet_ids", [])))
completed = len([e for e in analysis if e.get("status") == "completed"])
failed = len([e for e in analysis if e.get("status") == "failed"])
print("Know Thy Father — Phase 2: Multimodal Analysis")
print("=" * 50)
print(f" Media manifest: {total_media} entries")
print(f" Processed: {processed}")
print(f" Completed: {completed}")
print(f" Failed: {failed}")
print(f" Pending: {total_media - processed}")
print(f" Completion: {round(processed/total_media*100, 1) if total_media else 0}%")
print()
# Theme distribution from analysis
from collections import Counter
theme_counter = Counter()
for entry in analysis:
for theme in entry.get("analysis", {}).get("themes", []):
theme_counter[theme] += 1
if theme_counter:
print("Theme distribution:")
for theme, count in theme_counter.most_common(10):
print(f" {theme:25s} {count}")
# Kernels count
kernels = load_jsonl(KERNELS_FILE)
if kernels:
print(f"\nMeaning kernels extracted: {len(kernels)}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
parser = argparse.ArgumentParser(description="Know Thy Father — Phase 2: Multimodal Analysis")
parser.add_argument("--batch", type=int, default=0, help="Process N entries (0 = all)")
parser.add_argument("--status", action="store_true", help="Show pipeline status")
parser.add_argument("--retry-failed", action="store_true", help="Retry failed entries")
parser.add_argument("--extract-kernels", action="store_true", help="Extract meaning kernels from analysis")
args = parser.parse_args()
KTF_DIR.mkdir(parents=True, exist_ok=True)
if args.status:
print_status()
return
if args.extract_kernels:
result = extract_meaning_kernels()
print(json.dumps(result, indent=2))
return
result = run_pipeline(batch_size=args.batch, retry_failed=args.retry_failed)
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()