From c1e57a9d86b97df50009b187e15c82ff048a0a44 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Sun, 5 Apr 2026 13:26:48 -0400 Subject: [PATCH] feat: add local video decomposition pipeline --- scripts/twitter_archive/decompose_media.py | 164 +++++++++++++++++++++ tests/test_twitter_video_decomposition.py | 89 +++++++++++ 2 files changed, 253 insertions(+) create mode 100644 scripts/twitter_archive/decompose_media.py create mode 100644 tests/test_twitter_video_decomposition.py diff --git a/scripts/twitter_archive/decompose_media.py b/scripts/twitter_archive/decompose_media.py new file mode 100644 index 0000000..fa4ee33 --- /dev/null +++ b/scripts/twitter_archive/decompose_media.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +"""Local-first decomposition of Twitter archive video clips.""" + +from __future__ import annotations + +import argparse +import json +import subprocess +from pathlib import Path +from typing import Any + +from .common import ARCHIVE_DIR, write_json + +DEFAULT_OUTPUT_ROOT = ARCHIVE_DIR / "media" / "decomposed" + + +def build_output_paths(tweet_id: str, media_index: int, output_root: Path | None = None) -> dict[str, Path]: + root = (output_root or DEFAULT_OUTPUT_ROOT) / str(tweet_id) + clip_dir = root + stem = f"{int(media_index):03d}" + return { + "clip_dir": clip_dir, + "audio_path": clip_dir / f"{stem}_audio.wav", + "keyframes_dir": clip_dir / f"{stem}_keyframes", + "metadata_path": clip_dir / f"{stem}_metadata.json", + "transcript_path": clip_dir / f"{stem}_transcript.json", + } + + +def ffprobe_json(path: Path) -> dict[str, Any]: + result = subprocess.run( + [ + "ffprobe", + "-v", + "error", + "-show_entries", + "format=duration,bit_rate:stream=codec_type,width,height,avg_frame_rate,sample_rate", + "-of", + "json", + str(path), + ], + capture_output=True, + text=True, + check=True, + ) + return json.loads(result.stdout) + + +def _parse_ratio(value: str | None) -> float | None: + if not value or value in {"0/0", "N/A"}: + return None + if "/" in value: + left, right = value.split("/", 1) + right_num = float(right) + if right_num == 0: + return None + return round(float(left) / right_num, 3) + return float(value) + + +def summarize_probe(probe: dict[str, Any]) -> dict[str, Any]: + video = next((stream for stream in probe.get("streams", []) if stream.get("codec_type") == "video"), {}) + audio = next((stream for stream in probe.get("streams", []) if stream.get("codec_type") == "audio"), {}) + return { + "duration_s": round(float((probe.get("format") or {}).get("duration") or 0.0), 3), + "bit_rate": int((probe.get("format") or {}).get("bit_rate") or 0), + "video": { + "width": int(video.get("width") or 0), + "height": int(video.get("height") or 0), + "fps": _parse_ratio(video.get("avg_frame_rate")), + }, + "audio": { + "present": bool(audio), + "sample_rate": int(audio.get("sample_rate") or 0) if audio else None, + }, + } + + +def extract_audio(input_path: Path, output_path: Path) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + subprocess.run( + [ + "ffmpeg", + "-y", + "-i", + str(input_path), + "-vn", + "-ac", + "1", + "-ar", + "16000", + str(output_path), + ], + capture_output=True, + check=True, + ) + + +def extract_keyframes(input_path: Path, keyframes_dir: Path) -> None: + keyframes_dir.mkdir(parents=True, exist_ok=True) + subprocess.run( + [ + "ffmpeg", + "-y", + "-i", + str(input_path), + "-vf", + "fps=1", + str(keyframes_dir / "frame_%03d.jpg"), + ], + capture_output=True, + check=True, + ) + + +def write_transcript_placeholder(path: Path) -> None: + write_json(path, {"status": "pending_local_asr", "segments": []}) + + +def run_decomposition(input_path: Path, tweet_id: str, media_index: int, output_root: Path | None = None) -> dict[str, Any]: + paths = build_output_paths(tweet_id, media_index, output_root) + probe = ffprobe_json(input_path) + summary = summarize_probe(probe) + extract_audio(input_path, paths["audio_path"]) + extract_keyframes(input_path, paths["keyframes_dir"]) + write_transcript_placeholder(paths["transcript_path"]) + metadata = { + "tweet_id": str(tweet_id), + "media_index": int(media_index), + "input_path": str(input_path), + **summary, + "audio_path": str(paths["audio_path"]), + "keyframes_dir": str(paths["keyframes_dir"]), + "transcript_path": str(paths["transcript_path"]), + } + write_json(paths["metadata_path"], metadata) + return { + "status": "ok", + "metadata_path": str(paths["metadata_path"]), + "audio_path": str(paths["audio_path"]), + "keyframes_dir": str(paths["keyframes_dir"]), + "transcript_path": str(paths["transcript_path"]), + **summary, + } + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--input", required=True, help="Local video path") + parser.add_argument("--tweet-id", required=True) + parser.add_argument("--media-index", type=int, default=1) + parser.add_argument("--output-root", help="Override output root") + return parser + + +def main() -> None: + args = build_parser().parse_args() + output_root = Path(args.output_root).expanduser() if args.output_root else None + result = run_decomposition(Path(args.input).expanduser(), args.tweet_id, args.media_index, output_root) + print(json.dumps(result)) + + +if __name__ == "__main__": + main() diff --git a/tests/test_twitter_video_decomposition.py b/tests/test_twitter_video_decomposition.py new file mode 100644 index 0000000..f0196ef --- /dev/null +++ b/tests/test_twitter_video_decomposition.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import json +import subprocess +import sys +from pathlib import Path + +from scripts.twitter_archive.decompose_media import build_output_paths, summarize_probe + + +def test_build_output_paths_creates_local_artifact_tree() -> None: + paths = build_output_paths("12345", 1) + + assert paths["clip_dir"].parts[-3:] == ("media", "decomposed", "12345") + assert paths["audio_path"].name == "001_audio.wav" + assert paths["keyframes_dir"].name == "001_keyframes" + assert paths["metadata_path"].name == "001_metadata.json" + assert paths["transcript_path"].name == "001_transcript.json" + + +def test_summarize_probe_extracts_duration_resolution_and_stream_flags() -> None: + probe = { + "format": {"duration": "4.015", "bit_rate": "832000"}, + "streams": [ + {"codec_type": "video", "width": 320, "height": 240, "avg_frame_rate": "30/1"}, + {"codec_type": "audio", "sample_rate": "44100"}, + ], + } + + summary = summarize_probe(probe) + + assert summary["duration_s"] == 4.015 + assert summary["video"]["width"] == 320 + assert summary["video"]["height"] == 240 + assert summary["video"]["fps"] == 30.0 + assert summary["audio"]["present"] is True + assert summary["audio"]["sample_rate"] == 44100 + + +def test_cli_decomposes_one_local_clip(tmp_path: Path) -> None: + clip = tmp_path / "clip.mp4" + subprocess.run( + [ + "ffmpeg", + "-y", + "-f", + "lavfi", + "-i", + "testsrc=size=160x120:rate=8", + "-f", + "lavfi", + "-i", + "sine=frequency=880:sample_rate=16000", + "-t", + "2", + "-pix_fmt", + "yuv420p", + str(clip), + ], + capture_output=True, + check=True, + ) + + out_dir = tmp_path / "out" + result = subprocess.run( + [ + sys.executable, + "-m", + "scripts.twitter_archive.decompose_media", + "--input", + str(clip), + "--tweet-id", + "999", + "--media-index", + "1", + "--output-root", + str(out_dir), + ], + capture_output=True, + text=True, + check=True, + ) + + payload = json.loads(result.stdout) + assert payload["status"] == "ok" + assert Path(payload["metadata_path"]).exists() + assert Path(payload["audio_path"]).exists() + assert Path(payload["keyframes_dir"]).exists() + assert list(Path(payload["keyframes_dir"]).glob("*.jpg"))