Files
timmy-home/scripts/twitter_archive/extract_media_manifest.py

164 lines
5.8 KiB
Python

#!/usr/bin/env python3
"""Build a local manifest of Twitter archive media with post metadata preserved."""
from __future__ import annotations
import json
from collections import Counter
from pathlib import Path
from typing import Any
from .common import ARCHIVE_DIR, ensure_layout, resolve_source_dir, stable_sha256, write_json
MEDIA_DIR = ARCHIVE_DIR / "media"
MEDIA_MANIFEST_JSONL = MEDIA_DIR / "manifest.jsonl"
MEDIA_MANIFEST_SUMMARY = MEDIA_DIR / "manifest_summary.json"
MEDIA_FILE_DIR_NAMES = ("tweets_media", "community_tweet_media", "deleted_tweets_media")
def strip_js_prefix(raw_text: str) -> str:
start = raw_text.find("[")
if start == -1:
raise ValueError("Could not find JSON array in tweets.js")
return raw_text[start:].strip()
def _tweet_payload(raw_entry: dict[str, Any]) -> dict[str, Any]:
return raw_entry.get("tweet", raw_entry)
def _hashtags(tweet: dict[str, Any]) -> list[str]:
entities = tweet.get("entities", {}) or {}
return [item.get("text", "").strip() for item in entities.get("hashtags", []) if item.get("text")]
def _urls(tweet: dict[str, Any]) -> list[str]:
entities = tweet.get("entities", {}) or {}
return [item.get("expanded_url", "").strip() for item in entities.get("urls", []) if item.get("expanded_url")]
def _media_entries(tweet: dict[str, Any]) -> list[dict[str, Any]]:
return ((tweet.get("extended_entities") or {}).get("media") or (tweet.get("entities") or {}).get("media") or [])
def _find_local_media_path(source_dir: Path, tweet_id: str) -> str | None:
for dirname in MEDIA_FILE_DIR_NAMES:
media_dir = source_dir / dirname
if not media_dir.exists():
continue
matches = sorted(media_dir.glob(f"{tweet_id}-*"))
if matches:
return str(matches[0])
return None
def _media_variants(media: dict[str, Any]) -> list[dict[str, Any]]:
variants = []
for variant in ((media.get("video_info") or {}).get("variants") or []):
variants.append(
{
"content_type": variant.get("content_type"),
"bitrate": int(variant.get("bitrate") or 0),
"url": variant.get("url"),
}
)
return variants
def extract_media_records(raw_entries: list[dict[str, Any]], source_dir: Path, source_file: str) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
for raw_entry in raw_entries:
tweet = _tweet_payload(raw_entry)
tweet_id = str(tweet.get("id_str") or tweet.get("id") or "").strip()
if not tweet_id:
continue
media_entries = _media_entries(tweet)
if not media_entries:
continue
base = {
"tweet_id": tweet_id,
"created_at": tweet.get("created_at"),
"full_text": (tweet.get("full_text") or tweet.get("text") or "").strip(),
"hashtags": _hashtags(tweet),
"urls": _urls(tweet),
"source_file": source_file,
"local_media_path": _find_local_media_path(source_dir, tweet_id),
}
for index, media in enumerate(media_entries, start=1):
records.append(
{
**base,
"media_index": index,
"media_id": str(media.get("id_str") or media.get("id") or "").strip() or None,
"media_type": media.get("type") or "unknown",
"expanded_url": media.get("expanded_url"),
"media_url_https": media.get("media_url_https") or media.get("media_url"),
"display_url": media.get("display_url"),
"variants": _media_variants(media),
}
)
return records
def build_media_manifest_summary(records: list[dict[str, Any]], *, source_dir: str, source_file: str) -> dict[str, Any]:
hashtag_counts = Counter()
media_type_counts = Counter()
for record in records:
media_type_counts[record.get("media_type") or "unknown"] += 1
for tag in record.get("hashtags", []):
hashtag_counts[tag.lower()] += 1
return {
"schema_version": 1,
"source_dir": source_dir,
"source_file": source_file,
"media_record_count": len(records),
"media_type_counts": dict(media_type_counts),
"hashtag_counts": dict(hashtag_counts),
"fields": [
"tweet_id",
"created_at",
"full_text",
"hashtags",
"urls",
"media_index",
"media_id",
"media_type",
"expanded_url",
"media_url_https",
"display_url",
"variants",
"local_media_path",
"source_file",
],
}
def write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as handle:
for row in rows:
handle.write(json.dumps(row, sort_keys=True) + "\n")
def main() -> None:
ensure_layout()
MEDIA_DIR.mkdir(parents=True, exist_ok=True)
source_dir = resolve_source_dir()
source_path = source_dir / "tweets.js"
if not source_path.exists():
raise SystemExit(json.dumps({"status": "error", "reason": f"missing {source_path}"}))
raw_payload = strip_js_prefix(source_path.read_text())
raw_entries = json.loads(raw_payload)
records = extract_media_records(raw_entries, source_dir, source_path.name)
write_jsonl(MEDIA_MANIFEST_JSONL, records)
summary = build_media_manifest_summary(records, source_dir=str(source_dir), source_file=source_path.name)
summary["source_sha256"] = stable_sha256(source_path)
summary["source_size_bytes"] = source_path.stat().st_size
write_json(MEDIA_MANIFEST_SUMMARY, summary)
print(json.dumps({"status": "ok", **summary}, sort_keys=True))
if __name__ == "__main__":
main()