164 lines
5.8 KiB
Python
164 lines
5.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Build a local manifest of Twitter archive media with post metadata preserved."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .common import ARCHIVE_DIR, ensure_layout, resolve_source_dir, stable_sha256, write_json
|
|
|
|
MEDIA_DIR = ARCHIVE_DIR / "media"
|
|
MEDIA_MANIFEST_JSONL = MEDIA_DIR / "manifest.jsonl"
|
|
MEDIA_MANIFEST_SUMMARY = MEDIA_DIR / "manifest_summary.json"
|
|
MEDIA_FILE_DIR_NAMES = ("tweets_media", "community_tweet_media", "deleted_tweets_media")
|
|
|
|
|
|
def strip_js_prefix(raw_text: str) -> str:
|
|
start = raw_text.find("[")
|
|
if start == -1:
|
|
raise ValueError("Could not find JSON array in tweets.js")
|
|
return raw_text[start:].strip()
|
|
|
|
|
|
def _tweet_payload(raw_entry: dict[str, Any]) -> dict[str, Any]:
|
|
return raw_entry.get("tweet", raw_entry)
|
|
|
|
|
|
def _hashtags(tweet: dict[str, Any]) -> list[str]:
|
|
entities = tweet.get("entities", {}) or {}
|
|
return [item.get("text", "").strip() for item in entities.get("hashtags", []) if item.get("text")]
|
|
|
|
|
|
def _urls(tweet: dict[str, Any]) -> list[str]:
|
|
entities = tweet.get("entities", {}) or {}
|
|
return [item.get("expanded_url", "").strip() for item in entities.get("urls", []) if item.get("expanded_url")]
|
|
|
|
|
|
def _media_entries(tweet: dict[str, Any]) -> list[dict[str, Any]]:
|
|
return ((tweet.get("extended_entities") or {}).get("media") or (tweet.get("entities") or {}).get("media") or [])
|
|
|
|
|
|
def _find_local_media_path(source_dir: Path, tweet_id: str) -> str | None:
|
|
for dirname in MEDIA_FILE_DIR_NAMES:
|
|
media_dir = source_dir / dirname
|
|
if not media_dir.exists():
|
|
continue
|
|
matches = sorted(media_dir.glob(f"{tweet_id}-*"))
|
|
if matches:
|
|
return str(matches[0])
|
|
return None
|
|
|
|
|
|
def _media_variants(media: dict[str, Any]) -> list[dict[str, Any]]:
|
|
variants = []
|
|
for variant in ((media.get("video_info") or {}).get("variants") or []):
|
|
variants.append(
|
|
{
|
|
"content_type": variant.get("content_type"),
|
|
"bitrate": int(variant.get("bitrate") or 0),
|
|
"url": variant.get("url"),
|
|
}
|
|
)
|
|
return variants
|
|
|
|
|
|
def extract_media_records(raw_entries: list[dict[str, Any]], source_dir: Path, source_file: str) -> list[dict[str, Any]]:
|
|
records: list[dict[str, Any]] = []
|
|
for raw_entry in raw_entries:
|
|
tweet = _tweet_payload(raw_entry)
|
|
tweet_id = str(tweet.get("id_str") or tweet.get("id") or "").strip()
|
|
if not tweet_id:
|
|
continue
|
|
media_entries = _media_entries(tweet)
|
|
if not media_entries:
|
|
continue
|
|
base = {
|
|
"tweet_id": tweet_id,
|
|
"created_at": tweet.get("created_at"),
|
|
"full_text": (tweet.get("full_text") or tweet.get("text") or "").strip(),
|
|
"hashtags": _hashtags(tweet),
|
|
"urls": _urls(tweet),
|
|
"source_file": source_file,
|
|
"local_media_path": _find_local_media_path(source_dir, tweet_id),
|
|
}
|
|
for index, media in enumerate(media_entries, start=1):
|
|
records.append(
|
|
{
|
|
**base,
|
|
"media_index": index,
|
|
"media_id": str(media.get("id_str") or media.get("id") or "").strip() or None,
|
|
"media_type": media.get("type") or "unknown",
|
|
"expanded_url": media.get("expanded_url"),
|
|
"media_url_https": media.get("media_url_https") or media.get("media_url"),
|
|
"display_url": media.get("display_url"),
|
|
"variants": _media_variants(media),
|
|
}
|
|
)
|
|
return records
|
|
|
|
|
|
def build_media_manifest_summary(records: list[dict[str, Any]], *, source_dir: str, source_file: str) -> dict[str, Any]:
|
|
hashtag_counts = Counter()
|
|
media_type_counts = Counter()
|
|
for record in records:
|
|
media_type_counts[record.get("media_type") or "unknown"] += 1
|
|
for tag in record.get("hashtags", []):
|
|
hashtag_counts[tag.lower()] += 1
|
|
return {
|
|
"schema_version": 1,
|
|
"source_dir": source_dir,
|
|
"source_file": source_file,
|
|
"media_record_count": len(records),
|
|
"media_type_counts": dict(media_type_counts),
|
|
"hashtag_counts": dict(hashtag_counts),
|
|
"fields": [
|
|
"tweet_id",
|
|
"created_at",
|
|
"full_text",
|
|
"hashtags",
|
|
"urls",
|
|
"media_index",
|
|
"media_id",
|
|
"media_type",
|
|
"expanded_url",
|
|
"media_url_https",
|
|
"display_url",
|
|
"variants",
|
|
"local_media_path",
|
|
"source_file",
|
|
],
|
|
}
|
|
|
|
|
|
def write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, "w") as handle:
|
|
for row in rows:
|
|
handle.write(json.dumps(row, sort_keys=True) + "\n")
|
|
|
|
|
|
def main() -> None:
|
|
ensure_layout()
|
|
MEDIA_DIR.mkdir(parents=True, exist_ok=True)
|
|
source_dir = resolve_source_dir()
|
|
source_path = source_dir / "tweets.js"
|
|
if not source_path.exists():
|
|
raise SystemExit(json.dumps({"status": "error", "reason": f"missing {source_path}"}))
|
|
|
|
raw_payload = strip_js_prefix(source_path.read_text())
|
|
raw_entries = json.loads(raw_payload)
|
|
records = extract_media_records(raw_entries, source_dir, source_path.name)
|
|
write_jsonl(MEDIA_MANIFEST_JSONL, records)
|
|
summary = build_media_manifest_summary(records, source_dir=str(source_dir), source_file=source_path.name)
|
|
summary["source_sha256"] = stable_sha256(source_path)
|
|
summary["source_size_bytes"] = source_path.stat().st_size
|
|
write_json(MEDIA_MANIFEST_SUMMARY, summary)
|
|
print(json.dumps({"status": "ok", **summary}, sort_keys=True))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|