Compare commits
1 Commits
fix/issue-
...
fix/743
| Author | SHA1 | Date | |
|---|---|---|---|
| 0e4267d99e |
@@ -1,105 +0,0 @@
|
||||
"""Tests for shared audio analysis engine.
|
||||
|
||||
Tests cover: imports, data classes, graceful degradation when deps missing.
|
||||
Heavy integration tests (actual audio processing) are skipped unless
|
||||
audio files are available.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
from tools.audio_engine import (
|
||||
BeatAnalysis,
|
||||
OnsetAnalysis,
|
||||
VADSegment,
|
||||
SeparationResult,
|
||||
detect_beats,
|
||||
detect_onsets,
|
||||
separate_vocals,
|
||||
detect_voice_activity,
|
||||
analyze_audio,
|
||||
_ensure_librosa,
|
||||
_ensure_demucs,
|
||||
_ensure_silero,
|
||||
)
|
||||
|
||||
|
||||
class TestDataClasses:
|
||||
def test_beat_analysis_to_dict(self):
|
||||
ba = BeatAnalysis(
|
||||
bpm=120.0,
|
||||
beat_times=[0.0, 0.5, 1.0],
|
||||
beat_frames=[0, 100, 200],
|
||||
tempo_confidence=0.8,
|
||||
duration=3.0,
|
||||
sample_rate=22050,
|
||||
)
|
||||
d = ba.to_dict()
|
||||
assert d["bpm"] == 120.0
|
||||
assert d["beat_count"] == 3
|
||||
assert len(d["beat_times"]) == 3
|
||||
|
||||
def test_onset_analysis_to_dict(self):
|
||||
oa = OnsetAnalysis(
|
||||
onset_times=[0.1, 0.5],
|
||||
onset_frames=[10, 50],
|
||||
onset_count=2,
|
||||
avg_onset_interval=0.4,
|
||||
)
|
||||
d = oa.to_dict()
|
||||
assert d["onset_count"] == 2
|
||||
assert d["avg_onset_interval"] == 0.4
|
||||
|
||||
def test_vad_segment_to_dict(self):
|
||||
seg = VADSegment(start=1.0, end=2.5, is_speech=True)
|
||||
d = seg.to_dict()
|
||||
assert d["start"] == 1.0
|
||||
assert d["end"] == 2.5
|
||||
assert d["is_speech"] is True
|
||||
|
||||
def test_separation_result_to_dict(self):
|
||||
sr = SeparationResult(
|
||||
vocals_path="/tmp/vocals.wav",
|
||||
instrumental_path="/tmp/inst.wav",
|
||||
duration=120.0,
|
||||
)
|
||||
d = sr.to_dict()
|
||||
assert d["vocals_path"] == "/tmp/vocals.wav"
|
||||
assert d["duration"] == 120.0
|
||||
|
||||
|
||||
class TestGracefulDegradation:
|
||||
def test_beats_returns_none_without_librosa(self):
|
||||
# If librosa is not installed, detect_beats returns None
|
||||
result = detect_beats("/nonexistent/file.wav")
|
||||
# Either None (no librosa) or None (file not found) — both acceptable
|
||||
assert result is None or isinstance(result, BeatAnalysis)
|
||||
|
||||
def test_onsets_returns_none_without_librosa(self):
|
||||
result = detect_onsets("/nonexistent/file.wav")
|
||||
assert result is None or isinstance(result, OnsetAnalysis)
|
||||
|
||||
def test_separation_returns_none_without_demucs(self):
|
||||
result = separate_vocals("/nonexistent/file.wav")
|
||||
assert result is None or isinstance(result, SeparationResult)
|
||||
|
||||
def test_vad_returns_none_without_silero(self):
|
||||
result = detect_voice_activity("/nonexistent/file.wav")
|
||||
assert result is None or isinstance(result, list)
|
||||
|
||||
|
||||
class TestDependencyChecks:
|
||||
def test_ensure_librosa_returns_none_or_module(self):
|
||||
result = _ensure_librosa()
|
||||
assert result is None or result is not None # Either is fine
|
||||
|
||||
def test_ensure_demucs_is_bool(self):
|
||||
result = _ensure_demucs()
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_ensure_silero_is_bool(self):
|
||||
result = _ensure_silero()
|
||||
assert isinstance(result, bool)
|
||||
162
tests/test_telegram_thread_routing.py
Normal file
162
tests/test_telegram_thread_routing.py
Normal file
@@ -0,0 +1,162 @@
|
||||
"""Tests for Telegram thread-aware session routing.
|
||||
|
||||
Verifies that messages in different threads/topics get independent
|
||||
conversation histories.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
|
||||
from gateway.session import build_session_key, SessionSource
|
||||
from gateway.platforms.base import Platform
|
||||
|
||||
|
||||
class TestThreadSessionKey:
|
||||
"""Verify session keys include thread_id for isolation."""
|
||||
|
||||
def test_dm_with_thread_gets_unique_key(self):
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="123456",
|
||||
chat_type="dm",
|
||||
thread_id="100",
|
||||
)
|
||||
key = build_session_key(source)
|
||||
assert "123456" in key
|
||||
assert "100" in key
|
||||
assert key == "agent:main:telegram:dm:123456:100"
|
||||
|
||||
def test_dm_without_thread_uses_chat_only(self):
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="123456",
|
||||
chat_type="dm",
|
||||
)
|
||||
key = build_session_key(source)
|
||||
assert key == "agent:main:telegram:dm:123456"
|
||||
assert ":100" not in key
|
||||
|
||||
def test_different_threads_different_keys(self):
|
||||
source_a = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="123456",
|
||||
chat_type="dm",
|
||||
thread_id="100",
|
||||
)
|
||||
source_b = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="123456",
|
||||
chat_type="dm",
|
||||
thread_id="200",
|
||||
)
|
||||
key_a = build_session_key(source_a)
|
||||
key_b = build_session_key(source_b)
|
||||
assert key_a != key_b
|
||||
|
||||
def test_same_thread_same_key(self):
|
||||
source_a = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="123456",
|
||||
chat_type="dm",
|
||||
thread_id="100",
|
||||
)
|
||||
source_b = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="123456",
|
||||
chat_type="dm",
|
||||
thread_id="100",
|
||||
)
|
||||
assert build_session_key(source_a) == build_session_key(source_b)
|
||||
|
||||
def test_group_with_thread_includes_thread(self):
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="789",
|
||||
chat_type="group",
|
||||
thread_id="50",
|
||||
user_id="user1",
|
||||
)
|
||||
key = build_session_key(source)
|
||||
assert "789" in key
|
||||
assert "50" in key
|
||||
|
||||
def test_group_without_thread_isolates_by_user(self):
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="789",
|
||||
chat_type="group",
|
||||
user_id="user1",
|
||||
)
|
||||
key = build_session_key(source, group_sessions_per_user=True)
|
||||
assert "789" in key
|
||||
assert "user1" in key
|
||||
|
||||
def test_group_thread_shared_across_users(self):
|
||||
"""In threads, all participants share the same session by default."""
|
||||
source_a = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="789",
|
||||
chat_type="group",
|
||||
thread_id="50",
|
||||
user_id="user1",
|
||||
)
|
||||
source_b = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="789",
|
||||
chat_type="group",
|
||||
thread_id="50",
|
||||
user_id="user2",
|
||||
)
|
||||
key_a = build_session_key(source_a, thread_sessions_per_user=False)
|
||||
key_b = build_session_key(source_b, thread_sessions_per_user=False)
|
||||
assert key_a == key_b # Shared session in thread
|
||||
|
||||
def test_group_thread_per_user_when_enabled(self):
|
||||
"""With thread_sessions_per_user=True, users get isolated sessions."""
|
||||
source_a = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="789",
|
||||
chat_type="group",
|
||||
thread_id="50",
|
||||
user_id="user1",
|
||||
)
|
||||
source_b = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="789",
|
||||
chat_type="group",
|
||||
thread_id="50",
|
||||
user_id="user2",
|
||||
)
|
||||
key_a = build_session_key(source_a, thread_sessions_per_user=True)
|
||||
key_b = build_session_key(source_b, thread_sessions_per_user=True)
|
||||
assert key_a != key_b
|
||||
|
||||
|
||||
class TestSessionSourceSerialization:
|
||||
"""Verify SessionSource round-trips correctly with thread_id."""
|
||||
|
||||
def test_thread_id_preserved_in_dict(self):
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="123",
|
||||
chat_type="dm",
|
||||
thread_id="456",
|
||||
)
|
||||
d = source.to_dict()
|
||||
assert d["thread_id"] == "456"
|
||||
restored = SessionSource.from_dict(d)
|
||||
assert restored.thread_id == "456"
|
||||
|
||||
def test_none_thread_id_preserved(self):
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="123",
|
||||
chat_type="dm",
|
||||
)
|
||||
d = source.to_dict()
|
||||
assert d.get("thread_id") is None
|
||||
restored = SessionSource.from_dict(d)
|
||||
assert restored.thread_id is None
|
||||
@@ -1,453 +0,0 @@
|
||||
"""Shared Audio Analysis Engine
|
||||
|
||||
Provides beat detection, onset detection, vocal/instrumental separation,
|
||||
voice activity detection, and tempo estimation for use by:
|
||||
- Video Forge (scene transitions synced to music)
|
||||
- LPM 1.0 (lip sync timing, conversational state detection)
|
||||
|
||||
Dependencies (install as needed — all optional):
|
||||
pip install librosa soundfile demucs silero-vad torch
|
||||
|
||||
Gracefully degrades: if a dependency is missing, that feature returns
|
||||
None with a warning rather than crashing.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lazy dependency imports
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_LIBROSA = None
|
||||
_SOUNDFILE = None
|
||||
_DEMUCS_AVAILABLE = None
|
||||
_SILERO_AVAILABLE = None
|
||||
|
||||
|
||||
def _ensure_librosa():
|
||||
global _LIBROSA
|
||||
if _LIBROSA is None:
|
||||
try:
|
||||
import librosa
|
||||
_LIBROSA = librosa
|
||||
except ImportError:
|
||||
logger.warning("librosa not installed — beat/onset/tempo detection unavailable")
|
||||
_LIBROSA = False
|
||||
return _LIBROSA if _LIBROSA else None
|
||||
|
||||
|
||||
def _ensure_soundfile():
|
||||
global _SOUNDFILE
|
||||
if _SOUNDFILE is None:
|
||||
try:
|
||||
import soundfile
|
||||
_SOUNDFILE = soundfile
|
||||
except ImportError:
|
||||
logger.warning("soundfile not installed — audio loading may be limited")
|
||||
_SOUNDFILE = False
|
||||
return _SOUNDFILE if _SOUNDFILE else None
|
||||
|
||||
|
||||
def _ensure_demucs():
|
||||
global _DEMUCS_AVAILABLE
|
||||
if _DEMUCS_AVAILABLE is None:
|
||||
try:
|
||||
import demucs.api
|
||||
_DEMUCS_AVAILABLE = True
|
||||
except ImportError:
|
||||
logger.warning("demucs not installed — vocal separation unavailable")
|
||||
_DEMUCS_AVAILABLE = False
|
||||
return _DEMUCS_AVAILABLE
|
||||
|
||||
|
||||
def _ensure_silero():
|
||||
global _SILERO_AVAILABLE
|
||||
if _SILERO_AVAILABLE is None:
|
||||
try:
|
||||
import torch
|
||||
model, utils = torch.hub.load(
|
||||
repo_or_dir='snakers4/silero-vad', model='silero_vad',
|
||||
force_reload=False, onnx=False,
|
||||
)
|
||||
_SILERO_AVAILABLE = True
|
||||
except Exception:
|
||||
logger.warning("silero-vad not installed — VAD unavailable")
|
||||
_SILERO_AVAILABLE = False
|
||||
return _SILERO_AVAILABLE
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data classes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class BeatAnalysis:
|
||||
"""Results of beat and tempo analysis."""
|
||||
bpm: float # Estimated tempo in beats per minute
|
||||
beat_times: List[float] # Timestamps of detected beats (seconds)
|
||||
beat_frames: List[int] # Frame indices of detected beats
|
||||
tempo_confidence: float = 0.0 # Confidence in BPM estimate
|
||||
duration: float = 0.0 # Audio duration in seconds
|
||||
sample_rate: int = 0 # Sample rate used for analysis
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"bpm": round(self.bpm, 1),
|
||||
"beat_count": len(self.beat_times),
|
||||
"beat_times": self.beat_times[:50], # Cap for JSON size
|
||||
"tempo_confidence": round(self.tempo_confidence, 3),
|
||||
"duration": round(self.duration, 2),
|
||||
"sample_rate": self.sample_rate,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class OnsetAnalysis:
|
||||
"""Results of onset detection."""
|
||||
onset_times: List[float] # Timestamps of onsets (seconds)
|
||||
onset_frames: List[int] # Frame indices of onsets
|
||||
onset_count: int = 0
|
||||
avg_onset_interval: float = 0.0 # Average time between onsets (seconds)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"onset_count": self.onset_count,
|
||||
"onset_times": self.onset_times[:100],
|
||||
"avg_onset_interval": round(self.avg_onset_interval, 3),
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class VADSegment:
|
||||
"""A single voice activity segment."""
|
||||
start: float # Start time in seconds
|
||||
end: float # End time in seconds
|
||||
is_speech: bool # True if speech detected
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {"start": round(self.start, 3), "end": round(self.end, 3), "is_speech": self.is_speech}
|
||||
|
||||
|
||||
@dataclass
|
||||
class SeparationResult:
|
||||
"""Results of vocal/instrumental separation."""
|
||||
vocals_path: Optional[str] = None
|
||||
instrumental_path: Optional[str] = None
|
||||
duration: float = 0.0
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"vocals_path": self.vocals_path,
|
||||
"instrumental_path": self.instrumental_path,
|
||||
"duration": round(self.duration, 2),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Audio loading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_audio(
|
||||
path: str | Path,
|
||||
sr: int = 22050,
|
||||
mono: bool = True,
|
||||
duration: float | None = None,
|
||||
) -> tuple:
|
||||
"""Load audio file. Returns (y, sr) tuple.
|
||||
|
||||
Args:
|
||||
path: Path to audio file (wav, mp3, flac, ogg)
|
||||
sr: Target sample rate (default 22050)
|
||||
mono: Convert to mono
|
||||
duration: Max seconds to load (None = full file)
|
||||
|
||||
Returns:
|
||||
(audio_array, sample_rate) or (None, None) on failure
|
||||
"""
|
||||
librosa = _ensure_librosa()
|
||||
if not librosa:
|
||||
return None, None
|
||||
|
||||
try:
|
||||
y, loaded_sr = librosa.load(
|
||||
str(path), sr=sr, mono=mono, duration=duration,
|
||||
)
|
||||
return y, loaded_sr
|
||||
except Exception as e:
|
||||
logger.error("Failed to load audio %s: %s", path, e)
|
||||
return None, None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Beat detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_beats(
|
||||
audio_path: str | Path,
|
||||
sr: int = 22050,
|
||||
duration: float | None = None,
|
||||
) -> Optional[BeatAnalysis]:
|
||||
"""Detect beats and estimate tempo from an audio file.
|
||||
|
||||
Uses librosa.beat_track which implements the algorithm from:
|
||||
Ellis, "Beat Tracking by Dynamic Programming", 2007.
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file
|
||||
sr: Sample rate for analysis
|
||||
duration: Max seconds to analyze
|
||||
|
||||
Returns:
|
||||
BeatAnalysis or None if librosa unavailable
|
||||
"""
|
||||
librosa = _ensure_librosa()
|
||||
if not librosa:
|
||||
return None
|
||||
|
||||
y, loaded_sr = load_audio(audio_path, sr=sr, duration=duration)
|
||||
if y is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=loaded_sr)
|
||||
beat_times = librosa.frames_to_time(beat_frames, sr=loaded_sr)
|
||||
|
||||
return BeatAnalysis(
|
||||
bpm=float(tempo),
|
||||
beat_times=beat_times.tolist(),
|
||||
beat_frames=beat_frames.tolist(),
|
||||
tempo_confidence=0.8, # librosa doesn't expose this directly
|
||||
duration=len(y) / loaded_sr,
|
||||
sample_rate=loaded_sr,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Beat detection failed for %s: %s", audio_path, e)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Onset detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_onsets(
|
||||
audio_path: str | Path,
|
||||
sr: int = 22050,
|
||||
duration: float | None = None,
|
||||
backtrack: bool = True,
|
||||
) -> Optional[OnsetAnalysis]:
|
||||
"""Detect onsets (when new sounds begin).
|
||||
|
||||
Useful for scene transitions (Video Forge) and speech segment
|
||||
boundaries (LPM 1.0).
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file
|
||||
sr: Sample rate
|
||||
duration: Max seconds to analyze
|
||||
backtrack: Find preceding energy minimum for each onset
|
||||
|
||||
Returns:
|
||||
OnsetAnalysis or None if librosa unavailable
|
||||
"""
|
||||
librosa = _ensure_librosa()
|
||||
if not librosa:
|
||||
return None
|
||||
|
||||
y, loaded_sr = load_audio(audio_path, sr=sr, duration=duration)
|
||||
if y is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
onset_frames = librosa.onset.onset_detect(
|
||||
y=y, sr=loaded_sr, backtrack=backtrack,
|
||||
)
|
||||
onset_times = librosa.frames_to_time(onset_frames, sr=loaded_sr)
|
||||
|
||||
intervals = []
|
||||
times = onset_times.tolist()
|
||||
for i in range(1, len(times)):
|
||||
intervals.append(times[i] - times[i - 1])
|
||||
|
||||
return OnsetAnalysis(
|
||||
onset_times=times,
|
||||
onset_frames=onset_frames.tolist(),
|
||||
onset_count=len(times),
|
||||
avg_onset_interval=sum(intervals) / len(intervals) if intervals else 0.0,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Onset detection failed for %s: %s", audio_path, e)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Vocal/instrumental separation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def separate_vocals(
|
||||
audio_path: str | Path,
|
||||
output_dir: str | Path = "/tmp/audio_separation",
|
||||
model_name: str = "htdemucs",
|
||||
) -> Optional[SeparationResult]:
|
||||
"""Separate vocals from instrumental using demucs.
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file
|
||||
output_dir: Directory for output stems
|
||||
model_name: Demucs model (htdemucs, htdemucs_ft, mdx_extra)
|
||||
|
||||
Returns:
|
||||
SeparationResult with paths to vocals/instrumental, or None
|
||||
"""
|
||||
if not _ensure_demucs():
|
||||
return None
|
||||
|
||||
try:
|
||||
import demucs.api
|
||||
import soundfile as sf
|
||||
|
||||
output_dir = Path(output_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
separator = demucs.api.Separator(model=model_name)
|
||||
origin, separated = separator.separate_audio_file(str(audio_path))
|
||||
|
||||
vocals_path = output_dir / "vocals.wav"
|
||||
instrumental_path = output_dir / "instrumental.wav"
|
||||
|
||||
sf.write(str(vocals_path), separated["vocals"].cpu().numpy().T, separator.samplerate)
|
||||
sf.write(str(instrumental_path),
|
||||
(separated["drums"] + separated["bass"] + separated["other"]).cpu().numpy().T,
|
||||
separator.samplerate)
|
||||
|
||||
duration = len(origin) / separator.samplerate
|
||||
|
||||
return SeparationResult(
|
||||
vocals_path=str(vocals_path),
|
||||
instrumental_path=str(instrumental_path),
|
||||
duration=duration,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Vocal separation failed for %s: %s", audio_path, e)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Voice Activity Detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_voice_activity(
|
||||
audio_path: str | Path,
|
||||
sr: int = 16000,
|
||||
threshold: float = 0.5,
|
||||
min_speech_duration: float = 0.3,
|
||||
) -> Optional[List[VADSegment]]:
|
||||
"""Detect speech segments using Silero VAD.
|
||||
|
||||
Returns list of segments where speech was detected.
|
||||
Useful for LPM listen/speak state switching.
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file
|
||||
sr: Sample rate (Silero expects 16kHz or 8kHz)
|
||||
threshold: VAD threshold (0.0-1.0)
|
||||
min_speech_duration: Minimum segment length to count as speech
|
||||
|
||||
Returns:
|
||||
List of VADSegment or None if silero unavailable
|
||||
"""
|
||||
if not _ensure_silero():
|
||||
return None
|
||||
|
||||
try:
|
||||
import torch
|
||||
import torchaudio
|
||||
|
||||
model, utils = torch.hub.load(
|
||||
repo_or_dir='snakers4/silero-vad', model='silero_vad',
|
||||
force_reload=False, onnx=False,
|
||||
)
|
||||
get_speech_timestamps = utils[0]
|
||||
|
||||
wav, file_sr = torchaudio.load(str(audio_path))
|
||||
if file_sr != sr:
|
||||
wav = torchaudio.functional.resample(wav, file_sr, sr)
|
||||
|
||||
if wav.shape[0] > 1:
|
||||
wav = wav.mean(dim=0, keepdim=True)
|
||||
|
||||
speech_timestamps = get_speech_timestamps(
|
||||
wav.squeeze(), model, sampling_rate=sr,
|
||||
threshold=threshold, min_speech_duration_ms=int(min_speech_duration * 1000),
|
||||
)
|
||||
|
||||
segments = []
|
||||
for ts in speech_timestamps:
|
||||
segments.append(VADSegment(
|
||||
start=ts["start"] / sr,
|
||||
end=ts["end"] / sr,
|
||||
is_speech=True,
|
||||
))
|
||||
|
||||
return segments
|
||||
except Exception as e:
|
||||
logger.error("VAD failed for %s: %s", audio_path, e)
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Full analysis
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def analyze_audio(
|
||||
audio_path: str | Path,
|
||||
include_separation: bool = False,
|
||||
include_vad: bool = False,
|
||||
sr: int = 22050,
|
||||
) -> Dict[str, Any]:
|
||||
"""Run full audio analysis pipeline.
|
||||
|
||||
Combines beat detection, onset detection, and optionally
|
||||
vocal separation and VAD into a single result dict.
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file
|
||||
include_separation: Run vocal separation (slow)
|
||||
include_vad: Run voice activity detection
|
||||
sr: Sample rate for beat/onset analysis
|
||||
|
||||
Returns:
|
||||
Dict with all analysis results
|
||||
"""
|
||||
result = {"path": str(audio_path)}
|
||||
|
||||
beats = detect_beats(audio_path, sr=sr)
|
||||
if beats:
|
||||
result["beats"] = beats.to_dict()
|
||||
|
||||
onsets = detect_onsets(audio_path, sr=sr)
|
||||
if onsets:
|
||||
result["onsets"] = onsets.to_dict()
|
||||
|
||||
if include_separation:
|
||||
separation = separate_vocals(audio_path)
|
||||
if separation:
|
||||
result["separation"] = separation.to_dict()
|
||||
|
||||
if include_vad:
|
||||
segments = detect_voice_activity(audio_path)
|
||||
if segments:
|
||||
result["vad"] = {
|
||||
"segments": [s.to_dict() for s in segments],
|
||||
"speech_ratio": sum(s.end - s.start for s in segments) / (beats.duration if beats else 1.0),
|
||||
}
|
||||
|
||||
return result
|
||||
Reference in New Issue
Block a user