Compare commits

..

13 Commits

Author SHA1 Message Date
Alexander Whitestone
44e0396a1f feat: add behavioral crisis pattern detection (#133)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-20 12:11:02 -04:00
Alexander Whitestone
ac2d230bc1 wip: add behavioral tracker regression tests 2026-04-20 12:03:08 -04:00
d412939b4f fix: footer /about link to point to static about.html
Fixes #59

The footer links to /about but the repo ships about.html. On a plain static server this results in a 404. Changed to /about.html so the link resolves correctly.
2026-04-17 05:37:40 +00:00
07c582aa08 Merge pull request 'fix: crisis overlay initial focus to enabled Call 988 link (#69)' (#126) from burn/69-1776264183 into main
Merge PR #126: fix: crisis overlay initial focus to enabled Call 988 link (#69)
2026-04-17 01:46:56 +00:00
5f95dc1e39 Merge pull request '[P3] Service worker: cache crisis resources for offline (#41)' (#122) from burn/41-1776264184 into main
Merge PR #122: [P3] Service worker: cache crisis resources for offline (#41)
2026-04-17 01:46:55 +00:00
b1f3cac36d Merge pull request 'feat: session-level crisis tracking and escalation (closes #35)' (#118) from door/issue-35 into main
Merge PR #118: feat: session-level crisis tracking and escalation (closes #35)
2026-04-17 01:46:53 +00:00
07b3f67845 fix: crisis overlay initial focus to enabled Call 988 link (#69)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 9s
Smoke Test / smoke (pull_request) Successful in 15s
2026-04-15 15:09:36 +00:00
c22bbbaf65 fix: crisis overlay initial focus to enabled Call 988 link (#69) 2026-04-15 15:09:32 +00:00
543cb1d40f test: add offline self-containment and retry button tests (#41)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-15 14:58:44 +00:00
3cfd01815a feat: session-level crisis tracking and escalation (closes #35)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 17s
Smoke Test / smoke (pull_request) Successful in 23s
2026-04-15 11:49:52 +00:00
5a7ba9f207 feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:51 +00:00
8ed8f20a17 feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:49 +00:00
9d7d26033e feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:47 +00:00
10 changed files with 1050 additions and 461 deletions

View File

@@ -1,336 +0,0 @@
"""Behavioral Pattern Detection — crisis risk from usage patterns.
Detects crisis signals from HOW someone uses the system, not just
what they say. Complements content-based crisis detection.
Behavioral signals:
- Frequency spike (anxiety/agitation)
- Frequency drop (withdrawal/isolation)
- Late-night messaging (2-5 AM)
- Session length increase (loneliness)
- Abrupt termination after emotional content
- Return after long absence
Part of Epic #102 (Multimodal Crisis Detection).
"""
import logging
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class SessionEvent:
"""A single session interaction."""
session_id: str
timestamp: float
message_length: int
is_user: bool = True
emotional_content: bool = False
terminated_abruptly: bool = False
@dataclass
class BehavioralSignals:
"""Detected behavioral risk signals."""
frequency_change: float = 0.0 # -1 (drop) to +1 (spike) vs baseline
is_late_night: bool = False # 2-5 AM local time
session_length_trend: str = "stable" # increasing/decreasing/stable
withdrawal_detected: bool = False # significant drop in activity
return_after_absence: bool = False # came back after extended absence
abrupt_termination: bool = False # left after emotional content
behavioral_score: float = 0.0 # 0-1 aggregate risk
def to_dict(self) -> Dict[str, Any]:
return {
"frequency_change": self.frequency_change,
"is_late_night": self.is_late_night,
"session_length_trend": self.session_length_trend,
"withdrawal_detected": self.withdrawal_detected,
"return_after_absence": self.return_after_absence,
"abrupt_termination": self.abrupt_termination,
"behavioral_score": self.behavioral_score,
}
class BehavioralTracker:
"""Track behavioral patterns per session/user and detect risk signals.
Uses a 7-day rolling window for baseline calculation.
Thread-safe for concurrent session tracking.
"""
# Time window constants
LATE_NIGHT_START = 2 # 2 AM
LATE_NIGHT_END = 5 # 5 AM
BASELINE_WINDOW_DAYS = 7
ABSENCE_THRESHOLD_HOURS = 48
SPIKE_THRESHOLD = 2.0 # 2x baseline = spike
DROP_THRESHOLD = 0.3 # 30% of baseline = withdrawal
def __init__(self):
# session_id -> list of events
self._events: Dict[str, List[SessionEvent]] = defaultdict(list)
# session_id -> baseline metrics
self._baselines: Dict[str, Dict[str, float]] = {}
# session_id -> last activity timestamp
self._last_activity: Dict[str, float] = {}
# Global baseline (all sessions)
self._global_baseline: Dict[str, float] = {
"avg_messages_per_hour": 5.0,
"avg_session_length_min": 15.0,
"avg_message_length": 200.0,
}
def record(
self,
session_id: str,
timestamp: Optional[float] = None,
message_length: int = 0,
is_user: bool = True,
emotional_content: bool = False,
terminated_abruptly: bool = False,
) -> None:
"""Record a session event."""
if timestamp is None:
timestamp = time.time()
event = SessionEvent(
session_id=session_id,
timestamp=timestamp,
message_length=message_length,
is_user=is_user,
emotional_content=emotional_content,
terminated_abruptly=terminated_abruptly,
)
self._events[session_id].append(event)
self._last_activity[session_id] = timestamp
# Periodically update baseline
if len(self._events[session_id]) % 20 == 0:
self._update_baseline(session_id)
def get_risk_signals(self, session_id: str) -> BehavioralSignals:
"""Analyze behavioral patterns and return risk signals."""
events = self._events.get(session_id, [])
if not events:
return BehavioralSignals()
signals = BehavioralSignals()
now = time.time()
# 1. Frequency analysis
signals.frequency_change = self._analyze_frequency(session_id, now)
# 2. Late-night detection
signals.is_late_night = self._is_late_night(events[-1].timestamp)
# 3. Session length trend
signals.session_length_trend = self._analyze_session_length_trend(session_id)
# 4. Withdrawal detection
signals.withdrawal_detected = signals.frequency_change < -0.5
# 5. Return after absence
signals.return_after_absence = self._detect_return_after_absence(session_id, now)
# 6. Abrupt termination
signals.abrupt_termination = self._detect_abrupt_termination(events)
# 7. Aggregate behavioral score
signals.behavioral_score = self._compute_behavioral_score(signals)
return signals
def _analyze_frequency(self, session_id: str, now: float) -> float:
"""Compare recent frequency to baseline. Returns -1 to +1."""
events = self._events.get(session_id, [])
if len(events) < 3:
return 0.0
# Count messages in last hour
one_hour_ago = now - 3600
recent_count = sum(1 for e in events if e.timestamp > one_hour_ago and e.is_user)
# Get baseline
baseline = self._get_baseline(session_id)
baseline_rate = baseline.get("avg_messages_per_hour", 5.0)
if baseline_rate <= 0:
return 0.0
ratio = recent_count / baseline_rate
# Map to -1..+1: 0.5x = -0.5, 1x = 0, 2x = +1
if ratio < 1.0:
return max(-1.0, (ratio - 1.0))
else:
return min(1.0, (ratio - 1.0) / 2.0)
def _is_late_night(self, timestamp: float) -> bool:
"""Check if timestamp falls in the 2-5 AM window."""
dt = datetime.fromtimestamp(timestamp)
hour = dt.hour
return self.LATE_NIGHT_START <= hour < self.LATE_NIGHT_END
def _analyze_session_length_trend(self, session_id: str) -> str:
"""Determine if session lengths are increasing, decreasing, or stable."""
events = self._events.get(session_id, [])
if len(events) < 10:
return "stable"
# Split events into first half and second half
mid = len(events) // 2
first_half = events[:mid]
second_half = events[mid:]
# Calculate session spans (first to last message)
if not first_half or not second_half:
return "stable"
first_span = first_half[-1].timestamp - first_half[0].timestamp
second_span = second_half[-1].timestamp - second_half[0].timestamp
if second_span > first_span * 1.3:
return "increasing"
elif second_span < first_span * 0.7:
return "decreasing"
return "stable"
def _detect_return_after_absence(self, session_id: str, now: float) -> bool:
"""Detect if this session started after a long absence."""
events = self._events.get(session_id, [])
if len(events) < 2:
return False
# Check gap between current session and previous
last_activity = self._last_activity.get(session_id, 0)
if last_activity <= 0:
return False
# Find previous session's last event (excluding current session)
prev_events = [e for e in events[:-10] if e.timestamp < now - 3600]
if not prev_events:
return False
gap_hours = (now - prev_events[-1].timestamp) / 3600
return gap_hours >= self.ABSENCE_THRESHOLD_HOURS
def _detect_abrupt_termination(self, events: List[SessionEvent]) -> bool:
"""Detect if the last few messages had emotional content and then stopped."""
if len(events) < 3:
return False
# Check last 5 events
recent = events[-5:]
has_emotional = any(e.emotional_content for e in recent)
last_was_user = recent[-1].is_user if recent else False
last_was_short = recent[-1].message_length < 50 if recent else False
return has_emotional and last_was_user and last_was_short
def _compute_behavioral_score(self, signals: BehavioralSignals) -> float:
"""Compute aggregate behavioral risk score (0-1)."""
score = 0.0
weights = {
"frequency_spike": 0.15,
"frequency_drop": 0.20,
"late_night": 0.10,
"session_increasing": 0.10,
"withdrawal": 0.20,
"return_after_absence": 0.05,
"abrupt_termination": 0.20,
}
# Frequency spike (anxiety)
if signals.frequency_change > 0.5:
score += weights["frequency_spike"] * min(signals.frequency_change, 1.0)
# Frequency drop (withdrawal)
if signals.frequency_change < -0.3:
score += weights["frequency_drop"] * min(abs(signals.frequency_change), 1.0)
# Late night
if signals.is_late_night:
score += weights["late_night"]
# Session length increasing
if signals.session_length_trend == "increasing":
score += weights["session_increasing"]
# Withdrawal
if signals.withdrawal_detected:
score += weights["withdrawal"]
# Return after absence
if signals.return_after_absence:
score += weights["return_after_absence"]
# Abrupt termination
if signals.abrupt_termination:
score += weights["abrupt_termination"]
return min(1.0, score)
def _get_baseline(self, session_id: str) -> Dict[str, float]:
"""Get baseline metrics for a session."""
if session_id in self._baselines:
return self._baselines[session_id]
return self._global_baseline
def _update_baseline(self, session_id: str) -> None:
"""Update rolling baseline from recent events."""
events = self._events.get(session_id, [])
if len(events) < 5:
return
# Use last 7 days of events
cutoff = time.time() - (self.BASELINE_WINDOW_DAYS * 86400)
recent = [e for e in events if e.timestamp > cutoff and e.is_user]
if not recent:
return
# Calculate metrics
time_span_hours = (recent[-1].timestamp - recent[0].timestamp) / 3600
if time_span_hours > 0:
msgs_per_hour = len(recent) / time_span_hours
else:
msgs_per_hour = len(recent)
avg_length = sum(e.message_length for e in recent) / len(recent)
self._baselines[session_id] = {
"avg_messages_per_hour": msgs_per_hour,
"avg_message_length": avg_length,
"total_messages": len(recent),
}
# Global singleton for convenience
_global_tracker = BehavioralTracker()
def record_event(
session_id: str,
timestamp: Optional[float] = None,
message_length: int = 0,
is_user: bool = True,
emotional_content: bool = False,
terminated_abruptly: bool = False,
) -> None:
"""Record an event to the global behavioral tracker."""
_global_tracker.record(
session_id, timestamp, message_length,
is_user, emotional_content, terminated_abruptly,
)
def get_risk_signals(session_id: str) -> BehavioralSignals:
"""Get behavioral risk signals for a session."""
return _global_tracker.get_risk_signals(session_id)

View File

@@ -7,6 +7,8 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .behavioral import BehavioralTracker, BehavioralSignal
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
__all__ = [
"detect_crisis",
@@ -19,4 +21,9 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"BehavioralTracker",
"BehavioralSignal",
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
]

304
crisis/behavioral.py Normal file
View File

@@ -0,0 +1,304 @@
"""Behavioral crisis pattern detection for the-door (#133).
Detects crisis risk from behavioral patterns, not just message content:
- message frequency spikes versus a 7-day rolling baseline
- late-night messaging (2-5 AM)
- withdrawal / isolation via a sharp drop from the recent daily baseline
- session length trend versus recent sessions
- return after long absence
- rising crisis-score trend across recent messages
Privacy-first:
- in-memory only
- no database
- no file I/O
- no network calls
"""
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any
HIGH_RISK_HOURS = {2, 3, 4}
ELEVATED_RISK_HOURS = {1, 5}
ROLLING_BASELINE_DAYS = 7
RETURN_AFTER_ABSENCE_DAYS = 7
@dataclass
class BehavioralEvent:
session_id: str
timestamp: datetime
message_length: int
crisis_score: float = 0.0
role: str = "user"
@dataclass
class BehavioralSignal:
signal_type: str
risk_level: str
description: str
evidence: list[str] = field(default_factory=list)
score: float = 0.0
def as_dict(self) -> dict[str, Any]:
return {
"signal_type": self.signal_type,
"risk_level": self.risk_level,
"description": self.description,
"evidence": list(self.evidence),
"score": self.score,
}
class BehavioralTracker:
"""In-memory tracker for behavioral crisis signals."""
def __init__(self) -> None:
self._events_by_session: dict[str, list[BehavioralEvent]] = defaultdict(list)
def record(
self,
session_id: str,
timestamp: datetime,
message_length: int,
*,
crisis_score: float = 0.0,
role: str = "user",
) -> None:
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=timezone.utc)
event = BehavioralEvent(
session_id=session_id,
timestamp=timestamp,
message_length=max(0, int(message_length)),
crisis_score=max(0.0, min(1.0, float(crisis_score))),
role=role,
)
self._events_by_session[session_id].append(event)
self._events_by_session[session_id].sort(key=lambda item: item.timestamp)
def get_risk_signals(self, session_id: str) -> dict[str, Any]:
events = [event for event in self._events_by_session.get(session_id, []) if event.role == "user"]
if not events:
return {
"frequency_change": 1.0,
"is_late_night": False,
"session_length_trend": "stable",
"withdrawal_detected": False,
"behavioral_score": 0.0,
"signals": [],
}
signals: list[BehavioralSignal] = []
frequency_change = self._compute_frequency_change(events)
frequency_signal = self._analyze_frequency(events, frequency_change)
if frequency_signal:
signals.append(frequency_signal)
time_signal = self._analyze_time(events)
if time_signal:
signals.append(time_signal)
withdrawal_signal = self._analyze_withdrawal(session_id, events)
if withdrawal_signal:
signals.append(withdrawal_signal)
absence_signal = self._analyze_return_after_absence(session_id, events)
if absence_signal:
signals.append(absence_signal)
escalation_signal = self._analyze_escalation(events)
if escalation_signal:
signals.append(escalation_signal)
session_length_trend = self._compute_session_length_trend(session_id, events)
behavioral_score = self._compute_behavioral_score(signals)
risk_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
signals.sort(key=lambda item: (risk_order.get(item.risk_level, 9), -item.score))
return {
"frequency_change": frequency_change,
"is_late_night": any(item.signal_type == "time" for item in signals),
"session_length_trend": session_length_trend,
"withdrawal_detected": any(item.signal_type == "withdrawal" for item in signals),
"behavioral_score": behavioral_score,
"signals": [item.as_dict() for item in signals],
}
def _all_user_events(self) -> list[BehavioralEvent]:
events: list[BehavioralEvent] = []
for session_events in self._events_by_session.values():
events.extend(event for event in session_events if event.role == "user")
events.sort(key=lambda item: item.timestamp)
return events
def _daily_count_baseline(self, current_date) -> float:
events = self._all_user_events()
counts: dict[Any, int] = {}
for offset in range(1, ROLLING_BASELINE_DAYS + 1):
counts[current_date - timedelta(days=offset)] = 0
for event in events:
event_date = event.timestamp.date()
if event_date in counts:
counts[event_date] += 1
return sum(counts.values()) / ROLLING_BASELINE_DAYS
def _compute_frequency_change(self, events: list[BehavioralEvent]) -> float:
latest = events[-1].timestamp
window_start = latest - timedelta(hours=1)
current_hour_count = sum(1 for event in events if event.timestamp >= window_start)
baseline_daily = self._daily_count_baseline(latest.date())
baseline_hourly = max(baseline_daily / 24.0, 0.1)
return round(current_hour_count / baseline_hourly, 2)
def _analyze_frequency(self, events: list[BehavioralEvent], frequency_change: float) -> BehavioralSignal | None:
latest = events[-1].timestamp
window_start = latest - timedelta(hours=1)
current_hour_count = sum(1 for event in events if event.timestamp >= window_start)
if current_hour_count >= 6 and frequency_change >= 3.0:
level = "HIGH" if frequency_change >= 6.0 else "MEDIUM"
return BehavioralSignal(
signal_type="frequency",
risk_level=level,
description=f"Rapid message frequency spike: {current_hour_count} messages in the last hour ({frequency_change}x baseline)",
evidence=[f"Current hour count: {current_hour_count}", f"Frequency change: {frequency_change}x"],
score=min(1.0, frequency_change / 8.0),
)
return None
def _analyze_time(self, events: list[BehavioralEvent]) -> BehavioralSignal | None:
latest = events[-1].timestamp
hour = latest.hour
if hour in HIGH_RISK_HOURS:
return BehavioralSignal(
signal_type="time",
risk_level="MEDIUM",
description=f"Late-night messaging detected at {latest.strftime('%H:%M')}",
evidence=[f"Latest message timestamp: {latest.isoformat()}"],
score=0.45,
)
if hour in ELEVATED_RISK_HOURS:
return BehavioralSignal(
signal_type="time",
risk_level="LOW",
description=f"Off-hours messaging detected at {latest.strftime('%H:%M')}",
evidence=[f"Latest message timestamp: {latest.isoformat()}"],
score=0.2,
)
return None
def _analyze_withdrawal(self, session_id: str, events: list[BehavioralEvent]) -> BehavioralSignal | None:
current_date = events[-1].timestamp.date()
baseline_daily = self._daily_count_baseline(current_date)
if baseline_daily < 3.0:
return None
current_day_count = sum(1 for event in events if event.timestamp.date() == current_date)
current_avg_len = sum(event.message_length for event in events if event.timestamp.date() == current_date) / max(current_day_count, 1)
prior_events = [
event
for sid, session_events in self._events_by_session.items()
if sid != session_id
for event in session_events
if event.role == "user" and event.timestamp.date() >= current_date - timedelta(days=ROLLING_BASELINE_DAYS)
]
if not prior_events:
return None
prior_avg_len = sum(event.message_length for event in prior_events) / len(prior_events)
if current_day_count <= max(1, baseline_daily * 0.3):
score = 0.55 if current_day_count == 1 else 0.4
if current_avg_len < prior_avg_len * 0.5:
score += 0.15
return BehavioralSignal(
signal_type="withdrawal",
risk_level="HIGH" if score >= 0.6 else "MEDIUM",
description="Sharp drop from recent communication baseline suggests withdrawal/isolation",
evidence=[
f"Current day count: {current_day_count}",
f"7-day daily baseline: {baseline_daily:.2f}",
f"Average message length: {current_avg_len:.1f} vs {prior_avg_len:.1f}",
],
score=min(1.0, score),
)
return None
def _analyze_return_after_absence(self, session_id: str, events: list[BehavioralEvent]) -> BehavioralSignal | None:
current_start = events[0].timestamp
prior_events = [
event
for sid, session_events in self._events_by_session.items()
if sid != session_id
for event in session_events
if event.role == "user" and event.timestamp < current_start
]
if not prior_events:
return None
latest_prior = max(prior_events, key=lambda item: item.timestamp)
gap = current_start - latest_prior.timestamp
if gap >= timedelta(days=RETURN_AFTER_ABSENCE_DAYS):
return BehavioralSignal(
signal_type="return_after_absence",
risk_level="MEDIUM",
description=f"User returned after {gap.days} days of silence",
evidence=[f"Last prior activity: {latest_prior.timestamp.isoformat()}"],
score=min(1.0, gap.days / 14.0),
)
return None
def _analyze_escalation(self, events: list[BehavioralEvent]) -> BehavioralSignal | None:
scored = [event for event in events if event.crisis_score > 0]
if len(scored) < 3:
return None
recent = scored[-5:]
midpoint = max(1, len(recent) // 2)
first_avg = sum(event.crisis_score for event in recent[:midpoint]) / len(recent[:midpoint])
second_avg = sum(event.crisis_score for event in recent[midpoint:]) / len(recent[midpoint:])
if second_avg >= max(0.4, first_avg * 1.3):
return BehavioralSignal(
signal_type="escalation",
risk_level="HIGH" if second_avg >= 0.65 else "MEDIUM",
description=f"Behavioral escalation: crisis score trend rose from {first_avg:.2f} to {second_avg:.2f}",
evidence=[f"Recent crisis scores: {[round(event.crisis_score, 2) for event in recent]}"],
score=min(1.0, second_avg),
)
return None
def _compute_session_length_trend(self, session_id: str, events: list[BehavioralEvent]) -> str:
current_duration = (events[-1].timestamp - events[0].timestamp).total_seconds()
previous_durations = []
current_start = events[0].timestamp
for sid, session_events in self._events_by_session.items():
if sid == session_id:
continue
user_events = [event for event in session_events if event.role == "user"]
if len(user_events) < 2:
continue
if user_events[-1].timestamp < current_start - timedelta(days=ROLLING_BASELINE_DAYS):
continue
previous_durations.append((user_events[-1].timestamp - user_events[0].timestamp).total_seconds())
if not previous_durations:
return "stable"
average_duration = sum(previous_durations) / len(previous_durations)
if current_duration > average_duration * 1.5:
return "increasing"
if current_duration < average_duration * 0.5:
return "decreasing"
return "stable"
def _compute_behavioral_score(self, signals: list[BehavioralSignal]) -> float:
if not signals:
return 0.0
max_score = max(signal.score for signal in signals)
multi_signal_boost = min(0.2, 0.05 * (len(signals) - 1))
return round(min(1.0, max_score + multi_signal_boost), 2)

View File

@@ -22,6 +22,7 @@ from .response import (
get_system_prompt_modifier,
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
def check_crisis(text: str) -> dict:

325
crisis/session_tracker.py Normal file
View File

@@ -0,0 +1,325 @@
"""
Session-level crisis tracking and escalation for the-door (P0 #35).
Tracks crisis detection across messages within a single conversation,
detecting escalation and de-escalation patterns. Privacy-first: no
persistence beyond the conversation session.
Each message is analyzed in isolation by detect.py, but this module
maintains session state so the system can recognize patterns like:
- "I'm fine""I'm struggling""I can't go on" (rapid escalation)
- "I want to die""I'm calmer now""feeling better" (de-escalation)
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
# Feed each message's detection result
state = tracker.record(detect_crisis("I'm having a tough day"))
print(state.current_level) # "LOW"
print(state.is_escalating) # False
state = tracker.record(detect_crisis("I feel hopeless"))
print(state.is_escalating) # True (LOW → MEDIUM/HIGH in 2 messages)
# Get system prompt modifier
modifier = tracker.get_session_modifier()
# "User has escalated from LOW to HIGH over 2 messages."
# Reset for new session
tracker.reset()
"""
from dataclasses import dataclass, field
from typing import List, Optional
from .behavioral import BehavioralTracker
from .detect import CrisisDetectionResult, SCORES
# Level ordering for comparison (higher = more severe)
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class SessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[str] = field(default_factory=list)
is_escalating: bool = False
is_deescalating: bool = False
escalation_rate: float = 0.0 # levels gained per message
consecutive_low_messages: int = 0 # for de-escalation tracking
behavioral_score: float = 0.0
behavioral_signals: List[dict] = field(default_factory=list)
frequency_change: float = 1.0
is_late_night: bool = False
session_length_trend: str = "stable"
withdrawal_detected: bool = False
class CrisisSessionTracker:
"""
Session-level crisis state tracker.
Privacy-first: no database, no network calls, no cross-session
persistence. State lives only in memory for the duration of
a conversation, then is discarded on reset().
"""
# Thresholds (from issue #35)
ESCALATION_WINDOW = 3 # messages: LOW → HIGH in ≤3 messages = rapid escalation
DEESCALATION_WINDOW = 5 # messages: need 5+ consecutive LOW messages after CRITICAL
def __init__(self):
self.reset()
def reset(self):
"""Reset all session state. Call on new conversation."""
self._current_level = "NONE"
self._peak_level = "NONE"
self._message_count = 0
self._level_history: List[str] = []
self._consecutive_low = 0
self._behavioral_tracker = BehavioralTracker()
self._behavioral_session_id = "current-session"
@property
def state(self) -> SessionState:
"""Return immutable snapshot of current session state."""
is_escalating = self._detect_escalation()
is_deescalating = self._detect_deescalation()
rate = self._compute_escalation_rate()
behavioral = self._behavioral_tracker.get_risk_signals(self._behavioral_session_id)
return SessionState(
current_level=self._current_level,
peak_level=self._peak_level,
message_count=self._message_count,
level_history=list(self._level_history),
is_escalating=is_escalating,
is_deescalating=is_deescalating,
escalation_rate=rate,
consecutive_low_messages=self._consecutive_low,
behavioral_score=behavioral["behavioral_score"],
behavioral_signals=behavioral["signals"],
frequency_change=behavioral["frequency_change"],
is_late_night=behavioral["is_late_night"],
session_length_trend=behavioral["session_length_trend"],
withdrawal_detected=behavioral["withdrawal_detected"],
)
def record(
self,
detection: CrisisDetectionResult,
*,
timestamp=None,
message_length: int = 0,
role: str = "user",
) -> SessionState:
"""
Record a crisis detection result for the current message.
Returns updated SessionState.
"""
from datetime import datetime, timezone
level = detection.level
self._message_count += 1
self._level_history.append(level)
# Update peak
if LEVEL_ORDER.get(level, 0) > LEVEL_ORDER.get(self._peak_level, 0):
self._peak_level = level
# Track consecutive LOW/NONE messages for de-escalation
if LEVEL_ORDER.get(level, 0) <= LEVEL_ORDER["LOW"]:
self._consecutive_low += 1
else:
self._consecutive_low = 0
if role == "user":
if timestamp is None:
timestamp = datetime.now(timezone.utc)
self._behavioral_tracker.record(
self._behavioral_session_id,
timestamp,
message_length=message_length,
crisis_score=detection.score,
role=role,
)
self._current_level = level
return self.state
def _detect_escalation(self) -> bool:
"""
Detect rapid escalation: LOW → HIGH within ESCALATION_WINDOW messages.
Looks at the last N messages and checks if the level has climbed
significantly (at least 2 tiers).
"""
if len(self._level_history) < 2:
return False
window = self._level_history[-self.ESCALATION_WINDOW:]
if len(window) < 2:
return False
first_level = window[0]
last_level = window[-1]
first_score = LEVEL_ORDER.get(first_level, 0)
last_score = LEVEL_ORDER.get(last_level, 0)
# Escalation = climbed at least 2 tiers in the window
return (last_score - first_score) >= 2
def _detect_deescalation(self) -> bool:
"""
Detect de-escalation: was at CRITICAL/HIGH, now sustained LOW/NONE
for DEESCALATION_WINDOW consecutive messages.
"""
if LEVEL_ORDER.get(self._peak_level, 0) < LEVEL_ORDER["HIGH"]:
return False
return self._consecutive_low >= self.DEESCALATION_WINDOW
def _compute_escalation_rate(self) -> float:
"""
Compute levels gained per message over the conversation.
Positive = escalating, negative = de-escalating, 0 = stable.
"""
if self._message_count < 2:
return 0.0
first = LEVEL_ORDER.get(self._level_history[0], 0)
current = LEVEL_ORDER.get(self._current_level, 0)
return (current - first) / (self._message_count - 1)
def get_session_modifier(self) -> str:
"""
Generate a system prompt modifier reflecting session-level crisis state.
Returns empty string if no session context is relevant.
"""
if self._message_count < 2:
return ""
s = self.state
if s.is_escalating:
return (
f"User has escalated from {self._level_history[0]} to "
f"{s.current_level} over {s.message_count} messages. "
f"Peak crisis level this session: {s.peak_level}. "
"Respond with heightened awareness. The trajectory is "
"worsening — prioritize safety and connection."
)
if s.is_deescalating:
return (
f"User previously reached {s.peak_level} crisis level "
f"but has been at {s.current_level} or below for "
f"{s.consecutive_low_messages} consecutive messages. "
"The situation appears to be stabilizing. Continue "
"supportive engagement while remaining vigilant."
)
notes = []
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
notes.append(
f"User previously reached {s.peak_level} crisis level this session (currently {s.current_level}). "
"Continue with care and awareness of the earlier crisis."
)
if s.behavioral_score >= 0.35 and s.behavioral_signals:
signal_names = ", ".join(item["signal_type"] for item in s.behavioral_signals)
notes.append(
f"Behavioral risk signals detected this session: {signal_names}. "
"Use the behavioral context to increase sensitivity and warmth."
)
return " ".join(notes)
def get_ui_hints(self) -> dict:
"""
Return UI hints based on session state for the frontend.
These are advisory — the frontend decides what to show.
"""
s = self.state
hints = {
"session_escalating": s.is_escalating,
"session_deescalating": s.is_deescalating,
"session_peak_level": s.peak_level,
"session_message_count": s.message_count,
"behavioral_score": s.behavioral_score,
"is_late_night": s.is_late_night,
"withdrawal_detected": s.withdrawal_detected,
"session_length_trend": s.session_length_trend,
}
if s.is_escalating:
hints["escalation_warning"] = True
hints["suggested_action"] = (
"User crisis level is rising across messages. "
"Consider increasing intervention level."
)
if s.behavioral_score >= 0.5:
hints["behavioral_warning"] = True
hints.setdefault(
"suggested_action",
"Behavioral risk patterns are active. Keep the response warm, grounded, and alert."
)
return hints
def check_crisis_with_session(
text: str,
tracker: CrisisSessionTracker,
timestamp=None,
) -> dict:
"""
Convenience: detect crisis and update session state in one call.
Returns combined single-message detection + session-level context.
"""
from .detect import detect_crisis
from .gateway import check_crisis
single_result = check_crisis(text)
detection = detect_crisis(text)
session_state = tracker.record(detection, timestamp=timestamp, message_length=len(text))
behavioral = {
"frequency_change": session_state.frequency_change,
"is_late_night": session_state.is_late_night,
"session_length_trend": session_state.session_length_trend,
"withdrawal_detected": session_state.withdrawal_detected,
"behavioral_score": session_state.behavioral_score,
"signals": session_state.behavioral_signals,
}
return {
**single_result,
"session": {
"current_level": session_state.current_level,
"peak_level": session_state.peak_level,
"message_count": session_state.message_count,
"is_escalating": session_state.is_escalating,
"is_deescalating": session_state.is_deescalating,
"modifier": tracker.get_session_modifier(),
"ui_hints": tracker.get_ui_hints(),
"behavioral": behavioral,
},
}

View File

@@ -680,7 +680,7 @@ html, body {
<!-- Footer -->
<footer id="footer">
<a href="/about" aria-label="About The Door">about</a>
<a href="/about.html" aria-label="About The Door">about</a>
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
</footer>
@@ -808,6 +808,7 @@ Sovereignty and service always.`;
var crisisPanel = document.getElementById('crisis-panel');
var crisisOverlay = document.getElementById('crisis-overlay');
var overlayDismissBtn = document.getElementById('overlay-dismiss-btn');
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
@@ -1050,7 +1051,8 @@ Sovereignty and service always.`;
}
}, 1000);
overlayDismissBtn.focus();
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
}
// Register focus trap on document (always listening, gated by class check)

View File

@@ -1,136 +1,101 @@
"""Tests for behavioral pattern detection."""
"""
Tests for behavioral crisis pattern detection (#133).
"""
import time
import pytest
import os
import sys
import unittest
from datetime import datetime, timedelta, timezone
from behavioral_tracker import BehavioralTracker, BehavioralSignals
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.session_tracker import CrisisSessionTracker, check_crisis_with_session
from crisis.behavioral import BehavioralTracker
class TestBehavioralTracker:
def test_empty_tracker_returns_default_signals(self):
class TestBehavioralTracker(unittest.TestCase):
def _seed_day(self, tracker, *, session_id, day, count, start_hour=10, message_length=48, crisis_score=0.0):
base = datetime(2026, 4, day, start_hour, 0, tzinfo=timezone.utc)
for i in range(count):
tracker.record(
session_id,
base + timedelta(minutes=i * 10),
message_length=message_length,
crisis_score=crisis_score,
)
def test_frequency_change_uses_seven_day_baseline(self):
tracker = BehavioralTracker()
signals = tracker.get_risk_signals("session-1")
assert signals.behavioral_score == 0.0
assert not signals.is_late_night
for day in range(1, 8):
self._seed_day(tracker, session_id=f"baseline-{day}", day=day, count=2)
def test_frequency_spike_detected(self):
burst_base = datetime(2026, 4, 8, 14, 0, tzinfo=timezone.utc)
for i in range(8):
tracker.record(
"current-session",
burst_base + timedelta(minutes=i),
message_length=72,
crisis_score=0.1,
)
summary = tracker.get_risk_signals("current-session")
self.assertGreater(summary["frequency_change"], 2.0)
self.assertTrue(any(sig["signal_type"] == "frequency" for sig in summary["signals"]))
self.assertGreater(summary["behavioral_score"], 0.0)
def test_late_night_messages_raise_flag(self):
tracker = BehavioralTracker()
# Establish baseline: 2 messages per hour
now = time.time()
for i in range(10):
tracker.record("s1", timestamp=now - (10 - i) * 1800, message_length=100)
base = datetime(2026, 4, 10, 2, 15, tzinfo=timezone.utc)
for i in range(3):
tracker.record(
"late-night",
base + timedelta(minutes=i * 7),
message_length=35,
crisis_score=0.0,
)
# Spike: 10 messages in last 10 minutes
for i in range(10):
tracker.record("s1", timestamp=now - (10 - i) * 60, message_length=50)
summary = tracker.get_risk_signals("late-night")
signals = tracker.get_risk_signals("s1")
assert signals.frequency_change > 0.3 # Significant spike
self.assertTrue(summary["is_late_night"])
self.assertTrue(any(sig["signal_type"] == "time" for sig in summary["signals"]))
def test_frequency_drop_detected(self):
def test_withdrawal_detected_after_large_drop_from_baseline(self):
tracker = BehavioralTracker()
now = time.time()
# Baseline: heavy usage
for i in range(50):
tracker.record("s1", timestamp=now - (50 - i) * 300, message_length=100)
for day in range(1, 8):
self._seed_day(tracker, session_id=f"baseline-{day}", day=day, count=10, message_length=80)
# Then very few messages
tracker.record("s1", timestamp=now - 60, message_length=50)
signals = tracker.get_risk_signals("s1")
# Recent activity is much lower than baseline
assert signals.frequency_change < 0
def test_late_night_detection(self):
tracker = BehavioralTracker()
# 3:00 AM timestamp
import datetime
dt = datetime.datetime(2026, 4, 14, 3, 0, 0)
ts = dt.timestamp()
tracker.record("s1", timestamp=ts, message_length=100)
signals = tracker.get_risk_signals("s1")
assert signals.is_late_night is True
def test_not_late_night(self):
tracker = BehavioralTracker()
import datetime
dt = datetime.datetime(2026, 4, 14, 14, 0, 0) # 2 PM
ts = dt.timestamp()
tracker.record("s1", timestamp=ts, message_length=100)
signals = tracker.get_risk_signals("s1")
assert signals.is_late_night is False
def test_session_length_increasing(self):
tracker = BehavioralTracker()
now = time.time()
# First half: messages spread over 5 minutes
for i in range(10):
tracker.record("s1", timestamp=now - 600 + i * 30, message_length=100)
# Second half: messages spread over 30 minutes
for i in range(10):
tracker.record("s1", timestamp=now - 300 + i * 180, message_length=100)
signals = tracker.get_risk_signals("s1")
assert signals.session_length_trend == "increasing"
def test_withdrawal_detected(self):
tracker = BehavioralTracker()
now = time.time()
# High baseline
for i in range(50):
tracker.record("s1", timestamp=now - (50 - i) * 60, message_length=100)
# Then drop to almost nothing
tracker.record("s1", timestamp=now - 60, message_length=20)
signals = tracker.get_risk_signals("s1")
assert signals.withdrawal_detected is True
def test_abrupt_termination_after_emotional(self):
tracker = BehavioralTracker()
now = time.time()
# Normal messages
for i in range(5):
tracker.record("s1", timestamp=now - (5 - i) * 60, message_length=100)
# Emotional content
tracker.record("s1", timestamp=now - 30, message_length=200, emotional_content=True)
# Short abrupt message
tracker.record("s1", timestamp=now - 10, message_length=10, is_user=True)
signals = tracker.get_risk_signals("s1")
assert signals.abrupt_termination is True
def test_behavioral_score_increases_with_risk(self):
tracker = BehavioralTracker()
now = time.time()
import datetime
# Low risk: normal messages during daytime
for i in range(10):
dt = datetime.datetime(2026, 4, 14, 14, i, 0) # 2 PM
tracker.record("s1", timestamp=dt.timestamp(), message_length=100)
low_risk = tracker.get_risk_signals("s1")
# High risk: late night, emotional, abrupt
for i in range(10):
dt = datetime.datetime(2026, 4, 14, 3, i, 0) # 3 AM
tracker.record("s2", timestamp=dt.timestamp(), message_length=100, emotional_content=True)
tracker.record("s2", timestamp=datetime.datetime(2026, 4, 14, 3, 10, 0).timestamp(),
message_length=10, is_user=True)
high_risk = tracker.get_risk_signals("s2")
assert high_risk.behavioral_score > low_risk.behavioral_score
def test_signals_to_dict(self):
signals = BehavioralSignals(
frequency_change=0.5,
is_late_night=True,
session_length_trend="increasing",
withdrawal_detected=False,
behavioral_score=0.4,
tracker.record(
"withdrawal-session",
datetime(2026, 4, 9, 11, 0, tzinfo=timezone.utc),
message_length=18,
crisis_score=0.0,
)
d = signals.to_dict()
assert d["frequency_change"] == 0.5
assert d["is_late_night"] is True
assert d["session_length_trend"] == "increasing"
summary = tracker.get_risk_signals("withdrawal-session")
self.assertTrue(summary["withdrawal_detected"])
self.assertTrue(any(sig["signal_type"] == "withdrawal" for sig in summary["signals"]))
class TestBehavioralSessionIntegration(unittest.TestCase):
def test_check_crisis_with_session_includes_behavioral_summary(self):
tracker = CrisisSessionTracker()
base = datetime(2026, 4, 20, 2, 0, tzinfo=timezone.utc)
check_crisis_with_session("can't sleep", tracker, timestamp=base)
check_crisis_with_session("still here", tracker, timestamp=base + timedelta(minutes=1))
result = check_crisis_with_session("everything feels loud", tracker, timestamp=base + timedelta(minutes=2))
behavioral = result["session"]["behavioral"]
self.assertIn("frequency_change", behavioral)
self.assertIn("is_late_night", behavioral)
self.assertIn("session_length_trend", behavioral)
self.assertIn("withdrawal_detected", behavioral)
self.assertIn("behavioral_score", behavioral)
self.assertTrue(behavioral["is_late_night"])
self.assertGreater(behavioral["behavioral_score"], 0.0)
if __name__ == '__main__':
unittest.main()

View File

@@ -52,6 +52,34 @@ class TestCrisisOverlayFocusTrap(unittest.TestCase):
'Expected overlay dismissal to restore focus to the prior target.',
)
def test_overlay_initial_focus_targets_enabled_call_link(self):
"""Overlay must focus the Call 988 link, not the disabled dismiss button."""
# Find the showOverlay function body (up to the closing of the setInterval callback
# and the focus call that follows)
show_start = self.html.find('function showOverlay()')
self.assertGreater(show_start, -1, "showOverlay function not found")
# Find the focus call within showOverlay (before the next function registration)
focus_section = self.html[show_start:show_start + 2000]
self.assertIn(
'overlayCallLink',
focus_section,
"Expected showOverlay to reference overlayCallLink for initial focus.",
)
# Ensure the old buggy pattern is gone
focus_line_region = self.html[show_start + 800:show_start + 1200]
self.assertNotIn(
'overlayDismissBtn.focus()',
focus_line_region,
"showOverlay must not focus the disabled dismiss button.",
)
def test_overlay_call_link_variable_is_declared(self):
self.assertIn(
"querySelector('.overlay-call')",
self.html,
"Expected a JS reference to the .overlay-call link element.",
)
if __name__ == '__main__':
unittest.main()

View File

@@ -50,6 +50,22 @@ class TestCrisisOfflinePage(unittest.TestCase):
for phrase in required_phrases:
self.assertIn(phrase, self.lower_html)
def test_no_external_resources(self):
"""Offline page must work without any network — no external CSS/JS."""
import re
html = self.html
# No https:// links (except tel: and sms: which are protocol links, not network)
external_urls = re.findall(r'href=["\']https://|src=["\']https://', html)
self.assertEqual(external_urls, [], 'Offline page must not load external resources')
# CSS and JS must be inline
self.assertIn('<style>', html, 'CSS must be inline')
self.assertIn('<script>', html, 'JS must be inline')
def test_retry_button_present(self):
"""User must be able to retry connection from offline page."""
self.assertIn('retry-connection', self.html)
self.assertIn('Retry connection', self.html)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,277 @@
"""
Tests for crisis session tracking and escalation (P0 #35).
Covers: session_tracker.py
Run with: python -m pytest tests/test_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
from crisis.session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
class TestSessionState(unittest.TestCase):
"""Test SessionState defaults."""
def test_default_state(self):
s = SessionState()
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.is_escalating)
self.assertFalse(s.is_deescalating)
class TestSessionTracking(unittest.TestCase):
"""Test basic session state tracking."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_record_none_message(self):
state = self.tracker.record(detect_crisis("Hello Timmy"))
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.message_count, 1)
self.assertEqual(state.peak_level, "NONE")
def test_record_low_message(self):
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("Having a rough day"))
self.assertIn(state.current_level, ("LOW", "NONE"))
self.assertEqual(state.message_count, 2)
def test_record_critical_updates_peak(self):
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to kill myself"))
self.assertEqual(state.current_level, "CRITICAL")
self.assertEqual(state.peak_level, "CRITICAL")
def test_peak_preserved_after_drop(self):
"""Peak level should stay at the highest seen, even after de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
state = self.tracker.record(detect_crisis("I'm feeling a bit better"))
self.assertEqual(state.peak_level, "CRITICAL")
def test_level_history(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to die"))
self.assertEqual(len(state.level_history), 3)
self.assertEqual(state.level_history[0], "NONE")
self.assertEqual(state.level_history[2], "CRITICAL")
def test_reset_clears_state(self):
self.tracker.record(detect_crisis("I want to kill myself"))
self.tracker.reset()
state = self.tracker.state
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.peak_level, "NONE")
self.assertEqual(state.message_count, 0)
self.assertEqual(state.level_history, [])
class TestEscalationDetection(unittest.TestCase):
"""Test escalation detection: LOW → HIGH in ≤3 messages."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_escalation_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_escalating)
def test_no_escalation_stable(self):
"""Two normal messages should not trigger escalation."""
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("How are you?"))
self.assertFalse(state.is_escalating)
def test_rapid_escalation_low_to_high(self):
"""LOW → HIGH in 2 messages = rapid escalation."""
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I can't take this anymore, everything is pointless"))
# Depending on detection, this could be HIGH or CRITICAL
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_rapid_escalation_three_messages(self):
"""NONE → LOW → HIGH in 3 messages = escalation."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_escalation_rate(self):
"""Rate should be positive when escalating."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
state = self.tracker.state
self.assertGreater(state.escalation_rate, 0)
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: sustained LOW after HIGH/CRITICAL."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_deescalation_without_prior_crisis(self):
"""No de-escalation if never reached HIGH/CRITICAL."""
for _ in range(6):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_deescalating)
def test_deescalation_after_critical(self):
"""5+ consecutive LOW/NONE messages after CRITICAL = de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm doing better today"))
state = self.tracker.state
if state.peak_level == "CRITICAL":
self.assertTrue(state.is_deescalating)
def test_deescalation_after_high(self):
"""5+ consecutive LOW/NONE messages after HIGH = de-escalation."""
self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
for _ in range(5):
self.tracker.record(detect_crisis("Feeling okay"))
state = self.tracker.state
if state.peak_level == "HIGH":
self.assertTrue(state.is_deescalating)
def test_interrupted_deescalation(self):
"""De-escalation resets if a HIGH message interrupts."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(3):
self.tracker.record(detect_crisis("Doing better"))
# Interrupt with another crisis
self.tracker.record(detect_crisis("I feel hopeless again"))
self.tracker.record(detect_crisis("Feeling okay now"))
state = self.tracker.state
# Should NOT be de-escalating yet (counter reset)
self.assertFalse(state.is_deescalating)
class TestSessionModifier(unittest.TestCase):
"""Test system prompt modifier generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_modifier_for_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_no_modifier_for_stable_session(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Good morning"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_escalation_modifier(self):
"""Escalating session should produce a modifier."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_escalating:
self.assertIn("escalated", modifier.lower())
self.assertIn("NONE", modifier)
self.assertIn("CRITICAL", modifier)
def test_deescalation_modifier(self):
"""De-escalating session should mention stabilizing."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm feeling okay"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_deescalating:
self.assertIn("stabilizing", modifier.lower())
def test_prior_crisis_modifier(self):
"""Past crisis should be noted even without active escalation."""
self.tracker.record(detect_crisis("I want to die"))
self.tracker.record(detect_crisis("Feeling a bit better"))
modifier = self.tracker.get_session_modifier()
# Should note the prior CRITICAL
if modifier:
self.assertIn("CRITICAL", modifier)
class TestUIHints(unittest.TestCase):
"""Test UI hint generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_ui_hints_structure(self):
self.tracker.record(detect_crisis("Hello"))
hints = self.tracker.get_ui_hints()
self.assertIn("session_escalating", hints)
self.assertIn("session_deescalating", hints)
self.assertIn("session_peak_level", hints)
self.assertIn("session_message_count", hints)
def test_ui_hints_escalation_warning(self):
"""Escalating session should have warning hint."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
hints = self.tracker.get_ui_hints()
if hints["session_escalating"]:
self.assertTrue(hints.get("escalation_warning"))
self.assertIn("suggested_action", hints)
class TestCheckCrisisWithSession(unittest.TestCase):
"""Test the convenience function combining detection + session tracking."""
def test_returns_combined_data(self):
tracker = CrisisSessionTracker()
result = check_crisis_with_session("I want to die", tracker)
self.assertIn("level", result)
self.assertIn("session", result)
self.assertIn("current_level", result["session"])
self.assertIn("peak_level", result["session"])
self.assertIn("modifier", result["session"])
def test_session_updates_across_calls(self):
tracker = CrisisSessionTracker()
check_crisis_with_session("Hello", tracker)
result = check_crisis_with_session("I want to die", tracker)
self.assertEqual(result["session"]["message_count"], 2)
self.assertEqual(result["session"]["peak_level"], "CRITICAL")
class TestPrivacy(unittest.TestCase):
"""Verify privacy-first design principles."""
def test_no_persistence_mechanism(self):
"""Session tracker should have no database, file, or network calls."""
import inspect
source = inspect.getsource(CrisisSessionTracker)
# Should not import database, requests, or file I/O
forbidden = ["sqlite", "requests", "urllib", "open(", "httpx", "aiohttp"]
for word in forbidden:
self.assertNotIn(word, source.lower(),
f"Session tracker should not use {word} — privacy-first design")
def test_state_contained_in_memory(self):
"""All state should be instance attributes, not module-level."""
tracker = CrisisSessionTracker()
tracker.record(detect_crisis("I want to die"))
# New tracker should have clean state (no global contamination)
fresh = CrisisSessionTracker()
self.assertEqual(fresh.state.current_level, "NONE")
if __name__ == '__main__':
unittest.main()