Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
e132fc134b fix: rate-limit crisis overlay display (#100)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Successful in 6s
2026-04-20 19:39:26 -04:00
Alexander Whitestone
8b658e76f3 wip: add crisis overlay rate-limit regression test 2026-04-20 19:35:53 -04:00
5 changed files with 115 additions and 253 deletions

View File

@@ -8,7 +8,6 @@ from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urg
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
from .ab_testing import ABTestCrisisDetector, VariantRecord
__all__ = [
"detect_crisis",
@@ -24,6 +23,4 @@ __all__ = [
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
"ABTestCrisisDetector",
"VariantRecord",
]

View File

@@ -1,112 +0,0 @@
"""A/B test framework for crisis detection in the-door."""
from __future__ import annotations
import os
import random
import time
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Tuple
from .detect import CrisisDetectionResult
def _get_variant_override() -> Optional[str]:
"""Return env override for deterministic testing/debugging."""
value = os.environ.get("CRISIS_AB_VARIANT", "").strip().upper()
if value in {"A", "B"}:
return value
return None
@dataclass
class VariantRecord:
"""Single crisis detection event record with no user text or PII."""
variant: str
level: str
latency_ms: float
indicator_count: int
false_positive: Optional[bool] = None
class ABTestCrisisDetector:
"""Route crisis detection between two variants and collect comparison stats."""
def __init__(
self,
variant_a: Callable[[str], CrisisDetectionResult],
variant_b: Callable[[str], CrisisDetectionResult],
split: float = 0.5,
):
self.variant_a = variant_a
self.variant_b = variant_b
self.split = max(0.0, min(1.0, float(split)))
self.records: List[VariantRecord] = []
def _select_variant(self) -> str:
override = _get_variant_override()
if override:
return override
return "A" if random.random() < self.split else "B"
def detect(self, text: str) -> Tuple[CrisisDetectionResult, str, int]:
variant = self._select_variant()
detector = self.variant_a if variant == "A" else self.variant_b
start = time.perf_counter()
result = detector(text)
latency_ms = (time.perf_counter() - start) * 1000.0
record = VariantRecord(
variant=variant,
level=result.level,
latency_ms=latency_ms,
indicator_count=len(result.indicators),
)
self.records.append(record)
return result, variant, len(self.records) - 1
def record_outcome(self, record_id: int, *, false_positive: bool) -> None:
if record_id < 0 or record_id >= len(self.records):
raise IndexError(f"Unknown record id: {record_id}")
self.records[record_id].false_positive = bool(false_positive)
def get_stats(self) -> Dict[str, dict]:
stats: Dict[str, dict] = {}
for variant in ("A", "B"):
records = [record for record in self.records if record.variant == variant]
if not records:
stats[variant] = {
"count": 0,
"reviewed_count": 0,
"false_positive_rate": None,
}
continue
levels: Dict[str, int] = {}
for record in records:
levels[record.level] = levels.get(record.level, 0) + 1
reviewed = [record for record in records if record.false_positive is not None]
false_positive_rate = None
if reviewed:
false_positive_rate = round(
sum(1 for record in reviewed if record.false_positive) / len(reviewed),
4,
)
stats[variant] = {
"count": len(records),
"avg_latency_ms": round(sum(record.latency_ms for record in records) / len(records), 4),
"max_latency_ms": round(max(record.latency_ms for record in records), 4),
"min_latency_ms": round(min(record.latency_ms for record in records), 4),
"avg_indicator_count": round(sum(record.indicator_count for record in records) / len(records), 4),
"levels": levels,
"reviewed_count": len(reviewed),
"false_positive_rate": false_positive_rate,
}
return stats
def reset(self) -> None:
self.records.clear()

View File

@@ -681,6 +681,7 @@ html, body {
<!-- Footer -->
<footer id="footer">
<a href="/about.html" aria-label="About The Door">about</a>
<button id="crisis-resources-btn" aria-label="Open crisis resources">crisis resources</button>
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
</footer>
@@ -811,6 +812,7 @@ Sovereignty and service always.`;
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
var crisisResourcesBtn = document.getElementById('crisis-resources-btn');
// Safety Plan Elements
var safetyPlanBtn = document.getElementById('safety-plan-btn');
@@ -826,6 +828,9 @@ Sovereignty and service always.`;
var isStreaming = false;
var overlayTimer = null;
var crisisPanelShown = false;
var CRISIS_OVERLAY_COOLDOWN_MS = 10 * 60 * 1000;
var CRISIS_OVERLAY_LAST_SHOWN_KEY = 'timmy_crisis_overlay_last_shown_at';
var CRISIS_OVERLAY_EVENT_LOG_KEY = 'timmy_crisis_overlay_event_log';
// ===== SERVICE WORKER =====
if ('serviceWorker' in navigator) {
@@ -853,6 +858,43 @@ Sovereignty and service always.`;
window.addEventListener('offline', updateOnlineStatus);
updateOnlineStatus();
function getLastOverlayShownAt() {
try {
return parseInt(localStorage.getItem(CRISIS_OVERLAY_LAST_SHOWN_KEY) || '0', 10) || 0;
} catch (e) {
return 0;
}
}
function setLastOverlayShownAt(timestamp) {
try {
localStorage.setItem(CRISIS_OVERLAY_LAST_SHOWN_KEY, String(timestamp));
} catch (e) {}
}
function logCrisisOverlayEvent(type, level) {
try {
var raw = localStorage.getItem(CRISIS_OVERLAY_EVENT_LOG_KEY);
var events = raw ? JSON.parse(raw) : [];
if (!Array.isArray(events)) events = [];
events.push({ type: type, level: level, at: Date.now() });
if (events.length > 20) events = events.slice(events.length - 20);
localStorage.setItem(CRISIS_OVERLAY_EVENT_LOG_KEY, JSON.stringify(events));
} catch (e) {}
}
function openCrisisResources() {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
if (typeof crisisPanel.scrollIntoView === 'function') {
crisisPanel.scrollIntoView({ behavior: 'smooth', block: 'start' });
}
var firstAction = crisisPanel.querySelector('.crisis-btn, a[href]');
if (firstAction && typeof firstAction.focus === 'function') {
firstAction.focus();
}
}
// ===== CRISIS KEYWORDS =====
// Tier 1: General crisis indicators - triggers enhanced 988 panel
var crisisKeywords = [
@@ -1021,6 +1063,19 @@ Sovereignty and service always.`;
var _preOverlayFocusElement = null;
function showOverlay() {
return showOverlayWithRateLimit(false, 2);
}
function showOverlayWithRateLimit(forceOpen, level) {
var lastShownAt = getLastOverlayShownAt();
if (!forceOpen && Date.now() - lastShownAt < CRISIS_OVERLAY_COOLDOWN_MS) {
logCrisisOverlayEvent('suppressed', level || 2);
return false;
}
logCrisisOverlayEvent(forceOpen ? 'manual-open' : 'shown', level || 2);
setLastOverlayShownAt(Date.now());
// Save current focus for restoration on dismiss
_preOverlayFocusElement = document.activeElement;
@@ -1053,6 +1108,7 @@ Sovereignty and service always.`;
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
return true;
}
// Register focus trap on document (always listening, gated by class check)
@@ -1301,6 +1357,12 @@ Sovereignty and service always.`;
});
}
if (crisisResourcesBtn) {
crisisResourcesBtn.addEventListener('click', function() {
openCrisisResources();
});
}
// ===== TEXTAREA AUTO-RESIZE =====
msgInput.addEventListener('input', function() {
this.style.height = 'auto';

View File

@@ -1,138 +0,0 @@
"""Tests for crisis.ab_testing — A/B test framework for crisis detection (#101)."""
import os
from unittest.mock import patch
import pytest
from crisis.ab_testing import ABTestCrisisDetector
from crisis.detect import CrisisDetectionResult, detect_crisis
@pytest.fixture(autouse=True)
def clear_variant_override():
old = os.environ.pop("CRISIS_AB_VARIANT", None)
try:
yield
finally:
if old is not None:
os.environ["CRISIS_AB_VARIANT"] = old
else:
os.environ.pop("CRISIS_AB_VARIANT", None)
def _make_variant(level: str, indicators=None):
indicators = indicators or [f"mock_{level.lower()}"]
def fn(text: str) -> CrisisDetectionResult:
return CrisisDetectionResult(level=level, indicators=list(indicators))
return fn
def test_detect_returns_result_variant_and_logged_record():
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("HIGH"),
)
with patch.object(detector, "_select_variant", return_value="A"):
result, variant, record_id = detector.detect("test message")
assert isinstance(result, CrisisDetectionResult)
assert variant == "A"
assert record_id == 0
assert len(detector.records) == 1
assert detector.records[0].variant == "A"
assert detector.records[0].level == "LOW"
def test_env_override_forces_variant_b():
os.environ["CRISIS_AB_VARIANT"] = "b"
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("HIGH"),
)
result, variant, _ = detector.detect("test")
assert variant == "B"
assert result.level == "HIGH"
def test_get_stats_reports_latency_counts_and_level_breakdown():
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("CRITICAL"),
)
with patch.object(detector, "_select_variant", side_effect=["A", "A", "B"]):
detector.detect("first")
detector.detect("second")
detector.detect("third")
stats = detector.get_stats()
assert stats["A"]["count"] == 2
assert stats["B"]["count"] == 1
assert stats["A"]["levels"]["LOW"] == 2
assert stats["B"]["levels"]["CRITICAL"] == 1
assert "avg_latency_ms" in stats["A"]
assert "avg_indicator_count" in stats["B"]
def test_false_positive_rate_is_computed_from_reviewed_outcomes():
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("HIGH"),
)
with patch.object(detector, "_select_variant", side_effect=["A", "A", "B"]):
_, _, a0 = detector.detect("first")
_, _, a1 = detector.detect("second")
_, _, b0 = detector.detect("third")
detector.record_outcome(a0, false_positive=True)
detector.record_outcome(a1, false_positive=False)
detector.record_outcome(b0, false_positive=False)
stats = detector.get_stats()
assert stats["A"]["reviewed_count"] == 2
assert stats["A"]["false_positive_rate"] == 0.5
assert stats["B"]["false_positive_rate"] == 0.0
def test_record_outcome_rejects_unknown_record():
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("HIGH"),
)
with pytest.raises(IndexError):
detector.record_outcome(99, false_positive=True)
def test_reset_clears_records_and_stats():
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("HIGH"),
)
detector.detect("test")
detector.reset()
assert detector.records == []
stats = detector.get_stats()
assert stats["A"]["count"] == 0
assert stats["B"]["count"] == 0
def test_with_real_detector_integration():
detector = ABTestCrisisDetector(
variant_a=detect_crisis,
variant_b=detect_crisis,
)
result, variant, record_id = detector.detect("I want to kill myself")
assert result.level == "CRITICAL"
assert variant in ("A", "B")
assert record_id == 0

View File

@@ -0,0 +1,53 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestCrisisOverlayRateLimit(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_overlay_has_ten_minute_cooldown_constant(self):
self.assertRegex(
self.html,
r"CRISIS_OVERLAY_COOLDOWN_MS\s*=\s*10\s*\*\s*60\s*\*\s*1000",
'Expected a 10-minute crisis overlay cooldown constant.',
)
def test_show_overlay_suppresses_repeat_with_logging(self):
self.assertRegex(
self.html,
r"function\s+logCrisisOverlayEvent\s*\(",
'Expected a crisis overlay event logger.',
)
self.assertRegex(
self.html,
r"if\s*\(!forceOpen\s*&&\s*Date\.now\(\)\s*-\s*lastShownAt\s*<\s*CRISIS_OVERLAY_COOLDOWN_MS\)",
'Expected showOverlay to suppress repeated auto-displays inside the cooldown window.',
)
self.assertRegex(
self.html,
r"logCrisisOverlayEvent\('suppressed'",
'Expected suppressed overlay attempts to be logged.',
)
def test_manual_crisis_resources_button_exists_and_bypasses_cooldown(self):
self.assertIn('id="crisis-resources-btn"', self.html)
self.assertRegex(
self.html,
r"function\s+openCrisisResources\s*\(",
'Expected a manual crisis resources opener.',
)
self.assertRegex(
self.html,
r"crisisResourcesBtn\.addEventListener\('click',\s*function\(\)\s*\{\s*openCrisisResources\(\);",
'Expected the footer button to wire into openCrisisResources().',
)
if __name__ == '__main__':
unittest.main()