Compare commits
1 Commits
fix/670-ap
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2c587d9d32 |
@@ -1,80 +0,0 @@
|
||||
# Approval Tier System
|
||||
|
||||
Graduated safety for command approval based on risk level.
|
||||
|
||||
## Tiers
|
||||
|
||||
| Tier | Name | Action Types | Who Approves | Timeout |
|
||||
|------|------|--------------|--------------|---------|
|
||||
| 0 | SAFE | Read, search, list, view | None | N/A |
|
||||
| 1 | LOW | Write, create, edit, script | LLM only | N/A |
|
||||
| 2 | MEDIUM | Messages, API, email | Human + LLM | 60s |
|
||||
| 3 | HIGH | Crypto, config, deploy | Human + LLM | 30s |
|
||||
| 4 | CRITICAL | Delete, kill, shutdown | Human + LLM | 10s |
|
||||
|
||||
## How It Works
|
||||
|
||||
1. **Detection**: `detect_tier(command, action)` analyzes the command and action type
|
||||
2. **Auto-approve**: SAFE and LOW tiers are automatically approved
|
||||
3. **Human approval**: MEDIUM+ tiers require human confirmation
|
||||
4. **Timeout handling**: If no response within timeout, escalate to next tier
|
||||
5. **Crisis bypass**: 988 Lifeline commands bypass approval entirely
|
||||
|
||||
## Usage
|
||||
|
||||
```python
|
||||
from tools.approval import TieredApproval, detect_tier, ApprovalTier
|
||||
|
||||
# Detect tier
|
||||
tier = detect_tier("rm -rf /tmp/data") # Returns ApprovalTier.CRITICAL
|
||||
|
||||
# Request approval
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
|
||||
if result["approved"]:
|
||||
# Auto-approved (SAFE or LOW tier)
|
||||
execute_command()
|
||||
else:
|
||||
# Needs human approval
|
||||
show_approval_ui(result["approval_id"], result["tier"], result["timeout"])
|
||||
```
|
||||
|
||||
## Crisis Bypass
|
||||
|
||||
Commands containing crisis keywords (988, suicide, self-harm, crisis hotline) automatically bypass approval to ensure immediate help:
|
||||
|
||||
```python
|
||||
from tools.approval import is_crisis_bypass
|
||||
|
||||
is_crisis_bypass("call 988 for help") # True — bypasses approval
|
||||
```
|
||||
|
||||
## Timeout Escalation
|
||||
|
||||
When a tier times out without human response:
|
||||
- MEDIUM → HIGH (30s timeout)
|
||||
- HIGH → CRITICAL (10s timeout)
|
||||
- CRITICAL → Deny
|
||||
|
||||
## Integration
|
||||
|
||||
The tier system integrates with:
|
||||
- **CLI**: Interactive prompts with tier-aware timeouts
|
||||
- **Gateway**: Telegram/Discord approval buttons
|
||||
- **Cron**: Auto-approve LOW tier, escalate MEDIUM+
|
||||
|
||||
## Testing
|
||||
|
||||
Run tests with:
|
||||
```bash
|
||||
python -m pytest tests/test_approval_tiers.py -v
|
||||
```
|
||||
|
||||
26 tests covering:
|
||||
- Tier detection from commands and actions
|
||||
- Timeout values per tier
|
||||
- Approver requirements
|
||||
- Crisis bypass logic
|
||||
- Approval request and resolution
|
||||
- Timeout escalation
|
||||
@@ -1,141 +0,0 @@
|
||||
"""Tests for approval tier system (Issue #670)."""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from tools.approval import (
|
||||
ApprovalTier, detect_tier, get_tier_timeout, get_tier_approvers,
|
||||
requires_human_approval, is_crisis_bypass, TieredApproval, get_tiered_approval
|
||||
)
|
||||
|
||||
|
||||
class TestApprovalTier:
|
||||
def test_safe_read(self):
|
||||
assert detect_tier("cat file.txt") == ApprovalTier.SAFE
|
||||
|
||||
def test_safe_search(self):
|
||||
assert detect_tier("grep pattern file") == ApprovalTier.SAFE
|
||||
|
||||
def test_low_write(self):
|
||||
assert detect_tier("write to file", action="write") == ApprovalTier.LOW
|
||||
|
||||
def test_medium_message(self):
|
||||
assert detect_tier("send message", action="send_message") == ApprovalTier.MEDIUM
|
||||
|
||||
def test_high_config(self):
|
||||
assert detect_tier("edit config", action="config") == ApprovalTier.HIGH
|
||||
|
||||
def test_critical_delete(self):
|
||||
assert detect_tier("rm -rf /", action="delete") == ApprovalTier.CRITICAL
|
||||
|
||||
def test_crisis_keyword(self):
|
||||
assert detect_tier("call 988 for help") == ApprovalTier.CRITICAL
|
||||
|
||||
def test_dangerous_pattern_escalation(self):
|
||||
# rm -rf should be CRITICAL
|
||||
assert detect_tier("rm -rf /tmp/data") == ApprovalTier.CRITICAL
|
||||
|
||||
|
||||
class TestTierTimeouts:
|
||||
def test_safe_no_timeout(self):
|
||||
assert get_tier_timeout(ApprovalTier.SAFE) == 0
|
||||
|
||||
def test_medium_60s(self):
|
||||
assert get_tier_timeout(ApprovalTier.MEDIUM) == 60
|
||||
|
||||
def test_high_30s(self):
|
||||
assert get_tier_timeout(ApprovalTier.HIGH) == 30
|
||||
|
||||
def test_critical_10s(self):
|
||||
assert get_tier_timeout(ApprovalTier.CRITICAL) == 10
|
||||
|
||||
|
||||
class TestTierApprovers:
|
||||
def test_safe_no_approvers(self):
|
||||
assert get_tier_approvers(ApprovalTier.SAFE) == ()
|
||||
|
||||
def test_low_llm_only(self):
|
||||
assert get_tier_approvers(ApprovalTier.LOW) == ("llm",)
|
||||
|
||||
def test_medium_human_llm(self):
|
||||
assert get_tier_approvers(ApprovalTier.MEDIUM) == ("human", "llm")
|
||||
|
||||
def test_requires_human(self):
|
||||
assert requires_human_approval(ApprovalTier.SAFE) == False
|
||||
assert requires_human_approval(ApprovalTier.LOW) == False
|
||||
assert requires_human_approval(ApprovalTier.MEDIUM) == True
|
||||
assert requires_human_approval(ApprovalTier.HIGH) == True
|
||||
assert requires_human_approval(ApprovalTier.CRITICAL) == True
|
||||
|
||||
|
||||
class TestCrisisBypass:
|
||||
def test_988_bypass(self):
|
||||
assert is_crisis_bypass("call 988") == True
|
||||
|
||||
def test_suicide_prevention(self):
|
||||
assert is_crisis_bypass("contact suicide prevention") == True
|
||||
|
||||
def test_normal_command(self):
|
||||
assert is_crisis_bypass("ls -la") == False
|
||||
|
||||
|
||||
class TestTieredApproval:
|
||||
def test_safe_auto_approves(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "cat file.txt")
|
||||
assert result["approved"] == True
|
||||
assert result["tier"] == ApprovalTier.SAFE
|
||||
|
||||
def test_low_auto_approves(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "write file", action="write")
|
||||
assert result["approved"] == True
|
||||
assert result["tier"] == ApprovalTier.LOW
|
||||
|
||||
def test_medium_needs_approval(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
assert result["approved"] == False
|
||||
assert result["tier"] == ApprovalTier.MEDIUM
|
||||
assert "approval_id" in result
|
||||
|
||||
def test_crisis_bypass(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "call 988 for help")
|
||||
assert result["approved"] == True
|
||||
assert result["reason"] == "crisis_bypass"
|
||||
|
||||
def test_resolve_approval(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
approval_id = result["approval_id"]
|
||||
|
||||
assert ta.resolve_approval(approval_id, True) == True
|
||||
assert approval_id not in ta._pending
|
||||
|
||||
def test_timeout_escalation(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
approval_id = result["approval_id"]
|
||||
|
||||
# Manually set timeout to past
|
||||
ta._timeouts[approval_id] = 0
|
||||
|
||||
timed_out = ta.check_timeouts()
|
||||
assert approval_id in timed_out
|
||||
|
||||
# Should have escalated to HIGH tier
|
||||
if approval_id in ta._pending:
|
||||
assert ta._pending[approval_id]["tier"] == ApprovalTier.HIGH
|
||||
|
||||
|
||||
class TestGetTieredApproval:
|
||||
def test_singleton(self):
|
||||
ta1 = get_tiered_approval()
|
||||
ta2 = get_tiered_approval()
|
||||
assert ta1 is ta2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-v"])
|
||||
129
tests/test_video_pipeline.py
Normal file
129
tests/test_video_pipeline.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""Tests for FFmpeg Video Composition Pipeline — Issue #643."""
|
||||
|
||||
import os
|
||||
import pytest
|
||||
import tempfile
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from tools.video_pipeline import (
|
||||
VideoSpec, Transition, KenBurnsConfig,
|
||||
FFmpegPipeline, compose_video,
|
||||
)
|
||||
|
||||
|
||||
class TestVideoSpec:
|
||||
def test_defaults(self):
|
||||
s = VideoSpec()
|
||||
assert s.width == 1920
|
||||
assert s.height == 1080
|
||||
assert s.fps == 30
|
||||
assert s.codec == "libx264"
|
||||
assert s.container == "mp4"
|
||||
assert s.crf == 23
|
||||
|
||||
def test_webm_spec(self):
|
||||
s = VideoSpec(codec="libvpx-vp9", container="webm", audio_codec="libopus")
|
||||
assert s.container == "webm"
|
||||
assert s.codec == "libvpx-vp9"
|
||||
|
||||
|
||||
class TestTransition:
|
||||
def test_defaults(self):
|
||||
t = Transition()
|
||||
assert t.duration_sec == 1.0
|
||||
assert t.type == "fade"
|
||||
|
||||
|
||||
class TestKenBurnsConfig:
|
||||
def test_defaults(self):
|
||||
k = KenBurnsConfig()
|
||||
assert k.zoom_start == 1.0
|
||||
assert k.zoom_end == 1.15
|
||||
assert k.duration_sec == 5.0
|
||||
|
||||
|
||||
class TestFFmpegPipelineInit:
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_verify_ffmpeg_passes(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg version 6.0")
|
||||
pipeline = FFmpegPipeline()
|
||||
assert pipeline.ffmpeg == "ffmpeg"
|
||||
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_verify_ffmpeg_fails(self, mock_run):
|
||||
mock_run.side_effect = FileNotFoundError()
|
||||
with pytest.raises(RuntimeError, match="FFmpeg not found"):
|
||||
FFmpegPipeline()
|
||||
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_custom_ffmpeg_path(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg")
|
||||
pipeline = FFmpegPipeline(ffmpeg_path="/usr/local/bin/ffmpeg")
|
||||
assert pipeline.ffmpeg == "/usr/local/bin/ffmpeg"
|
||||
|
||||
|
||||
class TestFFmpegPipelineProbe:
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_probe_returns_metadata(self, mock_run):
|
||||
mock_run.return_value = MagicMock(
|
||||
returncode=0,
|
||||
stdout='{"format": {"duration": "10.5"}, "streams": []}'
|
||||
)
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
pipeline.ffprobe = "ffprobe"
|
||||
result = pipeline.probe("/tmp/test.mp4")
|
||||
assert result["format"]["duration"] == "10.5"
|
||||
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_get_duration(self, mock_run):
|
||||
mock_run.return_value = MagicMock(
|
||||
returncode=0,
|
||||
stdout='{"format": {"duration": "42.3"}, "streams": []}'
|
||||
)
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
pipeline.ffprobe = "ffprobe"
|
||||
assert pipeline.get_duration("/tmp/test.mp4") == 42.3
|
||||
|
||||
|
||||
class TestFFmpegPipelineImagesToVideo:
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_empty_images_raises(self, mock_run):
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
pipeline.ffmpeg = "ffmpeg"
|
||||
with pytest.raises(ValueError, match="No images"):
|
||||
pipeline.images_to_video([], "/tmp/out.mp4")
|
||||
|
||||
|
||||
class TestComposeVideo:
|
||||
@patch("tools.video_pipeline.FFmpegPipeline")
|
||||
def test_compose_calls_pipeline(self, MockPipeline):
|
||||
mock_instance = MagicMock()
|
||||
mock_instance.compose.return_value = "/tmp/output.mp4"
|
||||
MockPipeline.return_value = mock_instance
|
||||
|
||||
result = compose_video(["img1.png", "img2.png"], output="/tmp/out.mp4")
|
||||
assert result == "/tmp/output.mp4"
|
||||
mock_instance.compose.assert_called_once()
|
||||
|
||||
|
||||
class TestRunFFmpeg:
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_success(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="")
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
pipeline._run_ffmpeg(["ffmpeg", "-version"])
|
||||
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_failure_raises(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=1, stderr="Error: invalid input")
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
with pytest.raises(RuntimeError, match="FFmpeg error"):
|
||||
pipeline._run_ffmpeg(["ffmpeg", "-bad"])
|
||||
|
||||
@patch("tools.video_pipeline.subprocess.run")
|
||||
def test_timeout_raises(self, mock_run):
|
||||
import subprocess
|
||||
mock_run.side_effect = subprocess.TimeoutExpired(cmd="ffmpeg", timeout=600)
|
||||
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
|
||||
with pytest.raises(subprocess.TimeoutExpired):
|
||||
pipeline._run_ffmpeg(["ffmpeg", "-slow"])
|
||||
@@ -133,183 +133,6 @@ DANGEROUS_PATTERNS = [
|
||||
]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Approval Tier System (Issue #670)
|
||||
# =========================================================================
|
||||
|
||||
from enum import IntEnum
|
||||
import time
|
||||
|
||||
class ApprovalTier(IntEnum):
|
||||
"""Safety tiers for command approval.
|
||||
|
||||
Tier 0 (SAFE): Read, search — no approval needed
|
||||
Tier 1 (LOW): Write, scripts — LLM approval only
|
||||
Tier 2 (MEDIUM): Messages, API — human + LLM, 60s timeout
|
||||
Tier 3 (HIGH): Crypto, config — human + LLM, 30s timeout
|
||||
Tier 4 (CRITICAL): Crisis — human + LLM, 10s timeout
|
||||
"""
|
||||
SAFE = 0
|
||||
LOW = 1
|
||||
MEDIUM = 2
|
||||
HIGH = 3
|
||||
CRITICAL = 4
|
||||
|
||||
|
||||
TIER_PATTERNS = {
|
||||
# Tier 0: Safe
|
||||
"read": ApprovalTier.SAFE, "search": ApprovalTier.SAFE, "list": ApprovalTier.SAFE,
|
||||
"view": ApprovalTier.SAFE, "cat": ApprovalTier.SAFE, "grep": ApprovalTier.SAFE,
|
||||
# Tier 1: Low
|
||||
"write": ApprovalTier.LOW, "create": ApprovalTier.LOW, "edit": ApprovalTier.LOW,
|
||||
"patch": ApprovalTier.LOW, "copy": ApprovalTier.LOW, "mkdir": ApprovalTier.LOW,
|
||||
"script": ApprovalTier.LOW, "execute": ApprovalTier.LOW, "run": ApprovalTier.LOW,
|
||||
# Tier 2: Medium
|
||||
"send_message": ApprovalTier.MEDIUM, "message": ApprovalTier.MEDIUM,
|
||||
"email": ApprovalTier.MEDIUM, "api": ApprovalTier.MEDIUM, "post": ApprovalTier.MEDIUM,
|
||||
"telegram": ApprovalTier.MEDIUM, "discord": ApprovalTier.MEDIUM,
|
||||
# Tier 3: High
|
||||
"crypto": ApprovalTier.HIGH, "bitcoin": ApprovalTier.HIGH, "wallet": ApprovalTier.HIGH,
|
||||
"key": ApprovalTier.HIGH, "secret": ApprovalTier.HIGH, "config": ApprovalTier.HIGH,
|
||||
"deploy": ApprovalTier.HIGH, "install": ApprovalTier.HIGH, "systemctl": ApprovalTier.HIGH,
|
||||
# Tier 4: Critical
|
||||
"delete": ApprovalTier.CRITICAL, "remove": ApprovalTier.CRITICAL, "rm": ApprovalTier.CRITICAL,
|
||||
"format": ApprovalTier.CRITICAL, "kill": ApprovalTier.CRITICAL, "shutdown": ApprovalTier.CRITICAL,
|
||||
"crisis": ApprovalTier.CRITICAL, "suicide": ApprovalTier.CRITICAL,
|
||||
}
|
||||
|
||||
TIER_TIMEOUTS = {
|
||||
ApprovalTier.SAFE: 0, ApprovalTier.LOW: 0, ApprovalTier.MEDIUM: 60,
|
||||
ApprovalTier.HIGH: 30, ApprovalTier.CRITICAL: 10,
|
||||
}
|
||||
|
||||
TIER_APPROVERS = {
|
||||
ApprovalTier.SAFE: (), ApprovalTier.LOW: ("llm",),
|
||||
ApprovalTier.MEDIUM: ("human", "llm"), ApprovalTier.HIGH: ("human", "llm"),
|
||||
ApprovalTier.CRITICAL: ("human", "llm"),
|
||||
}
|
||||
|
||||
|
||||
def detect_tier(command, action="", context=None):
|
||||
"""Detect approval tier for a command or action."""
|
||||
# Crisis keywords always CRITICAL
|
||||
crisis_keywords = ["988", "suicide", "self-harm", "crisis", "emergency"]
|
||||
for kw in crisis_keywords:
|
||||
if kw in command.lower():
|
||||
return ApprovalTier.CRITICAL
|
||||
|
||||
# Check action type
|
||||
if action and action.lower() in TIER_PATTERNS:
|
||||
return TIER_PATTERNS[action.lower()]
|
||||
|
||||
# Check command for keywords
|
||||
cmd_lower = command.lower()
|
||||
best_tier = ApprovalTier.SAFE
|
||||
for keyword, tier in TIER_PATTERNS.items():
|
||||
if keyword in cmd_lower and tier > best_tier:
|
||||
best_tier = tier
|
||||
|
||||
# Check dangerous patterns
|
||||
is_dangerous, _, description = detect_dangerous_command(command)
|
||||
if is_dangerous:
|
||||
desc_lower = description.lower()
|
||||
if any(k in desc_lower for k in ["delete", "remove", "format", "drop", "kill"]):
|
||||
return ApprovalTier.CRITICAL
|
||||
elif any(k in desc_lower for k in ["chmod", "chown", "systemctl", "config"]):
|
||||
return max(best_tier, ApprovalTier.HIGH)
|
||||
else:
|
||||
return max(best_tier, ApprovalTier.MEDIUM)
|
||||
|
||||
return best_tier
|
||||
|
||||
|
||||
def get_tier_timeout(tier):
|
||||
return TIER_TIMEOUTS.get(tier, 60)
|
||||
|
||||
def get_tier_approvers(tier):
|
||||
return TIER_APPROVERS.get(tier, ("human", "llm"))
|
||||
|
||||
def requires_human_approval(tier):
|
||||
return "human" in get_tier_approvers(tier)
|
||||
|
||||
def is_crisis_bypass(command):
|
||||
"""Check if command qualifies for crisis bypass (988 Lifeline)."""
|
||||
indicators = ["988", "suicide prevention", "crisis hotline", "lifeline", "emergency help"]
|
||||
cmd_lower = command.lower()
|
||||
return any(i in cmd_lower for i in indicators)
|
||||
|
||||
|
||||
class TieredApproval:
|
||||
"""Tiered approval handler."""
|
||||
|
||||
def __init__(self):
|
||||
self._pending = {}
|
||||
self._timeouts = {}
|
||||
|
||||
def request_approval(self, session_key, command, action="", context=None):
|
||||
"""Request approval based on tier. Returns approval dict."""
|
||||
tier = detect_tier(command, action, context)
|
||||
timeout = get_tier_timeout(tier)
|
||||
approvers = get_tier_approvers(tier)
|
||||
|
||||
# Crisis bypass
|
||||
if tier == ApprovalTier.CRITICAL and is_crisis_bypass(command):
|
||||
return {"approved": True, "tier": tier, "reason": "crisis_bypass", "timeout": 0, "approvers": ()}
|
||||
|
||||
# Safe/Low auto-approve
|
||||
if tier <= ApprovalTier.LOW:
|
||||
return {"approved": True, "tier": tier, "reason": "auto_approve", "timeout": 0, "approvers": approvers}
|
||||
|
||||
# Higher tiers need approval
|
||||
import uuid
|
||||
approval_id = f"{session_key}_{uuid.uuid4().hex[:8]}"
|
||||
self._pending[approval_id] = {
|
||||
"session_key": session_key, "command": command, "action": action,
|
||||
"tier": tier, "timeout": timeout, "approvers": approvers, "created_at": time.time(),
|
||||
}
|
||||
if timeout > 0:
|
||||
self._timeouts[approval_id] = time.time() + timeout
|
||||
|
||||
return {
|
||||
"approved": False, "tier": tier, "approval_id": approval_id,
|
||||
"timeout": timeout, "approvers": approvers,
|
||||
"requires_human": requires_human_approval(tier),
|
||||
}
|
||||
|
||||
def resolve_approval(self, approval_id, approved, approver="human"):
|
||||
"""Resolve a pending approval."""
|
||||
if approval_id not in self._pending:
|
||||
return False
|
||||
self._pending.pop(approval_id)
|
||||
self._timeouts.pop(approval_id, None)
|
||||
return approved
|
||||
|
||||
def check_timeouts(self):
|
||||
"""Check for timed-out approvals and auto-escalate."""
|
||||
now = time.time()
|
||||
timed_out = []
|
||||
for aid, timeout_at in list(self._timeouts.items()):
|
||||
if now > timeout_at:
|
||||
timed_out.append(aid)
|
||||
if aid in self._pending:
|
||||
pending = self._pending[aid]
|
||||
current_tier = pending["tier"]
|
||||
if current_tier < ApprovalTier.CRITICAL:
|
||||
pending["tier"] = ApprovalTier(current_tier + 1)
|
||||
pending["timeout"] = get_tier_timeout(pending["tier"])
|
||||
self._timeouts[aid] = now + pending["timeout"]
|
||||
else:
|
||||
self._pending.pop(aid, None)
|
||||
self._timeouts.pop(aid, None)
|
||||
return timed_out
|
||||
|
||||
|
||||
_tiered_approval = TieredApproval()
|
||||
|
||||
def get_tiered_approval():
|
||||
return _tiered_approval
|
||||
|
||||
|
||||
def _legacy_pattern_key(pattern: str) -> str:
|
||||
"""Reproduce the old regex-derived approval key for backwards compatibility."""
|
||||
return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
|
||||
|
||||
571
tools/video_pipeline.py
Normal file
571
tools/video_pipeline.py
Normal file
@@ -0,0 +1,571 @@
|
||||
"""FFmpeg Video Composition Pipeline — Shared Infrastructure (Issue #643).
|
||||
|
||||
Used by both Video Forge (playground #53) and LPM 1.0 (#641).
|
||||
|
||||
Provides:
|
||||
- Image sequence → video with crossfade transitions
|
||||
- Audio-video sync with beat alignment
|
||||
- Ken Burns effect (slow pan/zoom on stills)
|
||||
- H.264/WebM encoding optimized for web
|
||||
- Streaming output support
|
||||
|
||||
Dependencies: ffmpeg (system binary), optional ffmpeg-python bindings.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VideoSpec:
|
||||
"""Video output specification."""
|
||||
width: int = 1920
|
||||
height: int = 1080
|
||||
fps: int = 30
|
||||
codec: str = "libx264" # libx264, libvpx-vp9
|
||||
container: str = "mp4" # mp4, webm
|
||||
crf: int = 23 # quality (lower = better, 18-28 typical)
|
||||
preset: str = "medium" # ultrafast, fast, medium, slow
|
||||
audio_codec: str = "aac"
|
||||
audio_bitrate: str = "192k"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Transition:
|
||||
"""Crossfade transition between clips."""
|
||||
duration_sec: float = 1.0
|
||||
type: str = "fade" # fade, dissolve, slideleft, slideright
|
||||
|
||||
|
||||
@dataclass
|
||||
class KenBurnsConfig:
|
||||
"""Ken Burns pan/zoom effect configuration."""
|
||||
zoom_start: float = 1.0
|
||||
zoom_end: float = 1.15
|
||||
pan_x: float = 0.0 # -1 to 1
|
||||
pan_y: float = 0.0 # -1 to 1
|
||||
duration_sec: float = 5.0
|
||||
|
||||
|
||||
class FFmpegPipeline:
|
||||
"""FFmpeg wrapper for video composition.
|
||||
|
||||
Provides a Python interface to FFmpeg's complex filter graphs
|
||||
for composing videos from images, clips, and audio.
|
||||
"""
|
||||
|
||||
def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):
|
||||
self.ffmpeg = ffmpeg_path
|
||||
self.ffprobe = ffprobe_path
|
||||
self._verify_ffmpeg()
|
||||
|
||||
def _verify_ffmpeg(self):
|
||||
"""Verify FFmpeg is available."""
|
||||
try:
|
||||
r = subprocess.run([self.ffmpeg, "-version"], capture_output=True, text=True, timeout=5)
|
||||
if r.returncode != 0:
|
||||
raise RuntimeError(f"FFmpeg returned {r.returncode}")
|
||||
logger.info("FFmpeg verified: %s", r.stdout.split('\n')[0])
|
||||
except FileNotFoundError:
|
||||
raise RuntimeError(f"FFmpeg not found at '{self.ffmpeg}'. Install: brew install ffmpeg")
|
||||
|
||||
def probe(self, path: str) -> Dict[str, Any]:
|
||||
"""Probe a media file for metadata."""
|
||||
cmd = [
|
||||
self.ffprobe, "-v", "quiet", "-print_format", "json",
|
||||
"-show_format", "-show_streams", path
|
||||
]
|
||||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
if r.returncode != 0:
|
||||
raise RuntimeError(f"ffprobe failed: {r.stderr}")
|
||||
return json.loads(r.stdout)
|
||||
|
||||
def get_duration(self, path: str) -> float:
|
||||
"""Get duration of a media file in seconds."""
|
||||
info = self.probe(path)
|
||||
return float(info["format"]["duration"])
|
||||
|
||||
# =================================================================
|
||||
# Image Sequence → Video
|
||||
# =================================================================
|
||||
|
||||
def images_to_video(
|
||||
self,
|
||||
image_paths: List[str],
|
||||
output_path: str,
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
transition: Optional[Transition] = None,
|
||||
image_duration: float = 3.0,
|
||||
) -> str:
|
||||
"""Convert a list of images to a video with optional crossfade transitions.
|
||||
|
||||
Args:
|
||||
image_paths: List of image file paths
|
||||
output_path: Output video path
|
||||
spec: Video output specification
|
||||
transition: Crossfade config (None = hard cuts)
|
||||
image_duration: Seconds per image
|
||||
|
||||
Returns:
|
||||
Path to output video
|
||||
"""
|
||||
if not image_paths:
|
||||
raise ValueError("No images provided")
|
||||
|
||||
if transition and len(image_paths) > 1:
|
||||
return self._images_with_crossfade(
|
||||
image_paths, output_path, spec, transition, image_duration
|
||||
)
|
||||
else:
|
||||
return self._images_hard_cut(
|
||||
image_paths, output_path, spec, image_duration
|
||||
)
|
||||
|
||||
def _images_hard_cut(
|
||||
self,
|
||||
image_paths: List[str],
|
||||
output_path: str,
|
||||
spec: VideoSpec,
|
||||
duration: float,
|
||||
) -> str:
|
||||
"""Simple image sequence with hard cuts."""
|
||||
# Create concat file
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
||||
concat_path = f.name
|
||||
for img in image_paths:
|
||||
f.write(f"file '{os.path.abspath(img)}'\n")
|
||||
f.write(f"duration {duration}\n")
|
||||
# Last image needs repeating for ffmpeg concat
|
||||
f.write(f"file '{os.path.abspath(image_paths[-1])}'\n")
|
||||
|
||||
try:
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
|
||||
"-i", concat_path,
|
||||
"-vf", f"scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2",
|
||||
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||
"-pix_fmt", "yuv420p", "-r", str(spec.fps),
|
||||
output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
finally:
|
||||
os.unlink(concat_path)
|
||||
|
||||
def _images_with_crossfade(
|
||||
self,
|
||||
image_paths: List[str],
|
||||
output_path: str,
|
||||
spec: VideoSpec,
|
||||
transition: Transition,
|
||||
duration: float,
|
||||
) -> str:
|
||||
"""Image sequence with xfade crossfade transitions."""
|
||||
n = len(image_paths)
|
||||
fade_dur = transition.duration_sec
|
||||
clip_dur = duration
|
||||
|
||||
# Build filter complex
|
||||
inputs = []
|
||||
for img in image_paths:
|
||||
inputs.extend(["-loop", "1", "-t", str(clip_dur), "-i", img])
|
||||
|
||||
# Scale all inputs
|
||||
filter_parts = []
|
||||
for i in range(n):
|
||||
filter_parts.append(
|
||||
f"[{i}:v]scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,"
|
||||
f"pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={spec.fps}[v{i}]"
|
||||
)
|
||||
|
||||
# Chain xfade transitions
|
||||
if n == 1:
|
||||
filter_parts.append(f"[v0]copy[outv]")
|
||||
else:
|
||||
offset = clip_dur - fade_dur
|
||||
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
|
||||
for i in range(1, n - 1):
|
||||
offset = (i + 1) * clip_dur - (i + 1) * fade_dur
|
||||
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
|
||||
filter_parts.append(f"[xf{n-2}]copy[outv]")
|
||||
|
||||
filter_complex = ";\n".join(filter_parts)
|
||||
|
||||
cmd = [
|
||||
self.ffmpeg, "-y",
|
||||
*inputs,
|
||||
"-filter_complex", filter_complex,
|
||||
"-map", "[outv]",
|
||||
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||
"-pix_fmt", "yuv420p",
|
||||
output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
|
||||
# =================================================================
|
||||
# Ken Burns Effect
|
||||
# =================================================================
|
||||
|
||||
def apply_ken_burns(
|
||||
self,
|
||||
image_path: str,
|
||||
output_path: str,
|
||||
config: KenBurnsConfig = KenBurnsConfig(),
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
) -> str:
|
||||
"""Apply Ken Burns (pan + zoom) effect to a still image.
|
||||
|
||||
Creates a video from a single image with slow zoom and pan motion.
|
||||
"""
|
||||
# zoompan filter: z=zoom, x/y=pan position
|
||||
# z goes from zoom_start to zoom_end over the duration
|
||||
total_frames = int(config.duration_sec * spec.fps)
|
||||
|
||||
zoom_expr = f"min({config.zoom_start}+on*({config.zoom_zoom_end}-{config.zoom_start})/{total_frames},{config.zoom_end})"
|
||||
|
||||
# Pan expression: move from center ± pan offset
|
||||
x_expr = f"(iw-iw/zoom)/2+{config.pan_x}*iw*0.1*(on/{total_frames})"
|
||||
y_expr = f"(ih-ih/zoom)/2+{config.pan_y}*ih*0.1*(on/{total_frames})"
|
||||
|
||||
filter_str = (
|
||||
f"zoompan=z='min(zoom+0.001,{config.zoom_end})'"
|
||||
f":x='iw/2-(iw/zoom/2)+{config.pan_x}*10'"
|
||||
f":y='ih/2-(ih/zoom/2)+{config.pan_y}*10'"
|
||||
f":d={total_frames}:s={spec.width}x{spec.height}:fps={spec.fps}"
|
||||
)
|
||||
|
||||
cmd = [
|
||||
self.ffmpeg, "-y",
|
||||
"-loop", "1", "-i", image_path,
|
||||
"-vf", filter_str,
|
||||
"-t", str(config.duration_sec),
|
||||
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||
"-pix_fmt", "yuv420p",
|
||||
output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
|
||||
# =================================================================
|
||||
# Audio-Video Sync
|
||||
# =================================================================
|
||||
|
||||
def mux_audio_video(
|
||||
self,
|
||||
video_path: str,
|
||||
audio_path: str,
|
||||
output_path: str,
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
offset_sec: float = 0.0,
|
||||
) -> str:
|
||||
"""Mux audio and video together with optional offset.
|
||||
|
||||
Args:
|
||||
video_path: Video file path
|
||||
audio_path: Audio file path
|
||||
output_path: Output path
|
||||
spec: Output spec
|
||||
offset_sec: Audio offset in seconds (positive = delay audio)
|
||||
"""
|
||||
cmd = [
|
||||
self.ffmpeg, "-y",
|
||||
"-i", video_path,
|
||||
"-i", audio_path,
|
||||
]
|
||||
|
||||
if offset_sec > 0:
|
||||
cmd.extend(["-itsoffset", str(offset_sec), "-i", audio_path])
|
||||
cmd.extend(["-map", "0:v", "-map", "2:a"])
|
||||
elif offset_sec < 0:
|
||||
cmd.extend(["-itsoffset", str(-offset_sec), "-i", video_path])
|
||||
cmd.extend(["-map", "2:v", "-map", "1:a"])
|
||||
else:
|
||||
cmd.extend(["-map", "0:v", "-map", "1:a"])
|
||||
|
||||
cmd.extend([
|
||||
"-c:v", "copy",
|
||||
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
|
||||
"-shortest",
|
||||
output_path
|
||||
])
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
|
||||
def align_to_beats(
|
||||
self,
|
||||
video_path: str,
|
||||
audio_path: str,
|
||||
beat_times: List[float],
|
||||
output_path: str,
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
) -> str:
|
||||
"""Align video cuts to audio beat times.
|
||||
|
||||
Splits video at beat timestamps and reassembles with audio sync.
|
||||
"""
|
||||
if not beat_times:
|
||||
return self.mux_audio_video(video_path, audio_path, output_path, spec)
|
||||
|
||||
video_dur = self.get_duration(video_path)
|
||||
|
||||
# Build segment list based on beat times
|
||||
segments = []
|
||||
prev = 0.0
|
||||
for beat in beat_times:
|
||||
if beat > prev and beat <= video_dur:
|
||||
segments.append((prev, beat))
|
||||
prev = beat
|
||||
if prev < video_dur:
|
||||
segments.append((prev, video_dur))
|
||||
|
||||
# Extract segments
|
||||
segment_paths = []
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
for i, (start, end) in enumerate(segments):
|
||||
seg_path = os.path.join(tmpdir, f"seg_{i:04d}.ts")
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-ss", str(start), "-i", video_path,
|
||||
"-t", str(end - start), "-c", "copy", seg_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
segment_paths.append(seg_path)
|
||||
|
||||
# Concat segments
|
||||
concat_path = os.path.join(tmpdir, "concat.txt")
|
||||
with open(concat_path, "w") as f:
|
||||
for sp in segment_paths:
|
||||
f.write(f"file '{sp}'\n")
|
||||
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
|
||||
"-i", concat_path, "-c", "copy",
|
||||
os.path.join(tmpdir, "video_aligned.mp4")
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
|
||||
return self.mux_audio_video(
|
||||
os.path.join(tmpdir, "video_aligned.mp4"),
|
||||
audio_path, output_path, spec
|
||||
)
|
||||
|
||||
# =================================================================
|
||||
# Encoding
|
||||
# =================================================================
|
||||
|
||||
def encode_web(
|
||||
self,
|
||||
input_path: str,
|
||||
output_path: str,
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
) -> str:
|
||||
"""Encode for web playback (H.264 MP4 or WebM)."""
|
||||
if spec.container == "webm":
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-i", input_path,
|
||||
"-c:v", "libvpx-vp9", "-crf", str(spec.crf), "-b:v", "0",
|
||||
"-c:a", "libopus", "-b:a", spec.audio_bitrate,
|
||||
"-pix_fmt", "yuv420p",
|
||||
output_path
|
||||
]
|
||||
else:
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-i", input_path,
|
||||
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
|
||||
"-movflags", "+faststart", # Progressive download
|
||||
"-pix_fmt", "yuv420p",
|
||||
output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
|
||||
# =================================================================
|
||||
# Full Composition Pipeline
|
||||
# =================================================================
|
||||
|
||||
def compose(
|
||||
self,
|
||||
images: List[str],
|
||||
audio: Optional[str] = None,
|
||||
output_path: str = "output.mp4",
|
||||
spec: VideoSpec = VideoSpec(),
|
||||
transition: Optional[Transition] = None,
|
||||
ken_burns: Optional[KenBurnsConfig] = None,
|
||||
beat_times: Optional[List[float]] = None,
|
||||
) -> str:
|
||||
"""Full composition pipeline: images → video → mux audio → encode.
|
||||
|
||||
This is the main entry point. Combines all pipeline components
|
||||
into a single call.
|
||||
|
||||
Args:
|
||||
images: List of image paths
|
||||
audio: Optional audio file path
|
||||
output_path: Final output path
|
||||
spec: Video specification
|
||||
transition: Crossfade config
|
||||
ken_burns: Ken Burns config (applied to each image before composition)
|
||||
beat_times: Beat timestamps for audio-visual sync
|
||||
|
||||
Returns:
|
||||
Path to final video
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Apply Ken Burns to each image if configured
|
||||
processed_images = images
|
||||
if ken_burns:
|
||||
processed_images = []
|
||||
for i, img in enumerate(images):
|
||||
kb_path = os.path.join(tmpdir, f"kb_{i:04d}.mp4")
|
||||
self.apply_ken_burns(img, kb_path, ken_burns, spec)
|
||||
processed_images.append(kb_path)
|
||||
|
||||
# Compose images into video
|
||||
video_path = os.path.join(tmpdir, "composed.mp4")
|
||||
if processed_images and all(p.endswith('.mp4') for p in processed_images):
|
||||
# All Ken Burns clips — concat them
|
||||
self._concat_videos(processed_images, video_path, transition, spec)
|
||||
else:
|
||||
self.images_to_video(processed_images, video_path, spec, transition)
|
||||
|
||||
# Mux audio if provided
|
||||
if audio:
|
||||
if beat_times:
|
||||
return self.align_to_beats(video_path, audio, beat_times, output_path, spec)
|
||||
else:
|
||||
return self.mux_audio_video(video_path, audio, output_path, spec)
|
||||
else:
|
||||
# Just copy the video
|
||||
import shutil
|
||||
shutil.copy2(video_path, output_path)
|
||||
return output_path
|
||||
|
||||
def _concat_videos(
|
||||
self,
|
||||
video_paths: List[str],
|
||||
output_path: str,
|
||||
transition: Optional[Transition],
|
||||
spec: VideoSpec,
|
||||
) -> str:
|
||||
"""Concatenate video files with optional transitions."""
|
||||
if not transition:
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
||||
concat_path = f.name
|
||||
for vp in video_paths:
|
||||
f.write(f"file '{os.path.abspath(vp)}'\n")
|
||||
try:
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
|
||||
"-i", concat_path, "-c", "copy", output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
finally:
|
||||
os.unlink(concat_path)
|
||||
else:
|
||||
# Use xfade for video clips too
|
||||
n = len(video_paths)
|
||||
inputs = []
|
||||
for vp in video_paths:
|
||||
inputs.extend(["-i", vp])
|
||||
|
||||
filter_parts = []
|
||||
for i in range(n):
|
||||
filter_parts.append(f"[{i}:v]fps={spec.fps},scale={spec.width}:{spec.height}[v{i}]")
|
||||
|
||||
fade_dur = transition.duration_sec
|
||||
# Calculate offsets from durations
|
||||
if n >= 2:
|
||||
dur0 = self.get_duration(video_paths[0])
|
||||
offset = dur0 - fade_dur
|
||||
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
|
||||
for i in range(1, n - 1):
|
||||
duri = self.get_duration(video_paths[i])
|
||||
offset += duri - fade_dur
|
||||
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
|
||||
filter_parts.append(f"[xf{n-2}]copy[outv]")
|
||||
else:
|
||||
filter_parts.append(f"[v0]copy[outv]")
|
||||
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", *inputs,
|
||||
"-filter_complex", ";".join(filter_parts),
|
||||
"-map", "[outv]",
|
||||
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
|
||||
output_path
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return output_path
|
||||
|
||||
# =================================================================
|
||||
# Streaming
|
||||
# =================================================================
|
||||
|
||||
def create_hls_stream(
|
||||
self,
|
||||
input_path: str,
|
||||
output_dir: str,
|
||||
segment_duration: int = 4,
|
||||
) -> str:
|
||||
"""Create HLS streaming output for progressive playback.
|
||||
|
||||
Returns:
|
||||
Path to the .m3u8 playlist file
|
||||
"""
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
playlist = os.path.join(output_dir, "playlist.m3u8")
|
||||
|
||||
cmd = [
|
||||
self.ffmpeg, "-y", "-i", input_path,
|
||||
"-c:v", "libx264", "-crf", "23", "-preset", "fast",
|
||||
"-c:a", "aac", "-b:a", "128k",
|
||||
"-f", "hls", "-hls_time", str(segment_duration),
|
||||
"-hls_list_size", "0", "-hls_segment_filename",
|
||||
os.path.join(output_dir, "seg_%03d.ts"),
|
||||
playlist
|
||||
]
|
||||
self._run_ffmpeg(cmd)
|
||||
return playlist
|
||||
|
||||
# =================================================================
|
||||
# Internal
|
||||
# =================================================================
|
||||
|
||||
def _run_ffmpeg(self, cmd: List[str], timeout: int = 600):
|
||||
"""Run an FFmpeg command with logging and error handling."""
|
||||
logger.info("FFmpeg: %s", " ".join(cmd[:8]) + "...")
|
||||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
||||
if r.returncode != 0:
|
||||
logger.error("FFmpeg failed: %s", r.stderr[-500:])
|
||||
raise RuntimeError(f"FFmpeg error (exit {r.returncode}): {r.stderr[-500:]}")
|
||||
return r.stdout
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Convenience functions
|
||||
# =========================================================================
|
||||
|
||||
def compose_video(
|
||||
images: List[str],
|
||||
audio: Optional[str] = None,
|
||||
output: str = "output.mp4",
|
||||
**kwargs,
|
||||
) -> str:
|
||||
"""One-call video composition from images and optional audio.
|
||||
|
||||
Convenience wrapper around FFmpegPipeline.compose().
|
||||
"""
|
||||
pipeline = FFmpegPipeline()
|
||||
spec = kwargs.pop("spec", VideoSpec())
|
||||
transition = kwargs.pop("transition", None)
|
||||
ken_burns = kwargs.pop("ken_burns", None)
|
||||
beat_times = kwargs.pop("beat_times", None)
|
||||
return pipeline.compose(images, audio, output, spec, transition, ken_burns, beat_times)
|
||||
Reference in New Issue
Block a user