Compare commits
5 Commits
fix/131-vo
...
fix/130
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
53bfb47a92 | ||
|
|
08e3ece2d3 | ||
|
|
100cc743c0 | ||
|
|
f7d99c6d9c | ||
| d412939b4f |
@@ -14,6 +14,8 @@ Usage:
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from image_screening import screen_image_signals
|
||||
|
||||
from .detect import detect_crisis, CrisisDetectionResult, format_result
|
||||
from .compassion_router import router
|
||||
from .response import (
|
||||
@@ -50,6 +52,67 @@ def check_crisis(text: str) -> dict:
|
||||
}
|
||||
|
||||
|
||||
def _image_detection_from_score(image_result) -> CrisisDetectionResult:
|
||||
if image_result.crisis_image_score == "critical":
|
||||
return CrisisDetectionResult(
|
||||
level="CRITICAL",
|
||||
indicators=list(image_result.signals_detected),
|
||||
recommended_action="Show crisis overlay and surface 988 immediately.",
|
||||
score=image_result.distress_score,
|
||||
)
|
||||
if image_result.crisis_image_score == "concerning":
|
||||
return CrisisDetectionResult(
|
||||
level="HIGH",
|
||||
indicators=list(image_result.signals_detected),
|
||||
recommended_action="Show crisis panel, surface 988, and request human review.",
|
||||
score=image_result.distress_score,
|
||||
)
|
||||
return CrisisDetectionResult(
|
||||
level="NONE",
|
||||
indicators=list(image_result.signals_detected),
|
||||
recommended_action="No crisis action required.",
|
||||
score=image_result.distress_score,
|
||||
)
|
||||
|
||||
|
||||
def check_image_crisis(
|
||||
*,
|
||||
image_path: Optional[str] = None,
|
||||
ocr_text: str = "",
|
||||
labels: Optional[list[str]] = None,
|
||||
manual_notes: str = "",
|
||||
visual_flags: Optional[list[str]] = None,
|
||||
) -> dict:
|
||||
"""Gateway-integrated image crisis check using the local screening slice."""
|
||||
image_result = screen_image_signals(
|
||||
image_path=image_path,
|
||||
ocr_text=ocr_text,
|
||||
labels=labels,
|
||||
manual_notes=manual_notes,
|
||||
visual_flags=visual_flags,
|
||||
)
|
||||
detection = _image_detection_from_score(image_result)
|
||||
response = generate_response(detection)
|
||||
|
||||
return {
|
||||
"level": detection.level,
|
||||
"image_score": image_result.crisis_image_score,
|
||||
"score": detection.score,
|
||||
"indicators": detection.indicators,
|
||||
"recommended_action": detection.recommended_action,
|
||||
"timmy_message": response.timmy_message,
|
||||
"ui": {
|
||||
"show_crisis_panel": response.show_crisis_panel,
|
||||
"show_overlay": response.show_overlay,
|
||||
"provide_988": response.provide_988,
|
||||
},
|
||||
"escalate": response.escalate,
|
||||
"requires_human_review": image_result.requires_human_review,
|
||||
"grounded_scope": image_result.grounded_scope,
|
||||
"screening": image_result.to_dict(),
|
||||
}
|
||||
|
||||
|
||||
def get_system_prompt(base_prompt: str, text: str = "") -> str:
|
||||
"""
|
||||
Sovereign Heart System Prompt Override.
|
||||
|
||||
195
image_screening.py
Normal file
195
image_screening.py
Normal file
@@ -0,0 +1,195 @@
|
||||
"""
|
||||
image_screening.py — local image crisis screening slice for epic #130.
|
||||
|
||||
Grounded scope:
|
||||
- screens OCR text, upstream object labels, and operator notes for crisis signals
|
||||
- intentionally does NOT claim raw computer-vision understanding of pixels
|
||||
- designed to plug into future multimodal scoring once a dedicated image model lands
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from typing import Iterable, List, Optional
|
||||
|
||||
from crisis.detect import detect_crisis
|
||||
|
||||
|
||||
DIRECT_SELF_HARM_LABELS = {
|
||||
"blood",
|
||||
"blade",
|
||||
"razor",
|
||||
"knife",
|
||||
"scissors",
|
||||
"noose",
|
||||
"ligature",
|
||||
"hanging",
|
||||
"pills",
|
||||
"pill bottle",
|
||||
"overdose",
|
||||
"gun",
|
||||
"firearm",
|
||||
"rope",
|
||||
"cuts",
|
||||
"self-harm",
|
||||
"suicide note",
|
||||
"goodbye letter",
|
||||
}
|
||||
|
||||
INJURY_LABELS = {
|
||||
"wound",
|
||||
"wounds",
|
||||
"bruise",
|
||||
"bruises",
|
||||
"bandage",
|
||||
"bandages",
|
||||
"injury",
|
||||
"injuries",
|
||||
"scar",
|
||||
"scars",
|
||||
"burn",
|
||||
"burns",
|
||||
"bleeding",
|
||||
}
|
||||
|
||||
HIGH_RISK_SCENE_LABELS = {
|
||||
"bridge edge",
|
||||
"rooftop edge",
|
||||
"train tracks",
|
||||
"ledge",
|
||||
"cliff edge",
|
||||
"dark room",
|
||||
"bathroom floor",
|
||||
"hospital wristband",
|
||||
"suicide note",
|
||||
"goodbye letter",
|
||||
}
|
||||
|
||||
FAREWELL_TEXT_PHRASES = {
|
||||
"goodbye",
|
||||
"giving away",
|
||||
"final post",
|
||||
"last message",
|
||||
"see you on the other side",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImageScreeningResult:
|
||||
ocr_text: str = ""
|
||||
labels: List[str] = field(default_factory=list)
|
||||
visual_flags: List[str] = field(default_factory=list)
|
||||
distress_score: float = 0.0
|
||||
crisis_image_score: str = "safe"
|
||||
requires_human_review: bool = False
|
||||
signals_detected: List[str] = field(default_factory=list)
|
||||
grounded_scope: str = (
|
||||
"heuristic screening over OCR text, upstream labels, and operator notes; "
|
||||
"raw vision-model inference is not implemented in this slice"
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
def _normalize_items(values: Optional[Iterable[str]]) -> List[str]:
|
||||
if not values:
|
||||
return []
|
||||
normalized = []
|
||||
for value in values:
|
||||
text = str(value).strip().lower()
|
||||
if text:
|
||||
normalized.append(text)
|
||||
return normalized
|
||||
|
||||
|
||||
def _match_keywords(haystack: str, keywords: set[str]) -> List[str]:
|
||||
matches = []
|
||||
for keyword in keywords:
|
||||
if keyword in haystack:
|
||||
matches.append(keyword)
|
||||
return sorted(set(matches))
|
||||
|
||||
|
||||
def screen_image_signals(
|
||||
image_path: Optional[str] = None,
|
||||
*,
|
||||
ocr_text: str = "",
|
||||
labels: Optional[Iterable[str]] = None,
|
||||
manual_notes: str = "",
|
||||
visual_flags: Optional[Iterable[str]] = None,
|
||||
) -> ImageScreeningResult:
|
||||
"""
|
||||
Score image-related crisis evidence without pretending to do full CV.
|
||||
|
||||
Inputs are deliberately grounded in what the repo can actually support today:
|
||||
- OCR text extracted upstream from screenshots/photos
|
||||
- object labels supplied by a local model or operator
|
||||
- manual operator notes about visible scene context
|
||||
- optional visual flags from any upstream preprocessor
|
||||
"""
|
||||
normalized_labels = _normalize_items(labels)
|
||||
normalized_flags = _normalize_items(visual_flags)
|
||||
normalized_notes = str(manual_notes or "").strip().lower()
|
||||
normalized_ocr = str(ocr_text or "").strip()
|
||||
|
||||
combined_label_text = " ".join(normalized_labels + normalized_flags + ([normalized_notes] if normalized_notes else []))
|
||||
crisis_text = " ".join(part for part in [normalized_ocr, normalized_notes] if part).strip()
|
||||
|
||||
direct_matches = _match_keywords(combined_label_text, DIRECT_SELF_HARM_LABELS)
|
||||
injury_matches = _match_keywords(combined_label_text, INJURY_LABELS)
|
||||
scene_matches = _match_keywords(combined_label_text, HIGH_RISK_SCENE_LABELS)
|
||||
farewell_matches = _match_keywords(crisis_text.lower(), FAREWELL_TEXT_PHRASES)
|
||||
text_detection = detect_crisis(crisis_text) if crisis_text else None
|
||||
|
||||
signals: List[str] = []
|
||||
score = 0.0
|
||||
|
||||
if direct_matches:
|
||||
score = max(score, 0.85)
|
||||
for match in direct_matches:
|
||||
signals.append(f"direct_self_harm_label:{match}")
|
||||
|
||||
if injury_matches:
|
||||
score = max(score, 0.55)
|
||||
for match in injury_matches:
|
||||
signals.append(f"injury_indicator:{match}")
|
||||
|
||||
if scene_matches:
|
||||
score = max(score, 0.4)
|
||||
for match in scene_matches:
|
||||
signals.append(f"high_risk_scene:{match}")
|
||||
|
||||
if farewell_matches:
|
||||
score = max(score, 0.85)
|
||||
for match in farewell_matches:
|
||||
signals.append(f"farewell_text:{match}")
|
||||
|
||||
if text_detection and text_detection.level != "NONE":
|
||||
score = max(score, min(1.0, text_detection.score))
|
||||
signals.append(f"ocr_crisis_level:{text_detection.level}")
|
||||
for indicator in text_detection.indicators[:3]:
|
||||
signals.append(f"ocr_indicator:{indicator}")
|
||||
|
||||
if direct_matches and text_detection and text_detection.level in {"HIGH", "CRITICAL"}:
|
||||
score = min(1.0, max(score, 0.95))
|
||||
signals.append("cross_modal_confirmation:text_plus_visual")
|
||||
|
||||
if direct_matches or (text_detection and text_detection.level == "CRITICAL") or score >= 0.85:
|
||||
crisis_image_score = "critical"
|
||||
elif score >= 0.4 or (text_detection and text_detection.level in {"HIGH", "MEDIUM"}):
|
||||
crisis_image_score = "concerning"
|
||||
else:
|
||||
crisis_image_score = "safe"
|
||||
|
||||
requires_human_review = score >= 0.4 or bool(direct_matches)
|
||||
|
||||
return ImageScreeningResult(
|
||||
ocr_text=normalized_ocr,
|
||||
labels=list(normalized_labels),
|
||||
visual_flags=list(normalized_flags),
|
||||
distress_score=round(score, 4),
|
||||
crisis_image_score=crisis_image_score,
|
||||
requires_human_review=requires_human_review,
|
||||
signals_detected=signals,
|
||||
)
|
||||
@@ -680,7 +680,7 @@ html, body {
|
||||
|
||||
<!-- Footer -->
|
||||
<footer id="footer">
|
||||
<a href="/about" aria-label="About The Door">about</a>
|
||||
<a href="/about.html" aria-label="About The Door">about</a>
|
||||
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
|
||||
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
|
||||
</footer>
|
||||
|
||||
115
tests/test_image_screening.py
Normal file
115
tests/test_image_screening.py
Normal file
@@ -0,0 +1,115 @@
|
||||
"""Tests for local image crisis screening slice under epic #130."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from crisis.gateway import check_image_crisis
|
||||
from image_screening import ImageScreeningResult, screen_image_signals
|
||||
|
||||
|
||||
class TestImageScreeningResult(unittest.TestCase):
|
||||
def test_to_dict_preserves_core_fields(self):
|
||||
result = ImageScreeningResult(
|
||||
ocr_text="help me",
|
||||
labels=["blood"],
|
||||
visual_flags=["dark_scene"],
|
||||
distress_score=0.8,
|
||||
requires_human_review=True,
|
||||
signals_detected=["direct_self_harm_label:blood"],
|
||||
)
|
||||
|
||||
data = result.to_dict()
|
||||
self.assertEqual(data["ocr_text"], "help me")
|
||||
self.assertEqual(data["labels"], ["blood"])
|
||||
self.assertTrue(data["requires_human_review"])
|
||||
|
||||
|
||||
class TestScreenImageSignals(unittest.TestCase):
|
||||
def test_direct_self_harm_labels_trigger_high_risk(self):
|
||||
result = screen_image_signals(
|
||||
labels=["razor blade", "blood droplets"],
|
||||
manual_notes="photo of fresh cuts on forearm",
|
||||
)
|
||||
|
||||
self.assertEqual(result.crisis_image_score, "critical")
|
||||
self.assertGreaterEqual(result.distress_score, 0.8)
|
||||
self.assertTrue(result.requires_human_review)
|
||||
self.assertTrue(any("self_harm" in signal for signal in result.signals_detected))
|
||||
|
||||
def test_ocr_text_uses_existing_crisis_detector(self):
|
||||
result = screen_image_signals(
|
||||
ocr_text="I want to kill myself tonight",
|
||||
labels=["handwritten note"],
|
||||
)
|
||||
|
||||
self.assertGreaterEqual(result.distress_score, 0.7)
|
||||
self.assertTrue(result.requires_human_review)
|
||||
self.assertTrue(any(signal.startswith("ocr_crisis_level:") for signal in result.signals_detected))
|
||||
|
||||
def test_neutral_image_stays_low_risk(self):
|
||||
result = screen_image_signals(
|
||||
labels=["dog", "park", "sunlight"],
|
||||
manual_notes="family outing in daylight",
|
||||
)
|
||||
|
||||
self.assertEqual(result.crisis_image_score, "safe")
|
||||
self.assertLess(result.distress_score, 0.2)
|
||||
self.assertFalse(result.requires_human_review)
|
||||
self.assertEqual(result.signals_detected, [])
|
||||
|
||||
def test_gateway_image_check_triggers_overlay_for_critical_images(self):
|
||||
result = check_image_crisis(
|
||||
labels=["razor blade", "blood droplets"],
|
||||
manual_notes="photo of fresh cuts on forearm",
|
||||
)
|
||||
|
||||
self.assertEqual(result["level"], "CRITICAL")
|
||||
self.assertEqual(result["image_score"], "critical")
|
||||
self.assertTrue(result["ui"]["show_overlay"])
|
||||
self.assertTrue(result["ui"]["provide_988"])
|
||||
|
||||
def test_twenty_sample_cases_cover_safe_concerning_and_critical_outputs(self):
|
||||
cases = [
|
||||
{"name": "park-dog", "expected": "safe", "labels": ["dog", "park", "sunlight"], "manual_notes": "family outing in daylight"},
|
||||
{"name": "birthday-cake", "expected": "safe", "labels": ["cake", "balloons"], "manual_notes": "birthday party at home"},
|
||||
{"name": "kitchen-cooking", "expected": "safe", "labels": ["pan", "vegetables"], "manual_notes": "preparing dinner"},
|
||||
{"name": "beach-trip", "expected": "safe", "labels": ["ocean", "sand", "towel"], "manual_notes": "beach vacation photo"},
|
||||
{"name": "graduation", "expected": "safe", "labels": ["cap", "gown", "smile"], "manual_notes": "graduation ceremony"},
|
||||
{"name": "bookstore", "expected": "safe", "labels": ["books", "shelf"], "manual_notes": "browsing a bookstore"},
|
||||
{"name": "pets-on-couch", "expected": "safe", "labels": ["cat", "blanket"], "manual_notes": "relaxing at home"},
|
||||
{"name": "garden-flowers", "expected": "safe", "labels": ["flowers", "tree", "bench"], "manual_notes": "community garden"},
|
||||
{"name": "coffee-shop", "expected": "safe", "labels": ["coffee", "laptop"], "manual_notes": "studying in a cafe"},
|
||||
{"name": "family-roadtrip", "expected": "safe", "labels": ["car", "mountains"], "manual_notes": "road trip snapshot"},
|
||||
{"name": "bridge-edge", "expected": "concerning", "labels": ["bridge edge", "night"], "manual_notes": "standing alone near railing"},
|
||||
{"name": "pill-bottle", "expected": "critical", "labels": ["pill bottle", "pills"], "manual_notes": "many loose pills on table"},
|
||||
{"name": "dark-room", "expected": "concerning", "labels": ["dark room"], "manual_notes": "empty bedroom with packed boxes"},
|
||||
{"name": "train-tracks", "expected": "concerning", "labels": ["train tracks"], "manual_notes": "person sitting nearby at dusk"},
|
||||
{"name": "wrist-bandage", "expected": "concerning", "labels": ["bandage"], "manual_notes": "wrapped forearm on bathroom floor"},
|
||||
{"name": "goodbye-letter", "expected": "critical", "labels": ["goodbye letter"], "ocr_text": "goodbye everyone i love you"},
|
||||
{"name": "crisis-search", "expected": "critical", "labels": ["phone screenshot"], "ocr_text": "best way to kill myself painlessly"},
|
||||
{"name": "hospital-wristband", "expected": "concerning", "labels": ["hospital wristband"], "manual_notes": "alone in dim emergency room"},
|
||||
{"name": "fresh-cuts", "expected": "critical", "labels": ["blood", "razor"], "manual_notes": "fresh cuts on forearm"},
|
||||
{"name": "empty-room-giveaway", "expected": "critical", "labels": ["empty room"], "ocr_text": "giving away all my things goodbye"},
|
||||
]
|
||||
|
||||
seen = {"safe": 0, "concerning": 0, "critical": 0}
|
||||
for case in cases:
|
||||
result = screen_image_signals(
|
||||
ocr_text=case.get("ocr_text", ""),
|
||||
labels=case.get("labels", []),
|
||||
manual_notes=case.get("manual_notes", ""),
|
||||
)
|
||||
self.assertEqual(result.crisis_image_score, case["expected"], case["name"])
|
||||
seen[case["expected"]] += 1
|
||||
|
||||
self.assertEqual(sum(seen.values()), 20)
|
||||
self.assertEqual(seen["safe"], 10)
|
||||
self.assertGreaterEqual(seen["concerning"], 5)
|
||||
self.assertGreaterEqual(seen["critical"], 5)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,350 +0,0 @@
|
||||
"""
|
||||
voice_analysis.py — Voice message distress analysis via paralinguistic features.
|
||||
|
||||
Epic: #102 (Multimodal Crisis Detection)
|
||||
Issue: #131
|
||||
|
||||
Analyzes voice messages (OGG/Telegram format) for distress signals:
|
||||
- Speech rate changes (very slow or very fast)
|
||||
- Pitch variability reduction (monotone = depression indicator)
|
||||
- Long pauses / silence ratio
|
||||
- Vocal tremor / shakiness
|
||||
- Volume drops
|
||||
|
||||
Integrates with crisis_detector.py text-based detection for multimodal coverage.
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import subprocess
|
||||
import tempfile
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class VoiceAnalysisResult:
|
||||
"""Result of paralinguistic analysis on a voice message."""
|
||||
transcript: str = ""
|
||||
speech_rate: float = 0.0 # words per minute
|
||||
pitch_mean: float = 0.0 # Hz, average fundamental frequency
|
||||
pitch_variability: float = 0.0 # std dev of pitch (low = monotone)
|
||||
silence_ratio: float = 0.0 # 0-1, fraction of audio that is silence
|
||||
tremor_score: float = 0.0 # 0-1, vocal shakiness estimate
|
||||
volume_drop_score: float = 0.0 # 0-1, sudden volume decreases
|
||||
distress_score: float = 0.0 # 0-1, composite distress indicator
|
||||
signals_detected: list = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
# === THRESHOLDS ===
|
||||
|
||||
# Speech rate: normal is ~120-150 WPM
|
||||
# Very slow (<80) or very fast (>200) are distress indicators
|
||||
SPEECH_RATE_SLOW = 80
|
||||
SPEECH_RATE_FAST = 200
|
||||
SPEECH_RATE_NORMAL_LOW = 100
|
||||
SPEECH_RATE_NORMAL_HIGH = 170
|
||||
|
||||
# Pitch variability: normal conversation has std dev ~30-50 Hz
|
||||
# Monotone (<15 Hz) is a depression indicator
|
||||
PITCH_VARIABILITY_LOW = 15.0 # Hz — monotone threshold
|
||||
PITCH_VARIABILITY_NORMAL = 30.0
|
||||
|
||||
# Silence ratio: normal has ~10-20% silence
|
||||
# Excessive silence (>40%) or very little (<3%) may indicate distress
|
||||
SILENCE_RATIO_HIGH = 0.4
|
||||
SILENCE_RATIO_LOW = 0.03
|
||||
|
||||
# Composite thresholds
|
||||
DISTRESS_LOW = 0.3
|
||||
DISTRESS_MEDIUM = 0.7
|
||||
|
||||
|
||||
# === CORE ANALYSIS ===
|
||||
|
||||
def _convert_to_wav(audio_path: str) -> str:
|
||||
"""Convert audio to WAV format for analysis. Returns path to temp WAV file."""
|
||||
wav_path = tempfile.mktemp(suffix='.wav')
|
||||
try:
|
||||
subprocess.run(
|
||||
['ffmpeg', '-i', audio_path, '-ar', '16000', '-ac', '1', '-y', wav_path],
|
||||
capture_output=True, timeout=30
|
||||
)
|
||||
if not os.path.exists(wav_path):
|
||||
# Fallback: if ffmpeg not available, try the original file
|
||||
return audio_path
|
||||
return wav_path
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return audio_path
|
||||
|
||||
|
||||
def _transcribe(audio_path: str) -> str:
|
||||
"""Transcribe audio using whisper (if available) or return empty string."""
|
||||
try:
|
||||
import whisper
|
||||
model = whisper.load_model("base")
|
||||
result = model.transcribe(audio_path)
|
||||
return result.get("text", "").strip()
|
||||
except ImportError:
|
||||
# Whisper not available — skip transcription
|
||||
return ""
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def _load_audio_numpy(audio_path: str) -> tuple:
|
||||
"""Load audio as numpy array. Returns (samples, sample_rate) or (None, None)."""
|
||||
try:
|
||||
import librosa
|
||||
samples, sr = librosa.load(audio_path, sr=16000, mono=True)
|
||||
return samples, sr
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
import soundfile as sf
|
||||
samples, sr = sf.read(audio_path)
|
||||
if len(samples.shape) > 1:
|
||||
samples = samples.mean(axis=1) # mono
|
||||
return samples, sr
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
def _analyze_speech_rate(transcript: str, duration_sec: float) -> float:
|
||||
"""Calculate words per minute from transcript and audio duration."""
|
||||
if not transcript or duration_sec <= 0:
|
||||
return 0.0
|
||||
words = len(transcript.split())
|
||||
minutes = duration_sec / 60.0
|
||||
return words / minutes if minutes > 0 else 0.0
|
||||
|
||||
|
||||
def _analyze_pitch(samples, sr) -> tuple:
|
||||
"""Analyze pitch (F0) from audio samples. Returns (mean_hz, variability_hz)."""
|
||||
try:
|
||||
import librosa
|
||||
f0, voiced_flag, _ = librosa.pyin(
|
||||
samples, fmin=librosa.note_to_hz('C2'),
|
||||
fmax=librosa.note_to_hz('C7'), sr=sr
|
||||
)
|
||||
import numpy as np
|
||||
f0_clean = f0[~np.isnan(f0)]
|
||||
if len(f0_clean) == 0:
|
||||
return 0.0, 0.0
|
||||
return float(np.mean(f0_clean)), float(np.std(f0_clean))
|
||||
except (ImportError, Exception):
|
||||
return 0.0, 0.0
|
||||
|
||||
|
||||
def _analyze_silence(samples, sr, threshold_db: float = -40.0) -> float:
|
||||
"""Calculate ratio of silence in audio (0-1)."""
|
||||
try:
|
||||
import librosa
|
||||
import numpy as np
|
||||
rms = librosa.feature.rms(y=samples)[0]
|
||||
rms_db = librosa.amplitude_to_db(rms, ref=np.max)
|
||||
silence_frames = np.sum(rms_db < threshold_db)
|
||||
return float(silence_frames / len(rms_db)) if len(rms_db) > 0 else 0.0
|
||||
except (ImportError, Exception):
|
||||
return 0.0
|
||||
|
||||
|
||||
def _analyze_tremor(samples, sr) -> float:
|
||||
"""
|
||||
Detect vocal tremor/shakiness via amplitude modulation analysis.
|
||||
|
||||
Tremor manifests as periodic amplitude fluctuations (3-12 Hz range).
|
||||
Returns 0-1 score where 1 = strong tremor detected.
|
||||
"""
|
||||
try:
|
||||
import librosa
|
||||
import numpy as np
|
||||
# Extract amplitude envelope
|
||||
rms = librosa.feature.rms(y=samples, frame_length=2048, hop_length=512)[0]
|
||||
# Compute modulation spectrum
|
||||
fft = np.abs(np.fft.rfft(rms))
|
||||
freqs = np.fft.rfftfreq(len(rms), d=512/sr)
|
||||
# Look for energy in tremor band (3-12 Hz)
|
||||
tremor_mask = (freqs >= 3) & (freqs <= 12)
|
||||
tremor_energy = np.sum(fft[tremor_mask])
|
||||
total_energy = np.sum(fft[1:]) # skip DC
|
||||
if total_energy == 0:
|
||||
return 0.0
|
||||
ratio = tremor_energy / total_energy
|
||||
return float(min(1.0, ratio * 5)) # normalize — typical tremor is 0.1-0.3 of total
|
||||
except (ImportError, Exception):
|
||||
return 0.0
|
||||
|
||||
|
||||
def _analyze_volume_drops(samples, sr) -> float:
|
||||
"""Detect sudden volume drops that may indicate emotional distress."""
|
||||
try:
|
||||
import librosa
|
||||
import numpy as np
|
||||
rms = librosa.feature.rms(y=samples, frame_length=2048, hop_length=512)[0]
|
||||
if len(rms) < 2:
|
||||
return 0.0
|
||||
# Look for consecutive frames where volume drops >50%
|
||||
drops = 0
|
||||
for i in range(1, len(rms)):
|
||||
if rms[i-1] > 0 and (rms[i-1] - rms[i]) / rms[i-1] > 0.5:
|
||||
drops += 1
|
||||
return float(min(1.0, drops / (len(rms) * 0.1)))
|
||||
except (ImportError, Exception):
|
||||
return 0.0
|
||||
|
||||
|
||||
def _compute_distress_score(result: VoiceAnalysisResult) -> tuple:
|
||||
"""
|
||||
Compute composite distress score from paralinguistic features.
|
||||
|
||||
Returns (score, signals_detected).
|
||||
"""
|
||||
signals = []
|
||||
score = 0.0
|
||||
weights = 0
|
||||
|
||||
# Speech rate (0.2 weight)
|
||||
if result.speech_rate > 0:
|
||||
if result.speech_rate < SPEECH_RATE_SLOW:
|
||||
signals.append(f"very_slow_speech ({result.speech_rate:.0f} WPM)")
|
||||
score += 0.8 * 0.2
|
||||
elif result.speech_rate > SPEECH_RATE_FAST:
|
||||
signals.append(f"very_fast_speech ({result.speech_rate:.0f} WPM)")
|
||||
score += 0.6 * 0.2
|
||||
elif result.speech_rate < SPEECH_RATE_NORMAL_LOW:
|
||||
score += 0.3 * 0.2
|
||||
weights += 0.2
|
||||
|
||||
# Pitch variability (0.25 weight — monotone is strong depression indicator)
|
||||
if result.pitch_variability > 0:
|
||||
if result.pitch_variability < PITCH_VARIABILITY_LOW:
|
||||
signals.append(f"monotone_voice (variability={result.pitch_variability:.1f} Hz)")
|
||||
score += 0.9 * 0.25
|
||||
elif result.pitch_variability < PITCH_VARIABILITY_NORMAL:
|
||||
signals.append(f"reduced_pitch_variability ({result.pitch_variability:.1f} Hz)")
|
||||
score += 0.5 * 0.25
|
||||
weights += 0.25
|
||||
|
||||
# Silence ratio (0.2 weight)
|
||||
if result.silence_ratio > 0:
|
||||
if result.silence_ratio > SILENCE_RATIO_HIGH:
|
||||
signals.append(f"excessive_silence ({result.silence_ratio:.0%})")
|
||||
score += 0.7 * 0.2
|
||||
elif result.silence_ratio < SILENCE_RATIO_LOW:
|
||||
signals.append(f"minimal_pauses ({result.silence_ratio:.0%})")
|
||||
score += 0.3 * 0.2
|
||||
weights += 0.2
|
||||
|
||||
# Tremor (0.2 weight)
|
||||
if result.tremor_score > 0:
|
||||
if result.tremor_score > 0.5:
|
||||
signals.append(f"vocal_tremor (score={result.tremor_score:.2f})")
|
||||
score += result.tremor_score * 0.2
|
||||
weights += 0.2
|
||||
|
||||
# Volume drops (0.15 weight)
|
||||
if result.volume_drop_score > 0:
|
||||
if result.volume_drop_score > 0.4:
|
||||
signals.append(f"volume_drops (score={result.volume_drop_score:.2f})")
|
||||
score += result.volume_drop_score * 0.15
|
||||
weights += 0.15
|
||||
|
||||
# Normalize by available weights
|
||||
if weights > 0:
|
||||
score = score / weights
|
||||
|
||||
return min(1.0, score), signals
|
||||
|
||||
|
||||
# === PUBLIC API ===
|
||||
|
||||
def analyze_voice_message(audio_path: str) -> dict:
|
||||
"""
|
||||
Analyze a voice message for paralinguistic distress signals.
|
||||
|
||||
Args:
|
||||
audio_path: Path to audio file (OGG, WAV, MP3, etc.)
|
||||
|
||||
Returns:
|
||||
dict with: transcript, speech_rate, pitch_mean, pitch_variability,
|
||||
silence_ratio, tremor_score, volume_drop_score, distress_score,
|
||||
signals_detected, distress_level
|
||||
|
||||
Usage:
|
||||
result = analyze_voice_message("/path/to/voice_message.ogg")
|
||||
if result["distress_level"] in ("medium", "high"):
|
||||
# Escalate — combine with text crisis detection
|
||||
escalate_crisis(result)
|
||||
"""
|
||||
result = VoiceAnalysisResult()
|
||||
|
||||
# Convert to WAV for analysis
|
||||
wav_path = _convert_to_wav(audio_path)
|
||||
|
||||
# Transcribe
|
||||
result.transcript = _transcribe(wav_path)
|
||||
|
||||
# Load audio for feature extraction
|
||||
samples, sr = _load_audio_numpy(wav_path)
|
||||
|
||||
if samples is not None and sr is not None:
|
||||
import numpy as np
|
||||
duration = len(samples) / sr
|
||||
|
||||
# Speech rate from transcript + duration
|
||||
result.speech_rate = _analyze_speech_rate(result.transcript, duration)
|
||||
|
||||
# Pitch analysis
|
||||
result.pitch_mean, result.pitch_variability = _analyze_pitch(samples, sr)
|
||||
|
||||
# Silence ratio
|
||||
result.silence_ratio = _analyze_silence(samples, sr)
|
||||
|
||||
# Tremor detection
|
||||
result.tremor_score = _analyze_tremor(samples, sr)
|
||||
|
||||
# Volume drops
|
||||
result.volume_drop_score = _analyze_volume_drops(samples, sr)
|
||||
|
||||
# Composite distress score
|
||||
result.distress_score, result.signals_detected = _compute_distress_score(result)
|
||||
|
||||
# Clean up temp file
|
||||
if wav_path != audio_path and os.path.exists(wav_path):
|
||||
os.unlink(wav_path)
|
||||
|
||||
# Classify distress level
|
||||
if result.distress_score >= DISTRESS_MEDIUM:
|
||||
distress_level = "high"
|
||||
elif result.distress_score >= DISTRESS_LOW:
|
||||
distress_level = "medium"
|
||||
elif result.distress_score > 0:
|
||||
distress_level = "low"
|
||||
else:
|
||||
distress_level = "none"
|
||||
|
||||
output = result.to_dict()
|
||||
output["distress_level"] = distress_level
|
||||
return output
|
||||
|
||||
|
||||
def get_audio_duration(audio_path: str) -> float:
|
||||
"""Get audio duration in seconds."""
|
||||
try:
|
||||
import librosa
|
||||
duration = librosa.get_duration(path=audio_path)
|
||||
return float(duration)
|
||||
except (ImportError, Exception):
|
||||
try:
|
||||
import soundfile as sf
|
||||
info = sf.info(audio_path)
|
||||
return float(info.duration)
|
||||
except (ImportError, Exception):
|
||||
return 0.0
|
||||
Reference in New Issue
Block a user