Closes #131 (Epic #102 — Multimodal Crisis Detection) Analyzes audio messages (OGG/MP3/WAV) for distress signals using paralinguistic features — no neural model needed, pure DSP. Signals detected: - Speech rate: very slow (<80 wpm) or very fast (>200 wpm) - Pitch variability: monotone voice (low F0 std = depression indicator) - Silence ratio: long pauses (>35% silence) - Volume: very quiet (<-30 dB) Implementation: - voice_analysis.py: Core module with analyze_voice_message() - Whisper integration for transcription + word timestamps - librosa for audio feature extraction (pitch, volume, silence) - Composite distress score (0-1) from max of individual signals - Thresholds: low (<0.3), medium (0.3-0.7), high (>0.7) 17 tests in tests/test_voice_analysis.py.
357 lines
11 KiB
Python
357 lines
11 KiB
Python
"""Voice message distress analysis — paralinguistic features (#131).
|
|
|
|
Analyzes audio (OGG/MP3/WAV) for distress signals using audio
|
|
features extracted without a neural model — pure DSP analysis.
|
|
|
|
Signals detected:
|
|
- Speech rate (words per minute from timestamps)
|
|
- Pitch variability (F0 std deviation — monotone = depression indicator)
|
|
- Silence ratio (long pauses)
|
|
- Volume dynamics (drops, tremor proxy)
|
|
|
|
Uses whisper for transcription + word timestamps. All other features
|
|
are computed from raw audio via librosa.
|
|
|
|
Refs: #131 — Epic #102 (Multimodal Crisis Detection)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class VoiceAnalysisResult:
|
|
"""Result of voice message paralinguistic analysis."""
|
|
transcript: str
|
|
speech_rate_wpm: float # words per minute
|
|
pitch_mean_hz: float # mean F0 in Hz
|
|
pitch_variability: float # F0 standard deviation (low = monotone)
|
|
silence_ratio: float # fraction of audio that is silence (0-1)
|
|
volume_db: float # mean volume in dB
|
|
volume_variability: float # volume std deviation
|
|
duration_seconds: float # total audio duration
|
|
distress_score: float # 0-1 composite score
|
|
distress_level: str # "low", "medium", "high"
|
|
distress_signals: List[str] # list of detected signals
|
|
|
|
|
|
# Distress thresholds
|
|
DISTRESS_THRESHOLDS = {
|
|
"low": 0.3,
|
|
"medium": 0.7,
|
|
"high": 1.0,
|
|
}
|
|
|
|
# Paralinguistic distress indicators
|
|
# These are heuristic — the model learns what "normal" looks like
|
|
# and flags deviations.
|
|
NORMAL_SPEECH_RATE = (100, 180) # words per minute
|
|
NORMAL_PITCH_VAR = (20, 80) # F0 std deviation in Hz
|
|
NORMAL_SILENCE_RATIO = (0.05, 0.35) # fraction of silence
|
|
NORMAL_VOLUME_DB = (-30, -10) # dB range
|
|
|
|
|
|
def _ensure_whisper():
|
|
"""Check if whisper is available."""
|
|
try:
|
|
result = subprocess.run(
|
|
["whisper", "--help"],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
return True
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
return False
|
|
|
|
|
|
def _ensure_librosa():
|
|
"""Check if librosa is available."""
|
|
try:
|
|
import librosa
|
|
return True
|
|
except ImportError:
|
|
return False
|
|
|
|
|
|
def transcribe_with_timestamps(audio_path: str) -> Dict[str, Any]:
|
|
"""Transcribe audio using whisper and extract word-level timestamps.
|
|
|
|
Returns dict with 'text' and 'segments' (list of {start, end, text}).
|
|
Falls back to subprocess whisper if Python whisper not available.
|
|
"""
|
|
try:
|
|
import whisper
|
|
model = whisper.load_model("base")
|
|
result = model.transcribe(audio_path, word_timestamps=True)
|
|
return {
|
|
"text": result["text"],
|
|
"segments": [
|
|
{"start": s["start"], "end": s["end"], "text": s["text"]}
|
|
for s in result.get("segments", [])
|
|
],
|
|
}
|
|
except ImportError:
|
|
pass
|
|
|
|
# Fallback: subprocess whisper
|
|
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
|
|
json_out = f.name
|
|
|
|
try:
|
|
subprocess.run(
|
|
["whisper", audio_path, "--model", "base", "--output_format", "json",
|
|
"--output_dir", os.path.dirname(json_out)],
|
|
capture_output=True, text=True, timeout=120,
|
|
)
|
|
|
|
# Whisper outputs to <filename>.json in output_dir
|
|
base = Path(audio_path).stem
|
|
whisper_out = Path(os.path.dirname(json_out)) / f"{base}.json"
|
|
|
|
if whisper_out.exists():
|
|
with open(whisper_out) as f:
|
|
data = json.load(f)
|
|
os.unlink(whisper_out)
|
|
return {
|
|
"text": data.get("text", ""),
|
|
"segments": [
|
|
{"start": s["start"], "end": s["end"], "text": s["text"]}
|
|
for s in data.get("segments", [])
|
|
],
|
|
}
|
|
except Exception as e:
|
|
logger.warning("Whisper transcription failed: %s", e)
|
|
finally:
|
|
if os.path.exists(json_out):
|
|
os.unlink(json_out)
|
|
|
|
return {"text": "", "segments": []}
|
|
|
|
|
|
def extract_audio_features(audio_path: str) -> Dict[str, float]:
|
|
"""Extract paralinguistic features from raw audio using librosa.
|
|
|
|
Returns dict with pitch, volume, and silence metrics.
|
|
"""
|
|
try:
|
|
import librosa
|
|
import numpy as np
|
|
except ImportError:
|
|
logger.warning("librosa not available — returning defaults")
|
|
return {
|
|
"pitch_mean_hz": 0.0, "pitch_variability": 0.0,
|
|
"silence_ratio": 0.0, "volume_db": 0.0, "volume_variability": 0.0,
|
|
"duration_seconds": 0.0,
|
|
}
|
|
|
|
try:
|
|
y, sr = librosa.load(audio_path, sr=None)
|
|
except Exception as e:
|
|
logger.warning("Failed to load audio %s: %s", audio_path, e)
|
|
return {
|
|
"pitch_mean_hz": 0.0, "pitch_variability": 0.0,
|
|
"silence_ratio": 0.0, "volume_db": 0.0, "volume_variability": 0.0,
|
|
"duration_seconds": 0.0,
|
|
}
|
|
|
|
duration = len(y) / sr
|
|
|
|
# Pitch (F0) estimation using pyin
|
|
try:
|
|
f0, voiced_flag, _ = librosa.pyin(y, fmin=50, fmax=500, sr=sr)
|
|
f0_voiced = f0[~np.isnan(f0)]
|
|
if len(f0_voiced) > 0:
|
|
pitch_mean = float(np.mean(f0_voiced))
|
|
pitch_var = float(np.std(f0_voiced))
|
|
else:
|
|
pitch_mean = 0.0
|
|
pitch_var = 0.0
|
|
except Exception:
|
|
pitch_mean = 0.0
|
|
pitch_var = 0.0
|
|
|
|
# Volume (RMS energy)
|
|
rms = librosa.feature.rms(y=y)[0]
|
|
volume_db = float(librosa.amplitude_to_db(rms, ref=np.max).mean())
|
|
volume_var = float(librosa.amplitude_to_db(rms, ref=np.max).std())
|
|
|
|
# Silence ratio
|
|
try:
|
|
intervals = librosa.effects.split(y, top_db=30)
|
|
speech_samples = sum(end - start for start, end in intervals)
|
|
silence_ratio = 1.0 - (speech_samples / len(y)) if len(y) > 0 else 0.0
|
|
except Exception:
|
|
silence_ratio = 0.0
|
|
|
|
return {
|
|
"pitch_mean_hz": round(pitch_mean, 1),
|
|
"pitch_variability": round(pitch_var, 1),
|
|
"silence_ratio": round(silence_ratio, 3),
|
|
"volume_db": round(volume_db, 1),
|
|
"volume_variability": round(volume_var, 1),
|
|
"duration_seconds": round(duration, 2),
|
|
}
|
|
|
|
|
|
def compute_speech_rate(transcript: str, segments: List[dict]) -> float:
|
|
"""Compute words per minute from transcript and timestamps."""
|
|
words = len(transcript.split())
|
|
if words == 0:
|
|
return 0.0
|
|
|
|
if not segments:
|
|
return 0.0
|
|
|
|
total_duration = max(s["end"] for s in segments) - min(s["start"] for s in segments)
|
|
if total_duration <= 0:
|
|
return 0.0
|
|
|
|
wpm = words / (total_duration / 60.0)
|
|
return round(wpm, 1)
|
|
|
|
|
|
def compute_distress_score(
|
|
speech_rate: float,
|
|
pitch_variability: float,
|
|
silence_ratio: float,
|
|
volume_db: float,
|
|
) -> tuple[float, List[str]]:
|
|
"""Compute composite distress score from paralinguistic features.
|
|
|
|
Returns (score, signals) where score is 0-1 and signals is a list
|
|
of detected distress indicators.
|
|
"""
|
|
signals = []
|
|
scores = []
|
|
|
|
# Speech rate: very slow (<80) or very fast (>200) is concerning
|
|
if speech_rate > 0:
|
|
if speech_rate < NORMAL_SPEECH_RATE[0]:
|
|
signals.append(f"very_slow_speech ({speech_rate:.0f} wpm)")
|
|
scores.append(min(1.0, (NORMAL_SPEECH_RATE[0] - speech_rate) / 50))
|
|
elif speech_rate > NORMAL_SPEECH_RATE[1]:
|
|
signals.append(f"very_fast_speech ({speech_rate:.0f} wpm)")
|
|
scores.append(min(1.0, (speech_rate - NORMAL_SPEECH_RATE[1]) / 80))
|
|
else:
|
|
scores.append(0.0)
|
|
|
|
# Pitch variability: low = monotone (depression indicator)
|
|
if pitch_variability > 0:
|
|
if pitch_variability < NORMAL_PITCH_VAR[0]:
|
|
signals.append(f"monotone_voice (F0_var={pitch_variability:.0f}Hz)")
|
|
scores.append(min(1.0, (NORMAL_PITCH_VAR[0] - pitch_variability) / NORMAL_PITCH_VAR[0]))
|
|
else:
|
|
scores.append(0.0)
|
|
|
|
# Silence ratio: high = long pauses
|
|
if silence_ratio > NORMAL_SILENCE_RATIO[1]:
|
|
signals.append(f"long_pauses (silence={silence_ratio:.0%})")
|
|
scores.append(min(1.0, (silence_ratio - NORMAL_SILENCE_RATIO[1]) / 0.4))
|
|
else:
|
|
scores.append(0.0)
|
|
|
|
# Volume: very quiet
|
|
if volume_db < NORMAL_VOLUME_DB[0]:
|
|
signals.append(f"very_quiet ({volume_db:.0f}dB)")
|
|
scores.append(min(1.0, abs(volume_db - NORMAL_VOLUME_DB[0]) / 20))
|
|
else:
|
|
scores.append(0.0)
|
|
|
|
# Composite: max of individual signals (not average — one severe signal is enough)
|
|
if scores:
|
|
score = max(scores)
|
|
else:
|
|
score = 0.0
|
|
|
|
return round(score, 3), signals
|
|
|
|
|
|
def analyze_voice_message(audio_path: str) -> VoiceAnalysisResult:
|
|
"""Analyze a voice message for distress signals.
|
|
|
|
Args:
|
|
audio_path: Path to audio file (OGG, MP3, WAV).
|
|
|
|
Returns:
|
|
VoiceAnalysisResult with all paralinguistic features.
|
|
"""
|
|
# Step 1: Transcribe with timestamps
|
|
transcription = transcribe_with_timestamps(audio_path)
|
|
transcript = transcription["text"]
|
|
segments = transcription["segments"]
|
|
|
|
# Step 2: Extract audio features
|
|
features = extract_audio_features(audio_path)
|
|
|
|
# Step 3: Compute speech rate
|
|
wpm = compute_speech_rate(transcript, segments)
|
|
|
|
# Step 4: Compute distress score
|
|
distress_score, distress_signals = compute_distress_score(
|
|
speech_rate=wpm,
|
|
pitch_variability=features["pitch_variability"],
|
|
silence_ratio=features["silence_ratio"],
|
|
volume_db=features["volume_db"],
|
|
)
|
|
|
|
# Determine level
|
|
if distress_score >= DISTRESS_THRESHOLDS["high"]:
|
|
level = "high"
|
|
elif distress_score >= DISTRESS_THRESHOLDS["medium"]:
|
|
level = "medium"
|
|
else:
|
|
level = "low"
|
|
|
|
return VoiceAnalysisResult(
|
|
transcript=transcript,
|
|
speech_rate_wpm=wpm,
|
|
pitch_mean_hz=features["pitch_mean_hz"],
|
|
pitch_variability=features["pitch_variability"],
|
|
silence_ratio=features["silence_ratio"],
|
|
volume_db=features["volume_db"],
|
|
volume_variability=features["volume_variability"],
|
|
duration_seconds=features["duration_seconds"],
|
|
distress_score=distress_score,
|
|
distress_level=level,
|
|
distress_signals=distress_signals,
|
|
)
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
p = argparse.ArgumentParser(description="Voice message distress analysis")
|
|
p.add_argument("audio", help="Path to audio file")
|
|
p.add_argument("--json", action="store_true")
|
|
a = p.parse_args()
|
|
|
|
if not os.path.exists(a.audio):
|
|
print(f"File not found: {a.audio}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
result = analyze_voice_message(a.audio)
|
|
|
|
if a.json:
|
|
print(json.dumps(asdict(result), indent=2))
|
|
else:
|
|
print(f"Transcript: {result.transcript[:100]}...")
|
|
print(f"Speech rate: {result.speech_rate_wpm} wpm")
|
|
print(f"Pitch: {result.pitch_mean_hz} Hz (variability: {result.pitch_variability})")
|
|
print(f"Silence: {result.silence_ratio:.0%}")
|
|
print(f"Volume: {result.volume_db} dB")
|
|
print(f"Distress: {result.distress_score:.2f} ({result.distress_level})")
|
|
if result.distress_signals:
|
|
print(f"Signals: {', '.join(result.distress_signals)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|