Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
c1e57a9d86 feat: add local video decomposition pipeline 2026-04-05 13:26:48 -04:00
2 changed files with 253 additions and 0 deletions

View File

@@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""Local-first decomposition of Twitter archive video clips."""
from __future__ import annotations
import argparse
import json
import subprocess
from pathlib import Path
from typing import Any
from .common import ARCHIVE_DIR, write_json
DEFAULT_OUTPUT_ROOT = ARCHIVE_DIR / "media" / "decomposed"
def build_output_paths(tweet_id: str, media_index: int, output_root: Path | None = None) -> dict[str, Path]:
root = (output_root or DEFAULT_OUTPUT_ROOT) / str(tweet_id)
clip_dir = root
stem = f"{int(media_index):03d}"
return {
"clip_dir": clip_dir,
"audio_path": clip_dir / f"{stem}_audio.wav",
"keyframes_dir": clip_dir / f"{stem}_keyframes",
"metadata_path": clip_dir / f"{stem}_metadata.json",
"transcript_path": clip_dir / f"{stem}_transcript.json",
}
def ffprobe_json(path: Path) -> dict[str, Any]:
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration,bit_rate:stream=codec_type,width,height,avg_frame_rate,sample_rate",
"-of",
"json",
str(path),
],
capture_output=True,
text=True,
check=True,
)
return json.loads(result.stdout)
def _parse_ratio(value: str | None) -> float | None:
if not value or value in {"0/0", "N/A"}:
return None
if "/" in value:
left, right = value.split("/", 1)
right_num = float(right)
if right_num == 0:
return None
return round(float(left) / right_num, 3)
return float(value)
def summarize_probe(probe: dict[str, Any]) -> dict[str, Any]:
video = next((stream for stream in probe.get("streams", []) if stream.get("codec_type") == "video"), {})
audio = next((stream for stream in probe.get("streams", []) if stream.get("codec_type") == "audio"), {})
return {
"duration_s": round(float((probe.get("format") or {}).get("duration") or 0.0), 3),
"bit_rate": int((probe.get("format") or {}).get("bit_rate") or 0),
"video": {
"width": int(video.get("width") or 0),
"height": int(video.get("height") or 0),
"fps": _parse_ratio(video.get("avg_frame_rate")),
},
"audio": {
"present": bool(audio),
"sample_rate": int(audio.get("sample_rate") or 0) if audio else None,
},
}
def extract_audio(input_path: Path, output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
subprocess.run(
[
"ffmpeg",
"-y",
"-i",
str(input_path),
"-vn",
"-ac",
"1",
"-ar",
"16000",
str(output_path),
],
capture_output=True,
check=True,
)
def extract_keyframes(input_path: Path, keyframes_dir: Path) -> None:
keyframes_dir.mkdir(parents=True, exist_ok=True)
subprocess.run(
[
"ffmpeg",
"-y",
"-i",
str(input_path),
"-vf",
"fps=1",
str(keyframes_dir / "frame_%03d.jpg"),
],
capture_output=True,
check=True,
)
def write_transcript_placeholder(path: Path) -> None:
write_json(path, {"status": "pending_local_asr", "segments": []})
def run_decomposition(input_path: Path, tweet_id: str, media_index: int, output_root: Path | None = None) -> dict[str, Any]:
paths = build_output_paths(tweet_id, media_index, output_root)
probe = ffprobe_json(input_path)
summary = summarize_probe(probe)
extract_audio(input_path, paths["audio_path"])
extract_keyframes(input_path, paths["keyframes_dir"])
write_transcript_placeholder(paths["transcript_path"])
metadata = {
"tweet_id": str(tweet_id),
"media_index": int(media_index),
"input_path": str(input_path),
**summary,
"audio_path": str(paths["audio_path"]),
"keyframes_dir": str(paths["keyframes_dir"]),
"transcript_path": str(paths["transcript_path"]),
}
write_json(paths["metadata_path"], metadata)
return {
"status": "ok",
"metadata_path": str(paths["metadata_path"]),
"audio_path": str(paths["audio_path"]),
"keyframes_dir": str(paths["keyframes_dir"]),
"transcript_path": str(paths["transcript_path"]),
**summary,
}
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--input", required=True, help="Local video path")
parser.add_argument("--tweet-id", required=True)
parser.add_argument("--media-index", type=int, default=1)
parser.add_argument("--output-root", help="Override output root")
return parser
def main() -> None:
args = build_parser().parse_args()
output_root = Path(args.output_root).expanduser() if args.output_root else None
result = run_decomposition(Path(args.input).expanduser(), args.tweet_id, args.media_index, output_root)
print(json.dumps(result))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,89 @@
from __future__ import annotations
import json
import subprocess
import sys
from pathlib import Path
from scripts.twitter_archive.decompose_media import build_output_paths, summarize_probe
def test_build_output_paths_creates_local_artifact_tree() -> None:
paths = build_output_paths("12345", 1)
assert paths["clip_dir"].parts[-3:] == ("media", "decomposed", "12345")
assert paths["audio_path"].name == "001_audio.wav"
assert paths["keyframes_dir"].name == "001_keyframes"
assert paths["metadata_path"].name == "001_metadata.json"
assert paths["transcript_path"].name == "001_transcript.json"
def test_summarize_probe_extracts_duration_resolution_and_stream_flags() -> None:
probe = {
"format": {"duration": "4.015", "bit_rate": "832000"},
"streams": [
{"codec_type": "video", "width": 320, "height": 240, "avg_frame_rate": "30/1"},
{"codec_type": "audio", "sample_rate": "44100"},
],
}
summary = summarize_probe(probe)
assert summary["duration_s"] == 4.015
assert summary["video"]["width"] == 320
assert summary["video"]["height"] == 240
assert summary["video"]["fps"] == 30.0
assert summary["audio"]["present"] is True
assert summary["audio"]["sample_rate"] == 44100
def test_cli_decomposes_one_local_clip(tmp_path: Path) -> None:
clip = tmp_path / "clip.mp4"
subprocess.run(
[
"ffmpeg",
"-y",
"-f",
"lavfi",
"-i",
"testsrc=size=160x120:rate=8",
"-f",
"lavfi",
"-i",
"sine=frequency=880:sample_rate=16000",
"-t",
"2",
"-pix_fmt",
"yuv420p",
str(clip),
],
capture_output=True,
check=True,
)
out_dir = tmp_path / "out"
result = subprocess.run(
[
sys.executable,
"-m",
"scripts.twitter_archive.decompose_media",
"--input",
str(clip),
"--tweet-id",
"999",
"--media-index",
"1",
"--output-root",
str(out_dir),
],
capture_output=True,
text=True,
check=True,
)
payload = json.loads(result.stdout)
assert payload["status"] == "ok"
assert Path(payload["metadata_path"]).exists()
assert Path(payload["audio_path"]).exists()
assert Path(payload["keyframes_dir"]).exists()
assert list(Path(payload["keyframes_dir"]).glob("*.jpg"))