feat: add local video decomposition pipeline
This commit is contained in:
164
scripts/twitter_archive/decompose_media.py
Normal file
164
scripts/twitter_archive/decompose_media.py
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Local-first decomposition of Twitter archive video clips."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from .common import ARCHIVE_DIR, write_json
|
||||||
|
|
||||||
|
DEFAULT_OUTPUT_ROOT = ARCHIVE_DIR / "media" / "decomposed"
|
||||||
|
|
||||||
|
|
||||||
|
def build_output_paths(tweet_id: str, media_index: int, output_root: Path | None = None) -> dict[str, Path]:
|
||||||
|
root = (output_root or DEFAULT_OUTPUT_ROOT) / str(tweet_id)
|
||||||
|
clip_dir = root
|
||||||
|
stem = f"{int(media_index):03d}"
|
||||||
|
return {
|
||||||
|
"clip_dir": clip_dir,
|
||||||
|
"audio_path": clip_dir / f"{stem}_audio.wav",
|
||||||
|
"keyframes_dir": clip_dir / f"{stem}_keyframes",
|
||||||
|
"metadata_path": clip_dir / f"{stem}_metadata.json",
|
||||||
|
"transcript_path": clip_dir / f"{stem}_transcript.json",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def ffprobe_json(path: Path) -> dict[str, Any]:
|
||||||
|
result = subprocess.run(
|
||||||
|
[
|
||||||
|
"ffprobe",
|
||||||
|
"-v",
|
||||||
|
"error",
|
||||||
|
"-show_entries",
|
||||||
|
"format=duration,bit_rate:stream=codec_type,width,height,avg_frame_rate,sample_rate",
|
||||||
|
"-of",
|
||||||
|
"json",
|
||||||
|
str(path),
|
||||||
|
],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
return json.loads(result.stdout)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_ratio(value: str | None) -> float | None:
|
||||||
|
if not value or value in {"0/0", "N/A"}:
|
||||||
|
return None
|
||||||
|
if "/" in value:
|
||||||
|
left, right = value.split("/", 1)
|
||||||
|
right_num = float(right)
|
||||||
|
if right_num == 0:
|
||||||
|
return None
|
||||||
|
return round(float(left) / right_num, 3)
|
||||||
|
return float(value)
|
||||||
|
|
||||||
|
|
||||||
|
def summarize_probe(probe: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
video = next((stream for stream in probe.get("streams", []) if stream.get("codec_type") == "video"), {})
|
||||||
|
audio = next((stream for stream in probe.get("streams", []) if stream.get("codec_type") == "audio"), {})
|
||||||
|
return {
|
||||||
|
"duration_s": round(float((probe.get("format") or {}).get("duration") or 0.0), 3),
|
||||||
|
"bit_rate": int((probe.get("format") or {}).get("bit_rate") or 0),
|
||||||
|
"video": {
|
||||||
|
"width": int(video.get("width") or 0),
|
||||||
|
"height": int(video.get("height") or 0),
|
||||||
|
"fps": _parse_ratio(video.get("avg_frame_rate")),
|
||||||
|
},
|
||||||
|
"audio": {
|
||||||
|
"present": bool(audio),
|
||||||
|
"sample_rate": int(audio.get("sample_rate") or 0) if audio else None,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def extract_audio(input_path: Path, output_path: Path) -> None:
|
||||||
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
subprocess.run(
|
||||||
|
[
|
||||||
|
"ffmpeg",
|
||||||
|
"-y",
|
||||||
|
"-i",
|
||||||
|
str(input_path),
|
||||||
|
"-vn",
|
||||||
|
"-ac",
|
||||||
|
"1",
|
||||||
|
"-ar",
|
||||||
|
"16000",
|
||||||
|
str(output_path),
|
||||||
|
],
|
||||||
|
capture_output=True,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_keyframes(input_path: Path, keyframes_dir: Path) -> None:
|
||||||
|
keyframes_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
subprocess.run(
|
||||||
|
[
|
||||||
|
"ffmpeg",
|
||||||
|
"-y",
|
||||||
|
"-i",
|
||||||
|
str(input_path),
|
||||||
|
"-vf",
|
||||||
|
"fps=1",
|
||||||
|
str(keyframes_dir / "frame_%03d.jpg"),
|
||||||
|
],
|
||||||
|
capture_output=True,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def write_transcript_placeholder(path: Path) -> None:
|
||||||
|
write_json(path, {"status": "pending_local_asr", "segments": []})
|
||||||
|
|
||||||
|
|
||||||
|
def run_decomposition(input_path: Path, tweet_id: str, media_index: int, output_root: Path | None = None) -> dict[str, Any]:
|
||||||
|
paths = build_output_paths(tweet_id, media_index, output_root)
|
||||||
|
probe = ffprobe_json(input_path)
|
||||||
|
summary = summarize_probe(probe)
|
||||||
|
extract_audio(input_path, paths["audio_path"])
|
||||||
|
extract_keyframes(input_path, paths["keyframes_dir"])
|
||||||
|
write_transcript_placeholder(paths["transcript_path"])
|
||||||
|
metadata = {
|
||||||
|
"tweet_id": str(tweet_id),
|
||||||
|
"media_index": int(media_index),
|
||||||
|
"input_path": str(input_path),
|
||||||
|
**summary,
|
||||||
|
"audio_path": str(paths["audio_path"]),
|
||||||
|
"keyframes_dir": str(paths["keyframes_dir"]),
|
||||||
|
"transcript_path": str(paths["transcript_path"]),
|
||||||
|
}
|
||||||
|
write_json(paths["metadata_path"], metadata)
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"metadata_path": str(paths["metadata_path"]),
|
||||||
|
"audio_path": str(paths["audio_path"]),
|
||||||
|
"keyframes_dir": str(paths["keyframes_dir"]),
|
||||||
|
"transcript_path": str(paths["transcript_path"]),
|
||||||
|
**summary,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def build_parser() -> argparse.ArgumentParser:
|
||||||
|
parser = argparse.ArgumentParser(description=__doc__)
|
||||||
|
parser.add_argument("--input", required=True, help="Local video path")
|
||||||
|
parser.add_argument("--tweet-id", required=True)
|
||||||
|
parser.add_argument("--media-index", type=int, default=1)
|
||||||
|
parser.add_argument("--output-root", help="Override output root")
|
||||||
|
return parser
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
args = build_parser().parse_args()
|
||||||
|
output_root = Path(args.output_root).expanduser() if args.output_root else None
|
||||||
|
result = run_decomposition(Path(args.input).expanduser(), args.tweet_id, args.media_index, output_root)
|
||||||
|
print(json.dumps(result))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
89
tests/test_twitter_video_decomposition.py
Normal file
89
tests/test_twitter_video_decomposition.py
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from scripts.twitter_archive.decompose_media import build_output_paths, summarize_probe
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_output_paths_creates_local_artifact_tree() -> None:
|
||||||
|
paths = build_output_paths("12345", 1)
|
||||||
|
|
||||||
|
assert paths["clip_dir"].parts[-3:] == ("media", "decomposed", "12345")
|
||||||
|
assert paths["audio_path"].name == "001_audio.wav"
|
||||||
|
assert paths["keyframes_dir"].name == "001_keyframes"
|
||||||
|
assert paths["metadata_path"].name == "001_metadata.json"
|
||||||
|
assert paths["transcript_path"].name == "001_transcript.json"
|
||||||
|
|
||||||
|
|
||||||
|
def test_summarize_probe_extracts_duration_resolution_and_stream_flags() -> None:
|
||||||
|
probe = {
|
||||||
|
"format": {"duration": "4.015", "bit_rate": "832000"},
|
||||||
|
"streams": [
|
||||||
|
{"codec_type": "video", "width": 320, "height": 240, "avg_frame_rate": "30/1"},
|
||||||
|
{"codec_type": "audio", "sample_rate": "44100"},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
summary = summarize_probe(probe)
|
||||||
|
|
||||||
|
assert summary["duration_s"] == 4.015
|
||||||
|
assert summary["video"]["width"] == 320
|
||||||
|
assert summary["video"]["height"] == 240
|
||||||
|
assert summary["video"]["fps"] == 30.0
|
||||||
|
assert summary["audio"]["present"] is True
|
||||||
|
assert summary["audio"]["sample_rate"] == 44100
|
||||||
|
|
||||||
|
|
||||||
|
def test_cli_decomposes_one_local_clip(tmp_path: Path) -> None:
|
||||||
|
clip = tmp_path / "clip.mp4"
|
||||||
|
subprocess.run(
|
||||||
|
[
|
||||||
|
"ffmpeg",
|
||||||
|
"-y",
|
||||||
|
"-f",
|
||||||
|
"lavfi",
|
||||||
|
"-i",
|
||||||
|
"testsrc=size=160x120:rate=8",
|
||||||
|
"-f",
|
||||||
|
"lavfi",
|
||||||
|
"-i",
|
||||||
|
"sine=frequency=880:sample_rate=16000",
|
||||||
|
"-t",
|
||||||
|
"2",
|
||||||
|
"-pix_fmt",
|
||||||
|
"yuv420p",
|
||||||
|
str(clip),
|
||||||
|
],
|
||||||
|
capture_output=True,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
out_dir = tmp_path / "out"
|
||||||
|
result = subprocess.run(
|
||||||
|
[
|
||||||
|
sys.executable,
|
||||||
|
"-m",
|
||||||
|
"scripts.twitter_archive.decompose_media",
|
||||||
|
"--input",
|
||||||
|
str(clip),
|
||||||
|
"--tweet-id",
|
||||||
|
"999",
|
||||||
|
"--media-index",
|
||||||
|
"1",
|
||||||
|
"--output-root",
|
||||||
|
str(out_dir),
|
||||||
|
],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
check=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
payload = json.loads(result.stdout)
|
||||||
|
assert payload["status"] == "ok"
|
||||||
|
assert Path(payload["metadata_path"]).exists()
|
||||||
|
assert Path(payload["audio_path"]).exists()
|
||||||
|
assert Path(payload["keyframes_dir"]).exists()
|
||||||
|
assert list(Path(payload["keyframes_dir"]).glob("*.jpg"))
|
||||||
Reference in New Issue
Block a user