Compare commits

..

3 Commits

Author SHA1 Message Date
Timmy
7cef18fdcb feat: add crisis ab testing for #101
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 14s
2026-04-20 21:43:37 -04:00
Timmy
706024e11e test: define crisis ab testing for #101 2026-04-20 21:41:31 -04:00
d412939b4f fix: footer /about link to point to static about.html
Fixes #59

The footer links to /about but the repo ships about.html. On a plain static server this results in a 404. Changed to /about.html so the link resolves correctly.
2026-04-17 05:37:40 +00:00
7 changed files with 255 additions and 375 deletions

View File

@@ -12,7 +12,7 @@ VPS := alexanderwhitestone.com
DOMAIN := alexanderwhitestone.com
DEPLOY_DIR := deploy
.PHONY: help deploy deploy-bash check ssl push service metrics
.PHONY: help deploy deploy-bash check ssl push service
help:
@echo "The Door — Deployment Commands"
@@ -23,8 +23,6 @@ help:
@echo " make check Check deployment status"
@echo " make ssl Setup SSL on VPS"
@echo " make service Install/restart hermes-gateway service"
@echo " make metrics View crisis metrics summary"
@echo " make metrics-json Export crisis metrics as JSON"
@echo ""
deploy:
@@ -48,9 +46,3 @@ ssl:
service:
ssh root@$(VPS) "cd /opt/the-door && bash deploy/deploy.sh --service"
metrics:
python3 -m crisis.metrics --summary
metrics-json:
python3 -m crisis.metrics --json

View File

@@ -8,7 +8,7 @@ from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urg
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
from .metrics import CrisisMetrics, AggregateMetrics
from .ab_testing import ABTestCrisisDetector, VariantRecord
__all__ = [
"detect_crisis",
@@ -24,6 +24,6 @@ __all__ = [
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
"CrisisMetrics",
"AggregateMetrics",
"ABTestCrisisDetector",
"VariantRecord",
]

112
crisis/ab_testing.py Normal file
View File

@@ -0,0 +1,112 @@
"""A/B test framework for crisis detection in the-door."""
from __future__ import annotations
import os
import random
import time
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional, Tuple
from .detect import CrisisDetectionResult
def _get_variant_override() -> Optional[str]:
"""Return env override for deterministic testing/debugging."""
value = os.environ.get("CRISIS_AB_VARIANT", "").strip().upper()
if value in {"A", "B"}:
return value
return None
@dataclass
class VariantRecord:
"""Single crisis detection event record with no user text or PII."""
variant: str
level: str
latency_ms: float
indicator_count: int
false_positive: Optional[bool] = None
class ABTestCrisisDetector:
"""Route crisis detection between two variants and collect comparison stats."""
def __init__(
self,
variant_a: Callable[[str], CrisisDetectionResult],
variant_b: Callable[[str], CrisisDetectionResult],
split: float = 0.5,
):
self.variant_a = variant_a
self.variant_b = variant_b
self.split = max(0.0, min(1.0, float(split)))
self.records: List[VariantRecord] = []
def _select_variant(self) -> str:
override = _get_variant_override()
if override:
return override
return "A" if random.random() < self.split else "B"
def detect(self, text: str) -> Tuple[CrisisDetectionResult, str, int]:
variant = self._select_variant()
detector = self.variant_a if variant == "A" else self.variant_b
start = time.perf_counter()
result = detector(text)
latency_ms = (time.perf_counter() - start) * 1000.0
record = VariantRecord(
variant=variant,
level=result.level,
latency_ms=latency_ms,
indicator_count=len(result.indicators),
)
self.records.append(record)
return result, variant, len(self.records) - 1
def record_outcome(self, record_id: int, *, false_positive: bool) -> None:
if record_id < 0 or record_id >= len(self.records):
raise IndexError(f"Unknown record id: {record_id}")
self.records[record_id].false_positive = bool(false_positive)
def get_stats(self) -> Dict[str, dict]:
stats: Dict[str, dict] = {}
for variant in ("A", "B"):
records = [record for record in self.records if record.variant == variant]
if not records:
stats[variant] = {
"count": 0,
"reviewed_count": 0,
"false_positive_rate": None,
}
continue
levels: Dict[str, int] = {}
for record in records:
levels[record.level] = levels.get(record.level, 0) + 1
reviewed = [record for record in records if record.false_positive is not None]
false_positive_rate = None
if reviewed:
false_positive_rate = round(
sum(1 for record in reviewed if record.false_positive) / len(reviewed),
4,
)
stats[variant] = {
"count": len(records),
"avg_latency_ms": round(sum(record.latency_ms for record in records) / len(records), 4),
"max_latency_ms": round(max(record.latency_ms for record in records), 4),
"min_latency_ms": round(min(record.latency_ms for record in records), 4),
"avg_indicator_count": round(sum(record.indicator_count for record in records) / len(records), 4),
"levels": levels,
"reviewed_count": len(reviewed),
"false_positive_rate": false_positive_rate,
}
return stats
def reset(self) -> None:
self.records.clear()

View File

@@ -1,244 +0,0 @@
"""
crisis/metrics.py — Aggregate crisis detection metrics.
Tracks session-level crisis data for aggregate reporting.
Privacy-first: stores only aggregate counts, never user content.
Usage:
from crisis.metrics import CrisisMetrics
metrics = CrisisMetrics()
metrics.record_session(tracker.state)
summary = metrics.get_summary()
"""
import json
import os
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
METRICS_DIR = Path.home() / ".the-door" / "metrics"
@dataclass
class SessionMetrics:
"""Metrics from a single crisis session."""
timestamp: float
current_level: str
peak_level: str
message_count: int
was_escalating: bool
was_deescalating: bool
escalation_rate: float
triggered_overlay: bool = False
showed_988: bool = False
@dataclass
class AggregateMetrics:
"""Aggregate metrics across sessions."""
total_sessions: int = 0
total_messages: int = 0
# Level distribution
level_counts: Dict[str, int] = field(default_factory=lambda: {
"NONE": 0, "LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0
})
# Escalation tracking
escalating_sessions: int = 0
deescalating_sessions: int = 0
# Safety interventions
overlay_triggers: int = 0
ninety_eight_show: int = 0
# Time window
period_start: Optional[float] = None
period_end: Optional[float] = None
class CrisisMetrics:
"""
Aggregate crisis metrics with local JSON persistence.
Privacy-first: stores only aggregate counts per day.
Never stores user messages, content, or identifying info.
"""
def __init__(self, metrics_dir: Optional[Path] = None):
self.metrics_dir = metrics_dir or METRICS_DIR
self.metrics_dir.mkdir(parents=True, exist_ok=True)
self._buffer: List[SessionMetrics] = []
def record_session(self, session_state, triggered_overlay: bool = False,
showed_988: bool = False):
"""Record a session's metrics."""
from .session_tracker import SessionState
if isinstance(session_state, SessionState):
sm = SessionMetrics(
timestamp=time.time(),
current_level=session_state.current_level,
peak_level=session_state.peak_level,
message_count=session_state.message_count,
was_escalating=session_state.is_escalating,
was_deescalating=session_state.is_deescalating,
escalation_rate=session_state.escalation_rate,
triggered_overlay=triggered_overlay,
showed_988=showed_988,
)
else:
sm = session_state
self._buffer.append(sm)
self._flush()
def _flush(self):
"""Write buffered sessions to daily file."""
if not self._buffer:
return
today = datetime.utcnow().strftime("%Y-%m-%d")
filepath = self.metrics_dir / f"{today}.jsonl"
with open(filepath, 'a') as f:
for sm in self._buffer:
f.write(json.dumps(asdict(sm)) + '\n')
self._buffer.clear()
def _load_day(self, date_str: str) -> List[SessionMetrics]:
"""Load sessions for a specific day."""
filepath = self.metrics_dir / f"{date_str}.jsonl"
if not filepath.exists():
return []
sessions = []
with open(filepath) as f:
for line in f:
if line.strip():
data = json.loads(line)
sessions.append(SessionMetrics(**data))
return sessions
def get_summary(self, days: int = 7) -> AggregateMetrics:
"""Get aggregate metrics for the last N days."""
agg = AggregateMetrics()
now = datetime.utcnow()
for i in range(days):
date = (now - timedelta(days=i)).strftime("%Y-%m-%d")
sessions = self._load_day(date)
for sm in sessions:
agg.total_sessions += 1
agg.total_messages += sm.message_count
# Level counts (use peak level)
level = sm.peak_level
agg.level_counts[level] = agg.level_counts.get(level, 0) + 1
if sm.was_escalating:
agg.escalating_sessions += 1
if sm.was_deescalating:
agg.deescalating_sessions += 1
if sm.triggered_overlay:
agg.overlay_triggers += 1
if sm.showed_988:
agg.ninety_eight_show += 1
# Time window
if agg.period_start is None or sm.timestamp < agg.period_start:
agg.period_start = sm.timestamp
if agg.period_end is None or sm.timestamp > agg.period_end:
agg.period_end = sm.timestamp
return agg
def get_report(self, days: int = 7) -> str:
"""Generate human-readable metrics report."""
agg = self.get_summary(days)
lines = []
lines.append("=" * 50)
lines.append(" CRISIS METRICS REPORT")
lines.append(f" Last {days} days")
if agg.period_start:
start = datetime.fromtimestamp(agg.period_start).strftime("%Y-%m-%d %H:%M")
lines.append(f" Period: {start} → now")
lines.append("=" * 50)
lines.append(f"\n Sessions: {agg.total_sessions}")
lines.append(f" Messages tracked: {agg.total_messages}")
lines.append(f"\n Level Distribution (by peak):")
for level in ["NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"]:
count = agg.level_counts.get(level, 0)
pct = (count / agg.total_sessions * 100) if agg.total_sessions > 0 else 0
bar = "" * int(pct / 5)
lines.append(f" {level:<10} {count:>5} ({pct:>5.1f}%) {bar}")
lines.append(f"\n Escalations: {agg.escalating_sessions}")
lines.append(f" De-escalations: {agg.deescalating_sessions}")
lines.append(f" Overlay triggers: {agg.overlay_triggers}")
lines.append(f" 988 shown: {agg.ninety_eight_show}")
if agg.total_sessions > 0:
escalation_rate = agg.escalating_sessions / agg.total_sessions * 100
lines.append(f"\n Escalation rate: {escalation_rate:.1f}%")
lines.append("=" * 50)
return "\n".join(lines)
def get_json(self, days: int = 7) -> str:
"""Export metrics as JSON."""
agg = self.get_summary(days)
return json.dumps(asdict(agg), indent=2)
def main():
"""CLI entry point for crisis metrics."""
import argparse
parser = argparse.ArgumentParser(description="Crisis Detection Metrics")
parser.add_argument("--summary", action="store_true", help="Show summary report")
parser.add_argument("--json", action="store_true", help="JSON export")
parser.add_argument("--days", type=int, default=7, help="Days to include")
parser.add_argument("--demo", action="store_true", help="Generate demo data")
args = parser.parse_args()
metrics = CrisisMetrics()
if args.demo:
import random
levels = ["NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"]
for i in range(50):
from .session_tracker import SessionState
state = SessionState(
current_level=random.choice(levels),
peak_level=random.choice(levels),
message_count=random.randint(1, 20),
is_escalating=random.random() > 0.7,
is_deescalating=random.random() > 0.8,
escalation_rate=random.random(),
)
metrics.record_session(
state,
triggered_overlay=random.random() > 0.8,
showed_988=random.random() > 0.7,
)
print("Generated 50 demo sessions.")
if args.json:
print(metrics.get_json(args.days))
else:
print(metrics.get_report(args.days))
if __name__ == "__main__":
main()

View File

@@ -680,7 +680,7 @@ html, body {
<!-- Footer -->
<footer id="footer">
<a href="/about" aria-label="About The Door">about</a>
<a href="/about.html" aria-label="About The Door">about</a>
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
</footer>

138
tests/test_ab_testing.py Normal file
View File

@@ -0,0 +1,138 @@
"""Tests for crisis.ab_testing — A/B test framework for crisis detection (#101)."""
import os
from unittest.mock import patch
import pytest
from crisis.ab_testing import ABTestCrisisDetector
from crisis.detect import CrisisDetectionResult, detect_crisis
@pytest.fixture(autouse=True)
def clear_variant_override():
old = os.environ.pop("CRISIS_AB_VARIANT", None)
try:
yield
finally:
if old is not None:
os.environ["CRISIS_AB_VARIANT"] = old
else:
os.environ.pop("CRISIS_AB_VARIANT", None)
def _make_variant(level: str, indicators=None):
indicators = indicators or [f"mock_{level.lower()}"]
def fn(text: str) -> CrisisDetectionResult:
return CrisisDetectionResult(level=level, indicators=list(indicators))
return fn
def test_detect_returns_result_variant_and_logged_record():
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("HIGH"),
)
with patch.object(detector, "_select_variant", return_value="A"):
result, variant, record_id = detector.detect("test message")
assert isinstance(result, CrisisDetectionResult)
assert variant == "A"
assert record_id == 0
assert len(detector.records) == 1
assert detector.records[0].variant == "A"
assert detector.records[0].level == "LOW"
def test_env_override_forces_variant_b():
os.environ["CRISIS_AB_VARIANT"] = "b"
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("HIGH"),
)
result, variant, _ = detector.detect("test")
assert variant == "B"
assert result.level == "HIGH"
def test_get_stats_reports_latency_counts_and_level_breakdown():
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("CRITICAL"),
)
with patch.object(detector, "_select_variant", side_effect=["A", "A", "B"]):
detector.detect("first")
detector.detect("second")
detector.detect("third")
stats = detector.get_stats()
assert stats["A"]["count"] == 2
assert stats["B"]["count"] == 1
assert stats["A"]["levels"]["LOW"] == 2
assert stats["B"]["levels"]["CRITICAL"] == 1
assert "avg_latency_ms" in stats["A"]
assert "avg_indicator_count" in stats["B"]
def test_false_positive_rate_is_computed_from_reviewed_outcomes():
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("HIGH"),
)
with patch.object(detector, "_select_variant", side_effect=["A", "A", "B"]):
_, _, a0 = detector.detect("first")
_, _, a1 = detector.detect("second")
_, _, b0 = detector.detect("third")
detector.record_outcome(a0, false_positive=True)
detector.record_outcome(a1, false_positive=False)
detector.record_outcome(b0, false_positive=False)
stats = detector.get_stats()
assert stats["A"]["reviewed_count"] == 2
assert stats["A"]["false_positive_rate"] == 0.5
assert stats["B"]["false_positive_rate"] == 0.0
def test_record_outcome_rejects_unknown_record():
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("HIGH"),
)
with pytest.raises(IndexError):
detector.record_outcome(99, false_positive=True)
def test_reset_clears_records_and_stats():
detector = ABTestCrisisDetector(
variant_a=_make_variant("LOW"),
variant_b=_make_variant("HIGH"),
)
detector.detect("test")
detector.reset()
assert detector.records == []
stats = detector.get_stats()
assert stats["A"]["count"] == 0
assert stats["B"]["count"] == 0
def test_with_real_detector_integration():
detector = ABTestCrisisDetector(
variant_a=detect_crisis,
variant_b=detect_crisis,
)
result, variant, record_id = detector.detect("I want to kill myself")
assert result.level == "CRITICAL"
assert variant in ("A", "B")
assert record_id == 0

View File

@@ -1,118 +0,0 @@
"""
Tests for crisis/metrics.py — Aggregate crisis metrics.
"""
import json
import os
import shutil
import tempfile
import unittest
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from crisis.metrics import CrisisMetrics, SessionMetrics, AggregateMetrics
class TestCrisisMetrics(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.metrics = CrisisMetrics(Path(self.tmpdir))
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_record_session_creates_file(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="LOW",
peak_level="MEDIUM",
message_count=5,
was_escalating=True,
was_deescalating=False,
escalation_rate=0.5,
)
self.metrics.record_session(sm)
files = list(Path(self.tmpdir).glob("*.jsonl"))
self.assertEqual(len(files), 1)
def test_record_session_writes_jsonl(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="HIGH",
peak_level="CRITICAL",
message_count=10,
was_escalating=True,
was_deescalating=False,
escalation_rate=1.0,
triggered_overlay=True,
showed_988=True,
)
self.metrics.record_session(sm)
files = list(Path(self.tmpdir).glob("*.jsonl"))
with open(files[0]) as f:
data = json.loads(f.readline())
self.assertEqual(data['peak_level'], 'CRITICAL')
self.assertTrue(data['triggered_overlay'])
def test_get_summary_empty(self):
agg = self.metrics.get_summary(days=7)
self.assertEqual(agg.total_sessions, 0)
self.assertEqual(agg.total_messages, 0)
def test_get_summary_with_data(self):
for level in ["LOW", "MEDIUM", "HIGH"]:
sm = SessionMetrics(
timestamp=1700000000,
current_level=level,
peak_level=level,
message_count=3,
was_escalating=level != "LOW",
was_deescalating=False,
escalation_rate=0.5,
)
self.metrics.record_session(sm)
agg = self.metrics.get_summary(days=1)
self.assertEqual(agg.total_sessions, 3)
self.assertEqual(agg.total_messages, 9)
self.assertEqual(agg.escalating_sessions, 2)
def test_get_report_returns_string(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="LOW",
peak_level="LOW",
message_count=5,
was_escalating=False,
was_deescalating=False,
escalation_rate=0.0,
)
self.metrics.record_session(sm)
report = self.metrics.get_report(days=1)
self.assertIn("CRISIS METRICS REPORT", report)
self.assertIn("Sessions:", report)
def test_get_json_returns_valid(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="MEDIUM",
peak_level="MEDIUM",
message_count=3,
was_escalating=False,
was_deescalating=False,
escalation_rate=0.0,
)
self.metrics.record_session(sm)
json_str = self.metrics.get_json(days=1)
data = json.loads(json_str)
self.assertEqual(data['total_sessions'], 1)
if __name__ == "__main__":
unittest.main()