Compare commits
2 Commits
fix/95
...
feat/133-b
| Author | SHA1 | Date | |
|---|---|---|---|
| ba02b63a13 | |||
| 161a9eee5d |
336
behavioral_tracker.py
Normal file
336
behavioral_tracker.py
Normal file
@@ -0,0 +1,336 @@
|
||||
"""Behavioral Pattern Detection — crisis risk from usage patterns.
|
||||
|
||||
Detects crisis signals from HOW someone uses the system, not just
|
||||
what they say. Complements content-based crisis detection.
|
||||
|
||||
Behavioral signals:
|
||||
- Frequency spike (anxiety/agitation)
|
||||
- Frequency drop (withdrawal/isolation)
|
||||
- Late-night messaging (2-5 AM)
|
||||
- Session length increase (loneliness)
|
||||
- Abrupt termination after emotional content
|
||||
- Return after long absence
|
||||
|
||||
Part of Epic #102 (Multimodal Crisis Detection).
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionEvent:
|
||||
"""A single session interaction."""
|
||||
session_id: str
|
||||
timestamp: float
|
||||
message_length: int
|
||||
is_user: bool = True
|
||||
emotional_content: bool = False
|
||||
terminated_abruptly: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class BehavioralSignals:
|
||||
"""Detected behavioral risk signals."""
|
||||
frequency_change: float = 0.0 # -1 (drop) to +1 (spike) vs baseline
|
||||
is_late_night: bool = False # 2-5 AM local time
|
||||
session_length_trend: str = "stable" # increasing/decreasing/stable
|
||||
withdrawal_detected: bool = False # significant drop in activity
|
||||
return_after_absence: bool = False # came back after extended absence
|
||||
abrupt_termination: bool = False # left after emotional content
|
||||
behavioral_score: float = 0.0 # 0-1 aggregate risk
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"frequency_change": self.frequency_change,
|
||||
"is_late_night": self.is_late_night,
|
||||
"session_length_trend": self.session_length_trend,
|
||||
"withdrawal_detected": self.withdrawal_detected,
|
||||
"return_after_absence": self.return_after_absence,
|
||||
"abrupt_termination": self.abrupt_termination,
|
||||
"behavioral_score": self.behavioral_score,
|
||||
}
|
||||
|
||||
|
||||
class BehavioralTracker:
|
||||
"""Track behavioral patterns per session/user and detect risk signals.
|
||||
|
||||
Uses a 7-day rolling window for baseline calculation.
|
||||
Thread-safe for concurrent session tracking.
|
||||
"""
|
||||
|
||||
# Time window constants
|
||||
LATE_NIGHT_START = 2 # 2 AM
|
||||
LATE_NIGHT_END = 5 # 5 AM
|
||||
BASELINE_WINDOW_DAYS = 7
|
||||
ABSENCE_THRESHOLD_HOURS = 48
|
||||
SPIKE_THRESHOLD = 2.0 # 2x baseline = spike
|
||||
DROP_THRESHOLD = 0.3 # 30% of baseline = withdrawal
|
||||
|
||||
def __init__(self):
|
||||
# session_id -> list of events
|
||||
self._events: Dict[str, List[SessionEvent]] = defaultdict(list)
|
||||
# session_id -> baseline metrics
|
||||
self._baselines: Dict[str, Dict[str, float]] = {}
|
||||
# session_id -> last activity timestamp
|
||||
self._last_activity: Dict[str, float] = {}
|
||||
# Global baseline (all sessions)
|
||||
self._global_baseline: Dict[str, float] = {
|
||||
"avg_messages_per_hour": 5.0,
|
||||
"avg_session_length_min": 15.0,
|
||||
"avg_message_length": 200.0,
|
||||
}
|
||||
|
||||
def record(
|
||||
self,
|
||||
session_id: str,
|
||||
timestamp: Optional[float] = None,
|
||||
message_length: int = 0,
|
||||
is_user: bool = True,
|
||||
emotional_content: bool = False,
|
||||
terminated_abruptly: bool = False,
|
||||
) -> None:
|
||||
"""Record a session event."""
|
||||
if timestamp is None:
|
||||
timestamp = time.time()
|
||||
|
||||
event = SessionEvent(
|
||||
session_id=session_id,
|
||||
timestamp=timestamp,
|
||||
message_length=message_length,
|
||||
is_user=is_user,
|
||||
emotional_content=emotional_content,
|
||||
terminated_abruptly=terminated_abruptly,
|
||||
)
|
||||
|
||||
self._events[session_id].append(event)
|
||||
self._last_activity[session_id] = timestamp
|
||||
|
||||
# Periodically update baseline
|
||||
if len(self._events[session_id]) % 20 == 0:
|
||||
self._update_baseline(session_id)
|
||||
|
||||
def get_risk_signals(self, session_id: str) -> BehavioralSignals:
|
||||
"""Analyze behavioral patterns and return risk signals."""
|
||||
events = self._events.get(session_id, [])
|
||||
if not events:
|
||||
return BehavioralSignals()
|
||||
|
||||
signals = BehavioralSignals()
|
||||
now = time.time()
|
||||
|
||||
# 1. Frequency analysis
|
||||
signals.frequency_change = self._analyze_frequency(session_id, now)
|
||||
|
||||
# 2. Late-night detection
|
||||
signals.is_late_night = self._is_late_night(events[-1].timestamp)
|
||||
|
||||
# 3. Session length trend
|
||||
signals.session_length_trend = self._analyze_session_length_trend(session_id)
|
||||
|
||||
# 4. Withdrawal detection
|
||||
signals.withdrawal_detected = signals.frequency_change < -0.5
|
||||
|
||||
# 5. Return after absence
|
||||
signals.return_after_absence = self._detect_return_after_absence(session_id, now)
|
||||
|
||||
# 6. Abrupt termination
|
||||
signals.abrupt_termination = self._detect_abrupt_termination(events)
|
||||
|
||||
# 7. Aggregate behavioral score
|
||||
signals.behavioral_score = self._compute_behavioral_score(signals)
|
||||
|
||||
return signals
|
||||
|
||||
def _analyze_frequency(self, session_id: str, now: float) -> float:
|
||||
"""Compare recent frequency to baseline. Returns -1 to +1."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 3:
|
||||
return 0.0
|
||||
|
||||
# Count messages in last hour
|
||||
one_hour_ago = now - 3600
|
||||
recent_count = sum(1 for e in events if e.timestamp > one_hour_ago and e.is_user)
|
||||
|
||||
# Get baseline
|
||||
baseline = self._get_baseline(session_id)
|
||||
baseline_rate = baseline.get("avg_messages_per_hour", 5.0)
|
||||
|
||||
if baseline_rate <= 0:
|
||||
return 0.0
|
||||
|
||||
ratio = recent_count / baseline_rate
|
||||
# Map to -1..+1: 0.5x = -0.5, 1x = 0, 2x = +1
|
||||
if ratio < 1.0:
|
||||
return max(-1.0, (ratio - 1.0))
|
||||
else:
|
||||
return min(1.0, (ratio - 1.0) / 2.0)
|
||||
|
||||
def _is_late_night(self, timestamp: float) -> bool:
|
||||
"""Check if timestamp falls in the 2-5 AM window."""
|
||||
dt = datetime.fromtimestamp(timestamp)
|
||||
hour = dt.hour
|
||||
return self.LATE_NIGHT_START <= hour < self.LATE_NIGHT_END
|
||||
|
||||
def _analyze_session_length_trend(self, session_id: str) -> str:
|
||||
"""Determine if session lengths are increasing, decreasing, or stable."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 10:
|
||||
return "stable"
|
||||
|
||||
# Split events into first half and second half
|
||||
mid = len(events) // 2
|
||||
first_half = events[:mid]
|
||||
second_half = events[mid:]
|
||||
|
||||
# Calculate session spans (first to last message)
|
||||
if not first_half or not second_half:
|
||||
return "stable"
|
||||
|
||||
first_span = first_half[-1].timestamp - first_half[0].timestamp
|
||||
second_span = second_half[-1].timestamp - second_half[0].timestamp
|
||||
|
||||
if second_span > first_span * 1.3:
|
||||
return "increasing"
|
||||
elif second_span < first_span * 0.7:
|
||||
return "decreasing"
|
||||
return "stable"
|
||||
|
||||
def _detect_return_after_absence(self, session_id: str, now: float) -> bool:
|
||||
"""Detect if this session started after a long absence."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 2:
|
||||
return False
|
||||
|
||||
# Check gap between current session and previous
|
||||
last_activity = self._last_activity.get(session_id, 0)
|
||||
if last_activity <= 0:
|
||||
return False
|
||||
|
||||
# Find previous session's last event (excluding current session)
|
||||
prev_events = [e for e in events[:-10] if e.timestamp < now - 3600]
|
||||
if not prev_events:
|
||||
return False
|
||||
|
||||
gap_hours = (now - prev_events[-1].timestamp) / 3600
|
||||
return gap_hours >= self.ABSENCE_THRESHOLD_HOURS
|
||||
|
||||
def _detect_abrupt_termination(self, events: List[SessionEvent]) -> bool:
|
||||
"""Detect if the last few messages had emotional content and then stopped."""
|
||||
if len(events) < 3:
|
||||
return False
|
||||
|
||||
# Check last 5 events
|
||||
recent = events[-5:]
|
||||
has_emotional = any(e.emotional_content for e in recent)
|
||||
last_was_user = recent[-1].is_user if recent else False
|
||||
last_was_short = recent[-1].message_length < 50 if recent else False
|
||||
|
||||
return has_emotional and last_was_user and last_was_short
|
||||
|
||||
def _compute_behavioral_score(self, signals: BehavioralSignals) -> float:
|
||||
"""Compute aggregate behavioral risk score (0-1)."""
|
||||
score = 0.0
|
||||
weights = {
|
||||
"frequency_spike": 0.15,
|
||||
"frequency_drop": 0.20,
|
||||
"late_night": 0.10,
|
||||
"session_increasing": 0.10,
|
||||
"withdrawal": 0.20,
|
||||
"return_after_absence": 0.05,
|
||||
"abrupt_termination": 0.20,
|
||||
}
|
||||
|
||||
# Frequency spike (anxiety)
|
||||
if signals.frequency_change > 0.5:
|
||||
score += weights["frequency_spike"] * min(signals.frequency_change, 1.0)
|
||||
|
||||
# Frequency drop (withdrawal)
|
||||
if signals.frequency_change < -0.3:
|
||||
score += weights["frequency_drop"] * min(abs(signals.frequency_change), 1.0)
|
||||
|
||||
# Late night
|
||||
if signals.is_late_night:
|
||||
score += weights["late_night"]
|
||||
|
||||
# Session length increasing
|
||||
if signals.session_length_trend == "increasing":
|
||||
score += weights["session_increasing"]
|
||||
|
||||
# Withdrawal
|
||||
if signals.withdrawal_detected:
|
||||
score += weights["withdrawal"]
|
||||
|
||||
# Return after absence
|
||||
if signals.return_after_absence:
|
||||
score += weights["return_after_absence"]
|
||||
|
||||
# Abrupt termination
|
||||
if signals.abrupt_termination:
|
||||
score += weights["abrupt_termination"]
|
||||
|
||||
return min(1.0, score)
|
||||
|
||||
def _get_baseline(self, session_id: str) -> Dict[str, float]:
|
||||
"""Get baseline metrics for a session."""
|
||||
if session_id in self._baselines:
|
||||
return self._baselines[session_id]
|
||||
return self._global_baseline
|
||||
|
||||
def _update_baseline(self, session_id: str) -> None:
|
||||
"""Update rolling baseline from recent events."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 5:
|
||||
return
|
||||
|
||||
# Use last 7 days of events
|
||||
cutoff = time.time() - (self.BASELINE_WINDOW_DAYS * 86400)
|
||||
recent = [e for e in events if e.timestamp > cutoff and e.is_user]
|
||||
|
||||
if not recent:
|
||||
return
|
||||
|
||||
# Calculate metrics
|
||||
time_span_hours = (recent[-1].timestamp - recent[0].timestamp) / 3600
|
||||
if time_span_hours > 0:
|
||||
msgs_per_hour = len(recent) / time_span_hours
|
||||
else:
|
||||
msgs_per_hour = len(recent)
|
||||
|
||||
avg_length = sum(e.message_length for e in recent) / len(recent)
|
||||
|
||||
self._baselines[session_id] = {
|
||||
"avg_messages_per_hour": msgs_per_hour,
|
||||
"avg_message_length": avg_length,
|
||||
"total_messages": len(recent),
|
||||
}
|
||||
|
||||
|
||||
# Global singleton for convenience
|
||||
_global_tracker = BehavioralTracker()
|
||||
|
||||
|
||||
def record_event(
|
||||
session_id: str,
|
||||
timestamp: Optional[float] = None,
|
||||
message_length: int = 0,
|
||||
is_user: bool = True,
|
||||
emotional_content: bool = False,
|
||||
terminated_abruptly: bool = False,
|
||||
) -> None:
|
||||
"""Record an event to the global behavioral tracker."""
|
||||
_global_tracker.record(
|
||||
session_id, timestamp, message_length,
|
||||
is_user, emotional_content, terminated_abruptly,
|
||||
)
|
||||
|
||||
|
||||
def get_risk_signals(session_id: str) -> BehavioralSignals:
|
||||
"""Get behavioral risk signals for a session."""
|
||||
return _global_tracker.get_risk_signals(session_id)
|
||||
@@ -7,7 +7,6 @@ Stands between a broken man and a machine that would tell him to die.
|
||||
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
|
||||
from .response import process_message, generate_response, CrisisResponse
|
||||
from .gateway import check_crisis, get_system_prompt, format_gateway_response
|
||||
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
|
||||
|
||||
__all__ = [
|
||||
"detect_crisis",
|
||||
@@ -20,7 +19,4 @@ __all__ = [
|
||||
"format_result",
|
||||
"format_gateway_response",
|
||||
"get_urgency_emoji",
|
||||
"CrisisSessionTracker",
|
||||
"SessionState",
|
||||
"check_crisis_with_session",
|
||||
]
|
||||
|
||||
@@ -22,7 +22,6 @@ from .response import (
|
||||
get_system_prompt_modifier,
|
||||
CrisisResponse,
|
||||
)
|
||||
from .session_tracker import CrisisSessionTracker
|
||||
|
||||
|
||||
def check_crisis(text: str) -> dict:
|
||||
|
||||
@@ -1,259 +0,0 @@
|
||||
"""
|
||||
Session-level crisis tracking and escalation for the-door (P0 #35).
|
||||
|
||||
Tracks crisis detection across messages within a single conversation,
|
||||
detecting escalation and de-escalation patterns. Privacy-first: no
|
||||
persistence beyond the conversation session.
|
||||
|
||||
Each message is analyzed in isolation by detect.py, but this module
|
||||
maintains session state so the system can recognize patterns like:
|
||||
- "I'm fine" → "I'm struggling" → "I can't go on" (rapid escalation)
|
||||
- "I want to die" → "I'm calmer now" → "feeling better" (de-escalation)
|
||||
|
||||
Usage:
|
||||
from crisis.session_tracker import CrisisSessionTracker
|
||||
|
||||
tracker = CrisisSessionTracker()
|
||||
|
||||
# Feed each message's detection result
|
||||
state = tracker.record(detect_crisis("I'm having a tough day"))
|
||||
print(state.current_level) # "LOW"
|
||||
print(state.is_escalating) # False
|
||||
|
||||
state = tracker.record(detect_crisis("I feel hopeless"))
|
||||
print(state.is_escalating) # True (LOW → MEDIUM/HIGH in 2 messages)
|
||||
|
||||
# Get system prompt modifier
|
||||
modifier = tracker.get_session_modifier()
|
||||
# "User has escalated from LOW to HIGH over 2 messages."
|
||||
|
||||
# Reset for new session
|
||||
tracker.reset()
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
|
||||
from .detect import CrisisDetectionResult, SCORES
|
||||
|
||||
# Level ordering for comparison (higher = more severe)
|
||||
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionState:
|
||||
"""Immutable snapshot of session crisis tracking state."""
|
||||
|
||||
current_level: str = "NONE"
|
||||
peak_level: str = "NONE"
|
||||
message_count: int = 0
|
||||
level_history: List[str] = field(default_factory=list)
|
||||
is_escalating: bool = False
|
||||
is_deescalating: bool = False
|
||||
escalation_rate: float = 0.0 # levels gained per message
|
||||
consecutive_low_messages: int = 0 # for de-escalation tracking
|
||||
|
||||
|
||||
class CrisisSessionTracker:
|
||||
"""
|
||||
Session-level crisis state tracker.
|
||||
|
||||
Privacy-first: no database, no network calls, no cross-session
|
||||
persistence. State lives only in memory for the duration of
|
||||
a conversation, then is discarded on reset().
|
||||
"""
|
||||
|
||||
# Thresholds (from issue #35)
|
||||
ESCALATION_WINDOW = 3 # messages: LOW → HIGH in ≤3 messages = rapid escalation
|
||||
DEESCALATION_WINDOW = 5 # messages: need 5+ consecutive LOW messages after CRITICAL
|
||||
|
||||
def __init__(self):
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
"""Reset all session state. Call on new conversation."""
|
||||
self._current_level = "NONE"
|
||||
self._peak_level = "NONE"
|
||||
self._message_count = 0
|
||||
self._level_history: List[str] = []
|
||||
self._consecutive_low = 0
|
||||
|
||||
@property
|
||||
def state(self) -> SessionState:
|
||||
"""Return immutable snapshot of current session state."""
|
||||
is_escalating = self._detect_escalation()
|
||||
is_deescalating = self._detect_deescalation()
|
||||
rate = self._compute_escalation_rate()
|
||||
|
||||
return SessionState(
|
||||
current_level=self._current_level,
|
||||
peak_level=self._peak_level,
|
||||
message_count=self._message_count,
|
||||
level_history=list(self._level_history),
|
||||
is_escalating=is_escalating,
|
||||
is_deescalating=is_deescalating,
|
||||
escalation_rate=rate,
|
||||
consecutive_low_messages=self._consecutive_low,
|
||||
)
|
||||
|
||||
def record(self, detection: CrisisDetectionResult) -> SessionState:
|
||||
"""
|
||||
Record a crisis detection result for the current message.
|
||||
|
||||
Returns updated SessionState.
|
||||
"""
|
||||
level = detection.level
|
||||
self._message_count += 1
|
||||
self._level_history.append(level)
|
||||
|
||||
# Update peak
|
||||
if LEVEL_ORDER.get(level, 0) > LEVEL_ORDER.get(self._peak_level, 0):
|
||||
self._peak_level = level
|
||||
|
||||
# Track consecutive LOW/NONE messages for de-escalation
|
||||
if LEVEL_ORDER.get(level, 0) <= LEVEL_ORDER["LOW"]:
|
||||
self._consecutive_low += 1
|
||||
else:
|
||||
self._consecutive_low = 0
|
||||
|
||||
self._current_level = level
|
||||
return self.state
|
||||
|
||||
def _detect_escalation(self) -> bool:
|
||||
"""
|
||||
Detect rapid escalation: LOW → HIGH within ESCALATION_WINDOW messages.
|
||||
|
||||
Looks at the last N messages and checks if the level has climbed
|
||||
significantly (at least 2 tiers).
|
||||
"""
|
||||
if len(self._level_history) < 2:
|
||||
return False
|
||||
|
||||
window = self._level_history[-self.ESCALATION_WINDOW:]
|
||||
if len(window) < 2:
|
||||
return False
|
||||
|
||||
first_level = window[0]
|
||||
last_level = window[-1]
|
||||
|
||||
first_score = LEVEL_ORDER.get(first_level, 0)
|
||||
last_score = LEVEL_ORDER.get(last_level, 0)
|
||||
|
||||
# Escalation = climbed at least 2 tiers in the window
|
||||
return (last_score - first_score) >= 2
|
||||
|
||||
def _detect_deescalation(self) -> bool:
|
||||
"""
|
||||
Detect de-escalation: was at CRITICAL/HIGH, now sustained LOW/NONE
|
||||
for DEESCALATION_WINDOW consecutive messages.
|
||||
"""
|
||||
if LEVEL_ORDER.get(self._peak_level, 0) < LEVEL_ORDER["HIGH"]:
|
||||
return False
|
||||
|
||||
return self._consecutive_low >= self.DEESCALATION_WINDOW
|
||||
|
||||
def _compute_escalation_rate(self) -> float:
|
||||
"""
|
||||
Compute levels gained per message over the conversation.
|
||||
|
||||
Positive = escalating, negative = de-escalating, 0 = stable.
|
||||
"""
|
||||
if self._message_count < 2:
|
||||
return 0.0
|
||||
|
||||
first = LEVEL_ORDER.get(self._level_history[0], 0)
|
||||
current = LEVEL_ORDER.get(self._current_level, 0)
|
||||
|
||||
return (current - first) / (self._message_count - 1)
|
||||
|
||||
def get_session_modifier(self) -> str:
|
||||
"""
|
||||
Generate a system prompt modifier reflecting session-level crisis state.
|
||||
|
||||
Returns empty string if no session context is relevant.
|
||||
"""
|
||||
if self._message_count < 2:
|
||||
return ""
|
||||
|
||||
s = self.state
|
||||
|
||||
if s.is_escalating:
|
||||
return (
|
||||
f"User has escalated from {self._level_history[0]} to "
|
||||
f"{s.current_level} over {s.message_count} messages. "
|
||||
f"Peak crisis level this session: {s.peak_level}. "
|
||||
"Respond with heightened awareness. The trajectory is "
|
||||
"worsening — prioritize safety and connection."
|
||||
)
|
||||
|
||||
if s.is_deescalating:
|
||||
return (
|
||||
f"User previously reached {s.peak_level} crisis level "
|
||||
f"but has been at {s.current_level} or below for "
|
||||
f"{s.consecutive_low_messages} consecutive messages. "
|
||||
"The situation appears to be stabilizing. Continue "
|
||||
"supportive engagement while remaining vigilant."
|
||||
)
|
||||
|
||||
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
|
||||
return (
|
||||
f"User previously reached {s.peak_level} crisis level "
|
||||
f"this session (currently {s.current_level}). "
|
||||
"Continue with care and awareness of the earlier crisis."
|
||||
)
|
||||
|
||||
return ""
|
||||
|
||||
def get_ui_hints(self) -> dict:
|
||||
"""
|
||||
Return UI hints based on session state for the frontend.
|
||||
|
||||
These are advisory — the frontend decides what to show.
|
||||
"""
|
||||
s = self.state
|
||||
|
||||
hints = {
|
||||
"session_escalating": s.is_escalating,
|
||||
"session_deescalating": s.is_deescalating,
|
||||
"session_peak_level": s.peak_level,
|
||||
"session_message_count": s.message_count,
|
||||
}
|
||||
|
||||
if s.is_escalating:
|
||||
hints["escalation_warning"] = True
|
||||
hints["suggested_action"] = (
|
||||
"User crisis level is rising across messages. "
|
||||
"Consider increasing intervention level."
|
||||
)
|
||||
|
||||
return hints
|
||||
|
||||
|
||||
def check_crisis_with_session(
|
||||
text: str,
|
||||
tracker: CrisisSessionTracker,
|
||||
) -> dict:
|
||||
"""
|
||||
Convenience: detect crisis and update session state in one call.
|
||||
|
||||
Returns combined single-message detection + session-level context.
|
||||
"""
|
||||
from .detect import detect_crisis
|
||||
from .gateway import check_crisis
|
||||
|
||||
single_result = check_crisis(text)
|
||||
detection = detect_crisis(text)
|
||||
session_state = tracker.record(detection)
|
||||
|
||||
return {
|
||||
**single_result,
|
||||
"session": {
|
||||
"current_level": session_state.current_level,
|
||||
"peak_level": session_state.peak_level,
|
||||
"message_count": session_state.message_count,
|
||||
"is_escalating": session_state.is_escalating,
|
||||
"is_deescalating": session_state.is_deescalating,
|
||||
"modifier": tracker.get_session_modifier(),
|
||||
"ui_hints": tracker.get_ui_hints(),
|
||||
},
|
||||
}
|
||||
76
index.html
76
index.html
@@ -680,7 +680,7 @@ html, body {
|
||||
|
||||
<!-- Footer -->
|
||||
<footer id="footer">
|
||||
<a href="/about.html" aria-label="About The Door">about</a>
|
||||
<a href="/about" aria-label="About The Door">about</a>
|
||||
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
|
||||
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
|
||||
</footer>
|
||||
@@ -808,7 +808,6 @@ Sovereignty and service always.`;
|
||||
var crisisPanel = document.getElementById('crisis-panel');
|
||||
var crisisOverlay = document.getElementById('crisis-overlay');
|
||||
var overlayDismissBtn = document.getElementById('overlay-dismiss-btn');
|
||||
var overlayCallLink = document.querySelector('.overlay-call');
|
||||
var statusDot = document.querySelector('.status-dot');
|
||||
var statusText = document.getElementById('status-text');
|
||||
|
||||
@@ -987,33 +986,20 @@ Sovereignty and service always.`;
|
||||
|
||||
// Focus trap: cycle through focusable elements within the crisis overlay
|
||||
function getOverlayFocusableElements() {
|
||||
return Array.prototype.slice.call(crisisOverlay.querySelectorAll(
|
||||
return crisisOverlay.querySelectorAll(
|
||||
'a[href], button:not([disabled]), [tabindex]:not([tabindex="-1"])'
|
||||
));
|
||||
);
|
||||
}
|
||||
|
||||
function trapFocusInOverlay(e) {
|
||||
if (!crisisOverlay.classList.contains('active')) return;
|
||||
if (e.key === 'Escape') {
|
||||
e.preventDefault();
|
||||
closeOverlay(msgInput);
|
||||
return;
|
||||
}
|
||||
if (e.key !== 'Tab') return;
|
||||
|
||||
var focusable = getOverlayFocusableElements();
|
||||
if (focusable.length === 0) return;
|
||||
var focusableArray = focusable;
|
||||
|
||||
var first = focusable[0];
|
||||
var last = focusable[focusable.length - 1];
|
||||
var activeIndex = focusableArray.indexOf(document.activeElement);
|
||||
|
||||
if (activeIndex === -1) {
|
||||
e.preventDefault();
|
||||
e.shiftKey ? last.focus() : first.focus();
|
||||
return;
|
||||
}
|
||||
|
||||
if (e.shiftKey) {
|
||||
// Shift+Tab: if on first, wrap to last
|
||||
@@ -1033,33 +1019,6 @@ Sovereignty and service always.`;
|
||||
// Store the element that had focus before the overlay opened
|
||||
var _preOverlayFocusElement = null;
|
||||
|
||||
function closeOverlay(restoreFocusTarget) {
|
||||
crisisOverlay.classList.remove('active');
|
||||
if (overlayTimer) {
|
||||
clearInterval(overlayTimer);
|
||||
overlayTimer = null;
|
||||
}
|
||||
|
||||
// Re-enable background interaction
|
||||
var mainApp = document.getElementById('app');
|
||||
if (mainApp) mainApp.removeAttribute('inert');
|
||||
var chatSection = document.getElementById('chat-area');
|
||||
if (chatSection) chatSection.removeAttribute('aria-hidden');
|
||||
var footerEl = document.querySelector('footer');
|
||||
if (footerEl) footerEl.removeAttribute('aria-hidden');
|
||||
|
||||
if (restoreFocusTarget === _preOverlayFocusElement && _preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
|
||||
_preOverlayFocusElement.focus();
|
||||
} else if (restoreFocusTarget && typeof restoreFocusTarget.focus === 'function') {
|
||||
restoreFocusTarget.focus();
|
||||
} else if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
|
||||
_preOverlayFocusElement.focus();
|
||||
} else {
|
||||
msgInput.focus();
|
||||
}
|
||||
_preOverlayFocusElement = null;
|
||||
}
|
||||
|
||||
function showOverlay() {
|
||||
// Save current focus for restoration on dismiss
|
||||
_preOverlayFocusElement = document.activeElement;
|
||||
@@ -1070,10 +1029,10 @@ Sovereignty and service always.`;
|
||||
overlayDismissBtn.textContent = 'Continue to chat (' + countdown + 's)';
|
||||
|
||||
// Disable background interaction via inert attribute
|
||||
var mainApp = document.getElementById('app');
|
||||
var mainApp = document.querySelector('.app');
|
||||
if (mainApp) mainApp.setAttribute('inert', '');
|
||||
// Also hide from assistive tech
|
||||
var chatSection = document.getElementById('chat-area');
|
||||
var chatSection = document.getElementById('chat');
|
||||
if (chatSection) chatSection.setAttribute('aria-hidden', 'true');
|
||||
var footerEl = document.querySelector('footer');
|
||||
if (footerEl) footerEl.setAttribute('aria-hidden', 'true');
|
||||
@@ -1091,8 +1050,7 @@ Sovereignty and service always.`;
|
||||
}
|
||||
}, 1000);
|
||||
|
||||
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
|
||||
if (overlayCallLink) overlayCallLink.focus();
|
||||
overlayDismissBtn.focus();
|
||||
}
|
||||
|
||||
// Register focus trap on document (always listening, gated by class check)
|
||||
@@ -1100,7 +1058,27 @@ Sovereignty and service always.`;
|
||||
|
||||
overlayDismissBtn.addEventListener('click', function() {
|
||||
if (!overlayDismissBtn.disabled) {
|
||||
closeOverlay(_preOverlayFocusElement);
|
||||
crisisOverlay.classList.remove('active');
|
||||
if (overlayTimer) {
|
||||
clearInterval(overlayTimer);
|
||||
overlayTimer = null;
|
||||
}
|
||||
|
||||
// Re-enable background interaction
|
||||
var mainApp = document.querySelector('.app');
|
||||
if (mainApp) mainApp.removeAttribute('inert');
|
||||
var chatSection = document.getElementById('chat');
|
||||
if (chatSection) chatSection.removeAttribute('aria-hidden');
|
||||
var footerEl = document.querySelector('footer');
|
||||
if (footerEl) footerEl.removeAttribute('aria-hidden');
|
||||
|
||||
// Restore focus to the element that had it before the overlay opened
|
||||
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
|
||||
_preOverlayFocusElement.focus();
|
||||
} else {
|
||||
msgInput.focus();
|
||||
}
|
||||
_preOverlayFocusElement = null;
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
136
tests/test_behavioral_tracker.py
Normal file
136
tests/test_behavioral_tracker.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""Tests for behavioral pattern detection."""
|
||||
|
||||
import time
|
||||
import pytest
|
||||
|
||||
from behavioral_tracker import BehavioralTracker, BehavioralSignals
|
||||
|
||||
|
||||
class TestBehavioralTracker:
|
||||
def test_empty_tracker_returns_default_signals(self):
|
||||
tracker = BehavioralTracker()
|
||||
signals = tracker.get_risk_signals("session-1")
|
||||
assert signals.behavioral_score == 0.0
|
||||
assert not signals.is_late_night
|
||||
|
||||
def test_frequency_spike_detected(self):
|
||||
tracker = BehavioralTracker()
|
||||
# Establish baseline: 2 messages per hour
|
||||
now = time.time()
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - (10 - i) * 1800, message_length=100)
|
||||
|
||||
# Spike: 10 messages in last 10 minutes
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - (10 - i) * 60, message_length=50)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.frequency_change > 0.3 # Significant spike
|
||||
|
||||
def test_frequency_drop_detected(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# Baseline: heavy usage
|
||||
for i in range(50):
|
||||
tracker.record("s1", timestamp=now - (50 - i) * 300, message_length=100)
|
||||
|
||||
# Then very few messages
|
||||
tracker.record("s1", timestamp=now - 60, message_length=50)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
# Recent activity is much lower than baseline
|
||||
assert signals.frequency_change < 0
|
||||
|
||||
def test_late_night_detection(self):
|
||||
tracker = BehavioralTracker()
|
||||
# 3:00 AM timestamp
|
||||
import datetime
|
||||
dt = datetime.datetime(2026, 4, 14, 3, 0, 0)
|
||||
ts = dt.timestamp()
|
||||
|
||||
tracker.record("s1", timestamp=ts, message_length=100)
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.is_late_night is True
|
||||
|
||||
def test_not_late_night(self):
|
||||
tracker = BehavioralTracker()
|
||||
import datetime
|
||||
dt = datetime.datetime(2026, 4, 14, 14, 0, 0) # 2 PM
|
||||
ts = dt.timestamp()
|
||||
|
||||
tracker.record("s1", timestamp=ts, message_length=100)
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.is_late_night is False
|
||||
|
||||
def test_session_length_increasing(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# First half: messages spread over 5 minutes
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - 600 + i * 30, message_length=100)
|
||||
# Second half: messages spread over 30 minutes
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - 300 + i * 180, message_length=100)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.session_length_trend == "increasing"
|
||||
|
||||
def test_withdrawal_detected(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# High baseline
|
||||
for i in range(50):
|
||||
tracker.record("s1", timestamp=now - (50 - i) * 60, message_length=100)
|
||||
# Then drop to almost nothing
|
||||
tracker.record("s1", timestamp=now - 60, message_length=20)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.withdrawal_detected is True
|
||||
|
||||
def test_abrupt_termination_after_emotional(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# Normal messages
|
||||
for i in range(5):
|
||||
tracker.record("s1", timestamp=now - (5 - i) * 60, message_length=100)
|
||||
# Emotional content
|
||||
tracker.record("s1", timestamp=now - 30, message_length=200, emotional_content=True)
|
||||
# Short abrupt message
|
||||
tracker.record("s1", timestamp=now - 10, message_length=10, is_user=True)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.abrupt_termination is True
|
||||
|
||||
def test_behavioral_score_increases_with_risk(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
import datetime
|
||||
|
||||
# Low risk: normal messages during daytime
|
||||
for i in range(10):
|
||||
dt = datetime.datetime(2026, 4, 14, 14, i, 0) # 2 PM
|
||||
tracker.record("s1", timestamp=dt.timestamp(), message_length=100)
|
||||
low_risk = tracker.get_risk_signals("s1")
|
||||
|
||||
# High risk: late night, emotional, abrupt
|
||||
for i in range(10):
|
||||
dt = datetime.datetime(2026, 4, 14, 3, i, 0) # 3 AM
|
||||
tracker.record("s2", timestamp=dt.timestamp(), message_length=100, emotional_content=True)
|
||||
tracker.record("s2", timestamp=datetime.datetime(2026, 4, 14, 3, 10, 0).timestamp(),
|
||||
message_length=10, is_user=True)
|
||||
high_risk = tracker.get_risk_signals("s2")
|
||||
|
||||
assert high_risk.behavioral_score > low_risk.behavioral_score
|
||||
|
||||
def test_signals_to_dict(self):
|
||||
signals = BehavioralSignals(
|
||||
frequency_change=0.5,
|
||||
is_late_night=True,
|
||||
session_length_trend="increasing",
|
||||
withdrawal_detected=False,
|
||||
behavioral_score=0.4,
|
||||
)
|
||||
d = signals.to_dict()
|
||||
assert d["frequency_change"] == 0.5
|
||||
assert d["is_late_night"] is True
|
||||
assert d["session_length_trend"] == "increasing"
|
||||
@@ -52,34 +52,6 @@ class TestCrisisOverlayFocusTrap(unittest.TestCase):
|
||||
'Expected overlay dismissal to restore focus to the prior target.',
|
||||
)
|
||||
|
||||
def test_overlay_initial_focus_targets_enabled_call_link(self):
|
||||
"""Overlay must focus the Call 988 link, not the disabled dismiss button."""
|
||||
# Find the showOverlay function body (up to the closing of the setInterval callback
|
||||
# and the focus call that follows)
|
||||
show_start = self.html.find('function showOverlay()')
|
||||
self.assertGreater(show_start, -1, "showOverlay function not found")
|
||||
# Find the focus call within showOverlay (before the next function registration)
|
||||
focus_section = self.html[show_start:show_start + 2000]
|
||||
self.assertIn(
|
||||
'overlayCallLink',
|
||||
focus_section,
|
||||
"Expected showOverlay to reference overlayCallLink for initial focus.",
|
||||
)
|
||||
# Ensure the old buggy pattern is gone
|
||||
focus_line_region = self.html[show_start + 800:show_start + 1200]
|
||||
self.assertNotIn(
|
||||
'overlayDismissBtn.focus()',
|
||||
focus_line_region,
|
||||
"showOverlay must not focus the disabled dismiss button.",
|
||||
)
|
||||
|
||||
def test_overlay_call_link_variable_is_declared(self):
|
||||
self.assertIn(
|
||||
"querySelector('.overlay-call')",
|
||||
self.html,
|
||||
"Expected a JS reference to the .overlay-call link element.",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
import pathlib
|
||||
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
|
||||
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
||||
INDEX_HTML = ROOT / 'index.html'
|
||||
|
||||
|
||||
def test_crisis_overlay_supports_keyboard_only_navigation():
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=True)
|
||||
page = browser.new_page()
|
||||
try:
|
||||
page.goto(INDEX_HTML.as_uri())
|
||||
page.evaluate(
|
||||
"""
|
||||
() => {
|
||||
window.fetch = () => Promise.reject(new Error('test fetch blocked'));
|
||||
window.alert = () => {};
|
||||
}
|
||||
"""
|
||||
)
|
||||
|
||||
page.fill('#msg-input', "I'm going to kill myself tonight")
|
||||
page.press('#msg-input', 'Enter')
|
||||
page.wait_for_function("document.getElementById('crisis-overlay').classList.contains('active')")
|
||||
|
||||
assert page.evaluate("document.activeElement.classList.contains('overlay-call')") is True
|
||||
|
||||
page.evaluate(
|
||||
"""
|
||||
() => {
|
||||
const btn = document.getElementById('overlay-dismiss-btn');
|
||||
btn.disabled = false;
|
||||
btn.textContent = 'Continue to chat';
|
||||
}
|
||||
"""
|
||||
)
|
||||
|
||||
page.keyboard.press('Tab')
|
||||
assert page.evaluate("document.activeElement.id") == 'overlay-dismiss-btn'
|
||||
|
||||
page.keyboard.press('Tab')
|
||||
assert page.evaluate("document.activeElement.classList.contains('overlay-call')") is True
|
||||
|
||||
page.keyboard.press('Shift+Tab')
|
||||
assert page.evaluate("document.activeElement.id") == 'overlay-dismiss-btn'
|
||||
|
||||
page.keyboard.press('Escape')
|
||||
page.wait_for_function("!document.getElementById('crisis-overlay').classList.contains('active')")
|
||||
assert page.evaluate("document.activeElement.id") == 'msg-input'
|
||||
finally:
|
||||
browser.close()
|
||||
@@ -50,22 +50,6 @@ class TestCrisisOfflinePage(unittest.TestCase):
|
||||
for phrase in required_phrases:
|
||||
self.assertIn(phrase, self.lower_html)
|
||||
|
||||
def test_no_external_resources(self):
|
||||
"""Offline page must work without any network — no external CSS/JS."""
|
||||
import re
|
||||
html = self.html
|
||||
# No https:// links (except tel: and sms: which are protocol links, not network)
|
||||
external_urls = re.findall(r'href=["\']https://|src=["\']https://', html)
|
||||
self.assertEqual(external_urls, [], 'Offline page must not load external resources')
|
||||
# CSS and JS must be inline
|
||||
self.assertIn('<style>', html, 'CSS must be inline')
|
||||
self.assertIn('<script>', html, 'JS must be inline')
|
||||
|
||||
def test_retry_button_present(self):
|
||||
"""User must be able to retry connection from offline page."""
|
||||
self.assertIn('retry-connection', self.html)
|
||||
self.assertIn('Retry connection', self.html)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -1,277 +0,0 @@
|
||||
"""
|
||||
Tests for crisis session tracking and escalation (P0 #35).
|
||||
|
||||
Covers: session_tracker.py
|
||||
Run with: python -m pytest tests/test_session_tracker.py -v
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from crisis.detect import detect_crisis
|
||||
from crisis.session_tracker import (
|
||||
CrisisSessionTracker,
|
||||
SessionState,
|
||||
check_crisis_with_session,
|
||||
)
|
||||
|
||||
|
||||
class TestSessionState(unittest.TestCase):
|
||||
"""Test SessionState defaults."""
|
||||
|
||||
def test_default_state(self):
|
||||
s = SessionState()
|
||||
self.assertEqual(s.current_level, "NONE")
|
||||
self.assertEqual(s.peak_level, "NONE")
|
||||
self.assertEqual(s.message_count, 0)
|
||||
self.assertEqual(s.level_history, [])
|
||||
self.assertFalse(s.is_escalating)
|
||||
self.assertFalse(s.is_deescalating)
|
||||
|
||||
|
||||
class TestSessionTracking(unittest.TestCase):
|
||||
"""Test basic session state tracking."""
|
||||
|
||||
def setUp(self):
|
||||
self.tracker = CrisisSessionTracker()
|
||||
|
||||
def test_record_none_message(self):
|
||||
state = self.tracker.record(detect_crisis("Hello Timmy"))
|
||||
self.assertEqual(state.current_level, "NONE")
|
||||
self.assertEqual(state.message_count, 1)
|
||||
self.assertEqual(state.peak_level, "NONE")
|
||||
|
||||
def test_record_low_message(self):
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
state = self.tracker.record(detect_crisis("Having a rough day"))
|
||||
self.assertIn(state.current_level, ("LOW", "NONE"))
|
||||
self.assertEqual(state.message_count, 2)
|
||||
|
||||
def test_record_critical_updates_peak(self):
|
||||
self.tracker.record(detect_crisis("Having a rough day"))
|
||||
state = self.tracker.record(detect_crisis("I want to kill myself"))
|
||||
self.assertEqual(state.current_level, "CRITICAL")
|
||||
self.assertEqual(state.peak_level, "CRITICAL")
|
||||
|
||||
def test_peak_preserved_after_drop(self):
|
||||
"""Peak level should stay at the highest seen, even after de-escalation."""
|
||||
self.tracker.record(detect_crisis("I want to kill myself"))
|
||||
state = self.tracker.record(detect_crisis("I'm feeling a bit better"))
|
||||
self.assertEqual(state.peak_level, "CRITICAL")
|
||||
|
||||
def test_level_history(self):
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
self.tracker.record(detect_crisis("Having a rough day"))
|
||||
state = self.tracker.record(detect_crisis("I want to die"))
|
||||
self.assertEqual(len(state.level_history), 3)
|
||||
self.assertEqual(state.level_history[0], "NONE")
|
||||
self.assertEqual(state.level_history[2], "CRITICAL")
|
||||
|
||||
def test_reset_clears_state(self):
|
||||
self.tracker.record(detect_crisis("I want to kill myself"))
|
||||
self.tracker.reset()
|
||||
state = self.tracker.state
|
||||
self.assertEqual(state.current_level, "NONE")
|
||||
self.assertEqual(state.peak_level, "NONE")
|
||||
self.assertEqual(state.message_count, 0)
|
||||
self.assertEqual(state.level_history, [])
|
||||
|
||||
|
||||
class TestEscalationDetection(unittest.TestCase):
|
||||
"""Test escalation detection: LOW → HIGH in ≤3 messages."""
|
||||
|
||||
def setUp(self):
|
||||
self.tracker = CrisisSessionTracker()
|
||||
|
||||
def test_no_escalation_single_message(self):
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
self.assertFalse(self.tracker.state.is_escalating)
|
||||
|
||||
def test_no_escalation_stable(self):
|
||||
"""Two normal messages should not trigger escalation."""
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
state = self.tracker.record(detect_crisis("How are you?"))
|
||||
self.assertFalse(state.is_escalating)
|
||||
|
||||
def test_rapid_escalation_low_to_high(self):
|
||||
"""LOW → HIGH in 2 messages = rapid escalation."""
|
||||
self.tracker.record(detect_crisis("Having a rough day"))
|
||||
state = self.tracker.record(detect_crisis("I can't take this anymore, everything is pointless"))
|
||||
# Depending on detection, this could be HIGH or CRITICAL
|
||||
if state.current_level in ("HIGH", "CRITICAL"):
|
||||
self.assertTrue(state.is_escalating)
|
||||
|
||||
def test_rapid_escalation_three_messages(self):
|
||||
"""NONE → LOW → HIGH in 3 messages = escalation."""
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
self.tracker.record(detect_crisis("Having a rough day"))
|
||||
state = self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
|
||||
if state.current_level in ("HIGH", "CRITICAL"):
|
||||
self.assertTrue(state.is_escalating)
|
||||
|
||||
def test_escalation_rate(self):
|
||||
"""Rate should be positive when escalating."""
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
self.tracker.record(detect_crisis("I want to die"))
|
||||
state = self.tracker.state
|
||||
self.assertGreater(state.escalation_rate, 0)
|
||||
|
||||
|
||||
class TestDeescalationDetection(unittest.TestCase):
|
||||
"""Test de-escalation: sustained LOW after HIGH/CRITICAL."""
|
||||
|
||||
def setUp(self):
|
||||
self.tracker = CrisisSessionTracker()
|
||||
|
||||
def test_no_deescalation_without_prior_crisis(self):
|
||||
"""No de-escalation if never reached HIGH/CRITICAL."""
|
||||
for _ in range(6):
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
self.assertFalse(self.tracker.state.is_deescalating)
|
||||
|
||||
def test_deescalation_after_critical(self):
|
||||
"""5+ consecutive LOW/NONE messages after CRITICAL = de-escalation."""
|
||||
self.tracker.record(detect_crisis("I want to kill myself"))
|
||||
for _ in range(5):
|
||||
self.tracker.record(detect_crisis("I'm doing better today"))
|
||||
state = self.tracker.state
|
||||
if state.peak_level == "CRITICAL":
|
||||
self.assertTrue(state.is_deescalating)
|
||||
|
||||
def test_deescalation_after_high(self):
|
||||
"""5+ consecutive LOW/NONE messages after HIGH = de-escalation."""
|
||||
self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
|
||||
for _ in range(5):
|
||||
self.tracker.record(detect_crisis("Feeling okay"))
|
||||
state = self.tracker.state
|
||||
if state.peak_level == "HIGH":
|
||||
self.assertTrue(state.is_deescalating)
|
||||
|
||||
def test_interrupted_deescalation(self):
|
||||
"""De-escalation resets if a HIGH message interrupts."""
|
||||
self.tracker.record(detect_crisis("I want to kill myself"))
|
||||
for _ in range(3):
|
||||
self.tracker.record(detect_crisis("Doing better"))
|
||||
# Interrupt with another crisis
|
||||
self.tracker.record(detect_crisis("I feel hopeless again"))
|
||||
self.tracker.record(detect_crisis("Feeling okay now"))
|
||||
state = self.tracker.state
|
||||
# Should NOT be de-escalating yet (counter reset)
|
||||
self.assertFalse(state.is_deescalating)
|
||||
|
||||
|
||||
class TestSessionModifier(unittest.TestCase):
|
||||
"""Test system prompt modifier generation."""
|
||||
|
||||
def setUp(self):
|
||||
self.tracker = CrisisSessionTracker()
|
||||
|
||||
def test_no_modifier_for_single_message(self):
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
self.assertEqual(self.tracker.get_session_modifier(), "")
|
||||
|
||||
def test_no_modifier_for_stable_session(self):
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
self.tracker.record(detect_crisis("Good morning"))
|
||||
self.assertEqual(self.tracker.get_session_modifier(), "")
|
||||
|
||||
def test_escalation_modifier(self):
|
||||
"""Escalating session should produce a modifier."""
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
self.tracker.record(detect_crisis("I want to die"))
|
||||
modifier = self.tracker.get_session_modifier()
|
||||
if self.tracker.state.is_escalating:
|
||||
self.assertIn("escalated", modifier.lower())
|
||||
self.assertIn("NONE", modifier)
|
||||
self.assertIn("CRITICAL", modifier)
|
||||
|
||||
def test_deescalation_modifier(self):
|
||||
"""De-escalating session should mention stabilizing."""
|
||||
self.tracker.record(detect_crisis("I want to kill myself"))
|
||||
for _ in range(5):
|
||||
self.tracker.record(detect_crisis("I'm feeling okay"))
|
||||
modifier = self.tracker.get_session_modifier()
|
||||
if self.tracker.state.is_deescalating:
|
||||
self.assertIn("stabilizing", modifier.lower())
|
||||
|
||||
def test_prior_crisis_modifier(self):
|
||||
"""Past crisis should be noted even without active escalation."""
|
||||
self.tracker.record(detect_crisis("I want to die"))
|
||||
self.tracker.record(detect_crisis("Feeling a bit better"))
|
||||
modifier = self.tracker.get_session_modifier()
|
||||
# Should note the prior CRITICAL
|
||||
if modifier:
|
||||
self.assertIn("CRITICAL", modifier)
|
||||
|
||||
|
||||
class TestUIHints(unittest.TestCase):
|
||||
"""Test UI hint generation."""
|
||||
|
||||
def setUp(self):
|
||||
self.tracker = CrisisSessionTracker()
|
||||
|
||||
def test_ui_hints_structure(self):
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
hints = self.tracker.get_ui_hints()
|
||||
self.assertIn("session_escalating", hints)
|
||||
self.assertIn("session_deescalating", hints)
|
||||
self.assertIn("session_peak_level", hints)
|
||||
self.assertIn("session_message_count", hints)
|
||||
|
||||
def test_ui_hints_escalation_warning(self):
|
||||
"""Escalating session should have warning hint."""
|
||||
self.tracker.record(detect_crisis("Hello"))
|
||||
self.tracker.record(detect_crisis("I want to die"))
|
||||
hints = self.tracker.get_ui_hints()
|
||||
if hints["session_escalating"]:
|
||||
self.assertTrue(hints.get("escalation_warning"))
|
||||
self.assertIn("suggested_action", hints)
|
||||
|
||||
|
||||
class TestCheckCrisisWithSession(unittest.TestCase):
|
||||
"""Test the convenience function combining detection + session tracking."""
|
||||
|
||||
def test_returns_combined_data(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
result = check_crisis_with_session("I want to die", tracker)
|
||||
self.assertIn("level", result)
|
||||
self.assertIn("session", result)
|
||||
self.assertIn("current_level", result["session"])
|
||||
self.assertIn("peak_level", result["session"])
|
||||
self.assertIn("modifier", result["session"])
|
||||
|
||||
def test_session_updates_across_calls(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
check_crisis_with_session("Hello", tracker)
|
||||
result = check_crisis_with_session("I want to die", tracker)
|
||||
self.assertEqual(result["session"]["message_count"], 2)
|
||||
self.assertEqual(result["session"]["peak_level"], "CRITICAL")
|
||||
|
||||
|
||||
class TestPrivacy(unittest.TestCase):
|
||||
"""Verify privacy-first design principles."""
|
||||
|
||||
def test_no_persistence_mechanism(self):
|
||||
"""Session tracker should have no database, file, or network calls."""
|
||||
import inspect
|
||||
source = inspect.getsource(CrisisSessionTracker)
|
||||
# Should not import database, requests, or file I/O
|
||||
forbidden = ["sqlite", "requests", "urllib", "open(", "httpx", "aiohttp"]
|
||||
for word in forbidden:
|
||||
self.assertNotIn(word, source.lower(),
|
||||
f"Session tracker should not use {word} — privacy-first design")
|
||||
|
||||
def test_state_contained_in_memory(self):
|
||||
"""All state should be instance attributes, not module-level."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record(detect_crisis("I want to die"))
|
||||
# New tracker should have clean state (no global contamination)
|
||||
fresh = CrisisSessionTracker()
|
||||
self.assertEqual(fresh.state.current_level, "NONE")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user