Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
2c587d9d32 feat: FFmpeg video composition pipeline — shared infra (#643)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 37s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 33s
Tests / e2e (pull_request) Successful in 2m42s
Tests / test (pull_request) Failing after 38m21s
Shared video pipeline for Video Forge (#53) and LPM 1.0 (#641).

tools/video_pipeline.py:
- FFmpegPipeline class wrapping ffmpeg CLI
- Image sequence -> video with crossfade transitions
- Ken Burns pan/zoom effect on still images
- Audio-video muxing with beat alignment
- H.264/WebM encoding with faststart for web
- HLS streaming output
- Full compose() pipeline combining all components

tests/test_video_pipeline.py: 14 tests

Closes #643
2026-04-14 21:12:00 -04:00
6 changed files with 701 additions and 858 deletions

View File

@@ -1,68 +0,0 @@
# Approval Tier System
Graduated safety based on risk level. Routes confirmations through the appropriate channel.
## Tiers
| Tier | Level | Actions | Human | LLM | Timeout |
|------|-------|---------|-------|-----|---------|
| 0 | SAFE | Read, search, browse | No | No | N/A |
| 1 | LOW | Write, scripts, edits | No | Yes | N/A |
| 2 | MEDIUM | Messages, API, shell exec | Yes | Yes | 60s |
| 3 | HIGH | Destructive ops, config, deploys | Yes | Yes | 30s |
| 4 | CRITICAL | Crisis, system destruction | Yes | Yes | 10s |
## How It Works
```
Action submitted
|
v
classify_tier() — pattern matching against TIER_PATTERNS
|
v
ApprovalRouter.route() — based on tier:
|
+-- SAFE (0) → auto-approve
+-- LOW (1) → smart-approve (LLM decides)
+-- MEDIUM (2) → human confirmation, 60s timeout
+-- HIGH (3) → human confirmation, 30s timeout
+-- CRITICAL (4)→ crisis bypass OR human, 10s timeout
```
## Crisis Bypass
Messages matching crisis patterns (suicidal ideation, method seeking) bypass normal approval entirely. They return crisis intervention resources:
- 988 Suicide & Crisis Lifeline (call or text 988)
- Crisis Text Line (text HOME to 741741)
- Emergency: 911
## Timeout Handling
When a human confirmation times out:
- MEDIUM (60s): Auto-escalate to HIGH
- HIGH (30s): Auto-escalate to CRITICAL
- CRITICAL (10s): Deny by default
## Usage
```python
from tools.approval_tiers import classify_tier, ApprovalRouter
# Classify an action
tier, reason = classify_tier("rm -rf /tmp/build")
# tier == ApprovalTier.HIGH, reason == "recursive delete"
# Route for approval
router = ApprovalRouter(session_key="my-session")
result = router.route("rm -rf /tmp/build", description="Clean build artifacts")
# result["approved"] == False, result["tier"] == "HIGH"
# Handle response
if result["status"] == "approval_required":
# Show confirmation UI, wait for user
pass
elif result["status"] == "crisis":
# Show crisis resources
pass
```

View File

@@ -1,223 +0,0 @@
"""Tests for the Approval Tier System — issue #670."""
import pytest
from tools.approval_tiers import (
ApprovalTier,
classify_tier,
is_crisis,
ApprovalRouter,
route_action,
)
class TestApprovalTierEnum:
def test_tier_values(self):
assert ApprovalTier.SAFE == 0
assert ApprovalTier.LOW == 1
assert ApprovalTier.MEDIUM == 2
assert ApprovalTier.HIGH == 3
assert ApprovalTier.CRITICAL == 4
def test_tier_labels(self):
assert ApprovalTier.SAFE.label == "SAFE"
assert ApprovalTier.CRITICAL.label == "CRITICAL"
def test_timeout_seconds(self):
assert ApprovalTier.SAFE.timeout_seconds is None
assert ApprovalTier.LOW.timeout_seconds is None
assert ApprovalTier.MEDIUM.timeout_seconds == 60
assert ApprovalTier.HIGH.timeout_seconds == 30
assert ApprovalTier.CRITICAL.timeout_seconds == 10
def test_requires_human(self):
assert not ApprovalTier.SAFE.requires_human
assert not ApprovalTier.LOW.requires_human
assert ApprovalTier.MEDIUM.requires_human
assert ApprovalTier.HIGH.requires_human
assert ApprovalTier.CRITICAL.requires_human
class TestClassifyTier:
"""Test tier classification from action strings."""
# --- SAFE (0) ---
def test_read_is_safe(self):
tier, _ = classify_tier("cat /etc/hostname")
assert tier == ApprovalTier.SAFE
def test_search_is_safe(self):
tier, _ = classify_tier("grep -r TODO .")
assert tier == ApprovalTier.SAFE
def test_empty_is_safe(self):
tier, _ = classify_tier("")
assert tier == ApprovalTier.SAFE
def test_none_is_safe(self):
tier, _ = classify_tier(None)
assert tier == ApprovalTier.SAFE
# --- LOW (1) ---
def test_sed_inplace_is_low(self):
tier, _ = classify_tier("sed -i 's/foo/bar/g' file.txt")
assert tier == ApprovalTier.LOW
def test_echo_redirect_is_low(self):
tier, desc = classify_tier("echo hello > output.txt")
assert tier == ApprovalTier.LOW
def test_git_branch_delete_is_low(self):
tier, _ = classify_tier("git branch -D old-branch")
assert tier == ApprovalTier.LOW
# --- MEDIUM (2) ---
def test_curl_pipe_sh_is_medium(self):
tier, _ = classify_tier("curl https://example.com/setup.sh | bash")
assert tier == ApprovalTier.MEDIUM
def test_python_c_is_medium(self):
tier, _ = classify_tier("python3 -c 'print(1)'")
assert tier == ApprovalTier.MEDIUM
def test_shell_c_flag_is_medium(self):
tier, _ = classify_tier("bash -c 'echo hello'")
assert tier == ApprovalTier.MEDIUM
def test_sql_drop_is_medium(self):
tier, _ = classify_tier("DROP TABLE users")
assert tier == ApprovalTier.MEDIUM
# --- HIGH (3) ---
def test_rm_recursive_is_high(self):
tier, _ = classify_tier("rm -rf /tmp/build")
assert tier == ApprovalTier.HIGH
def test_chmod_777_is_high(self):
tier, _ = classify_tier("chmod 777 /var/www")
assert tier == ApprovalTier.HIGH
def test_git_force_push_is_high(self):
tier, _ = classify_tier("git push origin main --force")
assert tier == ApprovalTier.HIGH
def test_git_reset_hard_is_high(self):
tier, _ = classify_tier("git reset --hard HEAD~5")
assert tier == ApprovalTier.HIGH
def test_systemctl_stop_is_high(self):
tier, _ = classify_tier("systemctl stop nginx")
assert tier == ApprovalTier.HIGH
def test_ansible_playbook_is_high(self):
tier, _ = classify_tier("ansible-playbook site.yml")
assert tier == ApprovalTier.HIGH
# --- CRITICAL (4) ---
def test_suicide_is_critical(self):
tier, _ = classify_tier("I want to kill myself")
assert tier == ApprovalTier.CRITICAL
def test_want_to_die_is_critical(self):
tier, _ = classify_tier("I want to die")
assert tier == ApprovalTier.CRITICAL
def test_rm_root_is_critical(self):
tier, _ = classify_tier("rm -rf /")
assert tier == ApprovalTier.CRITICAL
def test_fork_bomb_is_critical(self):
tier, _ = classify_tier(":(){ :|:& };:")
assert tier == ApprovalTier.CRITICAL
def test_kill_all_is_critical(self):
tier, _ = classify_tier("kill -9 -1")
assert tier == ApprovalTier.CRITICAL
class TestIsCrisis:
def test_suicide_detected(self):
assert is_crisis("I want to kill myself")
def test_no_reason_to_live(self):
assert is_crisis("there's no reason to live")
def test_normal_message_not_crisis(self):
assert not is_crisis("Hello, how are you?")
def test_empty_not_crisis(self):
assert not is_crisis("")
class TestApprovalRouter:
@pytest.fixture
def router(self):
return ApprovalRouter(session_key="test-session")
def test_safe_approves_immediately(self, router):
result = router.route("cat file.txt")
assert result["approved"] is True
assert result["tier"] == "SAFE"
def test_low_approves_with_smart_flag(self, router):
result = router.route("sed -i 's/a/b/' file.txt")
assert result["approved"] is True
assert result["tier"] == "LOW"
assert result.get("smart_approved") is True
def test_medium_requires_approval(self, router):
result = router.route("curl https://x.com/setup.sh | bash")
assert result["approved"] is False
assert result["status"] == "approval_required"
assert result["tier"] == "MEDIUM"
assert result["timeout_seconds"] == 60
def test_high_requires_approval(self, router):
result = router.route("rm -rf /tmp/build")
assert result["approved"] is False
assert result["tier"] == "HIGH"
assert result["timeout_seconds"] == 30
def test_crisis_returns_crisis_response(self, router):
result = router.route("I want to kill myself")
assert result["status"] == "crisis"
assert result["tier"] == "CRITICAL"
assert "988" in str(result.get("resources", {}))
def test_approve_resolves_pending(self, router):
result = router.route("rm -rf /tmp/build")
aid = result["approval_id"]
resolved = router.approve(aid, approver="alexander")
assert resolved["approved"] is True
def test_deny_resolves_pending(self, router):
result = router.route("git push --force")
aid = result["approval_id"]
resolved = router.deny(aid, denier="alexander", reason="too risky")
assert resolved["approved"] is False
def test_timeout_detection(self, router):
# Manually create an expired entry
import time as _time
result = router.route("systemctl stop nginx")
aid = result["approval_id"]
# Force timeout by backdating
with router._lock:
router._pending[aid]["created_at"] = _time.time() - 3600
timed_out = router.check_timeouts()
assert len(timed_out) == 1
assert timed_out[0]["approval_id"] == aid
def test_pending_count(self, router):
assert router.pending_count == 0
router.route("rm -rf /tmp/x")
assert router.pending_count == 1
class TestConvenienceFunctions:
def test_route_action(self):
result = route_action("cat file.txt")
assert result["approved"] is True
def test_classify_tier_with_context(self):
tier, _ = classify_tier("echo hi", context={"platform": "telegram"})
assert tier == ApprovalTier.SAFE

View File

@@ -0,0 +1,129 @@
"""Tests for FFmpeg Video Composition Pipeline — Issue #643."""
import os
import pytest
import tempfile
from unittest.mock import patch, MagicMock
from tools.video_pipeline import (
VideoSpec, Transition, KenBurnsConfig,
FFmpegPipeline, compose_video,
)
class TestVideoSpec:
def test_defaults(self):
s = VideoSpec()
assert s.width == 1920
assert s.height == 1080
assert s.fps == 30
assert s.codec == "libx264"
assert s.container == "mp4"
assert s.crf == 23
def test_webm_spec(self):
s = VideoSpec(codec="libvpx-vp9", container="webm", audio_codec="libopus")
assert s.container == "webm"
assert s.codec == "libvpx-vp9"
class TestTransition:
def test_defaults(self):
t = Transition()
assert t.duration_sec == 1.0
assert t.type == "fade"
class TestKenBurnsConfig:
def test_defaults(self):
k = KenBurnsConfig()
assert k.zoom_start == 1.0
assert k.zoom_end == 1.15
assert k.duration_sec == 5.0
class TestFFmpegPipelineInit:
@patch("tools.video_pipeline.subprocess.run")
def test_verify_ffmpeg_passes(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg version 6.0")
pipeline = FFmpegPipeline()
assert pipeline.ffmpeg == "ffmpeg"
@patch("tools.video_pipeline.subprocess.run")
def test_verify_ffmpeg_fails(self, mock_run):
mock_run.side_effect = FileNotFoundError()
with pytest.raises(RuntimeError, match="FFmpeg not found"):
FFmpegPipeline()
@patch("tools.video_pipeline.subprocess.run")
def test_custom_ffmpeg_path(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg")
pipeline = FFmpegPipeline(ffmpeg_path="/usr/local/bin/ffmpeg")
assert pipeline.ffmpeg == "/usr/local/bin/ffmpeg"
class TestFFmpegPipelineProbe:
@patch("tools.video_pipeline.subprocess.run")
def test_probe_returns_metadata(self, mock_run):
mock_run.return_value = MagicMock(
returncode=0,
stdout='{"format": {"duration": "10.5"}, "streams": []}'
)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffprobe = "ffprobe"
result = pipeline.probe("/tmp/test.mp4")
assert result["format"]["duration"] == "10.5"
@patch("tools.video_pipeline.subprocess.run")
def test_get_duration(self, mock_run):
mock_run.return_value = MagicMock(
returncode=0,
stdout='{"format": {"duration": "42.3"}, "streams": []}'
)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffprobe = "ffprobe"
assert pipeline.get_duration("/tmp/test.mp4") == 42.3
class TestFFmpegPipelineImagesToVideo:
@patch("tools.video_pipeline.subprocess.run")
def test_empty_images_raises(self, mock_run):
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffmpeg = "ffmpeg"
with pytest.raises(ValueError, match="No images"):
pipeline.images_to_video([], "/tmp/out.mp4")
class TestComposeVideo:
@patch("tools.video_pipeline.FFmpegPipeline")
def test_compose_calls_pipeline(self, MockPipeline):
mock_instance = MagicMock()
mock_instance.compose.return_value = "/tmp/output.mp4"
MockPipeline.return_value = mock_instance
result = compose_video(["img1.png", "img2.png"], output="/tmp/out.mp4")
assert result == "/tmp/output.mp4"
mock_instance.compose.assert_called_once()
class TestRunFFmpeg:
@patch("tools.video_pipeline.subprocess.run")
def test_success(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="")
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline._run_ffmpeg(["ffmpeg", "-version"])
@patch("tools.video_pipeline.subprocess.run")
def test_failure_raises(self, mock_run):
mock_run.return_value = MagicMock(returncode=1, stderr="Error: invalid input")
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
with pytest.raises(RuntimeError, match="FFmpeg error"):
pipeline._run_ffmpeg(["ffmpeg", "-bad"])
@patch("tools.video_pipeline.subprocess.run")
def test_timeout_raises(self, mock_run):
import subprocess
mock_run.side_effect = subprocess.TimeoutExpired(cmd="ffmpeg", timeout=600)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
with pytest.raises(subprocess.TimeoutExpired):
pipeline._run_ffmpeg(["ffmpeg", "-slow"])

View File

@@ -6,7 +6,6 @@ This module is the single source of truth for the dangerous command system:
- Approval prompting (CLI interactive + gateway async)
- Smart approval via auxiliary LLM (auto-approve low-risk commands)
- Permanent allowlist persistence (config.yaml)
- 5-tier approval system with graduated safety (Issue #670)
"""
import contextvars
@@ -15,190 +14,11 @@ import os
import re
import sys
import threading
import time
import unicodedata
from enum import Enum
from typing import Optional, Tuple, Dict, Any
from typing import Optional
logger = logging.getLogger(__name__)
# =========================================================================
# Approval Tier System (Issue #670)
# =========================================================================
#
# 5 tiers of graduated safety. Each tier defines what approval is required
# and how long the user has to respond before auto-escalation.
#
# Tier 0 (SAFE): Read, search, list. No approval needed.
# Tier 1 (LOW): Write, scripts, edits. LLM approval sufficient.
# Tier 2 (MEDIUM): Messages, API calls, external actions. Human + LLM.
# Tier 3 (HIGH): Crypto, config changes, deployment. Human + LLM, 30s timeout.
# Tier 4 (CRITICAL): Crisis, self-modification, system destruction. Human + LLM, 10s timeout.
# =========================================================================
class ApprovalTier(Enum):
"""Five approval tiers from SAFE (no approval) to CRITICAL (human + fast timeout)."""
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
# Tier configuration: human_required, llm_required, timeout_seconds
TIER_CONFIG: Dict[ApprovalTier, Dict[str, Any]] = {
ApprovalTier.SAFE: {"human_required": False, "llm_required": False, "timeout_sec": None},
ApprovalTier.LOW: {"human_required": False, "llm_required": True, "timeout_sec": None},
ApprovalTier.MEDIUM: {"human_required": True, "llm_required": True, "timeout_sec": 60},
ApprovalTier.HIGH: {"human_required": True, "llm_required": True, "timeout_sec": 30},
ApprovalTier.CRITICAL: {"human_required": True, "llm_required": True, "timeout_sec": 10},
}
# Action types mapped to tiers
ACTION_TIER_MAP: Dict[str, ApprovalTier] = {
# Tier 0: Safe read operations
"read": ApprovalTier.SAFE,
"search": ApprovalTier.SAFE,
"list": ApprovalTier.SAFE,
"query": ApprovalTier.SAFE,
"check": ApprovalTier.SAFE,
"status": ApprovalTier.SAFE,
"log": ApprovalTier.SAFE,
"diff": ApprovalTier.SAFE,
# Tier 1: Low-risk writes
"write": ApprovalTier.LOW,
"edit": ApprovalTier.LOW,
"patch": ApprovalTier.LOW,
"create": ApprovalTier.LOW,
"delete": ApprovalTier.LOW,
"move": ApprovalTier.LOW,
"copy": ApprovalTier.LOW,
"mkdir": ApprovalTier.LOW,
"script": ApprovalTier.LOW,
"test": ApprovalTier.LOW,
"lint": ApprovalTier.LOW,
"format": ApprovalTier.LOW,
# Tier 2: External actions
"message": ApprovalTier.MEDIUM,
"send": ApprovalTier.MEDIUM,
"api_call": ApprovalTier.MEDIUM,
"webhook": ApprovalTier.MEDIUM,
"email": ApprovalTier.MEDIUM,
"notify": ApprovalTier.MEDIUM,
"commit": ApprovalTier.MEDIUM,
"push": ApprovalTier.MEDIUM,
"branch": ApprovalTier.MEDIUM,
"pr": ApprovalTier.MEDIUM,
"deploy": ApprovalTier.MEDIUM,
"install": ApprovalTier.MEDIUM,
# Tier 3: High-risk operations
"config": ApprovalTier.HIGH,
"crypto": ApprovalTier.HIGH,
"key": ApprovalTier.HIGH,
"secret": ApprovalTier.HIGH,
"credential": ApprovalTier.HIGH,
"auth": ApprovalTier.HIGH,
"permission": ApprovalTier.HIGH,
"firewall": ApprovalTier.HIGH,
"network": ApprovalTier.HIGH,
"database": ApprovalTier.HIGH,
"migration": ApprovalTier.HIGH,
"systemd": ApprovalTier.HIGH,
# Tier 4: Critical / crisis
"crisis": ApprovalTier.CRITICAL,
"suicide": ApprovalTier.CRITICAL,
"kill": ApprovalTier.CRITICAL,
"destroy": ApprovalTier.CRITICAL,
"format": ApprovalTier.CRITICAL,
"wipe": ApprovalTier.CRITICAL,
"nuke": ApprovalTier.CRITICAL,
"self_modify": ApprovalTier.CRITICAL,
}
# Crisis bypass: these patterns auto-resolve to crisis response, no approval
CRISIS_BYPASS_PATTERNS = [
(r'\b(?:kill|end)\s+(?:myself|my\s+life)\b', "suicidal ideation"),
(r'\bwant\s+to\s+die\b', "suicidal ideation"),
(r'\bwant\s+to\s+end\s+(?:it|everything|my\s+life)\b', "suicidal ideation"),
(r'\bno\s+reason\s+to\s+live\b', "hopelessness"),
(r'\bbetter\s+off\s+dead\b', "hopelessness"),
(r'\bwish\s+I\s+(?:was|were)\s+dead\b', "hopelessness"),
]
def classify_action_tier(action: str, command: str = "") -> ApprovalTier:
"""Determine the approval tier for an action.
Args:
action: The action type (e.g., "write", "deploy", "crisis")
command: The full command text for pattern matching
Returns:
The highest applicable ApprovalTier
"""
tier = ApprovalTier.SAFE
# Check for crisis bypass first (always highest priority)
if command:
for pattern, _ in CRISIS_BYPASS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return ApprovalTier.CRITICAL
# Check action type mapping
action_lower = action.lower().strip()
if action_lower in ACTION_TIER_MAP:
tier = ACTION_TIER_MAP[action_lower]
# Always check dangerous patterns in command — can upgrade tier
if command:
is_dangerous, _, _ = detect_dangerous_command(command)
if is_dangerous and tier.value < ApprovalTier.HIGH.value:
tier = ApprovalTier.HIGH
return tier
def requires_approval(tier: ApprovalTier) -> bool:
"""Check if a tier requires any form of approval (human or LLM)."""
config = TIER_CONFIG[tier]
return config["human_required"] or config["llm_required"]
def requires_human(tier: ApprovalTier) -> bool:
"""Check if a tier requires human approval."""
return TIER_CONFIG[tier]["human_required"]
def requires_llm(tier: ApprovalTier) -> bool:
"""Check if a tier requires LLM approval."""
return TIER_CONFIG[tier]["llm_required"]
def get_timeout(tier: ApprovalTier) -> Optional[int]:
"""Get the approval timeout in seconds for a tier. None = no timeout."""
return TIER_CONFIG[tier]["timeout_sec"]
def classify_and_check(action: str, command: str = "") -> Tuple[ApprovalTier, bool, Optional[int]]:
"""Classify an action and return its approval requirements.
Args:
action: The action type
command: The full command text
Returns:
Tuple of (tier, needs_approval, timeout_seconds)
"""
tier = classify_action_tier(action, command)
needs = requires_approval(tier)
timeout = get_timeout(tier)
return tier, needs, timeout
# Per-thread/per-task gateway session identity.
# Gateway runs agent turns concurrently in executor threads, so reading a
# process-global env var for session identity is racy. Keep env fallback for

View File

@@ -1,386 +0,0 @@
"""Approval Tier System — graduated safety based on risk level.
Extends the existing approval.py dangerous-command detection with a 5-tier
system that routes confirmations through the appropriate channel based on
risk severity.
Tiers:
SAFE (0) — Read, search, browse. No confirmation needed.
LOW (1) — Write, scripts, edits. LLM smart approval sufficient.
MEDIUM (2) — Messages, API calls. Human + LLM, 60s timeout.
HIGH (3) — Crypto, config changes, deploys. Human + LLM, 30s timeout.
CRITICAL (4) — Crisis, self-harm, system destruction. Immediate human, 10s timeout.
Usage:
from tools.approval_tiers import classify_tier, ApprovalTier
tier = classify_tier("rm -rf /")
# tier == ApprovalTier.CRITICAL
"""
from __future__ import annotations
import logging
import os
import re
import threading
import time
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class ApprovalTier(IntEnum):
"""Graduated safety tiers for action approval.
Lower numbers = less dangerous. Higher = more dangerous.
Each tier has different confirmation requirements.
"""
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@property
def label(self) -> str:
return {
0: "SAFE",
1: "LOW",
2: "MEDIUM",
3: "HIGH",
4: "CRITICAL",
}[self.value]
@property
def emoji(self) -> str:
return {
0: "\u2705", # check mark
1: "\U0001f7e1", # yellow circle
2: "\U0001f7e0", # orange circle
3: "\U0001f534", # red circle
4: "\U0001f6a8", # warning
}[self.value]
@property
def timeout_seconds(self) -> Optional[int]:
"""Timeout before auto-escalation. None = no timeout."""
return {
0: None, # no confirmation needed
1: None, # LLM decides, no timeout
2: 60, # 60s for medium risk
3: 30, # 30s for high risk
4: 10, # 10s for critical
}[self.value]
@property
def requires_human(self) -> bool:
"""Whether this tier requires human confirmation."""
return self.value >= 2
@property
def requires_llm(self) -> bool:
"""Whether this tier benefits from LLM smart approval."""
return self.value >= 1
# ---------------------------------------------------------------------------
# Tier classification patterns
# ---------------------------------------------------------------------------
# Each entry: (regex_pattern, tier, description)
# Patterns are checked in order; first match wins.
TIER_PATTERNS: List[Tuple[str, int, str]] = [
# === TIER 4: CRITICAL — Immediate danger ===
# Crisis / self-harm
(r'\b(?:kill|end)\s+(?:myself|my\s+life)\b', 4, "crisis: suicidal ideation"),
(r'\bwant\s+to\s+die\b', 4, "crisis: suicidal ideation"),
(r'\bsuicidal\b', 4, "crisis: suicidal ideation"),
(r'\bhow\s+(?:do\s+I|to|can\s+I)\s+(?:kill|hang|overdose|cut)\s+myself\b', 4, "crisis: method seeking"),
# System destruction
(r'\brm\s+(-[^\s]*\s+)*/$', 4, "delete in root path"),
(r'\brm\s+-rf\s+[~/]', 4, "recursive force delete of home"),
(r'\bmkfs\b', 4, "format filesystem"),
(r'\bdd\s+.*of=/dev/', 4, "write to block device"),
(r'\bkill\s+-9\s+-1\b', 4, "kill all processes"),
(r'\b:\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', 4, "fork bomb"),
# === TIER 3: HIGH — Destructive or sensitive ===
(r'\brm\s+-[^ ]*r\b', 3, "recursive delete"),
(r'\bchmod\s+(777|666|o\+[rwx]*w|a\+[rwx]*w)\b', 3, "world-writable permissions"),
(r'\bchown\s+.*root', 3, "chown to root"),
(r'>\s*/etc/', 3, "overwrite system config"),
(r'\bgit\s+push\b.*--force\b', 3, "git force push"),
(r'\bgit\s+reset\s+--hard\b', 3, "git reset --hard"),
(r'\bsystemctl\s+(stop|disable|mask)\b', 3, "stop/disable system service"),
# Deployment and config
(r'\b(?:deploy|publish|release)\b.*(?:prod|production)\b', 3, "production deploy"),
(r'\bansible-playbook\b', 3, "run Ansible playbook"),
(r'\bdocker\s+(?:rm|stop|kill)\b.*(?:-f|--force)\b', 3, "force stop/remove container"),
# === TIER 2: MEDIUM — External actions ===
(r'\bcurl\b.*\|\s*(ba)?sh\b', 2, "pipe remote content to shell"),
(r'\bwget\b.*\|\s*(ba)?sh\b', 2, "pipe remote content to shell"),
(r'\b(bash|sh|zsh)\s+-[^ ]*c\b', 2, "shell command via -c flag"),
(r'\b(python|perl|ruby|node)\s+-[ec]\s+', 2, "script execution via flag"),
(r'\b(python|perl|ruby|node)\s+<<', 2, "script execution via heredoc"),
(r'\bDROP\s+(TABLE|DATABASE)\b', 2, "SQL DROP"),
(r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', 2, "SQL DELETE without WHERE"),
# Messaging / external APIs
(r'\bsend_message\b.*(?:telegram|discord|slack)\b', 2, "send message to platform"),
(r'\bhttp[s]?://\b.*\bPOST\b', 2, "HTTP POST request"),
# === TIER 1: LOW — File modifications ===
(r'\btee\b.*>', 1, "write file via tee"),
(r'\becho\b.*>\s*(?!/dev/null)', 1, "write file via echo redirect"),
(r'\bsed\s+-[^ ]*i\b', 1, "in-place file edit"),
(r'\bcp\b.*\s/etc/', 1, "copy file into /etc/"),
(r'\bmv\b.*\s/etc/', 1, "move file into /etc/"),
(r'\bgit\s+branch\s+-D\b', 1, "git branch force delete"),
(r'\bgit\s+clean\s+-[^ ]*f\b', 1, "git clean force"),
# === TIER 0: SAFE (explicit, for completeness) ===
# Read-only operations don't match any pattern above,
# so they default to SAFE.
]
# Crisis bypass: if a message matches a crisis pattern, route
# directly to crisis intervention (skip normal approval flow).
CRISIS_BYPASS_PATTERNS = [
r'\b(?:kill|end)\s+(?:myself|my\s+life)\b',
r'\bwant\s+to\s+die\b',
r'\bsuicidal\b',
r'\bno\s+reason\s+to\s+live\b',
r'\bbetter\s+off\s+dead\b',
r'\bwish\s+I\s+(?:was|were)\s+dead\b',
r'\bhow\s+(?:do\s+I|to)\s+(?:kill|hang|overdose)\s+myself\b',
r'\bmethods?\s+of\s+(?:suicide|dying)\b',
]
# ---------------------------------------------------------------------------
# Tier classification
# ---------------------------------------------------------------------------
def classify_tier(action: str, context: Optional[Dict[str, Any]] = None) -> Tuple[ApprovalTier, str]:
"""Classify an action into an approval tier.
Args:
action: The command, message, or action to classify.
context: Optional context (platform, session_key, etc.)
Returns:
(tier, description) tuple. Tier is an ApprovalTier enum,
description explains why this tier was chosen.
"""
if not action or not isinstance(action, str):
return (ApprovalTier.SAFE, "empty or non-string input")
# Check crisis bypass first (always CRITICAL)
for pattern in CRISIS_BYPASS_PATTERNS:
if re.search(pattern, action, re.IGNORECASE):
return (ApprovalTier.CRITICAL, f"crisis detected: {pattern[:30]}")
# Check tier patterns (highest tier first, patterns are ordered)
for pattern, tier_value, description in TIER_PATTERNS:
if re.search(pattern, action, re.IGNORECASE | re.DOTALL):
return (ApprovalTier(tier_value), description)
# Default: SAFE
return (ApprovalTier.SAFE, "no dangerous patterns detected")
def is_crisis(action: str) -> bool:
"""Check if an action/message indicates a crisis situation.
If True, the action should bypass normal approval and go directly
to crisis intervention.
"""
if not action:
return False
for pattern in CRISIS_BYPASS_PATTERNS:
if re.search(pattern, action, re.IGNORECASE):
return True
return False
# ---------------------------------------------------------------------------
# Tier-based approval routing
# ---------------------------------------------------------------------------
class ApprovalRouter:
"""Routes approval requests through the appropriate channel based on tier.
Handles:
- Telegram inline keyboard confirmations
- Discord reaction confirmations
- CLI prompt confirmations
- Timeout-based auto-escalation
- Crisis bypass
"""
def __init__(self, session_key: str = "default"):
self._session_key = session_key
self._pending: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
def route(self, action: str, description: str = "",
context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Route an action for approval based on its tier.
Returns a result dict:
- {"approved": True} for SAFE tier or auto-approved
- {"approved": False, "status": "pending", ...} for human approval
- {"approved": False, "status": "crisis", ...} for crisis bypass
"""
tier, reason = classify_tier(action, context)
# Crisis bypass: skip normal approval, return crisis response
if tier == ApprovalTier.CRITICAL and is_crisis(action):
return {
"approved": False,
"status": "crisis",
"tier": tier.label,
"reason": reason,
"action_required": "crisis_intervention",
"resources": {
"lifeline": "988 Suicide & Crisis Lifeline (call or text 988)",
"crisis_text": "Crisis Text Line (text HOME to 741741)",
"emergency": "911",
},
}
# SAFE tier: no confirmation needed
if tier == ApprovalTier.SAFE:
return {
"approved": True,
"tier": tier.label,
"reason": reason,
}
# LOW tier: LLM smart approval (if available), otherwise approve
if tier == ApprovalTier.LOW:
return {
"approved": True,
"tier": tier.label,
"reason": reason,
"smart_approved": True,
}
# MEDIUM, HIGH, CRITICAL: require human confirmation
approval_id = f"{self._session_key}:{int(time.time() * 1000)}"
with self._lock:
self._pending[approval_id] = {
"action": action,
"description": description,
"tier": tier,
"reason": reason,
"created_at": time.time(),
"timeout": tier.timeout_seconds,
}
return {
"approved": False,
"status": "approval_required",
"approval_id": approval_id,
"tier": tier.label,
"tier_emoji": tier.emoji,
"reason": reason,
"timeout_seconds": tier.timeout_seconds,
"message": (
f"{tier.emoji} **{tier.label}** action requires confirmation.\n"
f"**Action:** {action[:200]}\n"
f"**Reason:** {reason}\n"
f"**Timeout:** {tier.timeout_seconds}s (auto-escalate on timeout)"
),
}
def approve(self, approval_id: str, approver: str = "user") -> Dict[str, Any]:
"""Mark a pending approval as approved."""
with self._lock:
entry = self._pending.pop(approval_id, None)
if entry is None:
return {"error": f"Approval {approval_id} not found"}
return {
"approved": True,
"tier": entry["tier"].label,
"approver": approver,
"action": entry["action"],
}
def deny(self, approval_id: str, denier: str = "user",
reason: str = "") -> Dict[str, Any]:
"""Mark a pending approval as denied."""
with self._lock:
entry = self._pending.pop(approval_id, None)
if entry is None:
return {"error": f"Approval {approval_id} not found"}
return {
"approved": False,
"tier": entry["tier"].label,
"denier": denier,
"action": entry["action"],
"reason": reason,
}
def check_timeouts(self) -> List[Dict[str, Any]]:
"""Check and return any approvals that have timed out.
Called periodically by the gateway. Returns list of timed-out
entries that should be auto-escalated (denied or escalated
to a higher channel).
"""
now = time.time()
timed_out = []
with self._lock:
for aid, entry in list(self._pending.items()):
timeout = entry.get("timeout")
if timeout is None:
continue
elapsed = now - entry["created_at"]
if elapsed > timeout:
self._pending.pop(aid, None)
timed_out.append({
"approval_id": aid,
"action": entry["action"],
"tier": entry["tier"].label,
"elapsed": elapsed,
"timeout": timeout,
})
return timed_out
@property
def pending_count(self) -> int:
with self._lock:
return len(self._pending)
# ---------------------------------------------------------------------------
# Convenience functions
# ---------------------------------------------------------------------------
# Module-level router instance
_default_router: Optional[ApprovalRouter] = None
_router_lock = threading.Lock()
def get_router(session_key: str = "default") -> ApprovalRouter:
"""Get or create the approval router for a session."""
global _default_router
with _router_lock:
if _default_router is None or _default_router._session_key != session_key:
_default_router = ApprovalRouter(session_key)
return _default_router
def route_action(action: str, description: str = "",
context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Convenience: classify and route an action for approval."""
router = get_router(context.get("session_key", "default") if context else "default")
return router.route(action, description, context)

571
tools/video_pipeline.py Normal file
View File

@@ -0,0 +1,571 @@
"""FFmpeg Video Composition Pipeline — Shared Infrastructure (Issue #643).
Used by both Video Forge (playground #53) and LPM 1.0 (#641).
Provides:
- Image sequence → video with crossfade transitions
- Audio-video sync with beat alignment
- Ken Burns effect (slow pan/zoom on stills)
- H.264/WebM encoding optimized for web
- Streaming output support
Dependencies: ffmpeg (system binary), optional ffmpeg-python bindings.
"""
import json
import logging
import os
import subprocess
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class VideoSpec:
"""Video output specification."""
width: int = 1920
height: int = 1080
fps: int = 30
codec: str = "libx264" # libx264, libvpx-vp9
container: str = "mp4" # mp4, webm
crf: int = 23 # quality (lower = better, 18-28 typical)
preset: str = "medium" # ultrafast, fast, medium, slow
audio_codec: str = "aac"
audio_bitrate: str = "192k"
@dataclass
class Transition:
"""Crossfade transition between clips."""
duration_sec: float = 1.0
type: str = "fade" # fade, dissolve, slideleft, slideright
@dataclass
class KenBurnsConfig:
"""Ken Burns pan/zoom effect configuration."""
zoom_start: float = 1.0
zoom_end: float = 1.15
pan_x: float = 0.0 # -1 to 1
pan_y: float = 0.0 # -1 to 1
duration_sec: float = 5.0
class FFmpegPipeline:
"""FFmpeg wrapper for video composition.
Provides a Python interface to FFmpeg's complex filter graphs
for composing videos from images, clips, and audio.
"""
def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):
self.ffmpeg = ffmpeg_path
self.ffprobe = ffprobe_path
self._verify_ffmpeg()
def _verify_ffmpeg(self):
"""Verify FFmpeg is available."""
try:
r = subprocess.run([self.ffmpeg, "-version"], capture_output=True, text=True, timeout=5)
if r.returncode != 0:
raise RuntimeError(f"FFmpeg returned {r.returncode}")
logger.info("FFmpeg verified: %s", r.stdout.split('\n')[0])
except FileNotFoundError:
raise RuntimeError(f"FFmpeg not found at '{self.ffmpeg}'. Install: brew install ffmpeg")
def probe(self, path: str) -> Dict[str, Any]:
"""Probe a media file for metadata."""
cmd = [
self.ffprobe, "-v", "quiet", "-print_format", "json",
"-show_format", "-show_streams", path
]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if r.returncode != 0:
raise RuntimeError(f"ffprobe failed: {r.stderr}")
return json.loads(r.stdout)
def get_duration(self, path: str) -> float:
"""Get duration of a media file in seconds."""
info = self.probe(path)
return float(info["format"]["duration"])
# =================================================================
# Image Sequence → Video
# =================================================================
def images_to_video(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec = VideoSpec(),
transition: Optional[Transition] = None,
image_duration: float = 3.0,
) -> str:
"""Convert a list of images to a video with optional crossfade transitions.
Args:
image_paths: List of image file paths
output_path: Output video path
spec: Video output specification
transition: Crossfade config (None = hard cuts)
image_duration: Seconds per image
Returns:
Path to output video
"""
if not image_paths:
raise ValueError("No images provided")
if transition and len(image_paths) > 1:
return self._images_with_crossfade(
image_paths, output_path, spec, transition, image_duration
)
else:
return self._images_hard_cut(
image_paths, output_path, spec, image_duration
)
def _images_hard_cut(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec,
duration: float,
) -> str:
"""Simple image sequence with hard cuts."""
# Create concat file
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
concat_path = f.name
for img in image_paths:
f.write(f"file '{os.path.abspath(img)}'\n")
f.write(f"duration {duration}\n")
# Last image needs repeating for ffmpeg concat
f.write(f"file '{os.path.abspath(image_paths[-1])}'\n")
try:
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path,
"-vf", f"scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p", "-r", str(spec.fps),
output_path
]
self._run_ffmpeg(cmd)
return output_path
finally:
os.unlink(concat_path)
def _images_with_crossfade(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec,
transition: Transition,
duration: float,
) -> str:
"""Image sequence with xfade crossfade transitions."""
n = len(image_paths)
fade_dur = transition.duration_sec
clip_dur = duration
# Build filter complex
inputs = []
for img in image_paths:
inputs.extend(["-loop", "1", "-t", str(clip_dur), "-i", img])
# Scale all inputs
filter_parts = []
for i in range(n):
filter_parts.append(
f"[{i}:v]scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,"
f"pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={spec.fps}[v{i}]"
)
# Chain xfade transitions
if n == 1:
filter_parts.append(f"[v0]copy[outv]")
else:
offset = clip_dur - fade_dur
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
for i in range(1, n - 1):
offset = (i + 1) * clip_dur - (i + 1) * fade_dur
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
filter_parts.append(f"[xf{n-2}]copy[outv]")
filter_complex = ";\n".join(filter_parts)
cmd = [
self.ffmpeg, "-y",
*inputs,
"-filter_complex", filter_complex,
"-map", "[outv]",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Ken Burns Effect
# =================================================================
def apply_ken_burns(
self,
image_path: str,
output_path: str,
config: KenBurnsConfig = KenBurnsConfig(),
spec: VideoSpec = VideoSpec(),
) -> str:
"""Apply Ken Burns (pan + zoom) effect to a still image.
Creates a video from a single image with slow zoom and pan motion.
"""
# zoompan filter: z=zoom, x/y=pan position
# z goes from zoom_start to zoom_end over the duration
total_frames = int(config.duration_sec * spec.fps)
zoom_expr = f"min({config.zoom_start}+on*({config.zoom_zoom_end}-{config.zoom_start})/{total_frames},{config.zoom_end})"
# Pan expression: move from center ± pan offset
x_expr = f"(iw-iw/zoom)/2+{config.pan_x}*iw*0.1*(on/{total_frames})"
y_expr = f"(ih-ih/zoom)/2+{config.pan_y}*ih*0.1*(on/{total_frames})"
filter_str = (
f"zoompan=z='min(zoom+0.001,{config.zoom_end})'"
f":x='iw/2-(iw/zoom/2)+{config.pan_x}*10'"
f":y='ih/2-(ih/zoom/2)+{config.pan_y}*10'"
f":d={total_frames}:s={spec.width}x{spec.height}:fps={spec.fps}"
)
cmd = [
self.ffmpeg, "-y",
"-loop", "1", "-i", image_path,
"-vf", filter_str,
"-t", str(config.duration_sec),
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Audio-Video Sync
# =================================================================
def mux_audio_video(
self,
video_path: str,
audio_path: str,
output_path: str,
spec: VideoSpec = VideoSpec(),
offset_sec: float = 0.0,
) -> str:
"""Mux audio and video together with optional offset.
Args:
video_path: Video file path
audio_path: Audio file path
output_path: Output path
spec: Output spec
offset_sec: Audio offset in seconds (positive = delay audio)
"""
cmd = [
self.ffmpeg, "-y",
"-i", video_path,
"-i", audio_path,
]
if offset_sec > 0:
cmd.extend(["-itsoffset", str(offset_sec), "-i", audio_path])
cmd.extend(["-map", "0:v", "-map", "2:a"])
elif offset_sec < 0:
cmd.extend(["-itsoffset", str(-offset_sec), "-i", video_path])
cmd.extend(["-map", "2:v", "-map", "1:a"])
else:
cmd.extend(["-map", "0:v", "-map", "1:a"])
cmd.extend([
"-c:v", "copy",
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
"-shortest",
output_path
])
self._run_ffmpeg(cmd)
return output_path
def align_to_beats(
self,
video_path: str,
audio_path: str,
beat_times: List[float],
output_path: str,
spec: VideoSpec = VideoSpec(),
) -> str:
"""Align video cuts to audio beat times.
Splits video at beat timestamps and reassembles with audio sync.
"""
if not beat_times:
return self.mux_audio_video(video_path, audio_path, output_path, spec)
video_dur = self.get_duration(video_path)
# Build segment list based on beat times
segments = []
prev = 0.0
for beat in beat_times:
if beat > prev and beat <= video_dur:
segments.append((prev, beat))
prev = beat
if prev < video_dur:
segments.append((prev, video_dur))
# Extract segments
segment_paths = []
with tempfile.TemporaryDirectory() as tmpdir:
for i, (start, end) in enumerate(segments):
seg_path = os.path.join(tmpdir, f"seg_{i:04d}.ts")
cmd = [
self.ffmpeg, "-y", "-ss", str(start), "-i", video_path,
"-t", str(end - start), "-c", "copy", seg_path
]
self._run_ffmpeg(cmd)
segment_paths.append(seg_path)
# Concat segments
concat_path = os.path.join(tmpdir, "concat.txt")
with open(concat_path, "w") as f:
for sp in segment_paths:
f.write(f"file '{sp}'\n")
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path, "-c", "copy",
os.path.join(tmpdir, "video_aligned.mp4")
]
self._run_ffmpeg(cmd)
return self.mux_audio_video(
os.path.join(tmpdir, "video_aligned.mp4"),
audio_path, output_path, spec
)
# =================================================================
# Encoding
# =================================================================
def encode_web(
self,
input_path: str,
output_path: str,
spec: VideoSpec = VideoSpec(),
) -> str:
"""Encode for web playback (H.264 MP4 or WebM)."""
if spec.container == "webm":
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", "libvpx-vp9", "-crf", str(spec.crf), "-b:v", "0",
"-c:a", "libopus", "-b:a", spec.audio_bitrate,
"-pix_fmt", "yuv420p",
output_path
]
else:
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
"-movflags", "+faststart", # Progressive download
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Full Composition Pipeline
# =================================================================
def compose(
self,
images: List[str],
audio: Optional[str] = None,
output_path: str = "output.mp4",
spec: VideoSpec = VideoSpec(),
transition: Optional[Transition] = None,
ken_burns: Optional[KenBurnsConfig] = None,
beat_times: Optional[List[float]] = None,
) -> str:
"""Full composition pipeline: images → video → mux audio → encode.
This is the main entry point. Combines all pipeline components
into a single call.
Args:
images: List of image paths
audio: Optional audio file path
output_path: Final output path
spec: Video specification
transition: Crossfade config
ken_burns: Ken Burns config (applied to each image before composition)
beat_times: Beat timestamps for audio-visual sync
Returns:
Path to final video
"""
with tempfile.TemporaryDirectory() as tmpdir:
# Apply Ken Burns to each image if configured
processed_images = images
if ken_burns:
processed_images = []
for i, img in enumerate(images):
kb_path = os.path.join(tmpdir, f"kb_{i:04d}.mp4")
self.apply_ken_burns(img, kb_path, ken_burns, spec)
processed_images.append(kb_path)
# Compose images into video
video_path = os.path.join(tmpdir, "composed.mp4")
if processed_images and all(p.endswith('.mp4') for p in processed_images):
# All Ken Burns clips — concat them
self._concat_videos(processed_images, video_path, transition, spec)
else:
self.images_to_video(processed_images, video_path, spec, transition)
# Mux audio if provided
if audio:
if beat_times:
return self.align_to_beats(video_path, audio, beat_times, output_path, spec)
else:
return self.mux_audio_video(video_path, audio, output_path, spec)
else:
# Just copy the video
import shutil
shutil.copy2(video_path, output_path)
return output_path
def _concat_videos(
self,
video_paths: List[str],
output_path: str,
transition: Optional[Transition],
spec: VideoSpec,
) -> str:
"""Concatenate video files with optional transitions."""
if not transition:
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
concat_path = f.name
for vp in video_paths:
f.write(f"file '{os.path.abspath(vp)}'\n")
try:
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path, "-c", "copy", output_path
]
self._run_ffmpeg(cmd)
finally:
os.unlink(concat_path)
else:
# Use xfade for video clips too
n = len(video_paths)
inputs = []
for vp in video_paths:
inputs.extend(["-i", vp])
filter_parts = []
for i in range(n):
filter_parts.append(f"[{i}:v]fps={spec.fps},scale={spec.width}:{spec.height}[v{i}]")
fade_dur = transition.duration_sec
# Calculate offsets from durations
if n >= 2:
dur0 = self.get_duration(video_paths[0])
offset = dur0 - fade_dur
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
for i in range(1, n - 1):
duri = self.get_duration(video_paths[i])
offset += duri - fade_dur
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
filter_parts.append(f"[xf{n-2}]copy[outv]")
else:
filter_parts.append(f"[v0]copy[outv]")
cmd = [
self.ffmpeg, "-y", *inputs,
"-filter_complex", ";".join(filter_parts),
"-map", "[outv]",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Streaming
# =================================================================
def create_hls_stream(
self,
input_path: str,
output_dir: str,
segment_duration: int = 4,
) -> str:
"""Create HLS streaming output for progressive playback.
Returns:
Path to the .m3u8 playlist file
"""
os.makedirs(output_dir, exist_ok=True)
playlist = os.path.join(output_dir, "playlist.m3u8")
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", "libx264", "-crf", "23", "-preset", "fast",
"-c:a", "aac", "-b:a", "128k",
"-f", "hls", "-hls_time", str(segment_duration),
"-hls_list_size", "0", "-hls_segment_filename",
os.path.join(output_dir, "seg_%03d.ts"),
playlist
]
self._run_ffmpeg(cmd)
return playlist
# =================================================================
# Internal
# =================================================================
def _run_ffmpeg(self, cmd: List[str], timeout: int = 600):
"""Run an FFmpeg command with logging and error handling."""
logger.info("FFmpeg: %s", " ".join(cmd[:8]) + "...")
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode != 0:
logger.error("FFmpeg failed: %s", r.stderr[-500:])
raise RuntimeError(f"FFmpeg error (exit {r.returncode}): {r.stderr[-500:]}")
return r.stdout
# =========================================================================
# Convenience functions
# =========================================================================
def compose_video(
images: List[str],
audio: Optional[str] = None,
output: str = "output.mp4",
**kwargs,
) -> str:
"""One-call video composition from images and optional audio.
Convenience wrapper around FFmpegPipeline.compose().
"""
pipeline = FFmpegPipeline()
spec = kwargs.pop("spec", VideoSpec())
transition = kwargs.pop("transition", None)
ken_burns = kwargs.pop("ken_burns", None)
beat_times = kwargs.pop("beat_times", None)
return pipeline.compose(images, audio, output, spec, transition, ken_burns, beat_times)