feat(know-thy-father): Phase 2 Multimodal Analysis Pipeline (#584)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 11s
Some checks failed
Smoke Test / smoke (pull_request) Failing after 11s
Implement the multimodal analysis pipeline that processes the 818-entry
media manifest from Phase 1 to extract Meaning Kernels.
Pipeline (twitter-archive/multimodal_pipeline.py):
- Images/GIFs: Visual Description → Meme Logic → Meaning Kernels
- Videos: Keyframe Extraction (ffmpeg) → Per-Frame Description →
Sequence Analysis → Meaning Kernels
- All inference local via Gemma 4 (Ollama). Zero cloud credits.
Meaning Kernels extracted in three categories:
- SOVEREIGNTY: Bitcoin, decentralization, freedom, autonomy
- SERVICE: Building for others, caring, community, fatherhood
- THE SOUL: Identity, purpose, faith, what makes something alive
Features:
- Checkpoint/resume support (analysis_checkpoint.json)
- Per-item analysis saved to media/analysis/{tweet_id}.json
- Append-only meaning_kernels.jsonl for Phase 3 synthesis
- --synthesize flag generates categorized summary
- --type filter for photo/animated_gif/video
- Graceful error handling with error logs
Closes #584
This commit is contained in:
144
skills/autonomous-ai-agents/know-thy-father-multimodal/SKILL.md
Normal file
144
skills/autonomous-ai-agents/know-thy-father-multimodal/SKILL.md
Normal file
@@ -0,0 +1,144 @@
|
||||
---
|
||||
name: know-thy-father-multimodal
|
||||
description: "Multimodal analysis pipeline for Know Thy Father. Process Twitter media (images, GIFs, videos) via Gemma 4 to extract Meaning Kernels about sovereignty, service, and the soul."
|
||||
version: 1.0.0
|
||||
author: Timmy Time
|
||||
license: MIT
|
||||
metadata:
|
||||
hermes:
|
||||
tags: [multimodal, vision, analysis, meaning-kernels, twitter, sovereign]
|
||||
related_skills: [know-thy-father-pipeline, sovereign-meaning-synthesis]
|
||||
---
|
||||
|
||||
# Know Thy Father — Phase 2: Multimodal Analysis
|
||||
|
||||
## Overview
|
||||
|
||||
Processes the 818-entry media manifest from Phase 1 to extract Meaning Kernels — compact philosophical observations about sovereignty, service, and the soul — using local Gemma 4 inference. Zero cloud credits.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
Phase 1 (manifest.jsonl)
|
||||
│ 818 media entries with tweet text, hashtags, local paths
|
||||
▼
|
||||
Phase 2 (multimodal_pipeline.py)
|
||||
├── Images/GIFs → Visual Description → Meme Logic → Meaning Kernels
|
||||
└── Videos → Keyframes → Audio → Sequence Analysis → Meaning Kernels
|
||||
▼
|
||||
Output
|
||||
├── media/analysis/{tweet_id}.json — per-item analysis
|
||||
├── media/meaning_kernels.jsonl — all extracted kernels
|
||||
├── media/meaning_kernels_summary.json — categorized summary
|
||||
└── media/analysis_checkpoint.json — resume state
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic run (first 10 items)
|
||||
```bash
|
||||
cd twitter-archive
|
||||
python3 multimodal_pipeline.py --manifest media/manifest.jsonl --limit 10
|
||||
```
|
||||
|
||||
### Resume from checkpoint
|
||||
```bash
|
||||
python3 multimodal_pipeline.py --resume
|
||||
```
|
||||
|
||||
### Process only photos
|
||||
```bash
|
||||
python3 multimodal_pipeline.py --type photo --limit 50
|
||||
```
|
||||
|
||||
### Process only videos
|
||||
```bash
|
||||
python3 multimodal_pipeline.py --type video --limit 10
|
||||
```
|
||||
|
||||
### Generate meaning kernel summary
|
||||
```bash
|
||||
python3 multimodal_pipeline.py --synthesize
|
||||
```
|
||||
|
||||
## Meaning Kernels
|
||||
|
||||
Each kernel is a JSON object:
|
||||
```json
|
||||
{
|
||||
"category": "sovereignty|service|soul",
|
||||
"kernel": "one-sentence observation",
|
||||
"evidence": "what in the media supports this",
|
||||
"confidence": "high|medium|low",
|
||||
"source_tweet_id": "1234567890",
|
||||
"source_media_type": "photo",
|
||||
"source_hashtags": ["timmytime", "bitcoin"]
|
||||
}
|
||||
```
|
||||
|
||||
### Categories
|
||||
|
||||
- **SOVEREIGNTY**: Self-sovereignty, Bitcoin, decentralization, freedom, autonomy
|
||||
- **SERVICE**: Building for others, caring for broken men, community, fatherhood
|
||||
- **THE SOUL**: Identity, purpose, faith, what makes something alive, the soul of technology
|
||||
|
||||
## Pipeline Steps per Media Item
|
||||
|
||||
### Images/GIFs
|
||||
1. **Visual Description** — What is depicted, style, text overlays, emotional tone
|
||||
2. **Meme Logic** — Core joke/message, cultural references, what sharing reveals
|
||||
3. **Meaning Kernel Extraction** — Philosophical observations from the analysis
|
||||
|
||||
### Videos
|
||||
1. **Keyframe Extraction** — 5 evenly-spaced frames via ffmpeg
|
||||
2. **Per-Frame Description** — Visual description of each keyframe
|
||||
3. **Audio Extraction** — Demux to WAV (transcription via Whisper, pending)
|
||||
4. **Sequence Analysis** — Narrative arc, key moments, emotional progression
|
||||
5. **Meaning Kernel Extraction** — Philosophical observations from the analysis
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- **Ollama** running locally with `gemma4:latest` (or configured model)
|
||||
- **ffmpeg** and **ffprobe** for video processing
|
||||
- Local Twitter archive media files at the paths in manifest.jsonl
|
||||
|
||||
## Configuration (env vars)
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `KTF_WORKSPACE` | `~/timmy-home/twitter-archive` | Project workspace |
|
||||
| `OLLAMA_URL` | `http://localhost:11434` | Ollama API endpoint |
|
||||
| `KTF_MODEL` | `gemma4:latest` | Model for text analysis |
|
||||
| `KTF_VISION_MODEL` | `gemma4:latest` | Model for vision (multimodal) |
|
||||
|
||||
## Output Structure
|
||||
|
||||
```
|
||||
media/
|
||||
analysis/
|
||||
{tweet_id}.json — Full analysis per item
|
||||
{tweet_id}_error.json — Error log for failed items
|
||||
analysis_checkpoint.json — Resume state
|
||||
meaning_kernels.jsonl — All kernels (append-only)
|
||||
meaning_kernels_summary.json — Categorized summary
|
||||
```
|
||||
|
||||
## Integration with Phase 3
|
||||
|
||||
The `meaning_kernels.jsonl` file is the input for Phase 3 (Holographic Synthesis):
|
||||
- Kernels feed into `fact_store` as structured memories
|
||||
- Categories map to memory types (sovereignty→values, service→mission, soul→identity)
|
||||
- Confidence scores weight fact trust levels
|
||||
- Source tweets provide provenance links
|
||||
|
||||
## Pitfalls
|
||||
|
||||
1. **Local-only inference** — Zero cloud credits. Gemma 4 via Ollama. If Ollama is down, pipeline fails gracefully with error logs.
|
||||
|
||||
2. **GIFs are videos** — Twitter stores GIFs as MP4. Pipeline handles `animated_gif` type by extracting first frame.
|
||||
|
||||
3. **Missing media files** — The manifest references absolute paths from Alexander's archive. If files are moved, analysis records the error and continues.
|
||||
|
||||
4. **Slow processing** — Gemma 4 vision is ~5-10s per image. 818 items at 8s each = ~2 hours. Use `--limit` and `--resume` for incremental runs.
|
||||
|
||||
5. **Kernel quality** — Low-confidence kernels are noisy. The `--synthesize` command filters to high-confidence for review.
|
||||
541
twitter-archive/multimodal_pipeline.py
Executable file
541
twitter-archive/multimodal_pipeline.py
Executable file
@@ -0,0 +1,541 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Know Thy Father — Phase 2: Multimodal Analysis Pipeline
|
||||
|
||||
Processes the media manifest from Phase 1 to extract Meaning Kernels:
|
||||
- Images/GIFs: Visual description + Meme Logic Analysis
|
||||
- Videos: Frame extraction + Audio transcription + Visual Sequence Analysis
|
||||
|
||||
Designed for local inference via Gemma 4 (Ollama/llama.cpp). Zero cloud credits.
|
||||
|
||||
Usage:
|
||||
python3 multimodal_pipeline.py --manifest media/manifest.jsonl --limit 10
|
||||
python3 multimodal_pipeline.py --manifest media/manifest.jsonl --resume
|
||||
python3 multimodal_pipeline.py --manifest media/manifest.jsonl --type photo
|
||||
python3 multimodal_pipeline.py --synthesize # Generate meaning kernel summary
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
# ── Config ──────────────────────────────────────────────
|
||||
WORKSPACE = os.environ.get("KTF_WORKSPACE", os.path.expanduser("~/timmy-home/twitter-archive"))
|
||||
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
|
||||
MODEL = os.environ.get("KTF_MODEL", "gemma4:latest")
|
||||
VISION_MODEL = os.environ.get("KTF_VISION_MODEL", "gemma4:latest")
|
||||
CHECKPOINT_FILE = os.path.join(WORKSPACE, "media", "analysis_checkpoint.json")
|
||||
OUTPUT_DIR = os.path.join(WORKSPACE, "media", "analysis")
|
||||
KERNELS_FILE = os.path.join(WORKSPACE, "media", "meaning_kernels.jsonl")
|
||||
|
||||
# ── Prompt Templates ────────────────────────────────────
|
||||
|
||||
VISUAL_DESCRIPTION_PROMPT = """Describe this image in detail. Focus on:
|
||||
1. What is depicted (objects, people, text, symbols)
|
||||
2. Visual style (aesthetic, colors, composition)
|
||||
3. Any text overlays or captions visible
|
||||
4. Emotional tone conveyed
|
||||
|
||||
Be specific and factual. This is for building understanding of a person's visual language."""
|
||||
|
||||
MEME_LOGIC_PROMPT = """Analyze this image as a meme or visual communication piece. Identify:
|
||||
1. The core joke or message (what makes it funny/meaningful?)
|
||||
2. Cultural references or subcultures it connects to
|
||||
3. Emotional register (ironic, sincere, aggressive, playful)
|
||||
4. What this reveals about the person who shared it
|
||||
|
||||
This image was shared by Alexander (Rockachopa) on Twitter. Consider what his choice to share this tells us about his values and worldview."""
|
||||
|
||||
MEANING_KERNEL_PROMPT = """Based on this media analysis, extract "Meaning Kernels" — compact philosophical observations related to:
|
||||
- SOVEREIGNTY: Self-sovereignty, Bitcoin, decentralization, freedom, autonomy
|
||||
- SERVICE: Building for others, caring for broken men, community, fatherhood
|
||||
- THE SOUL: Identity, purpose, faith, what makes something alive, the soul of technology
|
||||
|
||||
For each kernel found, output a JSON object with:
|
||||
{
|
||||
"category": "sovereignty|service|soul",
|
||||
"kernel": "one-sentence observation",
|
||||
"evidence": "what in the media supports this",
|
||||
"confidence": "high|medium|low"
|
||||
}
|
||||
|
||||
Output ONLY valid JSON array. If no meaningful kernels found, output []."""
|
||||
|
||||
VIDEO_SEQUENCE_PROMPT = """Analyze this sequence of keyframes from a video. Identify:
|
||||
1. What is happening (narrative arc)
|
||||
2. Key visual moments (what's the "peak" frame?)
|
||||
3. Text/captions visible across frames
|
||||
4. Emotional progression
|
||||
|
||||
This video was shared by Alexander (Rockachopa) on Twitter."""
|
||||
|
||||
AUDIO_TRANSCRIPT_PROMPT = """Transcribe the following audio content. If it's speech, capture the words. If it's music or sound effects, describe what you hear. Be precise."""
|
||||
|
||||
|
||||
# ── Utilities ───────────────────────────────────────────
|
||||
|
||||
def log(msg: str, level: str = "INFO"):
|
||||
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
|
||||
print(f"[{ts}] [{level}] {msg}")
|
||||
|
||||
|
||||
def load_checkpoint() -> dict:
|
||||
if os.path.exists(CHECKPOINT_FILE):
|
||||
with open(CHECKPOINT_FILE) as f:
|
||||
return json.load(f)
|
||||
return {"processed_ids": [], "last_offset": 0, "total_kernels": 0, "started_at": datetime.now(timezone.utc).isoformat()}
|
||||
|
||||
|
||||
def save_checkpoint(cp: dict):
|
||||
os.makedirs(os.path.dirname(CHECKPOINT_FILE), exist_ok=True)
|
||||
with open(CHECKPOINT_FILE, "w") as f:
|
||||
json.dump(cp, f, indent=2)
|
||||
|
||||
|
||||
def load_manifest(path: str) -> list:
|
||||
entries = []
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
entries.append(json.loads(line))
|
||||
return entries
|
||||
|
||||
|
||||
def append_kernel(kernel: dict):
|
||||
os.makedirs(os.path.dirname(KERNELS_FILE), exist_ok=True)
|
||||
with open(KERNELS_FILE, "a") as f:
|
||||
f.write(json.dumps(kernel) + "\n")
|
||||
|
||||
|
||||
# ── Media Processing ───────────────────────────────────
|
||||
|
||||
def extract_keyframes(video_path: str, count: int = 5) -> list:
|
||||
"""Extract evenly-spaced keyframes from a video using ffmpeg."""
|
||||
tmpdir = tempfile.mkdtemp(prefix="ktf-frames-")
|
||||
try:
|
||||
# Get duration
|
||||
result = subprocess.run(
|
||||
["ffprobe", "-v", "quiet", "-show_entries", "format=duration",
|
||||
"-of", "csv=p=0", video_path],
|
||||
capture_output=True, text=True, timeout=30
|
||||
)
|
||||
duration = float(result.stdout.strip())
|
||||
if duration <= 0:
|
||||
return []
|
||||
|
||||
interval = duration / (count + 1)
|
||||
frames = []
|
||||
for i in range(count):
|
||||
ts = interval * (i + 1)
|
||||
out_path = os.path.join(tmpdir, f"frame_{i:03d}.jpg")
|
||||
subprocess.run(
|
||||
["ffmpeg", "-ss", str(ts), "-i", video_path, "-vframes", "1",
|
||||
"-q:v", "2", out_path, "-y"],
|
||||
capture_output=True, timeout=30
|
||||
)
|
||||
if os.path.exists(out_path):
|
||||
frames.append(out_path)
|
||||
return frames
|
||||
except Exception as e:
|
||||
log(f"Frame extraction failed: {e}", "WARN")
|
||||
return []
|
||||
|
||||
|
||||
def extract_audio(video_path: str) -> str:
|
||||
"""Extract audio track from video to WAV."""
|
||||
tmpdir = tempfile.mkdtemp(prefix="ktf-audio-")
|
||||
out_path = os.path.join(tmpdir, "audio.wav")
|
||||
try:
|
||||
subprocess.run(
|
||||
["ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le",
|
||||
"-ar", "16000", "-ac", "1", out_path, "-y"],
|
||||
capture_output=True, timeout=60
|
||||
)
|
||||
return out_path if os.path.exists(out_path) else ""
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def encode_image_base64(path: str) -> str:
|
||||
"""Read and base64-encode an image file."""
|
||||
with open(path, "rb") as f:
|
||||
return base64.b64encode(f.read()).decode()
|
||||
|
||||
|
||||
def call_ollama(prompt: str, images: list = None, model: str = None, timeout: int = 120) -> str:
|
||||
"""Call Ollama API with optional images (multimodal)."""
|
||||
import urllib.request
|
||||
|
||||
model = model or MODEL
|
||||
messages = [{"role": "user", "content": prompt}]
|
||||
|
||||
if images:
|
||||
# Add images to the message
|
||||
message_with_images = {
|
||||
"role": "user",
|
||||
"content": prompt,
|
||||
"images": images # list of base64 strings
|
||||
}
|
||||
messages = [message_with_images]
|
||||
|
||||
payload = json.dumps({
|
||||
"model": model,
|
||||
"messages": messages,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.3}
|
||||
}).encode()
|
||||
|
||||
url = f"{OLLAMA_URL.rstrip('/')}/api/chat"
|
||||
req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/json"})
|
||||
|
||||
try:
|
||||
resp = urllib.request.urlopen(req, timeout=timeout)
|
||||
data = json.loads(resp.read())
|
||||
return data.get("message", {}).get("content", "")
|
||||
except Exception as e:
|
||||
log(f"Ollama call failed: {e}", "ERROR")
|
||||
return f"ERROR: {e}"
|
||||
|
||||
|
||||
# ── Analysis Pipeline ──────────────────────────────────
|
||||
|
||||
def analyze_image(entry: dict) -> dict:
|
||||
"""Analyze a single image/GIF: visual description + meme logic + meaning kernels."""
|
||||
local_path = entry.get("local_media_path", "")
|
||||
tweet_text = entry.get("full_text", "")
|
||||
hashtags = entry.get("hashtags", [])
|
||||
tweet_id = entry.get("tweet_id", "")
|
||||
media_type = entry.get("media_type", "")
|
||||
|
||||
result = {
|
||||
"tweet_id": tweet_id,
|
||||
"media_type": media_type,
|
||||
"tweet_text": tweet_text,
|
||||
"hashtags": hashtags,
|
||||
"analyzed_at": datetime.now(timezone.utc).isoformat(),
|
||||
"visual_description": "",
|
||||
"meme_logic": "",
|
||||
"meaning_kernels": [],
|
||||
}
|
||||
|
||||
# Check if file exists
|
||||
if not local_path or not os.path.exists(local_path):
|
||||
result["error"] = f"File not found: {local_path}"
|
||||
return result
|
||||
|
||||
# For GIFs, extract first frame
|
||||
if media_type == "animated_gif":
|
||||
frames = extract_keyframes(local_path, count=1)
|
||||
image_path = frames[0] if frames else local_path
|
||||
else:
|
||||
image_path = local_path
|
||||
|
||||
# Encode image
|
||||
try:
|
||||
b64 = encode_image_base64(image_path)
|
||||
except Exception as e:
|
||||
result["error"] = f"Failed to read image: {e}"
|
||||
return result
|
||||
|
||||
# Step 1: Visual description
|
||||
log(f" Describing image for tweet {tweet_id}...")
|
||||
context = f"\n\nTweet text: {tweet_text}" if tweet_text else ""
|
||||
desc = call_ollama(VISUAL_DESCRIPTION_PROMPT + context, images=[b64], model=VISION_MODEL)
|
||||
result["visual_description"] = desc
|
||||
|
||||
# Step 2: Meme logic analysis
|
||||
log(f" Analyzing meme logic for tweet {tweet_id}...")
|
||||
meme_context = f"\n\nTweet text: {tweet_text}\nHashtags: {', '.join(hashtags)}"
|
||||
meme = call_ollama(MEME_LOGIC_PROMPT + meme_context, images=[b64], model=VISION_MODEL)
|
||||
result["meme_logic"] = meme
|
||||
|
||||
# Step 3: Extract meaning kernels
|
||||
log(f" Extracting meaning kernels for tweet {tweet_id}...")
|
||||
kernel_context = f"\n\nVisual description: {desc}\nMeme logic: {meme}\nTweet text: {tweet_text}\nHashtags: {', '.join(hashtags)}"
|
||||
kernel_raw = call_ollama(MEANING_KERNEL_PROMPT + kernel_context, model=MODEL)
|
||||
|
||||
# Parse kernels from JSON response
|
||||
try:
|
||||
# Find JSON array in response
|
||||
start = kernel_raw.find("[")
|
||||
end = kernel_raw.rfind("]") + 1
|
||||
if start >= 0 and end > start:
|
||||
kernels = json.loads(kernel_raw[start:end])
|
||||
if isinstance(kernels, list):
|
||||
result["meaning_kernels"] = kernels
|
||||
except json.JSONDecodeError:
|
||||
result["kernel_parse_error"] = kernel_raw[:500]
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def analyze_video(entry: dict) -> dict:
|
||||
"""Analyze a video: keyframes + audio + sequence analysis."""
|
||||
local_path = entry.get("local_media_path", "")
|
||||
tweet_text = entry.get("full_text", "")
|
||||
hashtags = entry.get("hashtags", [])
|
||||
tweet_id = entry.get("tweet_id", "")
|
||||
|
||||
result = {
|
||||
"tweet_id": tweet_id,
|
||||
"media_type": "video",
|
||||
"tweet_text": tweet_text,
|
||||
"hashtags": hashtags,
|
||||
"analyzed_at": datetime.now(timezone.utc).isoformat(),
|
||||
"keyframe_descriptions": [],
|
||||
"audio_transcript": "",
|
||||
"sequence_analysis": "",
|
||||
"meaning_kernels": [],
|
||||
}
|
||||
|
||||
if not local_path or not os.path.exists(local_path):
|
||||
result["error"] = f"File not found: {local_path}"
|
||||
return result
|
||||
|
||||
# Step 1: Extract keyframes
|
||||
log(f" Extracting keyframes from video {tweet_id}...")
|
||||
frames = extract_keyframes(local_path, count=5)
|
||||
|
||||
# Step 2: Describe each keyframe
|
||||
frame_descriptions = []
|
||||
for i, frame_path in enumerate(frames):
|
||||
log(f" Describing keyframe {i+1}/{len(frames)} for tweet {tweet_id}...")
|
||||
try:
|
||||
b64 = encode_image_base64(frame_path)
|
||||
desc = call_ollama(
|
||||
VISUAL_DESCRIPTION_PROMPT + f"\n\nThis is keyframe {i+1} of {len(frames)} from a video.",
|
||||
images=[b64], model=VISION_MODEL
|
||||
)
|
||||
frame_descriptions.append({"frame": i+1, "description": desc})
|
||||
except Exception as e:
|
||||
frame_descriptions.append({"frame": i+1, "error": str(e)})
|
||||
result["keyframe_descriptions"] = frame_descriptions
|
||||
|
||||
# Step 3: Extract and transcribe audio
|
||||
log(f" Extracting audio from video {tweet_id}...")
|
||||
audio_path = extract_audio(local_path)
|
||||
if audio_path:
|
||||
log(f" Audio extracted, transcription pending (Whisper integration)...")
|
||||
result["audio_transcript"] = "Audio extracted. Transcription requires Whisper model."
|
||||
# Clean up temp audio
|
||||
try:
|
||||
os.unlink(audio_path)
|
||||
os.rmdir(os.path.dirname(audio_path))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Step 4: Sequence analysis
|
||||
log(f" Analyzing video sequence for tweet {tweet_id}...")
|
||||
all_descriptions = "\n".join(
|
||||
f"Frame {d['frame']}: {d.get('description', d.get('error', '?'))}"
|
||||
for d in frame_descriptions
|
||||
)
|
||||
context = f"\n\nKeyframes:\n{all_descriptions}\n\nTweet text: {tweet_text}\nHashtags: {', '.join(hashtags)}"
|
||||
sequence = call_ollama(VIDEO_SEQUENCE_PROMPT + context, model=MODEL)
|
||||
result["sequence_analysis"] = sequence
|
||||
|
||||
# Step 5: Extract meaning kernels
|
||||
log(f" Extracting meaning kernels from video {tweet_id}...")
|
||||
kernel_context = f"\n\nKeyframe descriptions:\n{all_descriptions}\nSequence analysis: {sequence}\nTweet text: {tweet_text}"
|
||||
kernel_raw = call_ollama(MEANING_KERNEL_PROMPT + kernel_context, model=MODEL)
|
||||
|
||||
try:
|
||||
start = kernel_raw.find("[")
|
||||
end = kernel_raw.rfind("]") + 1
|
||||
if start >= 0 and end > start:
|
||||
kernels = json.loads(kernel_raw[start:end])
|
||||
if isinstance(kernels, list):
|
||||
result["meaning_kernels"] = kernels
|
||||
except json.JSONDecodeError:
|
||||
result["kernel_parse_error"] = kernel_raw[:500]
|
||||
|
||||
# Clean up temp frames
|
||||
for frame_path in frames:
|
||||
try:
|
||||
os.unlink(frame_path)
|
||||
except Exception:
|
||||
pass
|
||||
if frames:
|
||||
try:
|
||||
os.rmdir(os.path.dirname(frames[0]))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ── Main Pipeline ───────────────────────────────────────
|
||||
|
||||
def run_pipeline(manifest_path: str, limit: int = None, media_type: str = None, resume: bool = False):
|
||||
"""Run the multimodal analysis pipeline."""
|
||||
log(f"Loading manifest from {manifest_path}...")
|
||||
entries = load_manifest(manifest_path)
|
||||
log(f"Found {len(entries)} media entries")
|
||||
|
||||
# Filter by type
|
||||
if media_type:
|
||||
entries = [e for e in entries if e.get("media_type") == media_type]
|
||||
log(f"Filtered to {len(entries)} entries of type '{media_type}'")
|
||||
|
||||
# Load checkpoint
|
||||
cp = load_checkpoint()
|
||||
processed = set(cp.get("processed_ids", []))
|
||||
|
||||
if resume:
|
||||
log(f"Resuming — {len(processed)} already processed")
|
||||
entries = [e for e in entries if e.get("tweet_id") not in processed]
|
||||
|
||||
if limit:
|
||||
entries = entries[:limit]
|
||||
|
||||
log(f"Will process {len(entries)} entries")
|
||||
|
||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||
|
||||
for i, entry in enumerate(entries):
|
||||
tweet_id = entry.get("tweet_id", "unknown")
|
||||
mt = entry.get("media_type", "unknown")
|
||||
log(f"[{i+1}/{len(entries)}] Processing tweet {tweet_id} (type: {mt})")
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
if mt in ("photo", "animated_gif"):
|
||||
result = analyze_image(entry)
|
||||
elif mt == "video":
|
||||
result = analyze_video(entry)
|
||||
else:
|
||||
log(f" Skipping unknown type: {mt}", "WARN")
|
||||
continue
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
result["processing_time_seconds"] = round(elapsed, 1)
|
||||
|
||||
# Save individual result
|
||||
out_path = os.path.join(OUTPUT_DIR, f"{tweet_id}.json")
|
||||
with open(out_path, "w") as f:
|
||||
json.dump(result, f, indent=2, ensure_ascii=False)
|
||||
|
||||
# Append meaning kernels to kernels file
|
||||
for kernel in result.get("meaning_kernels", []):
|
||||
kernel["source_tweet_id"] = tweet_id
|
||||
kernel["source_media_type"] = mt
|
||||
kernel["source_hashtags"] = entry.get("hashtags", [])
|
||||
append_kernel(kernel)
|
||||
|
||||
# Update checkpoint
|
||||
processed.add(tweet_id)
|
||||
cp["processed_ids"] = list(processed)[-500:] # Keep last 500 to limit file size
|
||||
cp["last_offset"] = i + 1
|
||||
cp["total_kernels"] = cp.get("total_kernels", 0) + len(result.get("meaning_kernels", []))
|
||||
cp["last_processed"] = tweet_id
|
||||
cp["last_updated"] = datetime.now(timezone.utc).isoformat()
|
||||
save_checkpoint(cp)
|
||||
|
||||
kernels_found = len(result.get("meaning_kernels", []))
|
||||
log(f" Done in {elapsed:.1f}s — {kernels_found} kernel(s) found")
|
||||
|
||||
except Exception as e:
|
||||
log(f" ERROR: {e}", "ERROR")
|
||||
# Save error result
|
||||
error_result = {
|
||||
"tweet_id": tweet_id,
|
||||
"error": str(e),
|
||||
"analyzed_at": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
out_path = os.path.join(OUTPUT_DIR, f"{tweet_id}_error.json")
|
||||
with open(out_path, "w") as f:
|
||||
json.dump(error_result, f, indent=2)
|
||||
|
||||
log(f"Pipeline complete. {len(entries)} entries processed.")
|
||||
log(f"Total kernels extracted: {cp.get('total_kernels', 0)}")
|
||||
|
||||
|
||||
def synthesize():
|
||||
"""Generate a summary of all meaning kernels extracted so far."""
|
||||
if not os.path.exists(KERNELS_FILE):
|
||||
log("No meaning_kernels.jsonl found. Run pipeline first.", "ERROR")
|
||||
return
|
||||
|
||||
kernels = []
|
||||
with open(KERNELS_FILE) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
kernels.append(json.loads(line))
|
||||
|
||||
log(f"Loaded {len(kernels)} meaning kernels")
|
||||
|
||||
# Categorize
|
||||
by_category = {}
|
||||
for k in kernels:
|
||||
cat = k.get("category", "unknown")
|
||||
by_category.setdefault(cat, []).append(k)
|
||||
|
||||
summary = {
|
||||
"total_kernels": len(kernels),
|
||||
"by_category": {cat: len(items) for cat, items in by_category.items()},
|
||||
"top_kernels": {},
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
# Get top kernels by confidence
|
||||
for cat, items in by_category.items():
|
||||
high = [k for k in items if k.get("confidence") == "high"]
|
||||
summary["top_kernels"][cat] = [
|
||||
{"kernel": k["kernel"], "evidence": k.get("evidence", "")}
|
||||
for k in high[:10]
|
||||
]
|
||||
|
||||
# Save summary
|
||||
summary_path = os.path.join(WORKSPACE, "media", "meaning_kernels_summary.json")
|
||||
with open(summary_path, "w") as f:
|
||||
json.dump(summary, f, indent=2, ensure_ascii=False)
|
||||
|
||||
log(f"Summary saved to {summary_path}")
|
||||
|
||||
# Print overview
|
||||
print(f"\n{'='*60}")
|
||||
print(f" MEANING KERNELS SUMMARY")
|
||||
print(f" Total: {len(kernels)} kernels from {len(set(k.get('source_tweet_id','') for k in kernels))} media items")
|
||||
print(f"{'='*60}")
|
||||
for cat, count in sorted(by_category.items()):
|
||||
print(f"\n [{cat.upper()}] — {count} kernels")
|
||||
high = [k for k in by_category[cat] if k.get("confidence") == "high"]
|
||||
for k in high[:5]:
|
||||
print(f" • {k.get('kernel', '?')}")
|
||||
if len(high) > 5:
|
||||
print(f" ... and {len(high)-5} more")
|
||||
print(f"\n{'='*60}")
|
||||
|
||||
|
||||
# ── CLI ─────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Know Thy Father — Phase 2: Multimodal Analysis Pipeline")
|
||||
parser.add_argument("--manifest", default=os.path.join(WORKSPACE, "media", "manifest.jsonl"),
|
||||
help="Path to media manifest JSONL")
|
||||
parser.add_argument("--limit", type=int, default=None, help="Max entries to process")
|
||||
parser.add_argument("--type", dest="media_type", choices=["photo", "animated_gif", "video"],
|
||||
help="Filter by media type")
|
||||
parser.add_argument("--resume", action="store_true", help="Resume from checkpoint")
|
||||
parser.add_argument("--synthesize", action="store_true", help="Generate meaning kernel summary")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.synthesize:
|
||||
synthesize()
|
||||
else:
|
||||
run_pipeline(args.manifest, args.limit, args.media_type, args.resume)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user