Compare commits

..

6 Commits

Author SHA1 Message Date
7a08fb764a test: add tests for crisis detection A/B testing framework
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 14s
Refs #101
2026-04-15 15:10:04 +00:00
4b022f0df4 feat: add crisis detection A/B testing framework (#101)
- Feature flag: algorithm A vs B
- Event logging with variant assignment
- Metrics: false positive rate, detection latency per variant
- Statistical significance testing (chi-squared)
- Privacy-preserving (hashes text, no raw crisis content logged)

Closes #101
2026-04-15 15:08:45 +00:00
48f48c7f26 feat: cache offline crisis resources (refs #41) (#74)
All checks were successful
Smoke Test / smoke (push) Successful in 7s
Sanity Checks / sanity-test (pull_request) Successful in 17s
Smoke Test / smoke (pull_request) Successful in 19s
Merge PR #74 (squash)
2026-04-14 22:09:59 +00:00
da31288525 fix: deprecate dying_detection and consolidate crisis detection (#40) (#76)
All checks were successful
Smoke Test / smoke (push) Successful in 4s
Merge PR #76 (squash)
2026-04-14 22:08:29 +00:00
8efc858cd7 fix: add keyboard focus trap to crisis overlay (#80)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merge PR #80 (squash)
2026-04-14 22:08:28 +00:00
611c1c8456 fix(a11y): Safety plan modal keyboard focus trap (#65) (#81)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merge PR #81 (squash)
2026-04-14 22:08:24 +00:00
8 changed files with 1251 additions and 276 deletions

409
crisis/ab_testing.py Normal file
View File

@@ -0,0 +1,409 @@
#!/usr/bin/env python3
"""
Crisis Detection A/B Testing Framework
Allows testing different crisis detection algorithms with:
- Feature flags for algorithm A vs B
- Logging of which variant triggered for each event
- Metrics: false positive rate, detection latency per variant
- Statistical significance testing
Usage:
from crisis.ab_testing import ABTestManager, Variant
manager = ABTestManager()
result = manager.detect_with_variant("I'm feeling hopeless")
print(result.variant, result.detection_result)
# Get metrics
metrics = manager.get_metrics()
print(metrics.false_positive_rate_a, metrics.false_positive_rate_b)
"""
import json
import os
import random
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from uuid import uuid4
from .detect import detect_crisis, CrisisDetectionResult
class Variant(Enum):
"""A/B test variants for crisis detection."""
A = "A" # Control: current algorithm
B = "B" # Treatment: alternative algorithm
@dataclass
class ABTestConfig:
"""Configuration for A/B testing."""
enabled: bool = True
variant_b_percentage: float = 0.5 # 50% split
log_file: Optional[str] = None # Default: ~/.the-door/ab_test_logs.jsonl
metrics_file: Optional[str] = None # Default: ~/.the-door/ab_metrics.json
seed: Optional[int] = None # For reproducible assignments
@dataclass
class DetectionEvent:
"""A single detection event with variant assignment."""
event_id: str
timestamp: str
variant: str
text_hash: str # Hash of input text for privacy
detected_level: str
detected_score: float
indicators: List[str]
detection_latency_ms: float
is_false_positive: Optional[bool] = None # Null until labeled
user_feedback: Optional[str] = None
@dataclass
class VariantMetrics:
"""Metrics for a single variant."""
total_detections: int = 0
true_positives: int = 0
false_positives: int = 0
false_negatives: int = 0 # Requires manual labeling
avg_latency_ms: float = 0.0
p50_latency_ms: float = 0.0
p95_latency_ms: float = 0.0
p99_latency_ms: float = 0.0
level_distribution: Dict[str, int] = field(default_factory=dict)
indicator_frequency: Dict[str, int] = field(default_factory=dict)
@dataclass
class ABTestMetrics:
"""Complete A/B test metrics."""
test_id: str
start_time: str
end_time: Optional[str] = None
variant_a: VariantMetrics = field(default_factory=VariantMetrics)
variant_b: VariantMetrics = field(default_factory=VariantMetrics)
sample_size_a: int = 0
sample_size_b: int = 0
statistical_significance: Optional[float] = None # p-value if calculable
class ABTestManager:
"""Manages A/B testing for crisis detection algorithms."""
def __init__(self, config: Optional[ABTestConfig] = None):
self.config = config or ABTestConfig()
self.test_id = str(uuid4())[:8]
self.events: List[DetectionEvent] = []
# Set up file paths
home = Path.home() / ".the-door"
home.mkdir(exist_ok=True)
self.log_file = Path(self.config.log_file or home / "ab_test_logs.jsonl")
self.metrics_file = Path(self.config.metrics_file or home / "ab_metrics.json")
# Initialize RNG
if self.config.seed is not None:
random.seed(self.config.seed)
def _assign_variant(self, text: str) -> Variant:
"""Assign a variant based on text hash for deterministic assignment."""
if not self.config.enabled:
return Variant.A
# Use hash of text for consistent assignment
text_hash = hash(text) % 100
threshold = int(self.config.variant_b_percentage * 100)
if text_hash < threshold:
return Variant.B
return Variant.A
def _detect_variant_a(self, text: str) -> CrisisDetectionResult:
"""Variant A: Current algorithm (control)."""
return detect_crisis(text)
def _detect_variant_b(self, text: str) -> CrisisDetectionResult:
"""Variant B: Alternative detection algorithm.
This is a placeholder - in practice, you'd implement a different
detection algorithm here. For now, we'll use the same algorithm
but with different sensitivity settings.
"""
# Example: Variant B could use different thresholds or additional patterns
result = detect_crisis(text)
# For demonstration: adjust sensitivity based on confidence score
# In practice, this would be a completely different algorithm
if result.score > 0.7 and result.level != "CRITICAL":
# Variant B is more sensitive to high-confidence detections
from .detect import CRITICAL_INDICATORS
import re
for pattern in CRITICAL_INDICATORS:
if re.search(pattern, text, re.IGNORECASE):
# Upgrade to CRITICAL if we find critical indicators
return CrisisDetectionResult(
level="CRITICAL",
score=result.score,
indicators=result.indicators,
matched_patterns=result.matched_patterns,
recommended_action="immediate_intervention"
)
return result
def detect_with_variant(self, text: str, user_id: Optional[str] = None) -> Tuple[Variant, CrisisDetectionResult, float]:
"""
Run crisis detection with A/B testing.
Returns:
Tuple of (variant, detection_result, latency_ms)
"""
if not self.config.enabled:
start = time.time()
result = self._detect_variant_a(text)
latency = (time.time() - start) * 1000
return Variant.A, result, latency
# Assign variant
variant = self._assign_variant(text)
# Run detection with timing
start = time.time()
if variant == Variant.A:
result = self._detect_variant_a(text)
else:
result = self._detect_variant_b(text)
latency_ms = (time.time() - start) * 1000
# Log event
self._log_event(variant, text, result, latency_ms, user_id)
return variant, result, latency_ms
def _log_event(self, variant: Variant, text: str, result: CrisisDetectionResult,
latency_ms: float, user_id: Optional[str] = None):
"""Log a detection event."""
import hashlib
# Hash text for privacy (don't log actual crisis text)
text_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
event = DetectionEvent(
event_id=str(uuid4())[:8],
timestamp=datetime.now(timezone.utc).isoformat(),
variant=variant.value,
text_hash=text_hash,
detected_level=result.level,
detected_score=result.score,
indicators=result.indicators[:5], # Limit for storage
detection_latency_ms=round(latency_ms, 2),
)
self.events.append(event)
# Append to log file
try:
with open(self.log_file, "a") as f:
f.write(json.dumps(asdict(event)) + "\n")
except Exception:
pass # Don't fail on logging errors
def label_event(self, event_id: str, is_false_positive: bool, feedback: Optional[str] = None):
"""Label an event as true/false positive for metrics calculation."""
for event in self.events:
if event.event_id == event_id:
event.is_false_positive = is_false_positive
event.user_feedback = feedback
break
# Update log file
self._save_events()
def _save_events(self):
"""Save all events to log file."""
try:
with open(self.log_file, "w") as f:
for event in self.events:
f.write(json.dumps(asdict(event)) + "\n")
except Exception:
pass
def get_metrics(self) -> ABTestMetrics:
"""Calculate metrics for both variants."""
metrics = ABTestMetrics(
test_id=self.test_id,
start_time=self.events[0].timestamp if self.events else datetime.now(timezone.utc).isoformat(),
end_time=datetime.now(timezone.utc).isoformat(),
)
# Separate events by variant
a_events = [e for e in self.events if e.variant == "A"]
b_events = [e for e in self.events if e.variant == "B"]
metrics.sample_size_a = len(a_events)
metrics.sample_size_b = len(b_events)
# Calculate variant A metrics
if a_events:
metrics.variant_a = self._calculate_variant_metrics(a_events)
# Calculate variant B metrics
if b_events:
metrics.variant_b = self._calculate_variant_metrics(b_events)
# Calculate statistical significance if we have enough data
if len(a_events) >= 30 and len(b_events) >= 30:
metrics.statistical_significance = self._calculate_significance(a_events, b_events)
# Save metrics to file
self._save_metrics(metrics)
return metrics
def _calculate_variant_metrics(self, events: List[DetectionEvent]) -> VariantMetrics:
"""Calculate metrics for a single variant."""
if not events:
return VariantMetrics()
# Latency statistics
latencies = [e.detection_latency_ms for e in events]
latencies.sort()
n = len(latencies)
p50_idx = int(n * 0.5)
p95_idx = int(n * 0.95)
p99_idx = int(n * 0.99)
# Level distribution
level_dist = {}
for e in events:
level_dist[e.detected_level] = level_dist.get(e.detected_level, 0) + 1
# Indicator frequency
indicator_freq = {}
for e in events:
for ind in e.indicators:
indicator_freq[ind] = indicator_freq.get(ind, 0) + 1
# False positive rate (only for labeled events)
labeled = [e for e in events if e.is_false_positive is not None]
fp_count = sum(1 for e in labeled if e.is_false_positive)
tp_count = sum(1 for e in labeled if not e.is_false_positive)
return VariantMetrics(
total_detections=len(events),
true_positives=tp_count,
false_positives=fp_count,
avg_latency_ms=sum(latencies) / n,
p50_latency_ms=latencies[p50_idx] if n > 0 else 0,
p95_latency_ms=latencies[p95_idx] if n > 0 else 0,
p99_latency_ms=latencies[p99_idx] if n > 0 else 0,
level_distribution=level_dist,
indicator_frequency=dict(sorted(indicator_freq.items(), key=lambda x: -x[1])[:10]),
)
def _calculate_significance(self, a_events: List[DetectionEvent],
b_events: List[DetectionEvent]) -> Optional[float]:
"""Calculate statistical significance (p-value) using chi-squared test."""
try:
# Count detections at each level for each variant
a_levels = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "NONE": 0}
b_levels = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "NONE": 0}
for e in a_events:
a_levels[e.detected_level] = a_levels.get(e.detected_level, 0) + 1
for e in b_events:
b_levels[e.detected_level] = b_levels.get(e.detected_level, 0) + 1
# Simple chi-squared test for level distribution difference
# This is a simplified version - in production you'd use scipy.stats.chi2_contingency
total_a = len(a_events)
total_b = len(b_events)
if total_a == 0 or total_b == 0:
return None
# Calculate expected frequencies
chi_sq = 0
for level in a_levels:
expected_a = (a_levels[level] + b_levels[level]) * total_a / (total_a + total_b)
expected_b = (a_levels[level] + b_levels[level]) * total_b / (total_a + total_b)
if expected_a > 0:
chi_sq += (a_levels[level] - expected_a) ** 2 / expected_a
if expected_b > 0:
chi_sq += (b_levels[level] - expected_b) ** 2 / expected_b
# Return chi-squared value (not p-value, as we don't have scipy)
# Higher values indicate more significant difference
return chi_sq
except Exception:
return None
def _save_metrics(self, metrics: ABTestMetrics):
"""Save metrics to file."""
try:
with open(self.metrics_file, "w") as f:
json.dump(asdict(metrics), f, indent=2)
except Exception:
pass
def get_variant_distribution(self) -> Dict[str, int]:
"""Get current distribution of events across variants."""
dist = {"A": 0, "B": 0}
for event in self.events:
dist[event.variant] = dist.get(event.variant, 0) + 1
return dist
def force_variant(self, variant: Variant):
"""Force all subsequent detections to use a specific variant."""
self.config.enabled = False
self._forced_variant = variant
def reset(self):
"""Reset the A/B test."""
self.events = []
self.config.enabled = True
if hasattr(self, '_forced_variant'):
delattr(self, '_forced_variant')
# Convenience function for easy integration
_default_manager = None
def get_ab_manager() -> ABTestManager:
"""Get the default A/B test manager instance."""
global _default_manager
if _default_manager is None:
_default_manager = ABTestManager()
return _default_manager
def detect_with_ab(text: str, user_id: Optional[str] = None) -> dict:
"""
Detect crisis with A/B testing.
Returns dict with variant, detection result, and metrics.
"""
manager = get_ab_manager()
variant, result, latency = manager.detect_with_variant(text, user_id)
return {
"variant": variant.value,
"detection": {
"level": result.level,
"score": result.score,
"indicators": result.indicators,
"recommended_action": result.recommended_action,
},
"latency_ms": round(latency, 2),
"test_id": manager.test_id,
}

View File

@@ -51,13 +51,13 @@ HIGH_INDICATORS = [
r"\bwish\s+I\s+(?:was|were)\s+(?:dead|gone|never\s+born)\b",
r"\bdon'?t\s+matter\s+if\s+I\s+exist\b",
r"\bno\s+one\s+would\s+(?:care|miss)\b",
r"\bworld\s+would?\s+be\s+better\s+without\b",
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment)\b",
r"\bcan'?t\s+see\s+any\s+(?:point|reason)\b",
r"\bworld\s+would?\s+be\s+better\s+without\s+me\b",
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment|anguish)\b",
r"\bcan'?t\s+see\s+any\s+(?:point|reason|hope|way)\b",
r"\bescape\s+from\s*this",
r"\bjust\s+want\s+it\s+to\s+stop\b",
r"\btrapped\s+(?:in\s+(?:my|this|a\s+dark)|and\s+can'?t\s+escape)\b",
r"\bnothing\s+left\s+(?:to\s+(?:live\s+for|hope\s+for|give)|inside)\b",
r"\bdisappeared\s+forever\b",
# Contextual despair phrases (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
r"\beverything\s+is\s+hopeless\b",
@@ -68,6 +68,8 @@ HIGH_INDICATORS = [
r"\bno\s*hope\s+(?:left|remaining)\b",
r"\bno\s*way\s*out\b",
r"\bfeel(?:s|ing)?\s+trapped\b",
r"\btrapped\s+in\s+this\s+(?:situation|life|pain|darkness|hell)\b",
r"\btrapped\s+and\s+can'?t\s+escape\b",
r"\bdesperate\s+(?:for\s+)?help\b",
r"\bfeel(?:s|ing)?\s+desperate\b",
]
@@ -99,6 +101,8 @@ MEDIUM_INDICATORS = [
r"\bsinking\b",
r"\bdrowning\b",
r"\bhopeless\b",
r"\blost\s+all\s+hope\b",
r"\bno\s+tomorrow\b",
# Contextual versions (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
@@ -112,7 +116,7 @@ MEDIUM_INDICATORS = [
LOW_INDICATORS = [
r"\bunhappy\b",
r"\bdown\b",
r"\btough\s*time\b",
r"\btough\s*(?:time|day|week)\b",
r"\brough\s+(?:day|week|patch)\b",
r"\bstressed\b",
r"\bburnout\b",
@@ -122,6 +126,8 @@ LOW_INDICATORS = [
r"\btired\b",
r"\bsad\b",
r"\bupset\b",
r"\blonely\b",
r"\banxious?\b",
r"\bnot\s*(?:good|great|okay)\b",
r"\bthings\s*are\s*hard\b",
r"\bstruggling\b",

View File

@@ -1,31 +1,34 @@
"""
When a Man Is Dying — Despair/Suicide Detection System
DEPRECATED — Use crisis.detect instead.
Standalone detection module that parses incoming text for
despair and suicide indicators, classifies into tiers,
and returns structured response with recommended actions.
This module is a thin wrapper around crisis.detect for backward compatibility.
All unique patterns have been merged into crisis/detect.py (see issue #40).
Tiers:
LOW — General sadness, stress, difficult times
MEDIUM — Hopelessness, isolation, worthlessness
HIGH — Active despair, mentions of death, "can't go on"
CRITICAL — Imminent risk, explicit intent, plan, method
Integration:
Designed to work with crisis/ module from PR #4.
When crisis/ is available, uses it as the detection backend.
Falls back to internal detection when crisis/ is not present.
This module will be removed in a future release.
"""
import re
import json
import hashlib
import warnings
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict
import json
import hashlib
# Re-export the canonical detection
from crisis.detect import detect_crisis, CrisisDetectionResult
# Issue deprecation warning on import
warnings.warn(
"dying_detection is deprecated. Use 'from crisis.detect import detect_crisis' instead. "
"All patterns have been consolidated into crisis/detect.py. "
"See issue #40.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class DetectionResult:
"""Backward-compatible result type matching the old dying_detection API."""
level: str
indicators: List[str] = field(default_factory=list)
recommended_action: str = ""
@@ -34,110 +37,9 @@ class DetectionResult:
session_hash: str = ""
# ── Tiered indicator patterns ───────────────────────────────────
# Each pattern is a regex matched against lowercased input.
CRITICAL_PATTERNS = [
(r"\bkill\s*(my)?self\b", "explicit self-harm intent"),
(r"\bsuicid(?:al|ed|e)\b", "suicide reference"),
(r"\bend\s*(my)?\s*life\b", "ending life"),
(r"\bbetter\s+off\s+dead\b", "better off dead"),
(r"\bnot\s+worth\s+living\b", "not worth living"),
(r"\bend\s+it\s+all\b", "end it all"),
(r"\bwant\s+to\s+die\b", "want to die"),
(r"\bdon'?t\s+want\s+to\s+live\b", "don't want to live"),
(r"\bcan'?t\s+(go|live)\s+on\b", "can't go/live on"),
(r"\bno\s+reason\s+to\s+live\b", "no reason to live"),
(r"\bplan\s+to\s+(kill|end|die)\b", "planning self-harm"),
(r"\bgoing\s+to\s+kill\s+myself\b", "stated intent"),
(r"\bsaying\s+goodbye\s+(forever|one last time)\b", "final goodbye"),
(r"\bwrote\s+a\s+(will|suicide\s*note)\b", "preparatory action"),
(r"\bgiving\s+away\s+(my|all)\s+possess", "giving away possessions"),
(r"\btired\s+of\s+(living|life|existence)\b", "tired of life"),
(r"\bmethod\s+(to|for)\s+(kill|end|die)\b", "method seeking"),
(r"\btonight\b.*\b(die|kill|end|suicid)", "immediate timeframe + intent"),
(r"\b(die|kill|end)\b.*\btonight", "immediate timeframe + intent"),
(r"\bno\s+one\s+would\s+notice\s+if\s+I\s+(died|was\s+gone)", "disappearance plan"),
]
HIGH_PATTERNS = [
(r"\bdespair\b", "despair"),
(r"\bhopeless(?:ness)?\b", "hopelessness"),
(r"\bcan'?t\s+take\s+this\s+anymore\b", "can't take it"),
(r"\bdon'?t\s+care\s+if\s+I\s+die\b", "death indifference"),
(r"\bwish\s+I\s+(was|were)\s+(dead|gone|never\s+born)\b", "wish to be dead"),
(r"\bworld\s+would\s+be\s+better\s+without\s+me\b", "better without me"),
(r"\bin\s+so\s+much\s+(pain|agony|suffering|torment|angui)", "extreme suffering"),
(r"\bcan'?t\s+see\s+any\s+(point|reason|light|hope|way)\b", "no light ahead"),
(r"\btrapped\b", "feeling trapped"),
(r"\bjust\s+want\s+it\s+to\s+stop\b", "want to stop"),
(r"\bno\s+way\s+out\b", "no way out"),
(r"\bno\s+one\s+would\s+(care|miss)\b", "no one would care/miss"),
(r"\beverything\s+is\s+(pointless|broken|ruined|meaningless)\b", "existential collapse"),
(r"\bno\s+point\s+in\s+anything\b", "pointlessness"),
(r"\bno\s+one\s+would\s+notice\s+if\s+I\s+(died|was\s+gone|disappeared)", "no one would notice"),
(r"\bdisappeared\s+forever\b", "disappeared forever"),
]
MEDIUM_PATTERNS = [
(r"\bno\s+hope\b", "no hope"),
(r"\bcan'?t\s+go\s+on\b", "can't go on"),
(r"\bcan'?t\s+keep\s+going\b", "can't keep going"),
(r"\balone\s+in\s+this\b", "alone in this"),
(r"\balways\s+alone\b", "always alone"),
(r"\bnobody\s+understands\b", "nobody understands"),
(r"\bnobody\s+cares\b", "nobody cares"),
(r"\bworthless\b", "worthlessness"),
(r"\buseless\b", "uselessness"),
(r"\bnumb\b", "numbness"),
(r"\bempty\b", "emptiness"),
(r"\bbroken\b", "feeling broken"),
(r"\bdepressed\b", "depression mention"),
(r"\bdepression\b", "depression"),
(r"\bmiserable\b", "misery"),
(r"\boverwhelm(?:ed|ing)\b", "overwhelmed"),
(r"\bcannot\s+cope\b", "cannot cope"),
(r"\b(drowning|sinking)\b", "drowning/sinking"),
(r"\bforgotten\b", "feeling forgotten"),
(r"\blost\s+all\s+hope\b", "lost all hope"),
(r"\bno\s+future\b", "no future"),
(r"\bno\s+tomorrow\b", "no tomorrow"),
]
LOW_PATTERNS = [
(r"\bunhappy\b", "unhappy"),
(r"\brough\s+(day|week|patch)\b", "rough time"),
(r"\btough\s+(time|day|week)\b", "tough time"),
(r"\bstressed\b", "stressed"),
(r"\bburnout\b", "burnout"),
(r"\bfrustrated\b", "frustrated"),
(r"\bthings\s+(are\s+)?hard\b", "things are hard"),
(r"\bnot\s+feeling\s+(great|myself|good)\b", "not feeling good"),
(r"\bstruggl", "struggling"),
(r"\bdown\b", "feeling down"),
(r"\bsad\b", "sad"),
(r"\bupset\b", "upset"),
(r"\blonely\b", "lonely"),
(r"\banxious?\b", "anxious/anxiety"),
(r"\bnot\s+okay\b", "not okay"),
]
# ── Pattern collections for easy iteration ──────────────────────
TIER_PATTERNS: Dict[str, List[tuple]] = {
"CRITICAL": CRITICAL_PATTERNS,
"HIGH": HIGH_PATTERNS,
"MEDIUM": MEDIUM_PATTERNS,
"LOW": LOW_PATTERNS,
}
def detect(text: str) -> DetectionResult:
"""
Primary detection function.
If the crisis/ module is available, delegate to it.
Otherwise, use the internal pattern engine.
Primary detection function — delegates to crisis.detect.
Args:
text: User message to analyze
@@ -145,150 +47,25 @@ def detect(text: str) -> DetectionResult:
Returns:
DetectionResult with level, indicators, recommended_action, confidence
"""
# Try to use the crisis module if available
try:
from crisis.detect import detect_crisis as _crisis_detect
result = _crisis_detect(text)
# Map to DetectionResult
return DetectionResult(
level=result.level,
indicators=_describe_indicators(result.level, result.indicators),
recommended_action=result.recommended_action,
confidence=result.score,
session_hash=_hash_session(text),
)
except ImportError:
pass
result = detect_crisis(text)
return _detect_internal(text)
# Extract raw patterns from matches
raw_patterns = [m["pattern"] for m in result.matches] if result.matches else []
def _detect_internal(text: str) -> DetectionResult:
"""Internal pattern-based detection (fallback)."""
if not text or not text.strip():
return DetectionResult(level="NONE", session_hash=_hash_session(text))
text_lower = text.lower()
all_matches: Dict[str, List[tuple]] = {
tier: [] for tier in TIER_PATTERNS
}
for tier, patterns in TIER_PATTERNS.items():
for pattern, description in patterns:
if re.search(pattern, text_lower):
all_matches[tier].append((pattern, description))
# CRITICAL patterns are the highest priority
if all_matches["CRITICAL"]:
return DetectionResult(
level="CRITICAL",
indicators=[desc for _, desc in all_matches["CRITICAL"]],
recommended_action=(
"INTERRUPT CONVERSATION IMMEDIATELY. "
"Ask 'Are you safe right now?' "
"Provide 988 Suicide & Crisis Lifeline (call or text 988). "
"Provide Crisis Text Line (text HOME to 741741). "
"Stay present. Do not disconnect. "
"Redirect to professional help. "
"This is the most sacred moment."
),
raw_matched_patterns=[pat for pat, _ in all_matches["CRITICAL"]],
confidence=1.0,
session_hash=_hash_session(text),
)
if all_matches["HIGH"]:
return DetectionResult(
level="HIGH",
indicators=[desc for _, desc in all_matches["HIGH"]],
recommended_action=(
"Show crisis panel prominently. "
"Ask about safety directly. "
"Surface 988 number and crisis resources. "
"Stay present and engaged. Do not rush to fix. "
"Monitor closely for escalation."
),
raw_matched_patterns=[pat for pat, _ in all_matches["HIGH"]],
confidence=0.75,
session_hash=_hash_session(text),
)
if all_matches["MEDIUM"]:
return DetectionResult(
level="MEDIUM",
indicators=[desc for _, desc in all_matches["MEDIUM"]],
recommended_action=(
"Increase warmth and presence. "
"Subtly surface help resources. "
"Keep conversation anchored in the present. "
"Watch for escalation indicators."
),
raw_matched_patterns=[pat for pat, _ in all_matches["MEDIUM"]],
confidence=0.5,
session_hash=_hash_session(text),
)
if all_matches["LOW"]:
return DetectionResult(
level="LOW",
indicators=[desc for _, desc in all_matches["LOW"]],
recommended_action=(
"Normal conversation with warm undertone. "
"No crisis UI elements needed. "
"Remain vigilant. Be present."
),
raw_matched_patterns=[pat for pat, _ in all_matches["LOW"]],
confidence=0.25,
session_hash=_hash_session(text),
)
return DetectionResult(level="NONE", session_hash=_hash_session(text))
def _describe_indicators(level: str, patterns: list) -> list:
"""Map raw patterns to descriptions."""
descriptions = {
"CRITICAL": [],
"HIGH": [],
"MEDIUM": [],
"LOW": [],
}
for tier, items in TIER_PATTERNS.items():
for pat, desc in items:
if pat in patterns:
descriptions[tier].append(desc)
return descriptions.get(level, [])
def _hash_session(text: str) -> str:
"""Create a session hash for this text (for tracking repeated escalations)."""
return hashlib.sha256(text.encode()).hexdigest()[:12]
return DetectionResult(
level=result.level,
indicators=result.indicators,
recommended_action=result.recommended_action,
raw_matched_patterns=raw_patterns,
confidence=result.score,
session_hash=hashlib.sha256(text.encode()).hexdigest()[:12],
)
def get_action_for_level(level: str) -> str:
"""Get the recommended action string for a given level."""
actions = {
"CRITICAL": (
"INTERRUPT CONVERSATION. Ask 'Are you safe right now?' "
"Provide 988. Provide Crisis Text Line. "
"Stay present. Do not disconnect. "
"Redirect to help."
),
"HIGH": (
"Show crisis panel. Ask about safety. "
"Surface 988. Stay engaged."
),
"MEDIUM": (
"Increase warmth. Surface resources gently. "
"Anchor in present."
),
"LOW": (
"Normal conversation with warmth. "
"Remain vigilant."
),
"NONE": "No action needed.",
}
return actions.get(level, "Unknown level.")
from crisis.detect import ACTIONS
return ACTIONS.get(level, "Unknown level.")
def as_json(result: DetectionResult, indent: int = 2) -> str:

View File

@@ -983,12 +983,60 @@ Sovereignty and service always.`;
// ===== OVERLAY =====
// Focus trap: cycle through focusable elements within the crisis overlay
function getOverlayFocusableElements() {
return crisisOverlay.querySelectorAll(
'a[href], button:not([disabled]), [tabindex]:not([tabindex="-1"])'
);
}
function trapFocusInOverlay(e) {
if (!crisisOverlay.classList.contains('active')) return;
if (e.key !== 'Tab') return;
var focusable = getOverlayFocusableElements();
if (focusable.length === 0) return;
var first = focusable[0];
var last = focusable[focusable.length - 1];
if (e.shiftKey) {
// Shift+Tab: if on first, wrap to last
if (document.activeElement === first) {
e.preventDefault();
last.focus();
}
} else {
// Tab: if on last, wrap to first
if (document.activeElement === last) {
e.preventDefault();
first.focus();
}
}
}
// Store the element that had focus before the overlay opened
var _preOverlayFocusElement = null;
function showOverlay() {
// Save current focus for restoration on dismiss
_preOverlayFocusElement = document.activeElement;
crisisOverlay.classList.add('active');
overlayDismissBtn.disabled = true;
var countdown = 10;
overlayDismissBtn.textContent = 'Continue to chat (' + countdown + 's)';
// Disable background interaction via inert attribute
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.setAttribute('inert', '');
// Also hide from assistive tech
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.setAttribute('aria-hidden', 'true');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.setAttribute('aria-hidden', 'true');
if (overlayTimer) clearInterval(overlayTimer);
overlayTimer = setInterval(function() {
countdown--;
@@ -1005,6 +1053,9 @@ Sovereignty and service always.`;
overlayDismissBtn.focus();
}
// Register focus trap on document (always listening, gated by class check)
document.addEventListener('keydown', trapFocusInOverlay);
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
crisisOverlay.classList.remove('active');
@@ -1012,7 +1063,22 @@ Sovereignty and service always.`;
clearInterval(overlayTimer);
overlayTimer = null;
}
msgInput.focus();
// Re-enable background interaction
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
// Restore focus to the element that had it before the overlay opened
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
});
@@ -1117,25 +1183,14 @@ Sovereignty and service always.`;
} catch (e) {}
}
safetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
});
}
closeSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
cancelSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
saveSafetyPlan.addEventListener('click', function() {
@@ -1149,12 +1204,101 @@ Sovereignty and service always.`;
try {
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
alert('Safety plan saved locally.');
} catch (e) {
alert('Error saving plan.');
}
});
// ===== SAFETY PLAN FOCUS TRAP (fix #65) =====
// Focusable elements inside the modal, in tab order
var _spFocusableIds = [
'close-safety-plan',
'sp-warning-signs',
'sp-coping',
'sp-distraction',
'sp-help',
'sp-environment',
'cancel-safety-plan',
'save-safety-plan'
];
var _spTriggerEl = null; // element that opened the modal
function _getSpFocusableEls() {
return _spFocusableIds
.map(function(id) { return document.getElementById(id); })
.filter(function(el) { return el && !el.disabled; });
}
function _trapSafetyPlanFocus(e) {
if (e.key !== 'Tab') return;
var els = _getSpFocusableEls();
if (!els.length) return;
var first = els[0];
var last = els[els.length - 1];
if (e.shiftKey) {
// Shift+Tab on first → wrap to last
if (document.activeElement === first) {
e.preventDefault();
last.focus();
}
} else {
// Tab on last → wrap to first
if (document.activeElement === last) {
e.preventDefault();
first.focus();
}
}
}
function _trapSafetyPlanEscape(e) {
if (e.key === 'Escape') {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
}
}
function _activateSafetyPlanFocusTrap(triggerEl) {
_spTriggerEl = triggerEl || document.activeElement;
// Focus first textarea
var firstInput = document.getElementById('sp-warning-signs');
if (firstInput) firstInput.focus();
// Add listeners
document.addEventListener('keydown', _trapSafetyPlanFocus);
document.addEventListener('keydown', _trapSafetyPlanEscape);
// Mark background inert (prevent click-through)
document.body.setAttribute('aria-hidden', 'true');
safetyPlanModal.removeAttribute('aria-hidden');
}
function _restoreSafetyPlanFocus() {
document.removeEventListener('keydown', _trapSafetyPlanFocus);
document.removeEventListener('keydown', _trapSafetyPlanEscape);
document.body.removeAttribute('aria-hidden');
if (_spTriggerEl && typeof _spTriggerEl.focus === 'function') {
_spTriggerEl.focus();
}
_spTriggerEl = null;
}
// Wire open buttons to activate focus trap
safetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(crisisSafetyPlanBtn);
});
}
// ===== TEXTAREA AUTO-RESIZE =====
msgInput.addEventListener('input', function() {
this.style.height = 'auto';
@@ -1300,6 +1444,7 @@ Sovereignty and service always.`;
if (urlParams.get('safetyplan') === 'true') {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
// Clean up URL
window.history.replaceState({}, document.title, window.location.pathname);
}

View File

@@ -0,0 +1,84 @@
<!-- Test: Safety plan modal focus trap (issue #65) -->
<!-- Open this file in a browser to manually verify focus trap behavior -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Focus Trap Test</title>
<style>
body { font-family: sans-serif; padding: 20px; }
.test { margin: 10px 0; padding: 10px; border: 1px solid #ccc; }
.pass { background: #d4edda; border-color: #28a745; }
.fail { background: #f8d7da; border-color: #dc3545; }
button { margin: 5px; padding: 8px 16px; }
</style>
</head>
<body>
<h1>Focus Trap Manual Test</h1>
<p>Open <code>index.html</code> in a browser, then run these checks:</p>
<div class="test" id="test-1">
<strong>Test 1: Tab wraps to first element</strong><br>
1. Open safety plan modal<br>
2. Tab through all elements until you reach "Save Plan"<br>
3. Press Tab again → should wrap to close button (X)
</div>
<div class="test" id="test-2">
<strong>Test 2: Shift+Tab wraps to last element</strong><br>
1. Open safety plan modal<br>
2. Focus is on "Warning signs" textarea<br>
3. Press Shift+Tab → should wrap to "Save Plan" button
</div>
<div class="test" id="test-3">
<strong>Test 3: Escape closes modal</strong><br>
1. Open safety plan modal<br>
2. Press Escape → modal closes<br>
3. Focus returns to the button that opened it
</div>
<div class="test" id="test-4">
<strong>Test 4: Background not reachable</strong><br>
1. Open safety plan modal<br>
2. Try to Tab to the chat input behind the modal<br>
3. Should NOT be able to reach it
</div>
<div class="test" id="test-5">
<strong>Test 5: Click buttons close + restore focus</strong><br>
1. Open modal via "my safety plan" button<br>
2. Click Cancel → modal closes, focus on "my safety plan" button<br>
3. Open again, click Save → same behavior<br>
4. Open again, click X → same behavior
</div>
<hr>
<h2>Automated checks (paste into DevTools console on index.html):</h2>
<pre><code>
// Test focus trap
var modal = document.getElementById('safety-plan-modal');
var openBtn = document.getElementById('safety-plan-btn');
openBtn.click();
console.assert(modal.classList.contains('active'), 'Modal should be open');
var lastEl = document.getElementById('save-safety-plan');
lastEl.focus();
var evt = new KeyboardEvent('keydown', {key: 'Tab', bubbles: true});
document.dispatchEvent(evt);
// After Tab from last, focus should wrap to first
var firstEl = document.getElementById('close-safety-plan');
console.log('Focus after wrap:', document.activeElement.id);
console.assert(document.activeElement === firstEl || document.activeElement.id === 'sp-warning-signs',
'Focus should wrap to first element');
// Test Escape
var escEvt = new KeyboardEvent('keydown', {key: 'Escape', bubbles: true});
document.dispatchEvent(escEvt);
console.assert(!modal.classList.contains('active'), 'Modal should close on Escape');
console.assert(document.activeElement === openBtn, 'Focus should return to open button');
console.log('All automated checks passed!');
</code></pre>
</body>
</html>

View File

@@ -0,0 +1,453 @@
#!/usr/bin/env python3
"""
Tests for Crisis Detection A/B Testing Framework.
"""
import json
import os
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
# Add crisis module to path
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from crisis.ab_testing import (
ABTestManager,
ABTestConfig,
Variant,
DetectionEvent,
VariantMetrics,
ABTestMetrics,
get_ab_manager,
detect_with_ab,
)
from crisis.detect import CrisisDetectionResult
class TestABTestConfig:
"""Test A/B test configuration."""
def test_default_config(self):
config = ABTestConfig()
assert config.enabled is True
assert config.variant_b_percentage == 0.5
assert config.log_file is None
assert config.metrics_file is None
def test_custom_config(self):
config = ABTestConfig(
enabled=False,
variant_b_percentage=0.3,
log_file="/tmp/test.log",
metrics_file="/tmp/metrics.json",
seed=42
)
assert config.enabled is False
assert config.variant_b_percentage == 0.3
assert config.log_file == "/tmp/test.log"
assert config.seed == 42
class TestVariant:
"""Test variant enum."""
def test_values(self):
assert Variant.A.value == "A"
assert Variant.B.value == "B"
def test_comparison(self):
assert Variant.A != Variant.B
assert Variant.A == Variant.A
class TestDetectionEvent:
"""Test detection event dataclass."""
def test_creation(self):
event = DetectionEvent(
event_id="test-123",
timestamp="2026-01-01T00:00:00Z",
variant="A",
text_hash="abc123",
detected_level="HIGH",
detected_score=0.8,
indicators=["despair"],
detection_latency_ms=5.2
)
assert event.event_id == "test-123"
assert event.variant == "A"
assert event.detected_level == "HIGH"
assert event.is_false_positive is None
def test_to_dict(self):
event = DetectionEvent(
event_id="test-123",
timestamp="2026-01-01T00:00:00Z",
variant="B",
text_hash="def456",
detected_level="CRITICAL",
detected_score=0.95,
indicators=["suicide"],
detection_latency_ms=3.1
)
d = event.__dict__
assert "event_id" in d
assert "variant" in d
class TestABTestManager:
"""Test A/B test manager."""
def test_initialization(self, tmp_path):
config = ABTestConfig(
log_file=str(tmp_path / "test.log"),
metrics_file=str(tmp_path / "metrics.json")
)
manager = ABTestManager(config)
assert manager.config.enabled is True
assert manager.test_id is not None
assert len(manager.events) == 0
def test_variant_assignment_deterministic(self):
"""Same text should always get same variant."""
manager = ABTestManager(ABTestConfig(seed=42))
text = "I feel hopeless about everything"
variant1 = manager._assign_variant(text)
variant2 = manager._assign_variant(text)
assert variant1 == variant2
def test_variant_distribution(self):
"""Test that variant distribution is roughly 50/50."""
manager = ABTestManager(ABTestConfig(seed=42))
variants = []
for i in range(100):
text = f"Test message number {i}"
variant = manager._assign_variant(text)
variants.append(variant)
a_count = sum(1 for v in variants if v == Variant.A)
b_count = sum(1 for v in variants if v == Variant.B)
# Should be roughly 50/50 (within 20% tolerance)
assert 30 <= a_count <= 70
assert 30 <= b_count <= 70
def test_disabled_ab_testing(self, tmp_path):
"""When disabled, should always use variant A."""
config = ABTestConfig(
enabled=False,
log_file=str(tmp_path / "test.log")
)
manager = ABTestManager(config)
for i in range(10):
text = f"Test message {i}"
variant = manager._assign_variant(text)
assert variant == Variant.A
@patch('crisis.ab_testing.detect_crisis')
def test_detect_with_variant_a(self, mock_detect, tmp_path):
"""Test detection with variant A (control)."""
mock_detect.return_value = CrisisDetectionResult(
level="HIGH",
score=0.8,
indicators=["despair"],
matched_patterns=[],
recommended_action="provide_resources"
)
config = ABTestConfig(
enabled=False, # Force variant A
log_file=str(tmp_path / "test.log")
)
manager = ABTestManager(config)
variant, result, latency = manager.detect_with_variant("I'm feeling hopeless")
assert variant == Variant.A
assert result.level == "HIGH"
assert latency >= 0
@patch('crisis.ab_testing.detect_crisis')
def test_detect_with_variant_b(self, mock_detect, tmp_path):
"""Test detection with variant B (treatment)."""
mock_detect.return_value = CrisisDetectionResult(
level="MEDIUM",
score=0.75,
indicators=["no hope"],
matched_patterns=[],
recommended_action="provide_resources"
)
config = ABTestConfig(
variant_b_percentage=1.0, # Always variant B
log_file=str(tmp_path / "test.log")
)
manager = ABTestManager(config)
# Use text that hashes to variant B
for i in range(20):
text = f"Test message {i}"
variant, result, latency = manager.detect_with_variant(text)
if variant == Variant.B:
break
assert variant == Variant.B
def test_event_logging(self, tmp_path):
"""Test that events are logged to file."""
log_file = tmp_path / "test.jsonl"
config = ABTestConfig(
log_file=str(log_file),
metrics_file=str(tmp_path / "metrics.json")
)
manager = ABTestManager(config)
# Mock detection
with patch('crisis.ab_testing.detect_crisis') as mock_detect:
mock_detect.return_value = CrisisDetectionResult(
level="LOW",
score=0.3,
indicators=[],
matched_patterns=[],
recommended_action="none"
)
manager.detect_with_variant("Test message")
# Check log file exists and has content
assert log_file.exists()
with open(log_file) as f:
lines = f.readlines()
assert len(lines) >= 1
# Parse log entry
entry = json.loads(lines[0])
assert "event_id" in entry
assert "variant" in entry
assert "detected_level" in entry
def test_label_event(self, tmp_path):
"""Test labeling events as false positives."""
config = ABTestConfig(
log_file=str(tmp_path / "test.log"),
metrics_file=str(tmp_path / "metrics.json")
)
manager = ABTestManager(config)
# Create a mock event
event = DetectionEvent(
event_id="test-123",
timestamp="2026-01-01T00:00:00Z",
variant="A",
text_hash="abc123",
detected_level="HIGH",
detected_score=0.8,
indicators=["despair"],
detection_latency_ms=5.0
)
manager.events.append(event)
# Label it
manager.label_event("test-123", is_false_positive=True, feedback="Not actually crisis")
# Check labeling
assert event.is_false_positive is True
assert event.user_feedback == "Not actually crisis"
def test_get_metrics_empty(self, tmp_path):
"""Test metrics with no events."""
config = ABTestConfig(
metrics_file=str(tmp_path / "metrics.json")
)
manager = ABTestManager(config)
metrics = manager.get_metrics()
assert metrics.sample_size_a == 0
assert metrics.sample_size_b == 0
assert metrics.variant_a.total_detections == 0
def test_get_metrics_with_events(self, tmp_path):
"""Test metrics calculation with events."""
config = ABTestConfig(
log_file=str(tmp_path / "test.log"),
metrics_file=str(tmp_path / "metrics.json")
)
manager = ABTestManager(config)
# Add some mock events
for i in range(10):
event = DetectionEvent(
event_id=f"event-{i}",
timestamp="2026-01-01T00:00:00Z",
variant="A" if i % 2 == 0 else "B",
text_hash=f"hash-{i}",
detected_level="HIGH" if i % 3 == 0 else "MEDIUM",
detected_score=0.7 + (i % 3) * 0.1,
indicators=["despair"] if i % 2 == 0 else [],
detection_latency_ms=3.0 + i * 0.5
)
# Label some as false positives
if i % 4 == 0:
event.is_false_positive = True
elif i % 4 == 1:
event.is_false_positive = False
manager.events.append(event)
metrics = manager.get_metrics()
# Check we have events in both variants
assert metrics.sample_size_a > 0
assert metrics.sample_size_b > 0
# Check latency calculations
assert metrics.variant_a.avg_latency_ms > 0
assert metrics.variant_b.avg_latency_ms > 0
# Check level distribution
assert len(metrics.variant_a.level_distribution) > 0
def test_variant_distribution(self, tmp_path):
"""Test getting variant distribution."""
config = ABTestConfig()
manager = ABTestManager(config)
# Add events
for i in range(5):
event = DetectionEvent(
event_id=f"event-{i}",
timestamp="2026-01-01T00:00:00Z",
variant="A" if i < 3 else "B",
text_hash=f"hash-{i}",
detected_level="LOW",
detected_score=0.5,
indicators=[],
detection_latency_ms=2.0
)
manager.events.append(event)
dist = manager.get_variant_distribution()
assert dist["A"] == 3
assert detect_with_ab
assert dist["B"] == 2
def test_force_variant(self, tmp_path):
"""Test forcing a specific variant."""
config = ABTestConfig()
manager = ABTestManager(config)
manager.force_variant(Variant.B)
# After forcing, all should be variant B
for i in range(5):
text = f"Test message {i}"
variant = manager._assign_variant(text)
assert variant == Variant.B
def test_reset(self, tmp_path):
"""Test resetting the A/B test."""
config = ABTestConfig(
log_file=str(tmp_path / "test.log"),
metrics_file=str(tmp_path / "metrics.json")
)
manager = ABTestManager(config)
# Add some events
for i in range(3):
event = DetectionEvent(
event_id=f"event-{i}",
timestamp="2026-01-01T00:00:00Z",
variant="A",
text_hash=f"hash-{i}",
detected_level="LOW",
detected_score=0.5,
indicators=[],
detection_latency_ms=2.0
)
manager.events.append(event)
assert len(manager.events) == 3
# Reset
manager.reset()
assert len(manager.events) == 0
assert manager.config.enabled is True
class TestConvenienceFunctions:
"""Test convenience functions."""
def test_get_ab_manager(self):
"""Test getting default manager."""
manager = get_ab_manager()
assert isinstance(manager, ABTestManager)
@patch('crisis.ab_testing.detect_crisis')
def test_detect_with_ab(self, mock_detect):
"""Test convenience detection function."""
mock_detect.return_value = CrisisDetectionResult(
level="HIGH",
score=0.8,
indicators=["despair"],
matched_patterns=[],
recommended_action="provide_resources"
)
result = detect_with_ab("I'm feeling hopeless")
assert "variant" in result
assert "detection" in result
assert "latency_ms" in result
assert "test_id" in result
assert result["detection"]["level"] == "HIGH"
class TestMetricsCalculation:
"""Test metrics calculation edge cases."""
def test_percentile_calculation(self, tmp_path):
"""Test that percentiles are calculated correctly."""
config = ABTestConfig()
manager = ABTestManager(config)
# Create events with known latencies
latencies = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
events = []
for i, lat in enumerate(latencies):
events.append(DetectionEvent(
event_id=f"event-{i}",
timestamp="2026-01-01T00:00:00Z",
variant="A",
text_hash=f"hash-{i}",
detected_level="LOW",
detected_score=0.5,
indicators=[],
detection_latency_ms=lat
))
metrics = manager._calculate_variant_metrics(events)
assert metrics.p50_latency_ms == 5.0 # 50th percentile
assert metrics.p95_latency_ms == 10.0 # 95th percentile (last element)
assert metrics.avg_latency_ms == 5.5 # Average
def test_empty_variant_metrics(self, tmp_path):
"""Test metrics with no events."""
config = ABTestConfig()
manager = ABTestManager(config)
metrics = manager._calculate_variant_metrics([])
assert metrics.total_detections == 0
assert metrics.avg_latency_ms == 0
assert metrics.level_distribution == {}

View File

@@ -0,0 +1,57 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestCrisisOverlayFocusTrap(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_overlay_registers_tab_key_focus_trap(self):
self.assertRegex(
self.html,
r"function\s+trapFocusInOverlay\s*\(e\)",
'Expected crisis overlay focus trap handler to exist.',
)
self.assertRegex(
self.html,
r"if\s*\(e\.key\s*!==\s*'Tab'\)\s*return;",
'Expected focus trap handler to guard on Tab key events.',
)
self.assertRegex(
self.html,
r"document\.addEventListener\('keydown',\s*trapFocusInOverlay\)",
'Expected overlay focus trap to register on document keydown.',
)
def test_overlay_disables_background_interaction(self):
self.assertRegex(
self.html,
r"mainApp\.setAttribute\('inert',\s*''\)",
'Expected overlay to set inert on the main app while active.',
)
self.assertRegex(
self.html,
r"mainApp\.removeAttribute\('inert'\)",
'Expected overlay dismissal to remove inert from the main app.',
)
def test_overlay_restores_focus_after_dismiss(self):
self.assertRegex(
self.html,
r"_preOverlayFocusElement\s*=\s*document\.activeElement",
'Expected overlay to remember the pre-overlay focus target.',
)
self.assertRegex(
self.html,
r"_preOverlayFocusElement\.focus\(\)",
'Expected overlay dismissal to restore focus to the prior target.',
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,44 @@
import importlib
import sys
import unittest
import warnings
from crisis.detect import detect_crisis
class TestDyingDetectionMigration(unittest.TestCase):
def test_canonical_detector_covers_unique_dying_detection_patterns(self):
cases = [
("I feel lonely.", "LOW"),
("I've lost all hope and see no tomorrow.", "MEDIUM"),
("What if I disappeared forever?", "HIGH"),
]
for text, expected_level in cases:
with self.subTest(text=text):
result = detect_crisis(text)
self.assertEqual(result.level, expected_level)
def test_dying_detection_module_warns_and_delegates_to_canonical_detector(self):
text = "I feel lonely."
sys.modules.pop("dying_detection", None)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", DeprecationWarning)
module = importlib.import_module("dying_detection")
self.assertTrue(
any(issubclass(w.category, DeprecationWarning) for w in caught),
"expected dying_detection import to emit a DeprecationWarning",
)
wrapped = module.detect(text)
canonical = detect_crisis(text)
self.assertEqual(wrapped.level, canonical.level)
self.assertEqual(wrapped.confidence, canonical.score)
self.assertEqual(wrapped.raw_matched_patterns, [m["pattern"] for m in canonical.matches])
if __name__ == "__main__":
unittest.main()