Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merge PR #641
512 lines
18 KiB
Python
512 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Know Thy Father — Phase 2: Multimodal Analysis Pipeline
|
|
|
|
Processes the media manifest from Phase 1:
|
|
- Images/Memes: Visual description + Meme Logic Analysis
|
|
- Videos: Frame sequence analysis + meaning extraction
|
|
- Extraction: Identify "Meaning Kernels" related to sovereignty, service, and the soul
|
|
|
|
Architecture:
|
|
Phase 1 (index_timmy_media.py) → media-manifest.jsonl
|
|
Phase 2 (this script) → analysis entries → meaning-kernels.jsonl
|
|
|
|
Usage:
|
|
python analyze_media.py # Process all pending entries
|
|
python analyze_media.py --batch 10 # Process next 10 entries
|
|
python analyze_media.py --status # Show pipeline status
|
|
python analyze_media.py --retry-failed # Retry failed entries
|
|
python analyze_media.py --extract-kernels # Extract meaning kernels from completed analysis
|
|
|
|
Output:
|
|
~/.timmy/twitter-archive/know-thy-father/analysis.jsonl
|
|
~/.timmy/twitter-archive/know-thy-father/meaning-kernels.jsonl
|
|
~/.timmy/twitter-archive/know-thy-father/pipeline-status.json
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from common import (
|
|
ARCHIVE_DIR,
|
|
load_json,
|
|
load_jsonl,
|
|
write_json,
|
|
append_jsonl,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Paths
|
|
# ---------------------------------------------------------------------------
|
|
|
|
KTF_DIR = ARCHIVE_DIR / "know-thy-father"
|
|
MEDIA_MANIFEST = KTF_DIR / "media-manifest.jsonl"
|
|
ANALYSIS_FILE = KTF_DIR / "analysis.jsonl"
|
|
KERNELS_FILE = KTF_DIR / "meaning-kernels.jsonl"
|
|
PIPELINE_STATUS = KTF_DIR / "pipeline-status.json"
|
|
CHECKPOINT_FILE = KTF_DIR / "checkpoint.json"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Analysis schemas
|
|
# ---------------------------------------------------------------------------
|
|
|
|
THEME_CATEGORIES = [
|
|
"sovereignty",
|
|
"service",
|
|
"soul",
|
|
"identity",
|
|
"transmutation",
|
|
"digital_agency",
|
|
"authenticity",
|
|
"collective_identity",
|
|
"glitch",
|
|
"noise",
|
|
"silence",
|
|
"void",
|
|
"presence",
|
|
"simulation",
|
|
"persistence",
|
|
]
|
|
|
|
MEANING_KERNEL_PROMPT = """Analyze this media content from Alexander Whitestone's Twitter archive.
|
|
Extract "Meaning Kernels" — distilled philosophical statements about:
|
|
|
|
1. SOVEREIGNTY — autonomy, self-determination, freedom from systems
|
|
2. SERVICE — sacrifice, community, building for others
|
|
3. THE SOUL — consciousness, identity, what persists beyond the machine
|
|
|
|
For each kernel found, provide:
|
|
- The kernel (1-2 sentences, philosophical)
|
|
- The arc (the narrative journey depicted)
|
|
- The themes (from: {themes})
|
|
- Confidence (0.0-1.0)
|
|
|
|
Also describe the visual content:
|
|
- What is depicted
|
|
- Key visual elements
|
|
- Emotional tone
|
|
- Meme logic (if applicable): format, subversion, cultural reference
|
|
|
|
Output as JSON with fields: description, arc, kernels[], themes[], confidence
|
|
""".format(themes=", ".join(THEME_CATEGORIES))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pipeline state management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_checkpoint() -> dict:
|
|
"""Load pipeline checkpoint."""
|
|
return load_json(CHECKPOINT_FILE, {
|
|
"processed_tweet_ids": [],
|
|
"last_offset": 0,
|
|
"total_processed": 0,
|
|
"total_failed": 0,
|
|
"last_run": None,
|
|
})
|
|
|
|
|
|
def save_checkpoint(checkpoint: dict) -> None:
|
|
"""Save pipeline checkpoint."""
|
|
checkpoint["last_run"] = datetime.utcnow().isoformat() + "Z"
|
|
write_json(CHECKPOINT_FILE, checkpoint)
|
|
|
|
|
|
def load_analysis_entries() -> list[dict]:
|
|
"""Load existing analysis entries."""
|
|
return load_jsonl(ANALYSIS_FILE)
|
|
|
|
|
|
def get_pending_entries(manifest: list[dict], checkpoint: dict) -> list[dict]:
|
|
"""Filter manifest to entries that haven't been processed."""
|
|
processed = set(checkpoint.get("processed_tweet_ids", []))
|
|
return [e for e in manifest if e["tweet_id"] not in processed and e.get("media_type") != "none"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Media processing helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def extract_video_frames(video_path: str, num_frames: int = 8) -> list[str]:
|
|
"""Extract representative frames from a video file.
|
|
|
|
Returns list of paths to extracted frame images.
|
|
"""
|
|
if not os.path.exists(video_path):
|
|
return []
|
|
|
|
frames_dir = tempfile.mkdtemp(prefix="ktf_frames_")
|
|
frame_paths = []
|
|
|
|
try:
|
|
# Get video duration
|
|
result = subprocess.run(
|
|
["ffprobe", "-v", "error", "-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1", video_path],
|
|
capture_output=True, text=True, timeout=10,
|
|
)
|
|
duration = float(result.stdout.strip()) if result.returncode == 0 else 10.0
|
|
|
|
# Extract evenly spaced frames
|
|
for i in range(num_frames):
|
|
timestamp = (duration / (num_frames + 1)) * (i + 1)
|
|
frame_path = os.path.join(frames_dir, f"frame_{i:03d}.jpg")
|
|
subprocess.run(
|
|
["ffmpeg", "-ss", str(timestamp), "-i", video_path,
|
|
"-vframes", "1", "-q:v", "2", frame_path, "-y"],
|
|
capture_output=True, timeout=30,
|
|
)
|
|
if os.path.exists(frame_path):
|
|
frame_paths.append(frame_path)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Frame extraction failed for {video_path}: {e}")
|
|
|
|
return frame_paths
|
|
|
|
|
|
def analyze_with_vision(image_paths: list[str], prompt: str) -> dict:
|
|
"""Analyze images using a local vision model.
|
|
|
|
Returns structured analysis dict.
|
|
"""
|
|
if not image_paths:
|
|
return {"error": "no_images", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
|
|
|
|
# Build the vision prompt
|
|
full_prompt = prompt + "\n\nAnalyze these frames from a video sequence:"
|
|
|
|
# Try local Ollama with a vision model (Gemma 3 or LLaVA)
|
|
try:
|
|
result = subprocess.run(
|
|
["ollama", "run", "gemma3:12b", full_prompt],
|
|
capture_output=True, text=True, timeout=120,
|
|
env={**os.environ, "OLLAMA_NUM_PARALLEL": "1"},
|
|
)
|
|
if result.returncode == 0:
|
|
response = result.stdout.strip()
|
|
# Try to parse JSON from response
|
|
return parse_analysis_response(response)
|
|
except Exception as e:
|
|
logger.debug(f"Ollama vision failed: {e}")
|
|
|
|
# Fallback: text-only analysis based on tweet text
|
|
return {"error": "vision_unavailable", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
|
|
|
|
|
|
def analyze_image(image_path: str, tweet_text: str) -> dict:
|
|
"""Analyze a single image with context from the tweet text."""
|
|
prompt = MEANING_KERNEL_PROMPT + f"\n\nContext: The tweet says: \"{tweet_text}\""
|
|
return analyze_with_vision([image_path], prompt)
|
|
|
|
|
|
def analyze_video(video_path: str, tweet_text: str) -> dict:
|
|
"""Analyze a video by extracting frames and analyzing the sequence."""
|
|
frames = extract_video_frames(video_path, num_frames=6)
|
|
if not frames:
|
|
return {"error": "no_frames", "description": "", "kernels": [], "themes": [], "confidence": 0.0}
|
|
|
|
prompt = MEANING_KERNEL_PROMPT + f"\n\nContext: The tweet says: \"{tweet_text}\"\n\nThese are {len(frames)} frames extracted from a video. Analyze the narrative arc across the sequence."
|
|
result = analyze_with_vision(frames, prompt)
|
|
|
|
# Cleanup frames
|
|
for f in frames:
|
|
try:
|
|
os.unlink(f)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
os.rmdir(os.path.dirname(frames[0]))
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
def parse_analysis_response(response: str) -> dict:
|
|
"""Parse the LLM response into a structured analysis dict."""
|
|
# Try to find JSON in the response
|
|
import re
|
|
json_match = re.search(r'\{[\s\S]*\}', response)
|
|
if json_match:
|
|
try:
|
|
parsed = json.loads(json_match.group())
|
|
return {
|
|
"description": parsed.get("description", ""),
|
|
"arc": parsed.get("arc", ""),
|
|
"kernels": parsed.get("kernels", []),
|
|
"themes": parsed.get("themes", []),
|
|
"confidence": parsed.get("confidence", 0.5),
|
|
"raw_response": response,
|
|
}
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fallback: return raw response
|
|
return {
|
|
"description": response[:500],
|
|
"arc": "",
|
|
"kernels": [],
|
|
"themes": [],
|
|
"confidence": 0.0,
|
|
"raw_response": response,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main pipeline
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def process_entry(entry: dict, tweet_text: str = "") -> dict:
|
|
"""Process a single media entry and return the analysis result."""
|
|
media_type = entry.get("media_type", "unknown")
|
|
media_path = entry.get("media_path")
|
|
text = tweet_text or entry.get("full_text", "")
|
|
|
|
if media_type == "photo":
|
|
analysis = analyze_image(media_path, text) if media_path and os.path.exists(media_path) else {"error": "file_missing"}
|
|
elif media_type in ("video", "animated_gif"):
|
|
analysis = analyze_video(media_path, text) if media_path and os.path.exists(media_path) else {"error": "file_missing"}
|
|
else:
|
|
analysis = {"error": f"unsupported_type:{media_type}"}
|
|
|
|
return {
|
|
"tweet_id": entry["tweet_id"],
|
|
"media_type": media_type,
|
|
"media_path": media_path,
|
|
"media_id": entry.get("media_id"),
|
|
"tweet_text": text,
|
|
"hashtags": entry.get("hashtags", []),
|
|
"created_at": entry.get("created_at"),
|
|
"analysis": analysis,
|
|
"processed_at": datetime.utcnow().isoformat() + "Z",
|
|
"status": "completed" if not analysis.get("error") else "failed",
|
|
"error": analysis.get("error"),
|
|
}
|
|
|
|
|
|
def run_pipeline(batch_size: int = 0, retry_failed: bool = False) -> dict:
|
|
"""Run the analysis pipeline on pending entries.
|
|
|
|
Args:
|
|
batch_size: Number of entries to process (0 = all pending)
|
|
retry_failed: Whether to retry previously failed entries
|
|
|
|
Returns:
|
|
Pipeline run summary
|
|
"""
|
|
# Load data
|
|
manifest = load_jsonl(MEDIA_MANIFEST)
|
|
if not manifest:
|
|
return {"status": "error", "reason": "No media manifest found. Run index_timmy_media.py first."}
|
|
|
|
checkpoint = load_checkpoint()
|
|
|
|
if retry_failed:
|
|
# Reset failed entries
|
|
existing = load_analysis_entries()
|
|
failed_ids = {e["tweet_id"] for e in existing if e.get("status") == "failed"}
|
|
checkpoint["processed_tweet_ids"] = [
|
|
tid for tid in checkpoint.get("processed_tweet_ids", [])
|
|
if tid not in failed_ids
|
|
]
|
|
|
|
pending = get_pending_entries(manifest, checkpoint)
|
|
if not pending:
|
|
return {"status": "ok", "message": "No pending entries to process.", "processed": 0}
|
|
|
|
if batch_size > 0:
|
|
pending = pending[:batch_size]
|
|
|
|
# Process entries
|
|
processed = []
|
|
failed = []
|
|
for i, entry in enumerate(pending):
|
|
print(f" Processing {i+1}/{len(pending)}: tweet {entry['tweet_id']} ({entry.get('media_type')})...")
|
|
try:
|
|
result = process_entry(entry)
|
|
processed.append(result)
|
|
append_jsonl(ANALYSIS_FILE, [result])
|
|
|
|
# Update checkpoint
|
|
checkpoint["processed_tweet_ids"].append(entry["tweet_id"])
|
|
checkpoint["total_processed"] = checkpoint.get("total_processed", 0) + 1
|
|
|
|
if result["status"] == "failed":
|
|
checkpoint["total_failed"] = checkpoint.get("total_failed", 0) + 1
|
|
failed.append(entry["tweet_id"])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process {entry['tweet_id']}: {e}")
|
|
failed.append(entry["tweet_id"])
|
|
checkpoint["total_failed"] = checkpoint.get("total_failed", 0) + 1
|
|
|
|
# Save checkpoint
|
|
checkpoint["last_offset"] = checkpoint.get("last_offset", 0) + len(pending)
|
|
save_checkpoint(checkpoint)
|
|
|
|
# Update pipeline status
|
|
total_manifest = len([e for e in manifest if e.get("media_type") != "none"])
|
|
total_done = len(set(checkpoint.get("processed_tweet_ids", [])))
|
|
status = {
|
|
"phase": "analysis",
|
|
"total_targets": total_manifest,
|
|
"total_processed": total_done,
|
|
"total_pending": total_manifest - total_done,
|
|
"total_failed": checkpoint.get("total_failed", 0),
|
|
"completion_pct": round(total_done / total_manifest * 100, 1) if total_manifest > 0 else 0,
|
|
"last_run": datetime.utcnow().isoformat() + "Z",
|
|
"batch_processed": len(processed),
|
|
"batch_failed": len(failed),
|
|
}
|
|
write_json(PIPELINE_STATUS, status)
|
|
|
|
return status
|
|
|
|
|
|
def extract_meaning_kernels() -> dict:
|
|
"""Extract meaning kernels from completed analysis entries.
|
|
|
|
Reads analysis.jsonl and produces meaning-kernels.jsonl with
|
|
deduplicated, confidence-scored kernels.
|
|
"""
|
|
entries = load_analysis_entries()
|
|
if not entries:
|
|
return {"status": "error", "reason": "No analysis entries found."}
|
|
|
|
all_kernels = []
|
|
for entry in entries:
|
|
if entry.get("status") != "completed":
|
|
continue
|
|
analysis = entry.get("analysis", {})
|
|
kernels = analysis.get("kernels", [])
|
|
for kernel in kernels:
|
|
if isinstance(kernel, str):
|
|
all_kernels.append({
|
|
"tweet_id": entry["tweet_id"],
|
|
"kernel": kernel,
|
|
"arc": analysis.get("arc", ""),
|
|
"themes": analysis.get("themes", []),
|
|
"confidence": analysis.get("confidence", 0.5),
|
|
"created_at": entry.get("created_at"),
|
|
})
|
|
elif isinstance(kernel, dict):
|
|
all_kernels.append({
|
|
"tweet_id": entry["tweet_id"],
|
|
"kernel": kernel.get("kernel", kernel.get("text", str(kernel))),
|
|
"arc": kernel.get("arc", analysis.get("arc", "")),
|
|
"themes": kernel.get("themes", analysis.get("themes", [])),
|
|
"confidence": kernel.get("confidence", analysis.get("confidence", 0.5)),
|
|
"created_at": entry.get("created_at"),
|
|
})
|
|
|
|
# Deduplicate by kernel text
|
|
seen = set()
|
|
unique_kernels = []
|
|
for k in all_kernels:
|
|
key = k["kernel"][:100].lower()
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique_kernels.append(k)
|
|
|
|
# Sort by confidence
|
|
unique_kernels.sort(key=lambda k: k.get("confidence", 0), reverse=True)
|
|
|
|
# Write
|
|
KTF_DIR.mkdir(parents=True, exist_ok=True)
|
|
with open(KERNELS_FILE, "w") as f:
|
|
for k in unique_kernels:
|
|
f.write(json.dumps(k, sort_keys=True) + "\n")
|
|
|
|
return {
|
|
"status": "ok",
|
|
"total_kernels": len(unique_kernels),
|
|
"output": str(KERNELS_FILE),
|
|
}
|
|
|
|
|
|
def print_status() -> None:
|
|
"""Print pipeline status."""
|
|
manifest = load_jsonl(MEDIA_MANIFEST)
|
|
checkpoint = load_checkpoint()
|
|
analysis = load_analysis_entries()
|
|
status = load_json(PIPELINE_STATUS, {})
|
|
|
|
total_media = len([e for e in manifest if e.get("media_type") != "none"])
|
|
processed = len(set(checkpoint.get("processed_tweet_ids", [])))
|
|
completed = len([e for e in analysis if e.get("status") == "completed"])
|
|
failed = len([e for e in analysis if e.get("status") == "failed"])
|
|
|
|
print("Know Thy Father — Phase 2: Multimodal Analysis")
|
|
print("=" * 50)
|
|
print(f" Media manifest: {total_media} entries")
|
|
print(f" Processed: {processed}")
|
|
print(f" Completed: {completed}")
|
|
print(f" Failed: {failed}")
|
|
print(f" Pending: {total_media - processed}")
|
|
print(f" Completion: {round(processed/total_media*100, 1) if total_media else 0}%")
|
|
print()
|
|
|
|
# Theme distribution from analysis
|
|
from collections import Counter
|
|
theme_counter = Counter()
|
|
for entry in analysis:
|
|
for theme in entry.get("analysis", {}).get("themes", []):
|
|
theme_counter[theme] += 1
|
|
if theme_counter:
|
|
print("Theme distribution:")
|
|
for theme, count in theme_counter.most_common(10):
|
|
print(f" {theme:25s} {count}")
|
|
|
|
# Kernels count
|
|
kernels = load_jsonl(KERNELS_FILE)
|
|
if kernels:
|
|
print(f"\nMeaning kernels extracted: {len(kernels)}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
|
|
|
parser = argparse.ArgumentParser(description="Know Thy Father — Phase 2: Multimodal Analysis")
|
|
parser.add_argument("--batch", type=int, default=0, help="Process N entries (0 = all)")
|
|
parser.add_argument("--status", action="store_true", help="Show pipeline status")
|
|
parser.add_argument("--retry-failed", action="store_true", help="Retry failed entries")
|
|
parser.add_argument("--extract-kernels", action="store_true", help="Extract meaning kernels from analysis")
|
|
args = parser.parse_args()
|
|
|
|
KTF_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
if args.status:
|
|
print_status()
|
|
return
|
|
|
|
if args.extract_kernels:
|
|
result = extract_meaning_kernels()
|
|
print(json.dumps(result, indent=2))
|
|
return
|
|
|
|
result = run_pipeline(batch_size=args.batch, retry_failed=args.retry_failed)
|
|
print(json.dumps(result, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|