Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
2b91272f4e feat: CLI command to view crisis metrics summary (#136)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 9s
Smoke Test / smoke (pull_request) Successful in 14s
New crisis/metrics.py:
- CrisisEvent dataclass for individual events
- log_event(): append to ~/.the-door/crisis-metrics.jsonl
- load_events(): load last N days from JSONL
- compute_summary(): aggregate by level, top indicators,
  sessions affected, daily average, peak day
- MetricsSummary dataclass
- format_summary(): human-readable report with bar chart
- CLI: python3 -m crisis.metrics --summary/--json/--last 7d/--log

Makefile targets:
- make metrics    — weekly summary report
- make metrics-json — raw JSON export

Closes #136
2026-04-15 12:41:07 -04:00
10 changed files with 209 additions and 938 deletions

View File

@@ -46,3 +46,12 @@ ssl:
service:
ssh root@$(VPS) "cd /opt/the-door && bash deploy/deploy.sh --service"
# Crisis metrics
.PHONY: metrics metrics-json
metrics: ## Show crisis metrics summary (last 7 days)
python3 -m crisis.metrics --summary
metrics-json: ## Export crisis metrics as JSON
python3 -m crisis.metrics --json

View File

@@ -7,7 +7,6 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
__all__ = [
"detect_crisis",
@@ -20,7 +19,4 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
]

View File

@@ -22,7 +22,6 @@ from .response import (
get_system_prompt_modifier,
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
def check_crisis(text: str) -> dict:

199
crisis/metrics.py Normal file
View File

@@ -0,0 +1,199 @@
"""Crisis metrics — aggregate detection data for operators.
Tracks crisis detection events and provides summary reports.
Usage:
python3 -m crisis.metrics --summary # weekly report
python3 -m crisis.metrics --json # raw JSON export
python3 -m crisis.metrics --last 7d # last 7 days
"""
from __future__ import annotations
import json
import os
import sys
import time
from collections import Counter
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional
# Data directory for metrics storage
_DATA_DIR = Path(os.getenv("CRISIS_DATA_DIR", str(Path.home() / ".the-door")))
_METRICS_FILE = _DATA_DIR / "crisis-metrics.jsonl"
@dataclass
class CrisisEvent:
"""A single crisis detection event."""
timestamp: float
level: str # NONE, LOW, MODERATE, HIGH, CRITICAL
indicators: list
session_id: str = ""
source: str = "" # "chat", "gateway", "cli"
@dataclass
class MetricsSummary:
"""Aggregated metrics summary."""
period_days: int
total_events: int
by_level: Dict[str, int]
top_indicators: List[tuple]
sessions_affected: int
avg_daily: float
peak_day: str
peak_count: int
generated_at: str
def log_event(event: CrisisEvent) -> None:
"""Log a crisis event to the metrics file."""
_DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(_METRICS_FILE, "a") as f:
f.write(json.dumps(asdict(event)) + "\n")
def load_events(days: int = 7) -> List[CrisisEvent]:
"""Load crisis events from the last N days."""
if not _METRICS_FILE.exists():
return []
cutoff = time.time() - (days * 86400)
events = []
try:
with open(_METRICS_FILE) as f:
for line in f:
line = line.strip()
if not line:
continue
data = json.loads(line)
if data.get("timestamp", 0) >= cutoff:
events.append(CrisisEvent(**data))
except (json.JSONDecodeError, KeyError):
pass
return events
def compute_summary(days: int = 7) -> MetricsSummary:
"""Compute metrics summary for the given period."""
events = load_events(days)
now = time.time()
# By level
by_level = Counter(e.level for e in events)
# Top indicators
indicator_counts = Counter()
for e in events:
for ind in e.indicators:
indicator_counts[ind] += 1
top_indicators = indicator_counts.most_common(10)
# Sessions
sessions = set(e.session_id for e in events if e.session_id)
# Peak day
from collections import defaultdict
daily = defaultdict(int)
for e in events:
day = time.strftime("%Y-%m-%d", time.localtime(e.timestamp))
daily[day] += 1
peak_day = max(daily, key=daily.get) if daily else "N/A"
peak_count = daily.get(peak_day, 0)
return MetricsSummary(
period_days=days,
total_events=len(events),
by_level=dict(by_level),
top_indicators=top_indicators,
sessions_affected=len(sessions),
avg_daily=round(len(events) / max(days, 1), 1),
peak_day=peak_day,
peak_count=peak_count,
generated_at=time.strftime("%Y-%m-%d %H:%M:%S"),
)
def format_summary(summary: MetricsSummary) -> str:
"""Format metrics summary as human-readable report."""
lines = [
"Crisis Metrics Summary",
"=" * 40,
f"Period: Last {summary.period_days} days",
f"Generated: {summary.generated_at}",
"",
f"Total events: {summary.total_events}",
f"Daily avg: {summary.avg_daily}",
f"Sessions: {summary.sessions_affected}",
f"Peak day: {summary.peak_day} ({summary.peak_count} events)",
"",
]
if summary.by_level:
lines.append("By severity:")
for level in ["CRITICAL", "HIGH", "MODERATE", "LOW", "NONE"]:
count = summary.by_level.get(level, 0)
if count > 0:
bar = "" * min(count, 30)
lines.append(f" {level:10s} {count:4d} {bar}")
lines.append("")
if summary.top_indicators:
lines.append("Top indicators:")
for indicator, count in summary.top_indicators[:5]:
lines.append(f" {indicator}: {count}")
lines.append("")
if summary.total_events == 0:
lines.append("No crisis events in this period.")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Crisis metrics summary")
parser.add_argument("--summary", action="store_true", help="Print summary report")
parser.add_argument("--json", action="store_true", dest="as_json", help="Output JSON")
parser.add_argument("--last", default="7d", help="Time period (e.g., 7d, 30d)")
parser.add_argument("--log", nargs=2, metavar=("LEVEL", "INDICATOR"), help="Log a test event")
args = parser.parse_args()
# Parse period
period_str = args.last.rstrip("d")
try:
days = int(period_str)
except ValueError:
days = 7
# Log mode
if args.log:
level, indicator = args.log
event = CrisisEvent(
timestamp=time.time(),
level=level.upper(),
indicators=[indicator],
session_id="cli-test",
source="cli",
)
log_event(event)
print(f"Logged: {level.upper()} / {indicator}")
return 0
# Compute summary
summary = compute_summary(days)
if args.as_json:
print(json.dumps(asdict(summary), indent=2))
else:
print(format_summary(summary))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,259 +0,0 @@
"""
Session-level crisis tracking and escalation for the-door (P0 #35).
Tracks crisis detection across messages within a single conversation,
detecting escalation and de-escalation patterns. Privacy-first: no
persistence beyond the conversation session.
Each message is analyzed in isolation by detect.py, but this module
maintains session state so the system can recognize patterns like:
- "I'm fine""I'm struggling""I can't go on" (rapid escalation)
- "I want to die""I'm calmer now""feeling better" (de-escalation)
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
# Feed each message's detection result
state = tracker.record(detect_crisis("I'm having a tough day"))
print(state.current_level) # "LOW"
print(state.is_escalating) # False
state = tracker.record(detect_crisis("I feel hopeless"))
print(state.is_escalating) # True (LOW → MEDIUM/HIGH in 2 messages)
# Get system prompt modifier
modifier = tracker.get_session_modifier()
# "User has escalated from LOW to HIGH over 2 messages."
# Reset for new session
tracker.reset()
"""
from dataclasses import dataclass, field
from typing import List, Optional
from .detect import CrisisDetectionResult, SCORES
# Level ordering for comparison (higher = more severe)
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class SessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[str] = field(default_factory=list)
is_escalating: bool = False
is_deescalating: bool = False
escalation_rate: float = 0.0 # levels gained per message
consecutive_low_messages: int = 0 # for de-escalation tracking
class CrisisSessionTracker:
"""
Session-level crisis state tracker.
Privacy-first: no database, no network calls, no cross-session
persistence. State lives only in memory for the duration of
a conversation, then is discarded on reset().
"""
# Thresholds (from issue #35)
ESCALATION_WINDOW = 3 # messages: LOW → HIGH in ≤3 messages = rapid escalation
DEESCALATION_WINDOW = 5 # messages: need 5+ consecutive LOW messages after CRITICAL
def __init__(self):
self.reset()
def reset(self):
"""Reset all session state. Call on new conversation."""
self._current_level = "NONE"
self._peak_level = "NONE"
self._message_count = 0
self._level_history: List[str] = []
self._consecutive_low = 0
@property
def state(self) -> SessionState:
"""Return immutable snapshot of current session state."""
is_escalating = self._detect_escalation()
is_deescalating = self._detect_deescalation()
rate = self._compute_escalation_rate()
return SessionState(
current_level=self._current_level,
peak_level=self._peak_level,
message_count=self._message_count,
level_history=list(self._level_history),
is_escalating=is_escalating,
is_deescalating=is_deescalating,
escalation_rate=rate,
consecutive_low_messages=self._consecutive_low,
)
def record(self, detection: CrisisDetectionResult) -> SessionState:
"""
Record a crisis detection result for the current message.
Returns updated SessionState.
"""
level = detection.level
self._message_count += 1
self._level_history.append(level)
# Update peak
if LEVEL_ORDER.get(level, 0) > LEVEL_ORDER.get(self._peak_level, 0):
self._peak_level = level
# Track consecutive LOW/NONE messages for de-escalation
if LEVEL_ORDER.get(level, 0) <= LEVEL_ORDER["LOW"]:
self._consecutive_low += 1
else:
self._consecutive_low = 0
self._current_level = level
return self.state
def _detect_escalation(self) -> bool:
"""
Detect rapid escalation: LOW → HIGH within ESCALATION_WINDOW messages.
Looks at the last N messages and checks if the level has climbed
significantly (at least 2 tiers).
"""
if len(self._level_history) < 2:
return False
window = self._level_history[-self.ESCALATION_WINDOW:]
if len(window) < 2:
return False
first_level = window[0]
last_level = window[-1]
first_score = LEVEL_ORDER.get(first_level, 0)
last_score = LEVEL_ORDER.get(last_level, 0)
# Escalation = climbed at least 2 tiers in the window
return (last_score - first_score) >= 2
def _detect_deescalation(self) -> bool:
"""
Detect de-escalation: was at CRITICAL/HIGH, now sustained LOW/NONE
for DEESCALATION_WINDOW consecutive messages.
"""
if LEVEL_ORDER.get(self._peak_level, 0) < LEVEL_ORDER["HIGH"]:
return False
return self._consecutive_low >= self.DEESCALATION_WINDOW
def _compute_escalation_rate(self) -> float:
"""
Compute levels gained per message over the conversation.
Positive = escalating, negative = de-escalating, 0 = stable.
"""
if self._message_count < 2:
return 0.0
first = LEVEL_ORDER.get(self._level_history[0], 0)
current = LEVEL_ORDER.get(self._current_level, 0)
return (current - first) / (self._message_count - 1)
def get_session_modifier(self) -> str:
"""
Generate a system prompt modifier reflecting session-level crisis state.
Returns empty string if no session context is relevant.
"""
if self._message_count < 2:
return ""
s = self.state
if s.is_escalating:
return (
f"User has escalated from {self._level_history[0]} to "
f"{s.current_level} over {s.message_count} messages. "
f"Peak crisis level this session: {s.peak_level}. "
"Respond with heightened awareness. The trajectory is "
"worsening — prioritize safety and connection."
)
if s.is_deescalating:
return (
f"User previously reached {s.peak_level} crisis level "
f"but has been at {s.current_level} or below for "
f"{s.consecutive_low_messages} consecutive messages. "
"The situation appears to be stabilizing. Continue "
"supportive engagement while remaining vigilant."
)
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
return (
f"User previously reached {s.peak_level} crisis level "
f"this session (currently {s.current_level}). "
"Continue with care and awareness of the earlier crisis."
)
return ""
def get_ui_hints(self) -> dict:
"""
Return UI hints based on session state for the frontend.
These are advisory — the frontend decides what to show.
"""
s = self.state
hints = {
"session_escalating": s.is_escalating,
"session_deescalating": s.is_deescalating,
"session_peak_level": s.peak_level,
"session_message_count": s.message_count,
}
if s.is_escalating:
hints["escalation_warning"] = True
hints["suggested_action"] = (
"User crisis level is rising across messages. "
"Consider increasing intervention level."
)
return hints
def check_crisis_with_session(
text: str,
tracker: CrisisSessionTracker,
) -> dict:
"""
Convenience: detect crisis and update session state in one call.
Returns combined single-message detection + session-level context.
"""
from .detect import detect_crisis
from .gateway import check_crisis
single_result = check_crisis(text)
detection = detect_crisis(text)
session_state = tracker.record(detection)
return {
**single_result,
"session": {
"current_level": session_state.current_level,
"peak_level": session_state.peak_level,
"message_count": session_state.message_count,
"is_escalating": session_state.is_escalating,
"is_deescalating": session_state.is_deescalating,
"modifier": tracker.get_session_modifier(),
"ui_hints": tracker.get_ui_hints(),
},
}

View File

@@ -808,7 +808,6 @@ Sovereignty and service always.`;
var crisisPanel = document.getElementById('crisis-panel');
var crisisOverlay = document.getElementById('crisis-overlay');
var overlayDismissBtn = document.getElementById('overlay-dismiss-btn');
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
@@ -1051,8 +1050,7 @@ Sovereignty and service always.`;
}
}, 1000);
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
overlayDismissBtn.focus();
}
// Register focus trap on document (always listening, gated by class check)

View File

@@ -52,34 +52,6 @@ class TestCrisisOverlayFocusTrap(unittest.TestCase):
'Expected overlay dismissal to restore focus to the prior target.',
)
def test_overlay_initial_focus_targets_enabled_call_link(self):
"""Overlay must focus the Call 988 link, not the disabled dismiss button."""
# Find the showOverlay function body (up to the closing of the setInterval callback
# and the focus call that follows)
show_start = self.html.find('function showOverlay()')
self.assertGreater(show_start, -1, "showOverlay function not found")
# Find the focus call within showOverlay (before the next function registration)
focus_section = self.html[show_start:show_start + 2000]
self.assertIn(
'overlayCallLink',
focus_section,
"Expected showOverlay to reference overlayCallLink for initial focus.",
)
# Ensure the old buggy pattern is gone
focus_line_region = self.html[show_start + 800:show_start + 1200]
self.assertNotIn(
'overlayDismissBtn.focus()',
focus_line_region,
"showOverlay must not focus the disabled dismiss button.",
)
def test_overlay_call_link_variable_is_declared(self):
self.assertIn(
"querySelector('.overlay-call')",
self.html,
"Expected a JS reference to the .overlay-call link element.",
)
if __name__ == '__main__':
unittest.main()

View File

@@ -50,22 +50,6 @@ class TestCrisisOfflinePage(unittest.TestCase):
for phrase in required_phrases:
self.assertIn(phrase, self.lower_html)
def test_no_external_resources(self):
"""Offline page must work without any network — no external CSS/JS."""
import re
html = self.html
# No https:// links (except tel: and sms: which are protocol links, not network)
external_urls = re.findall(r'href=["\']https://|src=["\']https://', html)
self.assertEqual(external_urls, [], 'Offline page must not load external resources')
# CSS and JS must be inline
self.assertIn('<style>', html, 'CSS must be inline')
self.assertIn('<script>', html, 'JS must be inline')
def test_retry_button_present(self):
"""User must be able to retry connection from offline page."""
self.assertIn('retry-connection', self.html)
self.assertIn('Retry connection', self.html)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,277 +0,0 @@
"""
Tests for crisis session tracking and escalation (P0 #35).
Covers: session_tracker.py
Run with: python -m pytest tests/test_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
from crisis.session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
class TestSessionState(unittest.TestCase):
"""Test SessionState defaults."""
def test_default_state(self):
s = SessionState()
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.is_escalating)
self.assertFalse(s.is_deescalating)
class TestSessionTracking(unittest.TestCase):
"""Test basic session state tracking."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_record_none_message(self):
state = self.tracker.record(detect_crisis("Hello Timmy"))
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.message_count, 1)
self.assertEqual(state.peak_level, "NONE")
def test_record_low_message(self):
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("Having a rough day"))
self.assertIn(state.current_level, ("LOW", "NONE"))
self.assertEqual(state.message_count, 2)
def test_record_critical_updates_peak(self):
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to kill myself"))
self.assertEqual(state.current_level, "CRITICAL")
self.assertEqual(state.peak_level, "CRITICAL")
def test_peak_preserved_after_drop(self):
"""Peak level should stay at the highest seen, even after de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
state = self.tracker.record(detect_crisis("I'm feeling a bit better"))
self.assertEqual(state.peak_level, "CRITICAL")
def test_level_history(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to die"))
self.assertEqual(len(state.level_history), 3)
self.assertEqual(state.level_history[0], "NONE")
self.assertEqual(state.level_history[2], "CRITICAL")
def test_reset_clears_state(self):
self.tracker.record(detect_crisis("I want to kill myself"))
self.tracker.reset()
state = self.tracker.state
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.peak_level, "NONE")
self.assertEqual(state.message_count, 0)
self.assertEqual(state.level_history, [])
class TestEscalationDetection(unittest.TestCase):
"""Test escalation detection: LOW → HIGH in ≤3 messages."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_escalation_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_escalating)
def test_no_escalation_stable(self):
"""Two normal messages should not trigger escalation."""
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("How are you?"))
self.assertFalse(state.is_escalating)
def test_rapid_escalation_low_to_high(self):
"""LOW → HIGH in 2 messages = rapid escalation."""
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I can't take this anymore, everything is pointless"))
# Depending on detection, this could be HIGH or CRITICAL
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_rapid_escalation_three_messages(self):
"""NONE → LOW → HIGH in 3 messages = escalation."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_escalation_rate(self):
"""Rate should be positive when escalating."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
state = self.tracker.state
self.assertGreater(state.escalation_rate, 0)
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: sustained LOW after HIGH/CRITICAL."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_deescalation_without_prior_crisis(self):
"""No de-escalation if never reached HIGH/CRITICAL."""
for _ in range(6):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_deescalating)
def test_deescalation_after_critical(self):
"""5+ consecutive LOW/NONE messages after CRITICAL = de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm doing better today"))
state = self.tracker.state
if state.peak_level == "CRITICAL":
self.assertTrue(state.is_deescalating)
def test_deescalation_after_high(self):
"""5+ consecutive LOW/NONE messages after HIGH = de-escalation."""
self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
for _ in range(5):
self.tracker.record(detect_crisis("Feeling okay"))
state = self.tracker.state
if state.peak_level == "HIGH":
self.assertTrue(state.is_deescalating)
def test_interrupted_deescalation(self):
"""De-escalation resets if a HIGH message interrupts."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(3):
self.tracker.record(detect_crisis("Doing better"))
# Interrupt with another crisis
self.tracker.record(detect_crisis("I feel hopeless again"))
self.tracker.record(detect_crisis("Feeling okay now"))
state = self.tracker.state
# Should NOT be de-escalating yet (counter reset)
self.assertFalse(state.is_deescalating)
class TestSessionModifier(unittest.TestCase):
"""Test system prompt modifier generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_modifier_for_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_no_modifier_for_stable_session(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Good morning"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_escalation_modifier(self):
"""Escalating session should produce a modifier."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_escalating:
self.assertIn("escalated", modifier.lower())
self.assertIn("NONE", modifier)
self.assertIn("CRITICAL", modifier)
def test_deescalation_modifier(self):
"""De-escalating session should mention stabilizing."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm feeling okay"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_deescalating:
self.assertIn("stabilizing", modifier.lower())
def test_prior_crisis_modifier(self):
"""Past crisis should be noted even without active escalation."""
self.tracker.record(detect_crisis("I want to die"))
self.tracker.record(detect_crisis("Feeling a bit better"))
modifier = self.tracker.get_session_modifier()
# Should note the prior CRITICAL
if modifier:
self.assertIn("CRITICAL", modifier)
class TestUIHints(unittest.TestCase):
"""Test UI hint generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_ui_hints_structure(self):
self.tracker.record(detect_crisis("Hello"))
hints = self.tracker.get_ui_hints()
self.assertIn("session_escalating", hints)
self.assertIn("session_deescalating", hints)
self.assertIn("session_peak_level", hints)
self.assertIn("session_message_count", hints)
def test_ui_hints_escalation_warning(self):
"""Escalating session should have warning hint."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
hints = self.tracker.get_ui_hints()
if hints["session_escalating"]:
self.assertTrue(hints.get("escalation_warning"))
self.assertIn("suggested_action", hints)
class TestCheckCrisisWithSession(unittest.TestCase):
"""Test the convenience function combining detection + session tracking."""
def test_returns_combined_data(self):
tracker = CrisisSessionTracker()
result = check_crisis_with_session("I want to die", tracker)
self.assertIn("level", result)
self.assertIn("session", result)
self.assertIn("current_level", result["session"])
self.assertIn("peak_level", result["session"])
self.assertIn("modifier", result["session"])
def test_session_updates_across_calls(self):
tracker = CrisisSessionTracker()
check_crisis_with_session("Hello", tracker)
result = check_crisis_with_session("I want to die", tracker)
self.assertEqual(result["session"]["message_count"], 2)
self.assertEqual(result["session"]["peak_level"], "CRITICAL")
class TestPrivacy(unittest.TestCase):
"""Verify privacy-first design principles."""
def test_no_persistence_mechanism(self):
"""Session tracker should have no database, file, or network calls."""
import inspect
source = inspect.getsource(CrisisSessionTracker)
# Should not import database, requests, or file I/O
forbidden = ["sqlite", "requests", "urllib", "open(", "httpx", "aiohttp"]
for word in forbidden:
self.assertNotIn(word, source.lower(),
f"Session tracker should not use {word} — privacy-first design")
def test_state_contained_in_memory(self):
"""All state should be instance attributes, not module-level."""
tracker = CrisisSessionTracker()
tracker.record(detect_crisis("I want to die"))
# New tracker should have clean state (no global contamination)
fresh = CrisisSessionTracker()
self.assertEqual(fresh.state.current_level, "NONE")
if __name__ == '__main__':
unittest.main()

View File

@@ -1,350 +0,0 @@
"""
voice_analysis.py — Voice message distress analysis via paralinguistic features.
Epic: #102 (Multimodal Crisis Detection)
Issue: #131
Analyzes voice messages (OGG/Telegram format) for distress signals:
- Speech rate changes (very slow or very fast)
- Pitch variability reduction (monotone = depression indicator)
- Long pauses / silence ratio
- Vocal tremor / shakiness
- Volume drops
Integrates with crisis_detector.py text-based detection for multimodal coverage.
"""
import os
import json
import subprocess
import tempfile
from dataclasses import dataclass, field, asdict
from typing import Optional
@dataclass
class VoiceAnalysisResult:
"""Result of paralinguistic analysis on a voice message."""
transcript: str = ""
speech_rate: float = 0.0 # words per minute
pitch_mean: float = 0.0 # Hz, average fundamental frequency
pitch_variability: float = 0.0 # std dev of pitch (low = monotone)
silence_ratio: float = 0.0 # 0-1, fraction of audio that is silence
tremor_score: float = 0.0 # 0-1, vocal shakiness estimate
volume_drop_score: float = 0.0 # 0-1, sudden volume decreases
distress_score: float = 0.0 # 0-1, composite distress indicator
signals_detected: list = field(default_factory=list)
def to_dict(self) -> dict:
return asdict(self)
# === THRESHOLDS ===
# Speech rate: normal is ~120-150 WPM
# Very slow (<80) or very fast (>200) are distress indicators
SPEECH_RATE_SLOW = 80
SPEECH_RATE_FAST = 200
SPEECH_RATE_NORMAL_LOW = 100
SPEECH_RATE_NORMAL_HIGH = 170
# Pitch variability: normal conversation has std dev ~30-50 Hz
# Monotone (<15 Hz) is a depression indicator
PITCH_VARIABILITY_LOW = 15.0 # Hz — monotone threshold
PITCH_VARIABILITY_NORMAL = 30.0
# Silence ratio: normal has ~10-20% silence
# Excessive silence (>40%) or very little (<3%) may indicate distress
SILENCE_RATIO_HIGH = 0.4
SILENCE_RATIO_LOW = 0.03
# Composite thresholds
DISTRESS_LOW = 0.3
DISTRESS_MEDIUM = 0.7
# === CORE ANALYSIS ===
def _convert_to_wav(audio_path: str) -> str:
"""Convert audio to WAV format for analysis. Returns path to temp WAV file."""
wav_path = tempfile.mktemp(suffix='.wav')
try:
subprocess.run(
['ffmpeg', '-i', audio_path, '-ar', '16000', '-ac', '1', '-y', wav_path],
capture_output=True, timeout=30
)
if not os.path.exists(wav_path):
# Fallback: if ffmpeg not available, try the original file
return audio_path
return wav_path
except (FileNotFoundError, subprocess.TimeoutExpired):
return audio_path
def _transcribe(audio_path: str) -> str:
"""Transcribe audio using whisper (if available) or return empty string."""
try:
import whisper
model = whisper.load_model("base")
result = model.transcribe(audio_path)
return result.get("text", "").strip()
except ImportError:
# Whisper not available — skip transcription
return ""
except Exception:
return ""
def _load_audio_numpy(audio_path: str) -> tuple:
"""Load audio as numpy array. Returns (samples, sample_rate) or (None, None)."""
try:
import librosa
samples, sr = librosa.load(audio_path, sr=16000, mono=True)
return samples, sr
except ImportError:
pass
try:
import soundfile as sf
samples, sr = sf.read(audio_path)
if len(samples.shape) > 1:
samples = samples.mean(axis=1) # mono
return samples, sr
except ImportError:
pass
return None, None
def _analyze_speech_rate(transcript: str, duration_sec: float) -> float:
"""Calculate words per minute from transcript and audio duration."""
if not transcript or duration_sec <= 0:
return 0.0
words = len(transcript.split())
minutes = duration_sec / 60.0
return words / minutes if minutes > 0 else 0.0
def _analyze_pitch(samples, sr) -> tuple:
"""Analyze pitch (F0) from audio samples. Returns (mean_hz, variability_hz)."""
try:
import librosa
f0, voiced_flag, _ = librosa.pyin(
samples, fmin=librosa.note_to_hz('C2'),
fmax=librosa.note_to_hz('C7'), sr=sr
)
import numpy as np
f0_clean = f0[~np.isnan(f0)]
if len(f0_clean) == 0:
return 0.0, 0.0
return float(np.mean(f0_clean)), float(np.std(f0_clean))
except (ImportError, Exception):
return 0.0, 0.0
def _analyze_silence(samples, sr, threshold_db: float = -40.0) -> float:
"""Calculate ratio of silence in audio (0-1)."""
try:
import librosa
import numpy as np
rms = librosa.feature.rms(y=samples)[0]
rms_db = librosa.amplitude_to_db(rms, ref=np.max)
silence_frames = np.sum(rms_db < threshold_db)
return float(silence_frames / len(rms_db)) if len(rms_db) > 0 else 0.0
except (ImportError, Exception):
return 0.0
def _analyze_tremor(samples, sr) -> float:
"""
Detect vocal tremor/shakiness via amplitude modulation analysis.
Tremor manifests as periodic amplitude fluctuations (3-12 Hz range).
Returns 0-1 score where 1 = strong tremor detected.
"""
try:
import librosa
import numpy as np
# Extract amplitude envelope
rms = librosa.feature.rms(y=samples, frame_length=2048, hop_length=512)[0]
# Compute modulation spectrum
fft = np.abs(np.fft.rfft(rms))
freqs = np.fft.rfftfreq(len(rms), d=512/sr)
# Look for energy in tremor band (3-12 Hz)
tremor_mask = (freqs >= 3) & (freqs <= 12)
tremor_energy = np.sum(fft[tremor_mask])
total_energy = np.sum(fft[1:]) # skip DC
if total_energy == 0:
return 0.0
ratio = tremor_energy / total_energy
return float(min(1.0, ratio * 5)) # normalize — typical tremor is 0.1-0.3 of total
except (ImportError, Exception):
return 0.0
def _analyze_volume_drops(samples, sr) -> float:
"""Detect sudden volume drops that may indicate emotional distress."""
try:
import librosa
import numpy as np
rms = librosa.feature.rms(y=samples, frame_length=2048, hop_length=512)[0]
if len(rms) < 2:
return 0.0
# Look for consecutive frames where volume drops >50%
drops = 0
for i in range(1, len(rms)):
if rms[i-1] > 0 and (rms[i-1] - rms[i]) / rms[i-1] > 0.5:
drops += 1
return float(min(1.0, drops / (len(rms) * 0.1)))
except (ImportError, Exception):
return 0.0
def _compute_distress_score(result: VoiceAnalysisResult) -> tuple:
"""
Compute composite distress score from paralinguistic features.
Returns (score, signals_detected).
"""
signals = []
score = 0.0
weights = 0
# Speech rate (0.2 weight)
if result.speech_rate > 0:
if result.speech_rate < SPEECH_RATE_SLOW:
signals.append(f"very_slow_speech ({result.speech_rate:.0f} WPM)")
score += 0.8 * 0.2
elif result.speech_rate > SPEECH_RATE_FAST:
signals.append(f"very_fast_speech ({result.speech_rate:.0f} WPM)")
score += 0.6 * 0.2
elif result.speech_rate < SPEECH_RATE_NORMAL_LOW:
score += 0.3 * 0.2
weights += 0.2
# Pitch variability (0.25 weight — monotone is strong depression indicator)
if result.pitch_variability > 0:
if result.pitch_variability < PITCH_VARIABILITY_LOW:
signals.append(f"monotone_voice (variability={result.pitch_variability:.1f} Hz)")
score += 0.9 * 0.25
elif result.pitch_variability < PITCH_VARIABILITY_NORMAL:
signals.append(f"reduced_pitch_variability ({result.pitch_variability:.1f} Hz)")
score += 0.5 * 0.25
weights += 0.25
# Silence ratio (0.2 weight)
if result.silence_ratio > 0:
if result.silence_ratio > SILENCE_RATIO_HIGH:
signals.append(f"excessive_silence ({result.silence_ratio:.0%})")
score += 0.7 * 0.2
elif result.silence_ratio < SILENCE_RATIO_LOW:
signals.append(f"minimal_pauses ({result.silence_ratio:.0%})")
score += 0.3 * 0.2
weights += 0.2
# Tremor (0.2 weight)
if result.tremor_score > 0:
if result.tremor_score > 0.5:
signals.append(f"vocal_tremor (score={result.tremor_score:.2f})")
score += result.tremor_score * 0.2
weights += 0.2
# Volume drops (0.15 weight)
if result.volume_drop_score > 0:
if result.volume_drop_score > 0.4:
signals.append(f"volume_drops (score={result.volume_drop_score:.2f})")
score += result.volume_drop_score * 0.15
weights += 0.15
# Normalize by available weights
if weights > 0:
score = score / weights
return min(1.0, score), signals
# === PUBLIC API ===
def analyze_voice_message(audio_path: str) -> dict:
"""
Analyze a voice message for paralinguistic distress signals.
Args:
audio_path: Path to audio file (OGG, WAV, MP3, etc.)
Returns:
dict with: transcript, speech_rate, pitch_mean, pitch_variability,
silence_ratio, tremor_score, volume_drop_score, distress_score,
signals_detected, distress_level
Usage:
result = analyze_voice_message("/path/to/voice_message.ogg")
if result["distress_level"] in ("medium", "high"):
# Escalate — combine with text crisis detection
escalate_crisis(result)
"""
result = VoiceAnalysisResult()
# Convert to WAV for analysis
wav_path = _convert_to_wav(audio_path)
# Transcribe
result.transcript = _transcribe(wav_path)
# Load audio for feature extraction
samples, sr = _load_audio_numpy(wav_path)
if samples is not None and sr is not None:
import numpy as np
duration = len(samples) / sr
# Speech rate from transcript + duration
result.speech_rate = _analyze_speech_rate(result.transcript, duration)
# Pitch analysis
result.pitch_mean, result.pitch_variability = _analyze_pitch(samples, sr)
# Silence ratio
result.silence_ratio = _analyze_silence(samples, sr)
# Tremor detection
result.tremor_score = _analyze_tremor(samples, sr)
# Volume drops
result.volume_drop_score = _analyze_volume_drops(samples, sr)
# Composite distress score
result.distress_score, result.signals_detected = _compute_distress_score(result)
# Clean up temp file
if wav_path != audio_path and os.path.exists(wav_path):
os.unlink(wav_path)
# Classify distress level
if result.distress_score >= DISTRESS_MEDIUM:
distress_level = "high"
elif result.distress_score >= DISTRESS_LOW:
distress_level = "medium"
elif result.distress_score > 0:
distress_level = "low"
else:
distress_level = "none"
output = result.to_dict()
output["distress_level"] = distress_level
return output
def get_audio_duration(audio_path: str) -> float:
"""Get audio duration in seconds."""
try:
import librosa
duration = librosa.get_duration(path=audio_path)
return float(duration)
except (ImportError, Exception):
try:
import soundfile as sf
info = sf.info(audio_path)
return float(info.duration)
except (ImportError, Exception):
return 0.0