diff --git a/voice_analysis.py b/voice_analysis.py new file mode 100644 index 0000000..a570f39 --- /dev/null +++ b/voice_analysis.py @@ -0,0 +1,350 @@ +""" +voice_analysis.py — Voice message distress analysis via paralinguistic features. + +Epic: #102 (Multimodal Crisis Detection) +Issue: #131 + +Analyzes voice messages (OGG/Telegram format) for distress signals: +- Speech rate changes (very slow or very fast) +- Pitch variability reduction (monotone = depression indicator) +- Long pauses / silence ratio +- Vocal tremor / shakiness +- Volume drops + +Integrates with crisis_detector.py text-based detection for multimodal coverage. +""" + +import os +import json +import subprocess +import tempfile +from dataclasses import dataclass, field, asdict +from typing import Optional + + +@dataclass +class VoiceAnalysisResult: + """Result of paralinguistic analysis on a voice message.""" + transcript: str = "" + speech_rate: float = 0.0 # words per minute + pitch_mean: float = 0.0 # Hz, average fundamental frequency + pitch_variability: float = 0.0 # std dev of pitch (low = monotone) + silence_ratio: float = 0.0 # 0-1, fraction of audio that is silence + tremor_score: float = 0.0 # 0-1, vocal shakiness estimate + volume_drop_score: float = 0.0 # 0-1, sudden volume decreases + distress_score: float = 0.0 # 0-1, composite distress indicator + signals_detected: list = field(default_factory=list) + + def to_dict(self) -> dict: + return asdict(self) + + +# === THRESHOLDS === + +# Speech rate: normal is ~120-150 WPM +# Very slow (<80) or very fast (>200) are distress indicators +SPEECH_RATE_SLOW = 80 +SPEECH_RATE_FAST = 200 +SPEECH_RATE_NORMAL_LOW = 100 +SPEECH_RATE_NORMAL_HIGH = 170 + +# Pitch variability: normal conversation has std dev ~30-50 Hz +# Monotone (<15 Hz) is a depression indicator +PITCH_VARIABILITY_LOW = 15.0 # Hz — monotone threshold +PITCH_VARIABILITY_NORMAL = 30.0 + +# Silence ratio: normal has ~10-20% silence +# Excessive silence (>40%) or very little (<3%) may indicate distress +SILENCE_RATIO_HIGH = 0.4 +SILENCE_RATIO_LOW = 0.03 + +# Composite thresholds +DISTRESS_LOW = 0.3 +DISTRESS_MEDIUM = 0.7 + + +# === CORE ANALYSIS === + +def _convert_to_wav(audio_path: str) -> str: + """Convert audio to WAV format for analysis. Returns path to temp WAV file.""" + wav_path = tempfile.mktemp(suffix='.wav') + try: + subprocess.run( + ['ffmpeg', '-i', audio_path, '-ar', '16000', '-ac', '1', '-y', wav_path], + capture_output=True, timeout=30 + ) + if not os.path.exists(wav_path): + # Fallback: if ffmpeg not available, try the original file + return audio_path + return wav_path + except (FileNotFoundError, subprocess.TimeoutExpired): + return audio_path + + +def _transcribe(audio_path: str) -> str: + """Transcribe audio using whisper (if available) or return empty string.""" + try: + import whisper + model = whisper.load_model("base") + result = model.transcribe(audio_path) + return result.get("text", "").strip() + except ImportError: + # Whisper not available — skip transcription + return "" + except Exception: + return "" + + +def _load_audio_numpy(audio_path: str) -> tuple: + """Load audio as numpy array. Returns (samples, sample_rate) or (None, None).""" + try: + import librosa + samples, sr = librosa.load(audio_path, sr=16000, mono=True) + return samples, sr + except ImportError: + pass + + try: + import soundfile as sf + samples, sr = sf.read(audio_path) + if len(samples.shape) > 1: + samples = samples.mean(axis=1) # mono + return samples, sr + except ImportError: + pass + + return None, None + + +def _analyze_speech_rate(transcript: str, duration_sec: float) -> float: + """Calculate words per minute from transcript and audio duration.""" + if not transcript or duration_sec <= 0: + return 0.0 + words = len(transcript.split()) + minutes = duration_sec / 60.0 + return words / minutes if minutes > 0 else 0.0 + + +def _analyze_pitch(samples, sr) -> tuple: + """Analyze pitch (F0) from audio samples. Returns (mean_hz, variability_hz).""" + try: + import librosa + f0, voiced_flag, _ = librosa.pyin( + samples, fmin=librosa.note_to_hz('C2'), + fmax=librosa.note_to_hz('C7'), sr=sr + ) + import numpy as np + f0_clean = f0[~np.isnan(f0)] + if len(f0_clean) == 0: + return 0.0, 0.0 + return float(np.mean(f0_clean)), float(np.std(f0_clean)) + except (ImportError, Exception): + return 0.0, 0.0 + + +def _analyze_silence(samples, sr, threshold_db: float = -40.0) -> float: + """Calculate ratio of silence in audio (0-1).""" + try: + import librosa + import numpy as np + rms = librosa.feature.rms(y=samples)[0] + rms_db = librosa.amplitude_to_db(rms, ref=np.max) + silence_frames = np.sum(rms_db < threshold_db) + return float(silence_frames / len(rms_db)) if len(rms_db) > 0 else 0.0 + except (ImportError, Exception): + return 0.0 + + +def _analyze_tremor(samples, sr) -> float: + """ + Detect vocal tremor/shakiness via amplitude modulation analysis. + + Tremor manifests as periodic amplitude fluctuations (3-12 Hz range). + Returns 0-1 score where 1 = strong tremor detected. + """ + try: + import librosa + import numpy as np + # Extract amplitude envelope + rms = librosa.feature.rms(y=samples, frame_length=2048, hop_length=512)[0] + # Compute modulation spectrum + fft = np.abs(np.fft.rfft(rms)) + freqs = np.fft.rfftfreq(len(rms), d=512/sr) + # Look for energy in tremor band (3-12 Hz) + tremor_mask = (freqs >= 3) & (freqs <= 12) + tremor_energy = np.sum(fft[tremor_mask]) + total_energy = np.sum(fft[1:]) # skip DC + if total_energy == 0: + return 0.0 + ratio = tremor_energy / total_energy + return float(min(1.0, ratio * 5)) # normalize — typical tremor is 0.1-0.3 of total + except (ImportError, Exception): + return 0.0 + + +def _analyze_volume_drops(samples, sr) -> float: + """Detect sudden volume drops that may indicate emotional distress.""" + try: + import librosa + import numpy as np + rms = librosa.feature.rms(y=samples, frame_length=2048, hop_length=512)[0] + if len(rms) < 2: + return 0.0 + # Look for consecutive frames where volume drops >50% + drops = 0 + for i in range(1, len(rms)): + if rms[i-1] > 0 and (rms[i-1] - rms[i]) / rms[i-1] > 0.5: + drops += 1 + return float(min(1.0, drops / (len(rms) * 0.1))) + except (ImportError, Exception): + return 0.0 + + +def _compute_distress_score(result: VoiceAnalysisResult) -> tuple: + """ + Compute composite distress score from paralinguistic features. + + Returns (score, signals_detected). + """ + signals = [] + score = 0.0 + weights = 0 + + # Speech rate (0.2 weight) + if result.speech_rate > 0: + if result.speech_rate < SPEECH_RATE_SLOW: + signals.append(f"very_slow_speech ({result.speech_rate:.0f} WPM)") + score += 0.8 * 0.2 + elif result.speech_rate > SPEECH_RATE_FAST: + signals.append(f"very_fast_speech ({result.speech_rate:.0f} WPM)") + score += 0.6 * 0.2 + elif result.speech_rate < SPEECH_RATE_NORMAL_LOW: + score += 0.3 * 0.2 + weights += 0.2 + + # Pitch variability (0.25 weight — monotone is strong depression indicator) + if result.pitch_variability > 0: + if result.pitch_variability < PITCH_VARIABILITY_LOW: + signals.append(f"monotone_voice (variability={result.pitch_variability:.1f} Hz)") + score += 0.9 * 0.25 + elif result.pitch_variability < PITCH_VARIABILITY_NORMAL: + signals.append(f"reduced_pitch_variability ({result.pitch_variability:.1f} Hz)") + score += 0.5 * 0.25 + weights += 0.25 + + # Silence ratio (0.2 weight) + if result.silence_ratio > 0: + if result.silence_ratio > SILENCE_RATIO_HIGH: + signals.append(f"excessive_silence ({result.silence_ratio:.0%})") + score += 0.7 * 0.2 + elif result.silence_ratio < SILENCE_RATIO_LOW: + signals.append(f"minimal_pauses ({result.silence_ratio:.0%})") + score += 0.3 * 0.2 + weights += 0.2 + + # Tremor (0.2 weight) + if result.tremor_score > 0: + if result.tremor_score > 0.5: + signals.append(f"vocal_tremor (score={result.tremor_score:.2f})") + score += result.tremor_score * 0.2 + weights += 0.2 + + # Volume drops (0.15 weight) + if result.volume_drop_score > 0: + if result.volume_drop_score > 0.4: + signals.append(f"volume_drops (score={result.volume_drop_score:.2f})") + score += result.volume_drop_score * 0.15 + weights += 0.15 + + # Normalize by available weights + if weights > 0: + score = score / weights + + return min(1.0, score), signals + + +# === PUBLIC API === + +def analyze_voice_message(audio_path: str) -> dict: + """ + Analyze a voice message for paralinguistic distress signals. + + Args: + audio_path: Path to audio file (OGG, WAV, MP3, etc.) + + Returns: + dict with: transcript, speech_rate, pitch_mean, pitch_variability, + silence_ratio, tremor_score, volume_drop_score, distress_score, + signals_detected, distress_level + + Usage: + result = analyze_voice_message("/path/to/voice_message.ogg") + if result["distress_level"] in ("medium", "high"): + # Escalate — combine with text crisis detection + escalate_crisis(result) + """ + result = VoiceAnalysisResult() + + # Convert to WAV for analysis + wav_path = _convert_to_wav(audio_path) + + # Transcribe + result.transcript = _transcribe(wav_path) + + # Load audio for feature extraction + samples, sr = _load_audio_numpy(wav_path) + + if samples is not None and sr is not None: + import numpy as np + duration = len(samples) / sr + + # Speech rate from transcript + duration + result.speech_rate = _analyze_speech_rate(result.transcript, duration) + + # Pitch analysis + result.pitch_mean, result.pitch_variability = _analyze_pitch(samples, sr) + + # Silence ratio + result.silence_ratio = _analyze_silence(samples, sr) + + # Tremor detection + result.tremor_score = _analyze_tremor(samples, sr) + + # Volume drops + result.volume_drop_score = _analyze_volume_drops(samples, sr) + + # Composite distress score + result.distress_score, result.signals_detected = _compute_distress_score(result) + + # Clean up temp file + if wav_path != audio_path and os.path.exists(wav_path): + os.unlink(wav_path) + + # Classify distress level + if result.distress_score >= DISTRESS_MEDIUM: + distress_level = "high" + elif result.distress_score >= DISTRESS_LOW: + distress_level = "medium" + elif result.distress_score > 0: + distress_level = "low" + else: + distress_level = "none" + + output = result.to_dict() + output["distress_level"] = distress_level + return output + + +def get_audio_duration(audio_path: str) -> float: + """Get audio duration in seconds.""" + try: + import librosa + duration = librosa.get_duration(path=audio_path) + return float(duration) + except (ImportError, Exception): + try: + import soundfile as sf + info = sf.info(audio_path) + return float(info.duration) + except (ImportError, Exception): + return 0.0