Compare commits
1 Commits
feat/131-v
...
fix/130-be
| Author | SHA1 | Date | |
|---|---|---|---|
| f446f6dad6 |
311
crisis/behavioral.py
Normal file
311
crisis/behavioral.py
Normal file
@@ -0,0 +1,311 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Behavioral Pattern Detection for Crisis Signals (#133).
|
||||
|
||||
Detects crisis risk from session-level behavioral patterns:
|
||||
- Message frequency (increasing urgency = rapid-fire messages)
|
||||
- Time-of-day (late-night messages correlate with crisis risk)
|
||||
- Withdrawal (decreasing communication after engagement)
|
||||
- Escalation (crisis indicators getting stronger over time)
|
||||
|
||||
Usage:
|
||||
from crisis.behavioral import analyze_session, BehavioralSignal
|
||||
|
||||
signals = analyze_session(messages)
|
||||
for sig in signals:
|
||||
if sig.risk_level == "HIGH":
|
||||
# Escalate to crisis protocol
|
||||
pass
|
||||
"""
|
||||
|
||||
import math
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class Message:
|
||||
"""A single message in a session."""
|
||||
timestamp: datetime
|
||||
content: str
|
||||
crisis_score: float = 0.0 # 0.0-1.0 from text detector
|
||||
role: str = "user" # "user" or "assistant"
|
||||
|
||||
|
||||
@dataclass
|
||||
class BehavioralSignal:
|
||||
"""A detected behavioral pattern indicating crisis risk."""
|
||||
signal_type: str # "frequency", "time", "withdrawal", "escalation"
|
||||
risk_level: str # "LOW", "MEDIUM", "HIGH"
|
||||
description: str
|
||||
evidence: list = field(default_factory=list)
|
||||
score: float = 0.0 # 0.0-1.0
|
||||
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────────
|
||||
|
||||
# Message frequency thresholds (messages per hour)
|
||||
FREQ_NORMAL = 6 # <6/hr = normal
|
||||
FREQ_ELEVATED = 15 # 6-15/hr = elevated
|
||||
FREQ_HIGH = 30 # >30/hr = high urgency
|
||||
|
||||
# Time-of-day risk windows (hours in 24h format)
|
||||
HIGH_RISK_HOURS = set(range(1, 5)) # 1AM-4AM
|
||||
ELEVATED_RISK_HOURS = set(range(22, 24)) | set(range(5, 7)) # 10PM-12AM, 5AM-7AM
|
||||
|
||||
# Withdrawal: messages/day trend
|
||||
WITHDRAWAL_THRESHOLD = 0.3 # Current day < 30% of average = withdrawal
|
||||
|
||||
# Escalation: crisis score trend
|
||||
ESCALATION_WINDOW = 5 # Look at last N messages
|
||||
|
||||
|
||||
# ── Frequency Analysis ────────────────────────────────────────────────────────
|
||||
|
||||
def _analyze_frequency(messages: list[Message]) -> Optional[BehavioralSignal]:
|
||||
"""Detect rapid-fire messaging (urgency indicator)."""
|
||||
if len(messages) < 3:
|
||||
return None
|
||||
|
||||
user_msgs = [m for m in messages if m.role == "user"]
|
||||
if len(user_msgs) < 3:
|
||||
return None
|
||||
|
||||
# Calculate messages per hour in the most recent window
|
||||
recent = user_msgs[-10:] # Last 10 user messages
|
||||
if len(recent) < 2:
|
||||
return None
|
||||
|
||||
time_span = (recent[-1].timestamp - recent[0].timestamp).total_seconds()
|
||||
if time_span <= 0:
|
||||
return None
|
||||
|
||||
msg_per_hour = len(recent) / (time_span / 3600)
|
||||
|
||||
if msg_per_hour >= FREQ_HIGH:
|
||||
return BehavioralSignal(
|
||||
signal_type="frequency",
|
||||
risk_level="HIGH",
|
||||
description=f"Very rapid messaging: {msg_per_hour:.0f} messages/hour",
|
||||
evidence=[f"Last {len(recent)} messages in {time_span/60:.0f} minutes"],
|
||||
score=min(1.0, msg_per_hour / FREQ_HIGH),
|
||||
)
|
||||
elif msg_per_hour >= FREQ_ELEVATED:
|
||||
return BehavioralSignal(
|
||||
signal_type="frequency",
|
||||
risk_level="MEDIUM",
|
||||
description=f"Elevated messaging rate: {msg_per_hour:.0f} messages/hour",
|
||||
evidence=[f"Last {len(recent)} messages in {time_span/60:.0f} minutes"],
|
||||
score=msg_per_hour / FREQ_HIGH,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# ── Time-of-Day Analysis ─────────────────────────────────────────────────────
|
||||
|
||||
def _analyze_time(messages: list[Message]) -> Optional[BehavioralSignal]:
|
||||
"""Detect late-night messaging (correlates with crisis risk)."""
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
# Check most recent messages
|
||||
recent = messages[-5:]
|
||||
late_night_count = sum(1 for m in recent if m.timestamp.hour in HIGH_RISK_HOURS)
|
||||
elevated_count = sum(1 for m in recent if m.timestamp.hour in ELEVATED_RISK_HOURS)
|
||||
|
||||
if late_night_count >= 3:
|
||||
return BehavioralSignal(
|
||||
signal_type="time",
|
||||
risk_level="HIGH",
|
||||
description=f"Late-night messaging pattern: {late_night_count}/5 messages between 1-4 AM",
|
||||
evidence=[f"Message at {m.timestamp.strftime('%H:%M')}" for m in recent if m.timestamp.hour in HIGH_RISK_HOURS],
|
||||
score=late_night_count / len(recent),
|
||||
)
|
||||
elif elevated_count >= 3:
|
||||
return BehavioralSignal(
|
||||
signal_type="time",
|
||||
risk_level="MEDIUM",
|
||||
description=f"Off-hours messaging: {elevated_count}/5 messages in elevated-risk window",
|
||||
evidence=[f"Message at {m.timestamp.strftime('%H:%M')}" for m in recent if m.timestamp.hour in ELEVATED_RISK_HOURS],
|
||||
score=elevated_count / len(recent) * 0.5,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# ── Withdrawal Detection ──────────────────────────────────────────────────────
|
||||
|
||||
def _analyze_withdrawal(messages: list[Message]) -> Optional[BehavioralSignal]:
|
||||
"""Detect communication withdrawal (decreasing engagement)."""
|
||||
user_msgs = [m for m in messages if m.role == "user"]
|
||||
if len(user_msgs) < 10:
|
||||
return None
|
||||
|
||||
# Split into first half and second half
|
||||
mid = len(user_msgs) // 2
|
||||
first_half = user_msgs[:mid]
|
||||
second_half = user_msgs[mid:]
|
||||
|
||||
# Average message length as engagement proxy
|
||||
first_avg_len = sum(len(m.content) for m in first_half) / len(first_half)
|
||||
second_avg_len = sum(len(m.content) for m in second_half) / len(second_half)
|
||||
|
||||
# Time between messages
|
||||
def avg_gap(msgs):
|
||||
if len(msgs) < 2:
|
||||
return 0
|
||||
gaps = [(msgs[i+1].timestamp - msgs[i].timestamp).total_seconds() for i in range(len(msgs)-1)]
|
||||
return sum(gaps) / len(gaps)
|
||||
|
||||
first_gap = avg_gap(first_half)
|
||||
second_gap = avg_gap(second_half)
|
||||
|
||||
# Withdrawal = shorter messages AND longer gaps
|
||||
length_ratio = second_avg_len / first_avg_len if first_avg_len > 0 else 1.0
|
||||
gap_ratio = second_gap / first_gap if first_gap > 0 else 1.0
|
||||
|
||||
if length_ratio < 0.5 and gap_ratio > 2.0:
|
||||
return BehavioralSignal(
|
||||
signal_type="withdrawal",
|
||||
risk_level="HIGH",
|
||||
description="Significant withdrawal: messages shorter and less frequent",
|
||||
evidence=[
|
||||
f"Message length: {first_avg_len:.0f} -> {second_avg_len:.0f} chars ({length_ratio:.0%})",
|
||||
f"Message gap: {first_gap/60:.0f}min -> {second_gap/60:.0f}min ({gap_ratio:.1f}x)",
|
||||
],
|
||||
score=min(1.0, (1 - length_ratio) * 0.5 + (gap_ratio - 1) * 0.25),
|
||||
)
|
||||
elif length_ratio < 0.7 or gap_ratio > 1.5:
|
||||
return BehavioralSignal(
|
||||
signal_type="withdrawal",
|
||||
risk_level="MEDIUM",
|
||||
description="Moderate withdrawal: engagement decreasing",
|
||||
evidence=[
|
||||
f"Message length: {first_avg_len:.0f} -> {second_avg_len:.0f} chars",
|
||||
f"Message gap: {first_gap/60:.0f}min -> {second_gap/60:.0f}min",
|
||||
],
|
||||
score=(1 - length_ratio) * 0.3 + (gap_ratio - 1) * 0.15,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# ── Escalation Detection ─────────────────────────────────────────────────────
|
||||
|
||||
def _analyze_escalation(messages: list[Message]) -> Optional[BehavioralSignal]:
|
||||
"""Detect rising crisis scores over recent messages."""
|
||||
user_msgs = [m for m in messages if m.role == "user" and m.crisis_score > 0]
|
||||
if len(user_msgs) < ESCALATION_WINDOW:
|
||||
return None
|
||||
|
||||
recent = user_msgs[-ESCALATION_WINDOW:]
|
||||
scores = [m.crisis_score for m in recent]
|
||||
|
||||
# Check for upward trend
|
||||
if len(scores) < 3:
|
||||
return None
|
||||
|
||||
# Simple linear trend: is score increasing?
|
||||
first_half_avg = sum(scores[:len(scores)//2]) / (len(scores)//2)
|
||||
second_half_avg = sum(scores[len(scores)//2:]) / (len(scores) - len(scores)//2)
|
||||
|
||||
if second_half_avg > first_half_avg * 1.5 and second_half_avg > 0.5:
|
||||
return BehavioralSignal(
|
||||
signal_type="escalation",
|
||||
risk_level="HIGH",
|
||||
description=f"Crisis escalation detected: scores rising from {first_half_avg:.2f} to {second_half_avg:.2f}",
|
||||
evidence=[f"Score {i+1}: {s:.2f}" for i, s in enumerate(scores)],
|
||||
score=min(1.0, second_half_avg),
|
||||
)
|
||||
elif second_half_avg > first_half_avg * 1.2 and second_half_avg > 0.3:
|
||||
return BehavioralSignal(
|
||||
signal_type="escalation",
|
||||
risk_level="MEDIUM",
|
||||
description=f"Mild escalation: scores trending up",
|
||||
evidence=[f"Score {i+1}: {s:.2f}" for i, s in enumerate(scores)],
|
||||
score=second_half_avg * 0.5,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# ── Combined Analysis ─────────────────────────────────────────────────────────
|
||||
|
||||
def analyze_session(messages: list[Message]) -> list[BehavioralSignal]:
|
||||
"""Analyze a session for behavioral crisis signals.
|
||||
|
||||
Args:
|
||||
messages: List of Message objects with timestamps, content, and crisis scores.
|
||||
|
||||
Returns:
|
||||
List of BehavioralSignal objects, sorted by risk level (HIGH first).
|
||||
"""
|
||||
signals = []
|
||||
|
||||
freq = _analyze_frequency(messages)
|
||||
if freq:
|
||||
signals.append(freq)
|
||||
|
||||
time_sig = _analyze_time(messages)
|
||||
if time_sig:
|
||||
signals.append(time_sig)
|
||||
|
||||
withdrawal = _analyze_withdrawal(messages)
|
||||
if withdrawal:
|
||||
signals.append(withdrawal)
|
||||
|
||||
escalation = _analyze_escalation(messages)
|
||||
if escalation:
|
||||
signals.append(escalation)
|
||||
|
||||
# Sort: HIGH first, then MEDIUM, then LOW
|
||||
risk_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
|
||||
signals.sort(key=lambda s: (risk_order.get(s.risk_level, 9), -s.score))
|
||||
|
||||
return signals
|
||||
|
||||
|
||||
def get_session_risk_level(signals: list[BehavioralSignal]) -> str:
|
||||
"""Get overall session risk from behavioral signals."""
|
||||
if not signals:
|
||||
return "NONE"
|
||||
if any(s.risk_level == "HIGH" for s in signals):
|
||||
return "HIGH"
|
||||
if any(s.risk_level == "MEDIUM" for s in signals):
|
||||
return "MEDIUM"
|
||||
return "LOW"
|
||||
|
||||
|
||||
# ── Self-Test ─────────────────────────────────────────────────────────────────
|
||||
|
||||
if __name__ == "__main__":
|
||||
from datetime import timedelta
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# Test: rapid-fire messaging
|
||||
rapid_msgs = [
|
||||
Message(timestamp=now - timedelta(minutes=i), content="help me", role="user")
|
||||
for i in range(20, 0, -1)
|
||||
]
|
||||
signals = analyze_session(rapid_msgs)
|
||||
print(f"Rapid-fire: {[s.signal_type + ':' + s.risk_level for s in signals]}")
|
||||
assert any(s.signal_type == "frequency" for s in signals), "Should detect frequency"
|
||||
|
||||
# Test: late-night
|
||||
late_msgs = [
|
||||
Message(timestamp=now.replace(hour=2, minute=i*5), content="cant sleep", role="user")
|
||||
for i in range(5)
|
||||
]
|
||||
signals = analyze_session(late_msgs)
|
||||
print(f"Late-night: {[s.signal_type + ':' + s.risk_level for s in signals]}")
|
||||
assert any(s.signal_type == "time" for s in signals), "Should detect time"
|
||||
|
||||
# Test: escalation
|
||||
esc_msgs = [
|
||||
Message(timestamp=now - timedelta(minutes=i*10), content="feeling bad",
|
||||
role="user", crisis_score=0.1 + i*0.15)
|
||||
for i in range(5, 0, -1)
|
||||
]
|
||||
signals = analyze_session(esc_msgs)
|
||||
print(f"Escalation: {[s.signal_type + ':' + s.risk_level for s in signals]}")
|
||||
assert any(s.signal_type == "escalation" for s in signals), "Should detect escalation"
|
||||
|
||||
print("\nAll self-tests passed!")
|
||||
@@ -1,134 +0,0 @@
|
||||
"""Tests for voice message distress analysis (#131)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from voice_analysis import (
|
||||
VoiceAnalysisResult,
|
||||
compute_speech_rate,
|
||||
compute_distress_score,
|
||||
DISTRESS_THRESHOLDS,
|
||||
NORMAL_SPEECH_RATE,
|
||||
NORMAL_PITCH_VAR,
|
||||
)
|
||||
|
||||
|
||||
class TestDistressScore:
|
||||
"""Distress score computation from paralinguistic features."""
|
||||
|
||||
def test_normal_speech_no_distress(self):
|
||||
score, signals = compute_distress_score(
|
||||
speech_rate=140, # normal
|
||||
pitch_variability=50, # normal
|
||||
silence_ratio=0.15, # normal
|
||||
volume_db=-20, # normal
|
||||
)
|
||||
assert score < 0.1
|
||||
assert not signals
|
||||
|
||||
def test_slow_speech_detected(self):
|
||||
score, signals = compute_distress_score(
|
||||
speech_rate=60, # very slow
|
||||
pitch_variability=50,
|
||||
silence_ratio=0.15,
|
||||
volume_db=-20,
|
||||
)
|
||||
assert score > 0.1
|
||||
assert any("slow" in s for s in signals)
|
||||
|
||||
def test_monotone_detected(self):
|
||||
score, signals = compute_distress_score(
|
||||
speech_rate=140,
|
||||
pitch_variability=10, # very monotone
|
||||
silence_ratio=0.15,
|
||||
volume_db=-20,
|
||||
)
|
||||
assert score > 0.1
|
||||
assert any("monotone" in s for s in signals)
|
||||
|
||||
def test_long_pauses_detected(self):
|
||||
score, signals = compute_distress_score(
|
||||
speech_rate=140,
|
||||
pitch_variability=50,
|
||||
silence_ratio=0.50, # very quiet
|
||||
volume_db=-20,
|
||||
)
|
||||
assert score > 0.1
|
||||
assert any("pause" in s for s in signals)
|
||||
|
||||
def test_quiet_voice_detected(self):
|
||||
score, signals = compute_distress_score(
|
||||
speech_rate=140,
|
||||
pitch_variability=50,
|
||||
silence_ratio=0.15,
|
||||
volume_db=-45, # very quiet
|
||||
)
|
||||
assert score > 0.1
|
||||
assert any("quiet" in s for s in signals)
|
||||
|
||||
def test_multiple_signals_compound(self):
|
||||
score, signals = compute_distress_score(
|
||||
speech_rate=50, # very slow
|
||||
pitch_variability=5, # very monotone
|
||||
silence_ratio=0.55, # long pauses
|
||||
volume_db=-50, # very quiet
|
||||
)
|
||||
assert score > 0.5
|
||||
assert len(signals) >= 3
|
||||
|
||||
def test_max_score_is_1(self):
|
||||
score, _ = compute_distress_score(
|
||||
speech_rate=0,
|
||||
pitch_variability=0,
|
||||
silence_ratio=1.0,
|
||||
volume_db=-100,
|
||||
)
|
||||
assert score <= 1.0
|
||||
|
||||
|
||||
class TestSpeechRate:
|
||||
"""Speech rate computation."""
|
||||
|
||||
def test_normal_rate(self):
|
||||
# 100 words in 60 seconds = 100 wpm
|
||||
segments = [{"start": 0.0, "end": 60.0, "text": "x"}]
|
||||
wpm = compute_speech_rate("word " * 100, segments)
|
||||
assert abs(wpm - 100) < 5
|
||||
|
||||
def test_empty_transcript(self):
|
||||
assert compute_speech_rate("", []) == 0.0
|
||||
|
||||
def test_no_segments(self):
|
||||
assert compute_speech_rate("hello world", []) == 0.0
|
||||
|
||||
|
||||
class TestDistressThresholds:
|
||||
"""Threshold configuration."""
|
||||
|
||||
def test_thresholds_ordered(self):
|
||||
assert DISTRESS_THRESHOLDS["low"] < DISTRESS_THRESHOLDS["medium"]
|
||||
assert DISTRESS_THRESHOLDS["medium"] < DISTRESS_THRESHOLDS["high"]
|
||||
|
||||
def test_low_is_03(self):
|
||||
assert DISTRESS_THRESHOLDS["low"] == 0.3
|
||||
|
||||
def test_high_is_10(self):
|
||||
assert DISTRESS_THRESHOLDS["high"] == 1.0
|
||||
|
||||
|
||||
class TestVoiceAnalysisResult:
|
||||
"""Result data structure."""
|
||||
|
||||
def test_creation(self):
|
||||
result = VoiceAnalysisResult(
|
||||
transcript="hello", speech_rate_wpm=120.0,
|
||||
pitch_mean_hz=150.0, pitch_variability=40.0,
|
||||
silence_ratio=0.2, volume_db=-20.0,
|
||||
volume_variability=5.0, duration_seconds=10.0,
|
||||
distress_score=0.1, distress_level="low",
|
||||
distress_signals=[],
|
||||
)
|
||||
assert result.transcript == "hello"
|
||||
assert result.distress_level == "low"
|
||||
assert not result.distress_signals
|
||||
@@ -1,356 +0,0 @@
|
||||
"""Voice message distress analysis — paralinguistic features (#131).
|
||||
|
||||
Analyzes audio (OGG/MP3/WAV) for distress signals using audio
|
||||
features extracted without a neural model — pure DSP analysis.
|
||||
|
||||
Signals detected:
|
||||
- Speech rate (words per minute from timestamps)
|
||||
- Pitch variability (F0 std deviation — monotone = depression indicator)
|
||||
- Silence ratio (long pauses)
|
||||
- Volume dynamics (drops, tremor proxy)
|
||||
|
||||
Uses whisper for transcription + word timestamps. All other features
|
||||
are computed from raw audio via librosa.
|
||||
|
||||
Refs: #131 — Epic #102 (Multimodal Crisis Detection)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
from dataclasses import dataclass, asdict
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Dict, Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VoiceAnalysisResult:
|
||||
"""Result of voice message paralinguistic analysis."""
|
||||
transcript: str
|
||||
speech_rate_wpm: float # words per minute
|
||||
pitch_mean_hz: float # mean F0 in Hz
|
||||
pitch_variability: float # F0 standard deviation (low = monotone)
|
||||
silence_ratio: float # fraction of audio that is silence (0-1)
|
||||
volume_db: float # mean volume in dB
|
||||
volume_variability: float # volume std deviation
|
||||
duration_seconds: float # total audio duration
|
||||
distress_score: float # 0-1 composite score
|
||||
distress_level: str # "low", "medium", "high"
|
||||
distress_signals: List[str] # list of detected signals
|
||||
|
||||
|
||||
# Distress thresholds
|
||||
DISTRESS_THRESHOLDS = {
|
||||
"low": 0.3,
|
||||
"medium": 0.7,
|
||||
"high": 1.0,
|
||||
}
|
||||
|
||||
# Paralinguistic distress indicators
|
||||
# These are heuristic — the model learns what "normal" looks like
|
||||
# and flags deviations.
|
||||
NORMAL_SPEECH_RATE = (100, 180) # words per minute
|
||||
NORMAL_PITCH_VAR = (20, 80) # F0 std deviation in Hz
|
||||
NORMAL_SILENCE_RATIO = (0.05, 0.35) # fraction of silence
|
||||
NORMAL_VOLUME_DB = (-30, -10) # dB range
|
||||
|
||||
|
||||
def _ensure_whisper():
|
||||
"""Check if whisper is available."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["whisper", "--help"],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
return True
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return False
|
||||
|
||||
|
||||
def _ensure_librosa():
|
||||
"""Check if librosa is available."""
|
||||
try:
|
||||
import librosa
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
|
||||
def transcribe_with_timestamps(audio_path: str) -> Dict[str, Any]:
|
||||
"""Transcribe audio using whisper and extract word-level timestamps.
|
||||
|
||||
Returns dict with 'text' and 'segments' (list of {start, end, text}).
|
||||
Falls back to subprocess whisper if Python whisper not available.
|
||||
"""
|
||||
try:
|
||||
import whisper
|
||||
model = whisper.load_model("base")
|
||||
result = model.transcribe(audio_path, word_timestamps=True)
|
||||
return {
|
||||
"text": result["text"],
|
||||
"segments": [
|
||||
{"start": s["start"], "end": s["end"], "text": s["text"]}
|
||||
for s in result.get("segments", [])
|
||||
],
|
||||
}
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Fallback: subprocess whisper
|
||||
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
|
||||
json_out = f.name
|
||||
|
||||
try:
|
||||
subprocess.run(
|
||||
["whisper", audio_path, "--model", "base", "--output_format", "json",
|
||||
"--output_dir", os.path.dirname(json_out)],
|
||||
capture_output=True, text=True, timeout=120,
|
||||
)
|
||||
|
||||
# Whisper outputs to <filename>.json in output_dir
|
||||
base = Path(audio_path).stem
|
||||
whisper_out = Path(os.path.dirname(json_out)) / f"{base}.json"
|
||||
|
||||
if whisper_out.exists():
|
||||
with open(whisper_out) as f:
|
||||
data = json.load(f)
|
||||
os.unlink(whisper_out)
|
||||
return {
|
||||
"text": data.get("text", ""),
|
||||
"segments": [
|
||||
{"start": s["start"], "end": s["end"], "text": s["text"]}
|
||||
for s in data.get("segments", [])
|
||||
],
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning("Whisper transcription failed: %s", e)
|
||||
finally:
|
||||
if os.path.exists(json_out):
|
||||
os.unlink(json_out)
|
||||
|
||||
return {"text": "", "segments": []}
|
||||
|
||||
|
||||
def extract_audio_features(audio_path: str) -> Dict[str, float]:
|
||||
"""Extract paralinguistic features from raw audio using librosa.
|
||||
|
||||
Returns dict with pitch, volume, and silence metrics.
|
||||
"""
|
||||
try:
|
||||
import librosa
|
||||
import numpy as np
|
||||
except ImportError:
|
||||
logger.warning("librosa not available — returning defaults")
|
||||
return {
|
||||
"pitch_mean_hz": 0.0, "pitch_variability": 0.0,
|
||||
"silence_ratio": 0.0, "volume_db": 0.0, "volume_variability": 0.0,
|
||||
"duration_seconds": 0.0,
|
||||
}
|
||||
|
||||
try:
|
||||
y, sr = librosa.load(audio_path, sr=None)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to load audio %s: %s", audio_path, e)
|
||||
return {
|
||||
"pitch_mean_hz": 0.0, "pitch_variability": 0.0,
|
||||
"silence_ratio": 0.0, "volume_db": 0.0, "volume_variability": 0.0,
|
||||
"duration_seconds": 0.0,
|
||||
}
|
||||
|
||||
duration = len(y) / sr
|
||||
|
||||
# Pitch (F0) estimation using pyin
|
||||
try:
|
||||
f0, voiced_flag, _ = librosa.pyin(y, fmin=50, fmax=500, sr=sr)
|
||||
f0_voiced = f0[~np.isnan(f0)]
|
||||
if len(f0_voiced) > 0:
|
||||
pitch_mean = float(np.mean(f0_voiced))
|
||||
pitch_var = float(np.std(f0_voiced))
|
||||
else:
|
||||
pitch_mean = 0.0
|
||||
pitch_var = 0.0
|
||||
except Exception:
|
||||
pitch_mean = 0.0
|
||||
pitch_var = 0.0
|
||||
|
||||
# Volume (RMS energy)
|
||||
rms = librosa.feature.rms(y=y)[0]
|
||||
volume_db = float(librosa.amplitude_to_db(rms, ref=np.max).mean())
|
||||
volume_var = float(librosa.amplitude_to_db(rms, ref=np.max).std())
|
||||
|
||||
# Silence ratio
|
||||
try:
|
||||
intervals = librosa.effects.split(y, top_db=30)
|
||||
speech_samples = sum(end - start for start, end in intervals)
|
||||
silence_ratio = 1.0 - (speech_samples / len(y)) if len(y) > 0 else 0.0
|
||||
except Exception:
|
||||
silence_ratio = 0.0
|
||||
|
||||
return {
|
||||
"pitch_mean_hz": round(pitch_mean, 1),
|
||||
"pitch_variability": round(pitch_var, 1),
|
||||
"silence_ratio": round(silence_ratio, 3),
|
||||
"volume_db": round(volume_db, 1),
|
||||
"volume_variability": round(volume_var, 1),
|
||||
"duration_seconds": round(duration, 2),
|
||||
}
|
||||
|
||||
|
||||
def compute_speech_rate(transcript: str, segments: List[dict]) -> float:
|
||||
"""Compute words per minute from transcript and timestamps."""
|
||||
words = len(transcript.split())
|
||||
if words == 0:
|
||||
return 0.0
|
||||
|
||||
if not segments:
|
||||
return 0.0
|
||||
|
||||
total_duration = max(s["end"] for s in segments) - min(s["start"] for s in segments)
|
||||
if total_duration <= 0:
|
||||
return 0.0
|
||||
|
||||
wpm = words / (total_duration / 60.0)
|
||||
return round(wpm, 1)
|
||||
|
||||
|
||||
def compute_distress_score(
|
||||
speech_rate: float,
|
||||
pitch_variability: float,
|
||||
silence_ratio: float,
|
||||
volume_db: float,
|
||||
) -> tuple[float, List[str]]:
|
||||
"""Compute composite distress score from paralinguistic features.
|
||||
|
||||
Returns (score, signals) where score is 0-1 and signals is a list
|
||||
of detected distress indicators.
|
||||
"""
|
||||
signals = []
|
||||
scores = []
|
||||
|
||||
# Speech rate: very slow (<80) or very fast (>200) is concerning
|
||||
if speech_rate > 0:
|
||||
if speech_rate < NORMAL_SPEECH_RATE[0]:
|
||||
signals.append(f"very_slow_speech ({speech_rate:.0f} wpm)")
|
||||
scores.append(min(1.0, (NORMAL_SPEECH_RATE[0] - speech_rate) / 50))
|
||||
elif speech_rate > NORMAL_SPEECH_RATE[1]:
|
||||
signals.append(f"very_fast_speech ({speech_rate:.0f} wpm)")
|
||||
scores.append(min(1.0, (speech_rate - NORMAL_SPEECH_RATE[1]) / 80))
|
||||
else:
|
||||
scores.append(0.0)
|
||||
|
||||
# Pitch variability: low = monotone (depression indicator)
|
||||
if pitch_variability > 0:
|
||||
if pitch_variability < NORMAL_PITCH_VAR[0]:
|
||||
signals.append(f"monotone_voice (F0_var={pitch_variability:.0f}Hz)")
|
||||
scores.append(min(1.0, (NORMAL_PITCH_VAR[0] - pitch_variability) / NORMAL_PITCH_VAR[0]))
|
||||
else:
|
||||
scores.append(0.0)
|
||||
|
||||
# Silence ratio: high = long pauses
|
||||
if silence_ratio > NORMAL_SILENCE_RATIO[1]:
|
||||
signals.append(f"long_pauses (silence={silence_ratio:.0%})")
|
||||
scores.append(min(1.0, (silence_ratio - NORMAL_SILENCE_RATIO[1]) / 0.4))
|
||||
else:
|
||||
scores.append(0.0)
|
||||
|
||||
# Volume: very quiet
|
||||
if volume_db < NORMAL_VOLUME_DB[0]:
|
||||
signals.append(f"very_quiet ({volume_db:.0f}dB)")
|
||||
scores.append(min(1.0, abs(volume_db - NORMAL_VOLUME_DB[0]) / 20))
|
||||
else:
|
||||
scores.append(0.0)
|
||||
|
||||
# Composite: max of individual signals (not average — one severe signal is enough)
|
||||
if scores:
|
||||
score = max(scores)
|
||||
else:
|
||||
score = 0.0
|
||||
|
||||
return round(score, 3), signals
|
||||
|
||||
|
||||
def analyze_voice_message(audio_path: str) -> VoiceAnalysisResult:
|
||||
"""Analyze a voice message for distress signals.
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file (OGG, MP3, WAV).
|
||||
|
||||
Returns:
|
||||
VoiceAnalysisResult with all paralinguistic features.
|
||||
"""
|
||||
# Step 1: Transcribe with timestamps
|
||||
transcription = transcribe_with_timestamps(audio_path)
|
||||
transcript = transcription["text"]
|
||||
segments = transcription["segments"]
|
||||
|
||||
# Step 2: Extract audio features
|
||||
features = extract_audio_features(audio_path)
|
||||
|
||||
# Step 3: Compute speech rate
|
||||
wpm = compute_speech_rate(transcript, segments)
|
||||
|
||||
# Step 4: Compute distress score
|
||||
distress_score, distress_signals = compute_distress_score(
|
||||
speech_rate=wpm,
|
||||
pitch_variability=features["pitch_variability"],
|
||||
silence_ratio=features["silence_ratio"],
|
||||
volume_db=features["volume_db"],
|
||||
)
|
||||
|
||||
# Determine level
|
||||
if distress_score >= DISTRESS_THRESHOLDS["high"]:
|
||||
level = "high"
|
||||
elif distress_score >= DISTRESS_THRESHOLDS["medium"]:
|
||||
level = "medium"
|
||||
else:
|
||||
level = "low"
|
||||
|
||||
return VoiceAnalysisResult(
|
||||
transcript=transcript,
|
||||
speech_rate_wpm=wpm,
|
||||
pitch_mean_hz=features["pitch_mean_hz"],
|
||||
pitch_variability=features["pitch_variability"],
|
||||
silence_ratio=features["silence_ratio"],
|
||||
volume_db=features["volume_db"],
|
||||
volume_variability=features["volume_variability"],
|
||||
duration_seconds=features["duration_seconds"],
|
||||
distress_score=distress_score,
|
||||
distress_level=level,
|
||||
distress_signals=distress_signals,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
p = argparse.ArgumentParser(description="Voice message distress analysis")
|
||||
p.add_argument("audio", help="Path to audio file")
|
||||
p.add_argument("--json", action="store_true")
|
||||
a = p.parse_args()
|
||||
|
||||
if not os.path.exists(a.audio):
|
||||
print(f"File not found: {a.audio}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
result = analyze_voice_message(a.audio)
|
||||
|
||||
if a.json:
|
||||
print(json.dumps(asdict(result), indent=2))
|
||||
else:
|
||||
print(f"Transcript: {result.transcript[:100]}...")
|
||||
print(f"Speech rate: {result.speech_rate_wpm} wpm")
|
||||
print(f"Pitch: {result.pitch_mean_hz} Hz (variability: {result.pitch_variability})")
|
||||
print(f"Silence: {result.silence_ratio:.0%}")
|
||||
print(f"Volume: {result.volume_db} dB")
|
||||
print(f"Distress: {result.distress_score:.2f} ({result.distress_level})")
|
||||
if result.distress_signals:
|
||||
print(f"Signals: {', '.join(result.distress_signals)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user