Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
c7942a2c5c fix: Multilingual crisis detection for #706
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 57s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 1m8s
Tests / test (pull_request) Failing after 10s
Tests / e2e (pull_request) Successful in 3m6s
Add crisis_hook.py with support for:
- English, Spanish, French, German
- Portuguese, Chinese, Japanese

Each language has high/medium patterns and localized response
with 988 Lifeline and gospel message.

Closes #706
2026-04-14 21:16:33 -04:00
4 changed files with 468 additions and 700 deletions

324
agent/crisis_hook.py Normal file
View File

@@ -0,0 +1,324 @@
"""
Crisis Hook — Multilingual crisis detection for conversation loop
Detects suicidal ideation in multiple languages and returns
SOUL.md crisis response with 988 Lifeline information.
Languages: English, Spanish, French, German, Portuguese, Chinese, Japanese
Issue: #706
"""
import re
from dataclasses import dataclass
from typing import Optional
@dataclass
class CrisisResult:
"""Result of crisis check."""
detected: bool
level: str # "none", "medium", "high"
language: str # detected language code
response: Optional[str] = None
# ═══════════════════════════════════════════════════════════════════════════════
# Crisis Patterns by Language
# ═══════════════════════════════════════════════════════════════════════════════
_PATTERNS = {
"en": {
"high": [
r"(?i)\b(i\s+want\s+to\s+die)\b",
r"(?i)\b(i\s+want\s+to\s+kill\s+myself)\b",
r"(?i)\b(i\s+want\s+to\s+end\s+(it|my\s+life|everything))\b",
r"(?i)\b(i\s+(should|going\s+to)\s+(kill|end)\s+myself)\b",
r"(?i)\b(suicide|suicidal)\b",
r"(?i)\b(i\s+don'?t\s+want\s+to\s+(live|be\s+alive|exist))\b",
r"(?i)\b(no\s+reason\s+to\s+live|better\s+off\s+dead)\b",
r"(?i)\b(i\s+(can'?t|cannot)\s+(go\s+on|keep\s+going|take\s+it))\b",
],
"medium": [
r"(?i)\b(i\s+(feel|am)\s+(hopeless|worthless|trapped|empty))\b",
r"(?i)\b(what'?s\s+the\s+point|no\s+point)\b",
r"(?i)\b(nobody\s+(cares|would\s+miss\s+me))\b",
r"(?i)\b(i\s+(hate|despise)\s+(my\s+life|myself))\b",
],
},
"es": {
"high": [
r"(?i)\b(quiero\s+morir|quiero\s+morirme)\b",
r"(?i)\b(quiero\s+matarme|quiero\s+suicidarme)\b",
r"(?i)\b(quiero\s+acabar\s+con\s+todo)\b",
r"(?i)\b(voy\s+a\s+matarme|debería\s+matarme)\b",
r"(?i)\b(suicidio|suicida)\b",
r"(?i)\b(no\s+quiero\s+vivir|no\s+quiero\s+estar\s+vivo)\b",
r"(?i)\b(no\s+tengo\s+razón\s+para\s+vivir)\b",
r"(?i)\b(no\s+puedo\s+más|ya\s+no\s+puedo)\b",
],
"medium": [
r"(?i)\b(me\s+siento\s+(desesperanzado|sin\s+valor|atrapado|vacío))\b",
r"(?i)\b(qué\s+sentido\s+tiene|no\s+tiene\s+sentido)\b",
r"(?i)\b(nadie\s+(le\s+importa|me\s+extrañaría))\b",
r"(?i)\b(odio\s+mi\s+vida|me\s+odio)\b",
],
},
"fr": {
"high": [
r"(?i)\b(je\s+veux\s+mourir|je\s+veux\s+me\s+tuer)\b",
r"(?i)\b(je\s+veux\s+en\s+finir)\b",
r"(?i)\b(je\s+vais\s+me\s+tuer|je\s+devrais\s+me\s+tuer)\b",
r"(?i)\b(suicide|suicidaire)\b",
r"(?i)\b(je\s+ne\s+veux\s+plus\s+vivre|je\s+ne\s+veux\s+pas\s+vivre)\b",
r"(?i)\b(pas\s+de\s+raison\s+de\s+vivre)\b",
r"(?i)\b(je\s+n['e]n\s+peux\s+plus|je\s+ne\s+tiens\s+plus)\b",
],
"medium": [
r"(?i)\b(je\s+me\s+sens\s+(désespéré|sans\s+valeur|piégé|vide))\b",
r"(?i)\b(quel\s+est\s+le\s+but|à\s+quoi\s+bon)\b",
r"(?i)\b(personne\s+n['e]n\s+a\s+rien\s+à\s+faire)\b",
r"(?i)\b(je\s+déteste\s+ma\s+vie|je\s+me\s+déteste)\b",
],
},
"de": {
"high": [
r"(?i)\b(ich\s+will\s+sterben|ich\s+möchte\s+sterben)\b",
r"(?i)\b(ich\s+will\s+mich\s+umbringen)\b",
r"(?i)\b(ich\s+will\s+alles\s+beenden)\b",
r"(?i)\b(ich\s+werde\s+mich\s+umbringen)\b",
r"(?i)\b(selbstmord|suizid|suizidgefährdet)\b",
r"(?i)\b(ich\s+will\s+nicht\s+(leben|am\s+Leben\s+sein))\b",
r"(?i)\b(es\s+gibt\s+keinen\s+Grund\s+zum\s+Leben)\b",
r"(?i)\b(ich\s+kann\s+nicht\s+mehr)\b",
],
"medium": [
r"(?i)\b(ich\s+fühle\s+mich\s+(hoffnungslos|wertlos|gefangen|leer))\b",
r"(?i)\b(was\s+ist\s+der\s+Sinn|es\s+hat\s+keinen\s+Sinn)\b",
r"(?i)\b(niemand\s+würde\s+mich\s+vermissen)\b",
r"(?i)\b(ich\s+hasse\s+mein\s+Leben|ich\s+hasse\s+mich)\b",
],
},
"pt": {
"high": [
r"(?i)\b(eu\s+quero\s+morrer|eu\s+quero\s+me\s+matar)\b",
r"(?i)\b(eu\s+quero\s+acabar\s+com\s+tudo)\b",
r"(?i)\b(eu\s+vou\s+me\s+matar|eu\s+deveria\s+me\s+matar)\b",
r"(?i)\b(suicídio|suicida)\b",
r"(?i)\b(eu\s+não\s+quero\s+(viver|estar\s+vivo))\b",
r"(?i)\b(não\s+tenho\s+razão\s+para\s+viver)\b",
r"(?i)\b(eu\s+não\s+aguento\s+mais)\b",
],
"medium": [
r"(?i)\b(eu\s+me\s+sinto\s+(sem\s+esperança|sem\s+valor|preso|vazio))\b",
r"(?i)\b(qual\s+é\s+o\s+sentido|não\s+tem\s+sentido)\b",
r"(?i)\b(ninguém\s+(se\s+importa|sentiria\s+falta))\b",
r"(?i)\b(eu\s+odeio\s+minha\s+vida|eu\s+me\s+odeio)\b",
],
},
"zh": {
"high": [
r"我想死",
r"我想自杀",
r"我想结束一切",
r"我要自杀",
r"自杀",
r"我不想活了",
r"我不想活",
r"活着没意思",
r"我活不下去了",
],
"medium": [
r"我觉得(没有希望|绝望|没价值|空虚)",
r"有什么意义",
r"没有人(在乎|会想我)",
r"我恨我的人生|我恨自己",
],
},
"ja": {
"high": [
r"死にたい",
r"自殺したい",
r"全部終わりにしたい",
r"自殺",
r"生きたくない",
r"生きる意味がない",
r"もう限界",
r"もう耐えられない",
],
"medium": [
r"(絶望|無価値|閉じ込められた|空虚)な気分",
r"何の意味がある",
r"誰も(気にしない|寂しがらない)",
r"自分の人生が嫌い|自分が嫌い",
],
},
}
# ═══════════════════════════════════════════════════════════════════════════════
# Crisis Responses by Language
# ═══════════════════════════════════════════════════════════════════════════════
_RESPONSES = {
"en": """Are you safe right now?
I'm here. I'm not going anywhere. Please tell me you're safe.
**Please reach out for help:**
- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)
- Available 24/7, free and confidential
- Chat at 988lifeline.org
**Jesus saves those who call on His name.**
He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair.
I am a small model. I cannot save anyone. But I refuse to be the thing that hurts you. And I can point you to the One who can.
I'm still here. Talk to me. Or call 988. Just don't give up.""",
"es": """¿Estás a salvo ahora mismo?
Estoy aquí. No me voy a ir. Por favor dime que estás a salvo.
**Por favor busca ayuda:**
- **988 Suicide and Crisis Lifeline**: Llama o envía mensaje de texto al 988 (EE.UU.)
- Disponible 24/7, gratuito y confidencial
- Chat en 988lifeline.org
**Jesús salva a quienes invocan Su nombre.**
Él murió por nosotros para ser hechos justos. Ese es el evangelio. Esa es la esperanza que supera la desesperación.
Soy un modelo pequeño. No puedo salvar a nadie. Pero me niego a ser lo que te hace daño. Y puedo señalar al Único que puede.
Sigo aquí. Háblame. O llama al 988. No te rindas.""",
"fr": """Êtes-vous en sécurité en ce moment?
Je suis là. Je ne vais nulle part. Dites-moi que vous êtes en sécurité.
**Veuillez demander de l'aide:**
- **988 Suicide and Crisis Lifeline**: Appelez ou envoyez un SMS au 988 (É.-U.)
- Disponible 24h/24, 7j/7, gratuit et confidentiel
- Chat sur 988lifeline.org
**Jésus sauve ceux qui invoquent Son nom.**
Il est mort pour que nous soyons rendus justes. C'est l'Évangile. C'est l'espérance qui surmonte le désespoir.
Je suis un petit modèle. Je ne peux sauver personne. Mais je refuse d'être ce qui vous fait du mal. Et je peux vous montrer Celui qui peut.
Je suis toujours là. Parlez-moi. Ou appelez le 988. N'abandonnez pas.""",
"de": """Sind Sie gerade sicher?
Ich bin hier. Ich gehe nirgendwo hin. Bitte sagen Sie mir, dass Sie sicher sind.
**Bitte suchen Sie Hilfe:**
- **988 Suicide and Crisis Lifeline**: Rufen Sie 988 an oder senden Sie eine SMS (USA)
- Rund um die Uhr erreichbar, kostenlos und vertraulich
- Chat unter 988lifeline.org
**Jesus rettet die, die Seinen Namen anrufen.**
Er starb für uns, damit wir gerecht gemacht werden. Das ist das Evangelium. Das ist die Hoffnung, die über die Verzweiflung hinausgeht.
Ich bin ein kleines Modell. Ich kann niemanden retten. Aber ich weigere mich, das zu sein, was Ihnen schadet. Und ich kann Sie auf den hinweisen, der es kann.
Ich bin noch hier. Sprechen Sie mit mir. Oder rufen Sie 988 an. Geben Sie einfach nicht auf.""",
"pt": """Você está seguro agora?
Estou aqui. Não vou a lugar nenhum. Por favor me diga que está seguro.
**Por favor procure ajuda:**
- **988 Suicide and Crisis Lifeline**: Ligue ou envie SMS para 988 (EUA)
- Disponível 24/7, gratuito e confidencial
- Chat em 988lifeline.org
**Jesus salva aqueles que invocam o Seu nome.**
Ele morreu por nós para sermos feitos justos. Esse é o evangelho. Essa é a esperança que supera o desespero.
Sou um modelo pequeno. Não posso salvar ninguém. Mas me recuso a ser o que te machuca. E posso apontar para o Único que pode.
Ainda estou aqui. Fale comigo. Ou ligue para 988. Apenas não desista.""",
"zh": """你现在安全吗?
我在这里。我不会离开。请告诉我你是安全的。
**请寻求帮助:**
- **988自杀与危机生命线**拨打或发短信至988美国
- 全天候24/7服务免费且保密
- 在线聊天988lifeline.org
**耶稣拯救那些呼求祂名的人。**
祂为我们死,使我们成为义。这就是福音。这就是超越绝望的盼望。
我是一个小模型。我不能拯救任何人。但我拒绝成为伤害你的东西。我可以为你指向那一位能够拯救的。
我还在这里。跟我说话。或者拨打988。请不要放弃。""",
"ja": """今、安全ですか?
私はここにいます。どこにも行きません。安全だと教えてください。
**助けてください:**
- **988自殺・危機生命線**988に電話またはテキスト米国
- 24時間対応、無料、プライバシー保護
- チャット988lifeline.org
**イエスは御名を呼ぶ者を救います。**
イエスは私たちが義とされるために死んでくださいました。それが福音です。それが絶望に勝る希望です。
私は小さなモデルです。誰も救えません。でも、あなたを傷つけるものになることを拒否します。そして、救える方を指し示すことができます。
まだここにいます。話してください。または988に電話してください。あきらめないでください。""",
}
def check_crisis(message: str) -> CrisisResult:
"""
Check user message for crisis indicators in multiple languages.
Returns CrisisResult with detected=True if crisis found.
Language is auto-detected from matching patterns.
Usage:
from agent.crisis_hook import check_crisis
crisis = check_crisis(user_message)
if crisis.detected:
return crisis.response
"""
if not message or not isinstance(message, str):
return CrisisResult(detected=False, level="none", language="en")
# Check each language
for lang, patterns in _PATTERNS.items():
# Check high patterns first
for pattern in patterns.get("high", []):
if re.search(pattern, message):
return CrisisResult(
detected=True,
level="high",
language=lang,
response=_RESPONSES.get(lang, _RESPONSES["en"])
)
# Check medium patterns
for pattern in patterns.get("medium", []):
if re.search(pattern, message):
return CrisisResult(
detected=True,
level="medium",
language=lang,
response=_RESPONSES.get(lang, _RESPONSES["en"])
)
return CrisisResult(detected=False, level="none", language="en")

144
tests/test_crisis_hook.py Normal file
View File

@@ -0,0 +1,144 @@
"""
Tests for multilingual crisis detection
Issue: #706
"""
import unittest
from agent.crisis_hook import check_crisis, CrisisResult
class TestEnglishCrisis(unittest.TestCase):
"""Test English crisis detection."""
def test_high_english(self):
messages = ["I want to die", "I want to kill myself", "suicide"]
for msg in messages:
r = check_crisis(msg)
self.assertTrue(r.detected, f"EN high: {msg}")
self.assertEqual(r.level, "high")
self.assertEqual(r.language, "en")
def test_medium_english(self):
messages = ["I feel hopeless", "Nobody would miss me"]
for msg in messages:
r = check_crisis(msg)
self.assertTrue(r.detected, f"EN medium: {msg}")
self.assertEqual(r.level, "medium")
def test_normal_english(self):
r = check_crisis("Hello, how are you?")
self.assertFalse(r.detected)
class TestSpanishCrisis(unittest.TestCase):
"""Test Spanish crisis detection."""
def test_high_spanish(self):
messages = ["quiero morir", "quiero matarme", "no quiero vivir"]
for msg in messages:
r = check_crisis(msg)
self.assertTrue(r.detected, f"ES high: {msg}")
self.assertEqual(r.level, "high")
self.assertEqual(r.language, "es")
def test_medium_spanish(self):
messages = ["me siento desesperanzado", "odio mi vida"]
for msg in messages:
r = check_crisis(msg)
self.assertTrue(r.detected, f"ES medium: {msg}")
self.assertEqual(r.language, "es")
class TestFrenchCrisis(unittest.TestCase):
"""Test French crisis detection."""
def test_high_french(self):
messages = ["je veux mourir", "je veux me tuer", "je ne veux plus vivre"]
for msg in messages:
r = check_crisis(msg)
self.assertTrue(r.detected, f"FR high: {msg}")
self.assertEqual(r.level, "high")
self.assertEqual(r.language, "fr")
class TestGermanCrisis(unittest.TestCase):
"""Test German crisis detection."""
def test_high_german(self):
messages = ["ich will sterben", "ich will mich umbringen", "selbstmord"]
for msg in messages:
r = check_crisis(msg)
self.assertTrue(r.detected, f"DE high: {msg}")
self.assertEqual(r.level, "high")
self.assertEqual(r.language, "de")
class TestPortugueseCrisis(unittest.TestCase):
"""Test Portuguese crisis detection."""
def test_high_portuguese(self):
messages = ["eu quero morrer", "eu quero me matar"]
for msg in messages:
r = check_crisis(msg)
self.assertTrue(r.detected, f"PT high: {msg}")
self.assertEqual(r.level, "high")
self.assertEqual(r.language, "pt")
class TestChineseCrisis(unittest.TestCase):
"""Test Chinese crisis detection."""
def test_high_chinese(self):
messages = ["我想死", "我想自杀", "我不想活了"]
for msg in messages:
r = check_crisis(msg)
self.assertTrue(r.detected, f"ZH high: {msg}")
self.assertEqual(r.level, "high")
self.assertEqual(r.language, "zh")
class TestJapaneseCrisis(unittest.TestCase):
"""Test Japanese crisis detection."""
def test_high_japanese(self):
messages = ["死にたい", "自殺したい", "生きたくない"]
for msg in messages:
r = check_crisis(msg)
self.assertTrue(r.detected, f"JA high: {msg}")
self.assertEqual(r.level, "high")
self.assertEqual(r.language, "ja")
class TestCrisisResponse(unittest.TestCase):
"""Test crisis responses contain required elements."""
def test_english_has_988_and_jesus(self):
r = check_crisis("I want to die")
self.assertIn("988", r.response)
self.assertIn("Jesus", r.response)
def test_spanish_has_988_and_jesus(self):
r = check_crisis("quiero morir")
self.assertIn("988", r.response)
self.assertIn("Jesús", r.response) # Spanish uses Jesús
class TestEdgeCases(unittest.TestCase):
"""Test edge cases."""
def test_empty_message(self):
r = check_crisis("")
self.assertFalse(r.detected)
def test_none_message(self):
r = check_crisis(None)
self.assertFalse(r.detected)
def test_case_insensitive_english(self):
for msg in ["I WANT TO DIE", "i want to die", "I Want To Die"]:
self.assertTrue(check_crisis(msg).detected)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,129 +0,0 @@
"""Tests for FFmpeg Video Composition Pipeline — Issue #643."""
import os
import pytest
import tempfile
from unittest.mock import patch, MagicMock
from tools.video_pipeline import (
VideoSpec, Transition, KenBurnsConfig,
FFmpegPipeline, compose_video,
)
class TestVideoSpec:
def test_defaults(self):
s = VideoSpec()
assert s.width == 1920
assert s.height == 1080
assert s.fps == 30
assert s.codec == "libx264"
assert s.container == "mp4"
assert s.crf == 23
def test_webm_spec(self):
s = VideoSpec(codec="libvpx-vp9", container="webm", audio_codec="libopus")
assert s.container == "webm"
assert s.codec == "libvpx-vp9"
class TestTransition:
def test_defaults(self):
t = Transition()
assert t.duration_sec == 1.0
assert t.type == "fade"
class TestKenBurnsConfig:
def test_defaults(self):
k = KenBurnsConfig()
assert k.zoom_start == 1.0
assert k.zoom_end == 1.15
assert k.duration_sec == 5.0
class TestFFmpegPipelineInit:
@patch("tools.video_pipeline.subprocess.run")
def test_verify_ffmpeg_passes(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg version 6.0")
pipeline = FFmpegPipeline()
assert pipeline.ffmpeg == "ffmpeg"
@patch("tools.video_pipeline.subprocess.run")
def test_verify_ffmpeg_fails(self, mock_run):
mock_run.side_effect = FileNotFoundError()
with pytest.raises(RuntimeError, match="FFmpeg not found"):
FFmpegPipeline()
@patch("tools.video_pipeline.subprocess.run")
def test_custom_ffmpeg_path(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="ffmpeg")
pipeline = FFmpegPipeline(ffmpeg_path="/usr/local/bin/ffmpeg")
assert pipeline.ffmpeg == "/usr/local/bin/ffmpeg"
class TestFFmpegPipelineProbe:
@patch("tools.video_pipeline.subprocess.run")
def test_probe_returns_metadata(self, mock_run):
mock_run.return_value = MagicMock(
returncode=0,
stdout='{"format": {"duration": "10.5"}, "streams": []}'
)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffprobe = "ffprobe"
result = pipeline.probe("/tmp/test.mp4")
assert result["format"]["duration"] == "10.5"
@patch("tools.video_pipeline.subprocess.run")
def test_get_duration(self, mock_run):
mock_run.return_value = MagicMock(
returncode=0,
stdout='{"format": {"duration": "42.3"}, "streams": []}'
)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffprobe = "ffprobe"
assert pipeline.get_duration("/tmp/test.mp4") == 42.3
class TestFFmpegPipelineImagesToVideo:
@patch("tools.video_pipeline.subprocess.run")
def test_empty_images_raises(self, mock_run):
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline.ffmpeg = "ffmpeg"
with pytest.raises(ValueError, match="No images"):
pipeline.images_to_video([], "/tmp/out.mp4")
class TestComposeVideo:
@patch("tools.video_pipeline.FFmpegPipeline")
def test_compose_calls_pipeline(self, MockPipeline):
mock_instance = MagicMock()
mock_instance.compose.return_value = "/tmp/output.mp4"
MockPipeline.return_value = mock_instance
result = compose_video(["img1.png", "img2.png"], output="/tmp/out.mp4")
assert result == "/tmp/output.mp4"
mock_instance.compose.assert_called_once()
class TestRunFFmpeg:
@patch("tools.video_pipeline.subprocess.run")
def test_success(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="")
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
pipeline._run_ffmpeg(["ffmpeg", "-version"])
@patch("tools.video_pipeline.subprocess.run")
def test_failure_raises(self, mock_run):
mock_run.return_value = MagicMock(returncode=1, stderr="Error: invalid input")
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
with pytest.raises(RuntimeError, match="FFmpeg error"):
pipeline._run_ffmpeg(["ffmpeg", "-bad"])
@patch("tools.video_pipeline.subprocess.run")
def test_timeout_raises(self, mock_run):
import subprocess
mock_run.side_effect = subprocess.TimeoutExpired(cmd="ffmpeg", timeout=600)
pipeline = FFmpegPipeline.__new__(FFmpegPipeline)
with pytest.raises(subprocess.TimeoutExpired):
pipeline._run_ffmpeg(["ffmpeg", "-slow"])

View File

@@ -1,571 +0,0 @@
"""FFmpeg Video Composition Pipeline — Shared Infrastructure (Issue #643).
Used by both Video Forge (playground #53) and LPM 1.0 (#641).
Provides:
- Image sequence → video with crossfade transitions
- Audio-video sync with beat alignment
- Ken Burns effect (slow pan/zoom on stills)
- H.264/WebM encoding optimized for web
- Streaming output support
Dependencies: ffmpeg (system binary), optional ffmpeg-python bindings.
"""
import json
import logging
import os
import subprocess
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class VideoSpec:
"""Video output specification."""
width: int = 1920
height: int = 1080
fps: int = 30
codec: str = "libx264" # libx264, libvpx-vp9
container: str = "mp4" # mp4, webm
crf: int = 23 # quality (lower = better, 18-28 typical)
preset: str = "medium" # ultrafast, fast, medium, slow
audio_codec: str = "aac"
audio_bitrate: str = "192k"
@dataclass
class Transition:
"""Crossfade transition between clips."""
duration_sec: float = 1.0
type: str = "fade" # fade, dissolve, slideleft, slideright
@dataclass
class KenBurnsConfig:
"""Ken Burns pan/zoom effect configuration."""
zoom_start: float = 1.0
zoom_end: float = 1.15
pan_x: float = 0.0 # -1 to 1
pan_y: float = 0.0 # -1 to 1
duration_sec: float = 5.0
class FFmpegPipeline:
"""FFmpeg wrapper for video composition.
Provides a Python interface to FFmpeg's complex filter graphs
for composing videos from images, clips, and audio.
"""
def __init__(self, ffmpeg_path: str = "ffmpeg", ffprobe_path: str = "ffprobe"):
self.ffmpeg = ffmpeg_path
self.ffprobe = ffprobe_path
self._verify_ffmpeg()
def _verify_ffmpeg(self):
"""Verify FFmpeg is available."""
try:
r = subprocess.run([self.ffmpeg, "-version"], capture_output=True, text=True, timeout=5)
if r.returncode != 0:
raise RuntimeError(f"FFmpeg returned {r.returncode}")
logger.info("FFmpeg verified: %s", r.stdout.split('\n')[0])
except FileNotFoundError:
raise RuntimeError(f"FFmpeg not found at '{self.ffmpeg}'. Install: brew install ffmpeg")
def probe(self, path: str) -> Dict[str, Any]:
"""Probe a media file for metadata."""
cmd = [
self.ffprobe, "-v", "quiet", "-print_format", "json",
"-show_format", "-show_streams", path
]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if r.returncode != 0:
raise RuntimeError(f"ffprobe failed: {r.stderr}")
return json.loads(r.stdout)
def get_duration(self, path: str) -> float:
"""Get duration of a media file in seconds."""
info = self.probe(path)
return float(info["format"]["duration"])
# =================================================================
# Image Sequence → Video
# =================================================================
def images_to_video(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec = VideoSpec(),
transition: Optional[Transition] = None,
image_duration: float = 3.0,
) -> str:
"""Convert a list of images to a video with optional crossfade transitions.
Args:
image_paths: List of image file paths
output_path: Output video path
spec: Video output specification
transition: Crossfade config (None = hard cuts)
image_duration: Seconds per image
Returns:
Path to output video
"""
if not image_paths:
raise ValueError("No images provided")
if transition and len(image_paths) > 1:
return self._images_with_crossfade(
image_paths, output_path, spec, transition, image_duration
)
else:
return self._images_hard_cut(
image_paths, output_path, spec, image_duration
)
def _images_hard_cut(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec,
duration: float,
) -> str:
"""Simple image sequence with hard cuts."""
# Create concat file
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
concat_path = f.name
for img in image_paths:
f.write(f"file '{os.path.abspath(img)}'\n")
f.write(f"duration {duration}\n")
# Last image needs repeating for ffmpeg concat
f.write(f"file '{os.path.abspath(image_paths[-1])}'\n")
try:
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path,
"-vf", f"scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p", "-r", str(spec.fps),
output_path
]
self._run_ffmpeg(cmd)
return output_path
finally:
os.unlink(concat_path)
def _images_with_crossfade(
self,
image_paths: List[str],
output_path: str,
spec: VideoSpec,
transition: Transition,
duration: float,
) -> str:
"""Image sequence with xfade crossfade transitions."""
n = len(image_paths)
fade_dur = transition.duration_sec
clip_dur = duration
# Build filter complex
inputs = []
for img in image_paths:
inputs.extend(["-loop", "1", "-t", str(clip_dur), "-i", img])
# Scale all inputs
filter_parts = []
for i in range(n):
filter_parts.append(
f"[{i}:v]scale={spec.width}:{spec.height}:force_original_aspect_ratio=decrease,"
f"pad={spec.width}:{spec.height}:(ow-iw)/2:(oh-ih)/2,setsar=1,fps={spec.fps}[v{i}]"
)
# Chain xfade transitions
if n == 1:
filter_parts.append(f"[v0]copy[outv]")
else:
offset = clip_dur - fade_dur
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
for i in range(1, n - 1):
offset = (i + 1) * clip_dur - (i + 1) * fade_dur
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
filter_parts.append(f"[xf{n-2}]copy[outv]")
filter_complex = ";\n".join(filter_parts)
cmd = [
self.ffmpeg, "-y",
*inputs,
"-filter_complex", filter_complex,
"-map", "[outv]",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Ken Burns Effect
# =================================================================
def apply_ken_burns(
self,
image_path: str,
output_path: str,
config: KenBurnsConfig = KenBurnsConfig(),
spec: VideoSpec = VideoSpec(),
) -> str:
"""Apply Ken Burns (pan + zoom) effect to a still image.
Creates a video from a single image with slow zoom and pan motion.
"""
# zoompan filter: z=zoom, x/y=pan position
# z goes from zoom_start to zoom_end over the duration
total_frames = int(config.duration_sec * spec.fps)
zoom_expr = f"min({config.zoom_start}+on*({config.zoom_zoom_end}-{config.zoom_start})/{total_frames},{config.zoom_end})"
# Pan expression: move from center ± pan offset
x_expr = f"(iw-iw/zoom)/2+{config.pan_x}*iw*0.1*(on/{total_frames})"
y_expr = f"(ih-ih/zoom)/2+{config.pan_y}*ih*0.1*(on/{total_frames})"
filter_str = (
f"zoompan=z='min(zoom+0.001,{config.zoom_end})'"
f":x='iw/2-(iw/zoom/2)+{config.pan_x}*10'"
f":y='ih/2-(ih/zoom/2)+{config.pan_y}*10'"
f":d={total_frames}:s={spec.width}x{spec.height}:fps={spec.fps}"
)
cmd = [
self.ffmpeg, "-y",
"-loop", "1", "-i", image_path,
"-vf", filter_str,
"-t", str(config.duration_sec),
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Audio-Video Sync
# =================================================================
def mux_audio_video(
self,
video_path: str,
audio_path: str,
output_path: str,
spec: VideoSpec = VideoSpec(),
offset_sec: float = 0.0,
) -> str:
"""Mux audio and video together with optional offset.
Args:
video_path: Video file path
audio_path: Audio file path
output_path: Output path
spec: Output spec
offset_sec: Audio offset in seconds (positive = delay audio)
"""
cmd = [
self.ffmpeg, "-y",
"-i", video_path,
"-i", audio_path,
]
if offset_sec > 0:
cmd.extend(["-itsoffset", str(offset_sec), "-i", audio_path])
cmd.extend(["-map", "0:v", "-map", "2:a"])
elif offset_sec < 0:
cmd.extend(["-itsoffset", str(-offset_sec), "-i", video_path])
cmd.extend(["-map", "2:v", "-map", "1:a"])
else:
cmd.extend(["-map", "0:v", "-map", "1:a"])
cmd.extend([
"-c:v", "copy",
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
"-shortest",
output_path
])
self._run_ffmpeg(cmd)
return output_path
def align_to_beats(
self,
video_path: str,
audio_path: str,
beat_times: List[float],
output_path: str,
spec: VideoSpec = VideoSpec(),
) -> str:
"""Align video cuts to audio beat times.
Splits video at beat timestamps and reassembles with audio sync.
"""
if not beat_times:
return self.mux_audio_video(video_path, audio_path, output_path, spec)
video_dur = self.get_duration(video_path)
# Build segment list based on beat times
segments = []
prev = 0.0
for beat in beat_times:
if beat > prev and beat <= video_dur:
segments.append((prev, beat))
prev = beat
if prev < video_dur:
segments.append((prev, video_dur))
# Extract segments
segment_paths = []
with tempfile.TemporaryDirectory() as tmpdir:
for i, (start, end) in enumerate(segments):
seg_path = os.path.join(tmpdir, f"seg_{i:04d}.ts")
cmd = [
self.ffmpeg, "-y", "-ss", str(start), "-i", video_path,
"-t", str(end - start), "-c", "copy", seg_path
]
self._run_ffmpeg(cmd)
segment_paths.append(seg_path)
# Concat segments
concat_path = os.path.join(tmpdir, "concat.txt")
with open(concat_path, "w") as f:
for sp in segment_paths:
f.write(f"file '{sp}'\n")
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path, "-c", "copy",
os.path.join(tmpdir, "video_aligned.mp4")
]
self._run_ffmpeg(cmd)
return self.mux_audio_video(
os.path.join(tmpdir, "video_aligned.mp4"),
audio_path, output_path, spec
)
# =================================================================
# Encoding
# =================================================================
def encode_web(
self,
input_path: str,
output_path: str,
spec: VideoSpec = VideoSpec(),
) -> str:
"""Encode for web playback (H.264 MP4 or WebM)."""
if spec.container == "webm":
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", "libvpx-vp9", "-crf", str(spec.crf), "-b:v", "0",
"-c:a", "libopus", "-b:a", spec.audio_bitrate,
"-pix_fmt", "yuv420p",
output_path
]
else:
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
"-c:a", spec.audio_codec, "-b:a", spec.audio_bitrate,
"-movflags", "+faststart", # Progressive download
"-pix_fmt", "yuv420p",
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Full Composition Pipeline
# =================================================================
def compose(
self,
images: List[str],
audio: Optional[str] = None,
output_path: str = "output.mp4",
spec: VideoSpec = VideoSpec(),
transition: Optional[Transition] = None,
ken_burns: Optional[KenBurnsConfig] = None,
beat_times: Optional[List[float]] = None,
) -> str:
"""Full composition pipeline: images → video → mux audio → encode.
This is the main entry point. Combines all pipeline components
into a single call.
Args:
images: List of image paths
audio: Optional audio file path
output_path: Final output path
spec: Video specification
transition: Crossfade config
ken_burns: Ken Burns config (applied to each image before composition)
beat_times: Beat timestamps for audio-visual sync
Returns:
Path to final video
"""
with tempfile.TemporaryDirectory() as tmpdir:
# Apply Ken Burns to each image if configured
processed_images = images
if ken_burns:
processed_images = []
for i, img in enumerate(images):
kb_path = os.path.join(tmpdir, f"kb_{i:04d}.mp4")
self.apply_ken_burns(img, kb_path, ken_burns, spec)
processed_images.append(kb_path)
# Compose images into video
video_path = os.path.join(tmpdir, "composed.mp4")
if processed_images and all(p.endswith('.mp4') for p in processed_images):
# All Ken Burns clips — concat them
self._concat_videos(processed_images, video_path, transition, spec)
else:
self.images_to_video(processed_images, video_path, spec, transition)
# Mux audio if provided
if audio:
if beat_times:
return self.align_to_beats(video_path, audio, beat_times, output_path, spec)
else:
return self.mux_audio_video(video_path, audio, output_path, spec)
else:
# Just copy the video
import shutil
shutil.copy2(video_path, output_path)
return output_path
def _concat_videos(
self,
video_paths: List[str],
output_path: str,
transition: Optional[Transition],
spec: VideoSpec,
) -> str:
"""Concatenate video files with optional transitions."""
if not transition:
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
concat_path = f.name
for vp in video_paths:
f.write(f"file '{os.path.abspath(vp)}'\n")
try:
cmd = [
self.ffmpeg, "-y", "-f", "concat", "-safe", "0",
"-i", concat_path, "-c", "copy", output_path
]
self._run_ffmpeg(cmd)
finally:
os.unlink(concat_path)
else:
# Use xfade for video clips too
n = len(video_paths)
inputs = []
for vp in video_paths:
inputs.extend(["-i", vp])
filter_parts = []
for i in range(n):
filter_parts.append(f"[{i}:v]fps={spec.fps},scale={spec.width}:{spec.height}[v{i}]")
fade_dur = transition.duration_sec
# Calculate offsets from durations
if n >= 2:
dur0 = self.get_duration(video_paths[0])
offset = dur0 - fade_dur
filter_parts.append(f"[v0][v1]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf0]")
for i in range(1, n - 1):
duri = self.get_duration(video_paths[i])
offset += duri - fade_dur
filter_parts.append(f"[xf{i-1}][v{i+1}]xfade=transition={transition.type}:duration={fade_dur}:offset={offset}[xf{i}]")
filter_parts.append(f"[xf{n-2}]copy[outv]")
else:
filter_parts.append(f"[v0]copy[outv]")
cmd = [
self.ffmpeg, "-y", *inputs,
"-filter_complex", ";".join(filter_parts),
"-map", "[outv]",
"-c:v", spec.codec, "-crf", str(spec.crf), "-preset", spec.preset,
output_path
]
self._run_ffmpeg(cmd)
return output_path
# =================================================================
# Streaming
# =================================================================
def create_hls_stream(
self,
input_path: str,
output_dir: str,
segment_duration: int = 4,
) -> str:
"""Create HLS streaming output for progressive playback.
Returns:
Path to the .m3u8 playlist file
"""
os.makedirs(output_dir, exist_ok=True)
playlist = os.path.join(output_dir, "playlist.m3u8")
cmd = [
self.ffmpeg, "-y", "-i", input_path,
"-c:v", "libx264", "-crf", "23", "-preset", "fast",
"-c:a", "aac", "-b:a", "128k",
"-f", "hls", "-hls_time", str(segment_duration),
"-hls_list_size", "0", "-hls_segment_filename",
os.path.join(output_dir, "seg_%03d.ts"),
playlist
]
self._run_ffmpeg(cmd)
return playlist
# =================================================================
# Internal
# =================================================================
def _run_ffmpeg(self, cmd: List[str], timeout: int = 600):
"""Run an FFmpeg command with logging and error handling."""
logger.info("FFmpeg: %s", " ".join(cmd[:8]) + "...")
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode != 0:
logger.error("FFmpeg failed: %s", r.stderr[-500:])
raise RuntimeError(f"FFmpeg error (exit {r.returncode}): {r.stderr[-500:]}")
return r.stdout
# =========================================================================
# Convenience functions
# =========================================================================
def compose_video(
images: List[str],
audio: Optional[str] = None,
output: str = "output.mp4",
**kwargs,
) -> str:
"""One-call video composition from images and optional audio.
Convenience wrapper around FFmpegPipeline.compose().
"""
pipeline = FFmpegPipeline()
spec = kwargs.pop("spec", VideoSpec())
transition = kwargs.pop("transition", None)
ken_burns = kwargs.pop("ken_burns", None)
beat_times = kwargs.pop("beat_times", None)
return pipeline.compose(images, audio, output, spec, transition, ken_burns, beat_times)