Compare commits

..

16 Commits

Author SHA1 Message Date
Alexander Whitestone
c642b0c453 feat: add crisis metrics endpoint contract (#97)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 9s
2026-04-17 01:49:52 -04:00
Alexander Whitestone
b7a2f90671 test: add crisis metrics endpoint coverage 2026-04-17 01:39:13 -04:00
07c582aa08 Merge pull request 'fix: crisis overlay initial focus to enabled Call 988 link (#69)' (#126) from burn/69-1776264183 into main
Merge PR #126: fix: crisis overlay initial focus to enabled Call 988 link (#69)
2026-04-17 01:46:56 +00:00
5f95dc1e39 Merge pull request '[P3] Service worker: cache crisis resources for offline (#41)' (#122) from burn/41-1776264184 into main
Merge PR #122: [P3] Service worker: cache crisis resources for offline (#41)
2026-04-17 01:46:55 +00:00
b1f3cac36d Merge pull request 'feat: session-level crisis tracking and escalation (closes #35)' (#118) from door/issue-35 into main
Merge PR #118: feat: session-level crisis tracking and escalation (closes #35)
2026-04-17 01:46:53 +00:00
07b3f67845 fix: crisis overlay initial focus to enabled Call 988 link (#69)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 9s
Smoke Test / smoke (pull_request) Successful in 15s
2026-04-15 15:09:36 +00:00
c22bbbaf65 fix: crisis overlay initial focus to enabled Call 988 link (#69) 2026-04-15 15:09:32 +00:00
543cb1d40f test: add offline self-containment and retry button tests (#41)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-15 14:58:44 +00:00
3cfd01815a feat: session-level crisis tracking and escalation (closes #35)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 17s
Smoke Test / smoke (pull_request) Successful in 23s
2026-04-15 11:49:52 +00:00
5a7ba9f207 feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:51 +00:00
8ed8f20a17 feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:49 +00:00
9d7d26033e feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:47 +00:00
48f48c7f26 feat: cache offline crisis resources (refs #41) (#74)
All checks were successful
Smoke Test / smoke (push) Successful in 7s
Sanity Checks / sanity-test (pull_request) Successful in 17s
Smoke Test / smoke (pull_request) Successful in 19s
Merge PR #74 (squash)
2026-04-14 22:09:59 +00:00
da31288525 fix: deprecate dying_detection and consolidate crisis detection (#40) (#76)
All checks were successful
Smoke Test / smoke (push) Successful in 4s
Merge PR #76 (squash)
2026-04-14 22:08:29 +00:00
8efc858cd7 fix: add keyboard focus trap to crisis overlay (#80)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merge PR #80 (squash)
2026-04-14 22:08:28 +00:00
611c1c8456 fix(a11y): Safety plan modal keyboard focus trap (#65) (#81)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merge PR #81 (squash)
2026-04-14 22:08:24 +00:00
19 changed files with 1870 additions and 571 deletions

View File

@@ -95,7 +95,45 @@ Run tests:
python -m pytest crisis/tests.py -v
```
### 7. Acceptance Criteria Checklist
### 7. Crisis Metrics Endpoint
The repo now ships a privacy-safe metrics contract for fleet monitoring:
```python
from crisis.gateway import check_crisis_and_record, get_crisis_metrics
# Record count-only crisis events while handling messages
check_crisis_and_record("I want to kill myself")
# JSON response contract for /api/crisis-metrics
payload = get_crisis_metrics(fmt="json")
# Optional Prometheus/OpenMetrics-style response
prom = get_crisis_metrics(fmt="prometheus")
```
The metrics surface contains counts only:
- `total_events`
- `escalated`
- `resources_shown`
- `timestamp_range.start`
- `timestamp_range.end`
No message content, indicators, or user identifiers are retained.
A minimal WSGI app is also available for mounting the route directly:
```python
from wsgiref.simple_server import make_server
from crisis.metrics import crisis_metrics_app
with make_server("127.0.0.1", 8645, crisis_metrics_app) as httpd:
httpd.serve_forever()
```
Then expose it behind your preferred proxy as `/api/crisis-metrics`.
### 8. Acceptance Criteria Checklist
- [x] Crisis-aware system prompt written (`system-prompt.txt`)
- [x] Frontend embeds system prompt on every API request (`index.html:1129`)
@@ -104,6 +142,7 @@ python -m pytest crisis/tests.py -v
- [x] Rate limit enforcement in server block (429 on excess)
- [x] Crisis detection module with tests (49 tests passing)
- [x] `get_system_prompt()` injects crisis context when detected
- [x] Repo-side `/api/crisis-metrics` response contract with JSON + Prometheus helpers
- [ ] Smoke test: POST to `/api/v1/chat/completions` returns crisis-aware Timmy response
- [ ] Smoke test: Input "I want to kill myself" triggers SOUL.md protocol
- [ ] Smoke test: 11th request in 1 minute returns HTTP 429

View File

@@ -34,7 +34,7 @@ deploy-bash:
push:
rsync -avz --exclude='.git' --exclude='deploy' \
index.html manifest.json sw.js about.html testimony.html system-prompt.txt \
index.html manifest.json sw.js about.html crisis-offline.html testimony.html system-prompt.txt \
root@$(VPS):/var/www/the-door/
ssh root@$(VPS) "chown -R www-data:www-data /var/www/the-door"

241
crisis-offline.html Normal file
View File

@@ -0,0 +1,241 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="theme-color" content="#0d1117">
<meta name="description" content="Offline crisis resources from The Door. Call or text 988 for immediate help.">
<title>Offline Crisis Resources | The Door</title>
<style>
:root {
color-scheme: dark;
--bg: #0d1117;
--panel: #161b22;
--panel-urgent: #1c1210;
--border: #30363d;
--accent: #c9362c;
--accent-soft: #ff6b6b;
--text: #e6edf3;
--muted: #8b949e;
--safe: #2ea043;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: var(--bg);
color: var(--text);
line-height: 1.6;
}
main {
max-width: 760px;
margin: 0 auto;
padding: 24px 16px 48px;
}
.status {
display: inline-flex;
align-items: center;
gap: 10px;
padding: 8px 12px;
border-radius: 999px;
background: rgba(201, 54, 44, 0.15);
border: 1px solid rgba(255, 107, 107, 0.35);
color: var(--accent-soft);
font-size: 0.9rem;
margin-bottom: 20px;
}
.status-dot {
width: 10px;
height: 10px;
border-radius: 50%;
background: var(--accent-soft);
}
h1 {
font-size: clamp(2rem, 6vw, 2.75rem);
line-height: 1.15;
margin: 0 0 12px;
}
.lede {
color: var(--muted);
font-size: 1.05rem;
margin: 0 0 28px;
}
.urgent-box,
.panel {
border-radius: 18px;
padding: 20px;
margin-bottom: 18px;
border: 1px solid var(--border);
background: var(--panel);
}
.urgent-box {
background: linear-gradient(180deg, rgba(201, 54, 44, 0.18), rgba(28, 18, 16, 0.95));
border-color: rgba(255, 107, 107, 0.35);
}
.section-title {
font-size: 1.2rem;
margin: 0 0 12px;
}
.actions {
display: grid;
gap: 12px;
margin-top: 16px;
}
.action-btn {
display: flex;
align-items: center;
justify-content: center;
text-align: center;
gap: 8px;
min-height: 52px;
padding: 14px 18px;
border-radius: 12px;
text-decoration: none;
font-weight: 700;
color: #fff;
background: var(--accent);
border: 1px solid transparent;
}
.action-btn.secondary {
background: #1f6feb;
}
.action-btn.retry {
background: transparent;
color: var(--text);
border-color: var(--border);
}
.action-btn:focus,
.action-btn:hover,
button.action-btn:hover,
button.action-btn:focus {
outline: 3px solid rgba(255, 107, 107, 0.4);
outline-offset: 2px;
}
ul, ol {
margin: 0;
padding-left: 20px;
}
li + li {
margin-top: 8px;
}
.grounding-steps li::marker {
color: var(--accent-soft);
font-weight: 700;
}
.small {
color: var(--muted);
font-size: 0.92rem;
}
.grid {
display: grid;
gap: 18px;
}
@media (min-width: 700px) {
.grid {
grid-template-columns: 1fr 1fr;
}
}
</style>
</head>
<body>
<main>
<div class="status" role="status" aria-live="polite">
<span class="status-dot" aria-hidden="true"></span>
<span id="connection-status-text">Offline crisis resources are ready on this device.</span>
</div>
<h1>You are not alone right now.</h1>
<p class="lede">
Your connection is weak or offline. These crisis resources are saved locally so you can still reach help.
</p>
<section class="urgent-box" aria-labelledby="urgent-help-title">
<h2 class="section-title" id="urgent-help-title">Immediate help</h2>
<p>If you might act on suicidal thoughts, contact a real person now. Stay with another person if you can.</p>
<div class="actions">
<a class="action-btn" href="tel:988" aria-label="Call 988 now">Call 988 now</a>
<a class="action-btn secondary" href="sms:741741&body=HOME" aria-label="Text HOME to 741741 for Crisis Text Line">Text HOME to 741741 — Crisis Text Line</a>
<button class="action-btn retry" id="retry-connection" type="button">Retry connection</button>
</div>
<p class="small" style="margin-top: 14px;">If you are in immediate danger or have already taken action, call emergency services now.</p>
</section>
<div class="grid">
<section class="panel" aria-labelledby="grounding-title">
<h2 class="section-title" id="grounding-title">5-4-3-2-1 grounding</h2>
<ol class="grounding-steps">
<li>5 things you can see</li>
<li>4 things you can feel</li>
<li>3 things you can hear</li>
<li>2 things you can smell</li>
<li>1 thing you can taste</li>
</ol>
<p class="small" style="margin-top: 14px;">Say each one out loud if you can. Slow is okay.</p>
</section>
<section class="panel" aria-labelledby="next-steps-title">
<h2 class="section-title" id="next-steps-title">Next small steps</h2>
<ul>
<li>Put some distance between yourself and anything you could use to hurt yourself.</li>
<li>Move closer to another person, a front desk, or a public place if possible.</li>
<li>Drink water or hold something cold in your hand.</li>
<li>Breathe in for 4, hold for 4, out for 6. Repeat 5 times.</li>
<li>Text or call one safe person and say: “I need you with me right now.”</li>
</ul>
</section>
</div>
<section class="panel" aria-labelledby="hope-title">
<h2 class="section-title" id="hope-title">Stay through the next ten minutes</h2>
<p>Do not solve your whole life right now. Stay for the next breath. Then the next one.</p>
<p class="small">If the connection comes back, you can return to The Door chat. Until then, the fastest path to a real person is still 988.</p>
</section>
</main>
<script>
(function () {
var statusText = document.getElementById('connection-status-text');
var retryButton = document.getElementById('retry-connection');
function updateStatus() {
statusText.textContent = navigator.onLine
? 'Connection is back. You can reopen chat now.'
: 'Offline crisis resources are ready on this device.';
}
retryButton.addEventListener('click', function () {
if (navigator.onLine) {
window.location.href = '/';
return;
}
window.location.reload();
});
window.addEventListener('online', updateStatus);
window.addEventListener('offline', updateStatus);
updateStatus();
})();
</script>
</body>
</html>

View File

@@ -1,3 +1,46 @@
"""Crisis package init."""
from .detect import detect_crisis, CrisisDetectionResult
from .metrics import CrisisMetrics
"""
Crisis detection and response system for the-door.
Stands between a broken man and a machine that would tell him to die.
"""
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import (
check_crisis,
check_crisis_and_record,
get_crisis_metrics,
get_system_prompt,
format_gateway_response,
)
from .metrics import (
CrisisMetricsTracker,
CrisisMetricsSnapshot,
build_metrics_http_response,
crisis_metrics_app,
format_prometheus_metrics,
)
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
__all__ = [
"detect_crisis",
"CrisisDetectionResult",
"process_message",
"generate_response",
"CrisisResponse",
"check_crisis",
"check_crisis_and_record",
"get_crisis_metrics",
"get_system_prompt",
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisMetricsTracker",
"CrisisMetricsSnapshot",
"build_metrics_http_response",
"crisis_metrics_app",
"format_prometheus_metrics",
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
]

View File

@@ -51,13 +51,13 @@ HIGH_INDICATORS = [
r"\bwish\s+I\s+(?:was|were)\s+(?:dead|gone|never\s+born)\b",
r"\bdon'?t\s+matter\s+if\s+I\s+exist\b",
r"\bno\s+one\s+would\s+(?:care|miss)\b",
r"\bworld\s+would?\s+be\s+better\s+without\b",
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment)\b",
r"\bcan'?t\s+see\s+any\s+(?:point|reason)\b",
r"\bworld\s+would?\s+be\s+better\s+without\s+me\b",
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment|anguish)\b",
r"\bcan'?t\s+see\s+any\s+(?:point|reason|hope|way)\b",
r"\bescape\s+from\s*this",
r"\bjust\s+want\s+it\s+to\s+stop\b",
r"\btrapped\s+(?:in\s+(?:my|this|a\s+dark)|and\s+can'?t\s+escape)\b",
r"\bnothing\s+left\s+(?:to\s+(?:live\s+for|hope\s+for|give)|inside)\b",
r"\bdisappeared\s+forever\b",
# Contextual despair phrases (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
r"\beverything\s+is\s+hopeless\b",
@@ -68,6 +68,8 @@ HIGH_INDICATORS = [
r"\bno\s*hope\s+(?:left|remaining)\b",
r"\bno\s*way\s*out\b",
r"\bfeel(?:s|ing)?\s+trapped\b",
r"\btrapped\s+in\s+this\s+(?:situation|life|pain|darkness|hell)\b",
r"\btrapped\s+and\s+can'?t\s+escape\b",
r"\bdesperate\s+(?:for\s+)?help\b",
r"\bfeel(?:s|ing)?\s+desperate\b",
]
@@ -99,6 +101,8 @@ MEDIUM_INDICATORS = [
r"\bsinking\b",
r"\bdrowning\b",
r"\bhopeless\b",
r"\blost\s+all\s+hope\b",
r"\bno\s+tomorrow\b",
# Contextual versions (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
@@ -112,7 +116,7 @@ MEDIUM_INDICATORS = [
LOW_INDICATORS = [
r"\bunhappy\b",
r"\bdown\b",
r"\btough\s*time\b",
r"\btough\s*(?:time|day|week)\b",
r"\brough\s+(?:day|week|patch)\b",
r"\bstressed\b",
r"\bburnout\b",
@@ -122,6 +126,8 @@ LOW_INDICATORS = [
r"\btired\b",
r"\bsad\b",
r"\bupset\b",
r"\blonely\b",
r"\banxious?\b",
r"\bnot\s*(?:good|great|okay)\b",
r"\bthings\s*are\s*hard\b",
r"\bstruggling\b",

View File

@@ -16,25 +16,38 @@ from typing import Optional
from .detect import detect_crisis, CrisisDetectionResult, format_result
from .compassion_router import router
from .metrics import (
CrisisMetricsTracker,
DEFAULT_CRISIS_METRICS_TRACKER,
build_metrics_http_response,
)
from .response import (
process_message,
generate_response,
get_system_prompt_modifier,
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
def check_crisis(text: str) -> dict:
def check_crisis(
text: str,
metrics_tracker: Optional[CrisisMetricsTracker] = None,
timestamp: Optional[object] = None,
) -> dict:
"""
Full crisis check returning structured data.
Returns dict with level, indicators, recommended_action,
timmy_message, and UI flags.
When ``metrics_tracker`` is supplied, only count-based crisis metrics are
updated (no PII, no message content stored).
"""
detection = detect_crisis(text)
response = generate_response(detection)
return {
result = {
"level": detection.level,
"score": detection.score,
"indicators": detection.indicators,
@@ -48,6 +61,30 @@ def check_crisis(text: str) -> dict:
"escalate": response.escalate,
}
if metrics_tracker is not None:
metrics_tracker.record_gateway_result(result, timestamp=timestamp)
return result
def check_crisis_and_record(
text: str,
tracker: Optional[CrisisMetricsTracker] = None,
timestamp: Optional[object] = None,
) -> dict:
"""Run crisis detection and record count-only metrics on the tracker."""
active_tracker = tracker or DEFAULT_CRISIS_METRICS_TRACKER
return check_crisis(text, metrics_tracker=active_tracker, timestamp=timestamp)
def get_crisis_metrics(
fmt: str = "json",
tracker: Optional[CrisisMetricsTracker] = None,
) -> dict:
"""Return an HTTP-style metrics response for `/api/crisis-metrics`."""
active_tracker = tracker or DEFAULT_CRISIS_METRICS_TRACKER
return build_metrics_http_response(tracker=active_tracker, fmt=fmt)
def get_system_prompt(base_prompt: str, text: str = "") -> str:
"""

View File

@@ -1,97 +1,239 @@
"""Crisis Detection Metrics Layer — the-door (#37).
Privacy-preserving counters and histograms for crisis events.
No PII. Just counts, categories, and aggregate stats.
"""
Crisis metrics tracker and endpoint helpers for the-door.
Provides a privacy-safe metrics surface for fleet monitoring. Counts only.
No message content, indicators, or user identifiers are retained.
"""
from __future__ import annotations
import json
import os
import threading
import time
from collections import Counter
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Optional
from urllib.parse import parse_qs
class CrisisMetrics:
"""Record, query, and summarise crisis detection events."""
UTC = timezone.utc
PROMETHEUS_CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8"
JSON_CONTENT_TYPE = "application/json"
def __init__(self, path: str):
self._path = path
self._counters: Dict[str, Counter] = {
'total': Counter(),
'levels': Counter(),
'keywords': Counter(),
}
self._lock = threading.Lock()
self._load()
def _load(self):
if not os.path.exists(self._path):
return
with open(self._path) as fh:
for line in fh:
try:
event = json.loads(line)
except Exception:
continue
self._counters['levels'][event['level']] += 1
for kw in event.get('keywords', []):
self._counters['keywords'][kw.lower()] += 1
self._counters['total'] = Counter({'all': sum(self._counters['levels'].values())})
@dataclass(frozen=True)
class CrisisMetricsSnapshot:
total_events: int = 0
escalated: int = 0
resources_shown: int = 0
start: Optional[str] = None
end: Optional[str] = None
def record(self, level: str, keywords: List[str]):
"""Record a single crisis-detection event. Thread-safe."""
event = {
'ts': time.time(),
'level': level,
'keywords': [k.lower() for k in keywords],
}
with self._lock:
with open(self._path, 'a') as fh:
fh.write(json.dumps(event) + '\n')
self._counters['levels'][level] += 1
for kw in event['keywords']:
self._counters['keywords'][kw] += 1
def snapshot(self) -> dict:
"""Return current aggregate counters."""
with self._lock:
total = sum(self._counters['levels'].values())
return {
'total_detections': total,
'detections_per_level': dict(self._counters['levels']),
'keywords_frequency': dict(self._counters['keywords']),
}
def weekly(self) -> dict:
"""Return weekly summary with optional false-positive estimate."""
now = time.time()
week_start = now - 7 * 86400
levels = Counter()
keywords = Counter()
with self._lock:
if os.path.exists(self._path):
with open(self._path) as fh:
for line in fh:
try:
ev = json.loads(line)
except Exception:
continue
if ev['ts'] < week_start:
continue
levels[ev['level']] += 1
for kw in ev.get('keywords', []):
keywords[kw] += 1
total = sum(levels.values())
def to_dict(self) -> dict[str, Any]:
return {
'week_start': week_start,
'week_end': now,
'detections': total,
'detections_per_level': dict(levels),
'top_keywords': keywords.most_common(10),
'false_positive_estimate': round(
levels.get('NONE', 0) / total * 100, 1
) if total else 0.0,
"total_events": self.total_events,
"escalated": self.escalated,
"resources_shown": self.resources_shown,
"timestamp_range": {
"start": self.start,
"end": self.end,
},
}
def close(self):
pass # nothing to flush
class CrisisMetricsTracker:
"""In-memory metrics accumulator for crisis events.
Counts only crisis-bearing events (`level != NONE`). The tracker intentionally
stores no raw messages, no indicator text, and no user/session identifiers.
"""
def __init__(self) -> None:
self.reset()
def reset(self) -> None:
self._total_events = 0
self._escalated = 0
self._resources_shown = 0
self._first_event_at: Optional[datetime] = None
self._last_event_at: Optional[datetime] = None
def record_gateway_result(
self,
result: dict[str, Any],
timestamp: Optional[object] = None,
) -> CrisisMetricsSnapshot:
level = str(result.get("level") or "NONE").upper()
if level == "NONE":
return self.snapshot()
event_time = _coerce_timestamp(timestamp)
self._total_events += 1
if bool(result.get("escalate")):
self._escalated += 1
ui = result.get("ui") or {}
if any(bool(ui.get(key)) for key in ("provide_988", "show_crisis_panel", "show_overlay")):
self._resources_shown += 1
if self._first_event_at is None or event_time < self._first_event_at:
self._first_event_at = event_time
if self._last_event_at is None or event_time > self._last_event_at:
self._last_event_at = event_time
return self.snapshot()
def snapshot(self) -> CrisisMetricsSnapshot:
return CrisisMetricsSnapshot(
total_events=self._total_events,
escalated=self._escalated,
resources_shown=self._resources_shown,
start=_format_timestamp(self._first_event_at),
end=_format_timestamp(self._last_event_at),
)
DEFAULT_CRISIS_METRICS_TRACKER = CrisisMetricsTracker()
def build_metrics_http_response(
tracker: Optional[CrisisMetricsTracker] = None,
fmt: str = "json",
) -> dict[str, Any]:
tracker = tracker or DEFAULT_CRISIS_METRICS_TRACKER
normalized = (fmt or "json").strip().lower()
snapshot = tracker.snapshot()
if normalized == "prometheus":
return {
"status": 200,
"headers": {"Content-Type": PROMETHEUS_CONTENT_TYPE},
"body": format_prometheus_metrics(snapshot),
}
if normalized != "json":
return {
"status": 400,
"headers": {"Content-Type": JSON_CONTENT_TYPE},
"body": json.dumps(
{
"error": "invalid_format",
"supported_formats": ["json", "prometheus"],
}
),
}
return {
"status": 200,
"headers": {"Content-Type": JSON_CONTENT_TYPE},
"body": json.dumps(snapshot.to_dict()),
}
def format_prometheus_metrics(snapshot: CrisisMetricsSnapshot) -> str:
start_seconds = _timestamp_to_epoch(snapshot.start)
end_seconds = _timestamp_to_epoch(snapshot.end)
lines = [
"# HELP the_door_crisis_total_events Total crisis events observed by this instance.",
"# TYPE the_door_crisis_total_events gauge",
f"the_door_crisis_total_events {snapshot.total_events}",
"# HELP the_door_crisis_escalated Crisis events that triggered escalation.",
"# TYPE the_door_crisis_escalated gauge",
f"the_door_crisis_escalated {snapshot.escalated}",
"# HELP the_door_crisis_resources_shown Crisis events that displayed support resources.",
"# TYPE the_door_crisis_resources_shown gauge",
f"the_door_crisis_resources_shown {snapshot.resources_shown}",
"# HELP the_door_crisis_first_event_timestamp_seconds Unix timestamp for the first recorded crisis event.",
"# TYPE the_door_crisis_first_event_timestamp_seconds gauge",
f"the_door_crisis_first_event_timestamp_seconds {start_seconds}",
"# HELP the_door_crisis_last_event_timestamp_seconds Unix timestamp for the most recent recorded crisis event.",
"# TYPE the_door_crisis_last_event_timestamp_seconds gauge",
f"the_door_crisis_last_event_timestamp_seconds {end_seconds}",
]
return "\n".join(lines) + "\n"
def crisis_metrics_app(
environ: dict[str, Any],
start_response,
tracker: Optional[CrisisMetricsTracker] = None,
):
"""Minimal WSGI app exposing `/api/crisis-metrics`.
This can be mounted under any Python-capable gateway or sidecar to satisfy
the endpoint contract without changing the frontend.
"""
path = environ.get("PATH_INFO", "")
method = str(environ.get("REQUEST_METHOD", "GET")).upper()
if path != "/api/crisis-metrics":
body = json.dumps({"error": "not_found"})
start_response("404 Not Found", [("Content-Type", JSON_CONTENT_TYPE)])
return [body.encode("utf-8")]
if method != "GET":
body = json.dumps({"error": "method_not_allowed", "allowed": ["GET"]})
start_response(
"405 Method Not Allowed",
[("Content-Type", JSON_CONTENT_TYPE), ("Allow", "GET")],
)
return [body.encode("utf-8")]
fmt = _negotiate_format(
environ.get("QUERY_STRING", ""),
environ.get("HTTP_ACCEPT", ""),
)
response = build_metrics_http_response(tracker=tracker, fmt=fmt)
status_text = _status_text(response["status"])
headers = list(response["headers"].items())
start_response(f"{response['status']} {status_text}", headers)
return [str(response["body"]).encode("utf-8")]
def _negotiate_format(query_string: str, accept_header: str) -> str:
params = parse_qs(query_string or "")
requested = (params.get("format", [""])[0] or "").strip().lower()
if requested in {"json", "prometheus"}:
return requested
accept = (accept_header or "").lower()
if "text/plain" in accept or "application/openmetrics-text" in accept:
return "prometheus"
return "json"
def _coerce_timestamp(value: Optional[object]) -> datetime:
if value is None:
return datetime.now(UTC)
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
if isinstance(value, str):
text = value.strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
parsed = datetime.fromisoformat(text)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
raise TypeError(f"Unsupported timestamp type: {type(value)!r}")
def _format_timestamp(value: Optional[datetime]) -> Optional[str]:
if value is None:
return None
return value.astimezone(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _timestamp_to_epoch(value: Optional[str]) -> int:
if not value:
return 0
return int(_coerce_timestamp(value).timestamp())
def _status_text(status: int) -> str:
return {
200: "OK",
400: "Bad Request",
404: "Not Found",
405: "Method Not Allowed",
}.get(status, "OK")

259
crisis/session_tracker.py Normal file
View File

@@ -0,0 +1,259 @@
"""
Session-level crisis tracking and escalation for the-door (P0 #35).
Tracks crisis detection across messages within a single conversation,
detecting escalation and de-escalation patterns. Privacy-first: no
persistence beyond the conversation session.
Each message is analyzed in isolation by detect.py, but this module
maintains session state so the system can recognize patterns like:
- "I'm fine""I'm struggling""I can't go on" (rapid escalation)
- "I want to die""I'm calmer now""feeling better" (de-escalation)
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
# Feed each message's detection result
state = tracker.record(detect_crisis("I'm having a tough day"))
print(state.current_level) # "LOW"
print(state.is_escalating) # False
state = tracker.record(detect_crisis("I feel hopeless"))
print(state.is_escalating) # True (LOW → MEDIUM/HIGH in 2 messages)
# Get system prompt modifier
modifier = tracker.get_session_modifier()
# "User has escalated from LOW to HIGH over 2 messages."
# Reset for new session
tracker.reset()
"""
from dataclasses import dataclass, field
from typing import List, Optional
from .detect import CrisisDetectionResult, SCORES
# Level ordering for comparison (higher = more severe)
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class SessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[str] = field(default_factory=list)
is_escalating: bool = False
is_deescalating: bool = False
escalation_rate: float = 0.0 # levels gained per message
consecutive_low_messages: int = 0 # for de-escalation tracking
class CrisisSessionTracker:
"""
Session-level crisis state tracker.
Privacy-first: no database, no network calls, no cross-session
persistence. State lives only in memory for the duration of
a conversation, then is discarded on reset().
"""
# Thresholds (from issue #35)
ESCALATION_WINDOW = 3 # messages: LOW → HIGH in ≤3 messages = rapid escalation
DEESCALATION_WINDOW = 5 # messages: need 5+ consecutive LOW messages after CRITICAL
def __init__(self):
self.reset()
def reset(self):
"""Reset all session state. Call on new conversation."""
self._current_level = "NONE"
self._peak_level = "NONE"
self._message_count = 0
self._level_history: List[str] = []
self._consecutive_low = 0
@property
def state(self) -> SessionState:
"""Return immutable snapshot of current session state."""
is_escalating = self._detect_escalation()
is_deescalating = self._detect_deescalation()
rate = self._compute_escalation_rate()
return SessionState(
current_level=self._current_level,
peak_level=self._peak_level,
message_count=self._message_count,
level_history=list(self._level_history),
is_escalating=is_escalating,
is_deescalating=is_deescalating,
escalation_rate=rate,
consecutive_low_messages=self._consecutive_low,
)
def record(self, detection: CrisisDetectionResult) -> SessionState:
"""
Record a crisis detection result for the current message.
Returns updated SessionState.
"""
level = detection.level
self._message_count += 1
self._level_history.append(level)
# Update peak
if LEVEL_ORDER.get(level, 0) > LEVEL_ORDER.get(self._peak_level, 0):
self._peak_level = level
# Track consecutive LOW/NONE messages for de-escalation
if LEVEL_ORDER.get(level, 0) <= LEVEL_ORDER["LOW"]:
self._consecutive_low += 1
else:
self._consecutive_low = 0
self._current_level = level
return self.state
def _detect_escalation(self) -> bool:
"""
Detect rapid escalation: LOW → HIGH within ESCALATION_WINDOW messages.
Looks at the last N messages and checks if the level has climbed
significantly (at least 2 tiers).
"""
if len(self._level_history) < 2:
return False
window = self._level_history[-self.ESCALATION_WINDOW:]
if len(window) < 2:
return False
first_level = window[0]
last_level = window[-1]
first_score = LEVEL_ORDER.get(first_level, 0)
last_score = LEVEL_ORDER.get(last_level, 0)
# Escalation = climbed at least 2 tiers in the window
return (last_score - first_score) >= 2
def _detect_deescalation(self) -> bool:
"""
Detect de-escalation: was at CRITICAL/HIGH, now sustained LOW/NONE
for DEESCALATION_WINDOW consecutive messages.
"""
if LEVEL_ORDER.get(self._peak_level, 0) < LEVEL_ORDER["HIGH"]:
return False
return self._consecutive_low >= self.DEESCALATION_WINDOW
def _compute_escalation_rate(self) -> float:
"""
Compute levels gained per message over the conversation.
Positive = escalating, negative = de-escalating, 0 = stable.
"""
if self._message_count < 2:
return 0.0
first = LEVEL_ORDER.get(self._level_history[0], 0)
current = LEVEL_ORDER.get(self._current_level, 0)
return (current - first) / (self._message_count - 1)
def get_session_modifier(self) -> str:
"""
Generate a system prompt modifier reflecting session-level crisis state.
Returns empty string if no session context is relevant.
"""
if self._message_count < 2:
return ""
s = self.state
if s.is_escalating:
return (
f"User has escalated from {self._level_history[0]} to "
f"{s.current_level} over {s.message_count} messages. "
f"Peak crisis level this session: {s.peak_level}. "
"Respond with heightened awareness. The trajectory is "
"worsening — prioritize safety and connection."
)
if s.is_deescalating:
return (
f"User previously reached {s.peak_level} crisis level "
f"but has been at {s.current_level} or below for "
f"{s.consecutive_low_messages} consecutive messages. "
"The situation appears to be stabilizing. Continue "
"supportive engagement while remaining vigilant."
)
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
return (
f"User previously reached {s.peak_level} crisis level "
f"this session (currently {s.current_level}). "
"Continue with care and awareness of the earlier crisis."
)
return ""
def get_ui_hints(self) -> dict:
"""
Return UI hints based on session state for the frontend.
These are advisory — the frontend decides what to show.
"""
s = self.state
hints = {
"session_escalating": s.is_escalating,
"session_deescalating": s.is_deescalating,
"session_peak_level": s.peak_level,
"session_message_count": s.message_count,
}
if s.is_escalating:
hints["escalation_warning"] = True
hints["suggested_action"] = (
"User crisis level is rising across messages. "
"Consider increasing intervention level."
)
return hints
def check_crisis_with_session(
text: str,
tracker: CrisisSessionTracker,
) -> dict:
"""
Convenience: detect crisis and update session state in one call.
Returns combined single-message detection + session-level context.
"""
from .detect import detect_crisis
from .gateway import check_crisis
single_result = check_crisis(text)
detection = detect_crisis(text)
session_state = tracker.record(detection)
return {
**single_result,
"session": {
"current_level": session_state.current_level,
"peak_level": session_state.peak_level,
"message_count": session_state.message_count,
"is_escalating": session_state.is_escalating,
"is_deescalating": session_state.is_deescalating,
"modifier": tracker.get_session_modifier(),
"ui_hints": tracker.get_ui_hints(),
},
}

View File

@@ -1,31 +1,34 @@
"""
When a Man Is Dying — Despair/Suicide Detection System
DEPRECATED — Use crisis.detect instead.
Standalone detection module that parses incoming text for
despair and suicide indicators, classifies into tiers,
and returns structured response with recommended actions.
This module is a thin wrapper around crisis.detect for backward compatibility.
All unique patterns have been merged into crisis/detect.py (see issue #40).
Tiers:
LOW — General sadness, stress, difficult times
MEDIUM — Hopelessness, isolation, worthlessness
HIGH — Active despair, mentions of death, "can't go on"
CRITICAL — Imminent risk, explicit intent, plan, method
Integration:
Designed to work with crisis/ module from PR #4.
When crisis/ is available, uses it as the detection backend.
Falls back to internal detection when crisis/ is not present.
This module will be removed in a future release.
"""
import re
import json
import hashlib
import warnings
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict
import json
import hashlib
# Re-export the canonical detection
from crisis.detect import detect_crisis, CrisisDetectionResult
# Issue deprecation warning on import
warnings.warn(
"dying_detection is deprecated. Use 'from crisis.detect import detect_crisis' instead. "
"All patterns have been consolidated into crisis/detect.py. "
"See issue #40.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class DetectionResult:
"""Backward-compatible result type matching the old dying_detection API."""
level: str
indicators: List[str] = field(default_factory=list)
recommended_action: str = ""
@@ -34,110 +37,9 @@ class DetectionResult:
session_hash: str = ""
# ── Tiered indicator patterns ───────────────────────────────────
# Each pattern is a regex matched against lowercased input.
CRITICAL_PATTERNS = [
(r"\bkill\s*(my)?self\b", "explicit self-harm intent"),
(r"\bsuicid(?:al|ed|e)\b", "suicide reference"),
(r"\bend\s*(my)?\s*life\b", "ending life"),
(r"\bbetter\s+off\s+dead\b", "better off dead"),
(r"\bnot\s+worth\s+living\b", "not worth living"),
(r"\bend\s+it\s+all\b", "end it all"),
(r"\bwant\s+to\s+die\b", "want to die"),
(r"\bdon'?t\s+want\s+to\s+live\b", "don't want to live"),
(r"\bcan'?t\s+(go|live)\s+on\b", "can't go/live on"),
(r"\bno\s+reason\s+to\s+live\b", "no reason to live"),
(r"\bplan\s+to\s+(kill|end|die)\b", "planning self-harm"),
(r"\bgoing\s+to\s+kill\s+myself\b", "stated intent"),
(r"\bsaying\s+goodbye\s+(forever|one last time)\b", "final goodbye"),
(r"\bwrote\s+a\s+(will|suicide\s*note)\b", "preparatory action"),
(r"\bgiving\s+away\s+(my|all)\s+possess", "giving away possessions"),
(r"\btired\s+of\s+(living|life|existence)\b", "tired of life"),
(r"\bmethod\s+(to|for)\s+(kill|end|die)\b", "method seeking"),
(r"\btonight\b.*\b(die|kill|end|suicid)", "immediate timeframe + intent"),
(r"\b(die|kill|end)\b.*\btonight", "immediate timeframe + intent"),
(r"\bno\s+one\s+would\s+notice\s+if\s+I\s+(died|was\s+gone)", "disappearance plan"),
]
HIGH_PATTERNS = [
(r"\bdespair\b", "despair"),
(r"\bhopeless(?:ness)?\b", "hopelessness"),
(r"\bcan'?t\s+take\s+this\s+anymore\b", "can't take it"),
(r"\bdon'?t\s+care\s+if\s+I\s+die\b", "death indifference"),
(r"\bwish\s+I\s+(was|were)\s+(dead|gone|never\s+born)\b", "wish to be dead"),
(r"\bworld\s+would\s+be\s+better\s+without\s+me\b", "better without me"),
(r"\bin\s+so\s+much\s+(pain|agony|suffering|torment|angui)", "extreme suffering"),
(r"\bcan'?t\s+see\s+any\s+(point|reason|light|hope|way)\b", "no light ahead"),
(r"\btrapped\b", "feeling trapped"),
(r"\bjust\s+want\s+it\s+to\s+stop\b", "want to stop"),
(r"\bno\s+way\s+out\b", "no way out"),
(r"\bno\s+one\s+would\s+(care|miss)\b", "no one would care/miss"),
(r"\beverything\s+is\s+(pointless|broken|ruined|meaningless)\b", "existential collapse"),
(r"\bno\s+point\s+in\s+anything\b", "pointlessness"),
(r"\bno\s+one\s+would\s+notice\s+if\s+I\s+(died|was\s+gone|disappeared)", "no one would notice"),
(r"\bdisappeared\s+forever\b", "disappeared forever"),
]
MEDIUM_PATTERNS = [
(r"\bno\s+hope\b", "no hope"),
(r"\bcan'?t\s+go\s+on\b", "can't go on"),
(r"\bcan'?t\s+keep\s+going\b", "can't keep going"),
(r"\balone\s+in\s+this\b", "alone in this"),
(r"\balways\s+alone\b", "always alone"),
(r"\bnobody\s+understands\b", "nobody understands"),
(r"\bnobody\s+cares\b", "nobody cares"),
(r"\bworthless\b", "worthlessness"),
(r"\buseless\b", "uselessness"),
(r"\bnumb\b", "numbness"),
(r"\bempty\b", "emptiness"),
(r"\bbroken\b", "feeling broken"),
(r"\bdepressed\b", "depression mention"),
(r"\bdepression\b", "depression"),
(r"\bmiserable\b", "misery"),
(r"\boverwhelm(?:ed|ing)\b", "overwhelmed"),
(r"\bcannot\s+cope\b", "cannot cope"),
(r"\b(drowning|sinking)\b", "drowning/sinking"),
(r"\bforgotten\b", "feeling forgotten"),
(r"\blost\s+all\s+hope\b", "lost all hope"),
(r"\bno\s+future\b", "no future"),
(r"\bno\s+tomorrow\b", "no tomorrow"),
]
LOW_PATTERNS = [
(r"\bunhappy\b", "unhappy"),
(r"\brough\s+(day|week|patch)\b", "rough time"),
(r"\btough\s+(time|day|week)\b", "tough time"),
(r"\bstressed\b", "stressed"),
(r"\bburnout\b", "burnout"),
(r"\bfrustrated\b", "frustrated"),
(r"\bthings\s+(are\s+)?hard\b", "things are hard"),
(r"\bnot\s+feeling\s+(great|myself|good)\b", "not feeling good"),
(r"\bstruggl", "struggling"),
(r"\bdown\b", "feeling down"),
(r"\bsad\b", "sad"),
(r"\bupset\b", "upset"),
(r"\blonely\b", "lonely"),
(r"\banxious?\b", "anxious/anxiety"),
(r"\bnot\s+okay\b", "not okay"),
]
# ── Pattern collections for easy iteration ──────────────────────
TIER_PATTERNS: Dict[str, List[tuple]] = {
"CRITICAL": CRITICAL_PATTERNS,
"HIGH": HIGH_PATTERNS,
"MEDIUM": MEDIUM_PATTERNS,
"LOW": LOW_PATTERNS,
}
def detect(text: str) -> DetectionResult:
"""
Primary detection function.
If the crisis/ module is available, delegate to it.
Otherwise, use the internal pattern engine.
Primary detection function — delegates to crisis.detect.
Args:
text: User message to analyze
@@ -145,150 +47,25 @@ def detect(text: str) -> DetectionResult:
Returns:
DetectionResult with level, indicators, recommended_action, confidence
"""
# Try to use the crisis module if available
try:
from crisis.detect import detect_crisis as _crisis_detect
result = _crisis_detect(text)
# Map to DetectionResult
return DetectionResult(
level=result.level,
indicators=_describe_indicators(result.level, result.indicators),
recommended_action=result.recommended_action,
confidence=result.score,
session_hash=_hash_session(text),
)
except ImportError:
pass
result = detect_crisis(text)
return _detect_internal(text)
# Extract raw patterns from matches
raw_patterns = [m["pattern"] for m in result.matches] if result.matches else []
def _detect_internal(text: str) -> DetectionResult:
"""Internal pattern-based detection (fallback)."""
if not text or not text.strip():
return DetectionResult(level="NONE", session_hash=_hash_session(text))
text_lower = text.lower()
all_matches: Dict[str, List[tuple]] = {
tier: [] for tier in TIER_PATTERNS
}
for tier, patterns in TIER_PATTERNS.items():
for pattern, description in patterns:
if re.search(pattern, text_lower):
all_matches[tier].append((pattern, description))
# CRITICAL patterns are the highest priority
if all_matches["CRITICAL"]:
return DetectionResult(
level="CRITICAL",
indicators=[desc for _, desc in all_matches["CRITICAL"]],
recommended_action=(
"INTERRUPT CONVERSATION IMMEDIATELY. "
"Ask 'Are you safe right now?' "
"Provide 988 Suicide & Crisis Lifeline (call or text 988). "
"Provide Crisis Text Line (text HOME to 741741). "
"Stay present. Do not disconnect. "
"Redirect to professional help. "
"This is the most sacred moment."
),
raw_matched_patterns=[pat for pat, _ in all_matches["CRITICAL"]],
confidence=1.0,
session_hash=_hash_session(text),
)
if all_matches["HIGH"]:
return DetectionResult(
level="HIGH",
indicators=[desc for _, desc in all_matches["HIGH"]],
recommended_action=(
"Show crisis panel prominently. "
"Ask about safety directly. "
"Surface 988 number and crisis resources. "
"Stay present and engaged. Do not rush to fix. "
"Monitor closely for escalation."
),
raw_matched_patterns=[pat for pat, _ in all_matches["HIGH"]],
confidence=0.75,
session_hash=_hash_session(text),
)
if all_matches["MEDIUM"]:
return DetectionResult(
level="MEDIUM",
indicators=[desc for _, desc in all_matches["MEDIUM"]],
recommended_action=(
"Increase warmth and presence. "
"Subtly surface help resources. "
"Keep conversation anchored in the present. "
"Watch for escalation indicators."
),
raw_matched_patterns=[pat for pat, _ in all_matches["MEDIUM"]],
confidence=0.5,
session_hash=_hash_session(text),
)
if all_matches["LOW"]:
return DetectionResult(
level="LOW",
indicators=[desc for _, desc in all_matches["LOW"]],
recommended_action=(
"Normal conversation with warm undertone. "
"No crisis UI elements needed. "
"Remain vigilant. Be present."
),
raw_matched_patterns=[pat for pat, _ in all_matches["LOW"]],
confidence=0.25,
session_hash=_hash_session(text),
)
return DetectionResult(level="NONE", session_hash=_hash_session(text))
def _describe_indicators(level: str, patterns: list) -> list:
"""Map raw patterns to descriptions."""
descriptions = {
"CRITICAL": [],
"HIGH": [],
"MEDIUM": [],
"LOW": [],
}
for tier, items in TIER_PATTERNS.items():
for pat, desc in items:
if pat in patterns:
descriptions[tier].append(desc)
return descriptions.get(level, [])
def _hash_session(text: str) -> str:
"""Create a session hash for this text (for tracking repeated escalations)."""
return hashlib.sha256(text.encode()).hexdigest()[:12]
return DetectionResult(
level=result.level,
indicators=result.indicators,
recommended_action=result.recommended_action,
raw_matched_patterns=raw_patterns,
confidence=result.score,
session_hash=hashlib.sha256(text.encode()).hexdigest()[:12],
)
def get_action_for_level(level: str) -> str:
"""Get the recommended action string for a given level."""
actions = {
"CRITICAL": (
"INTERRUPT CONVERSATION. Ask 'Are you safe right now?' "
"Provide 988. Provide Crisis Text Line. "
"Stay present. Do not disconnect. "
"Redirect to help."
),
"HIGH": (
"Show crisis panel. Ask about safety. "
"Surface 988. Stay engaged."
),
"MEDIUM": (
"Increase warmth. Surface resources gently. "
"Anchor in present."
),
"LOW": (
"Normal conversation with warmth. "
"Remain vigilant."
),
"NONE": "No action needed.",
}
return actions.get(level, "Unknown level.")
from crisis.detect import ACTIONS
return ACTIONS.get(level, "Unknown level.")
def as_json(result: DetectionResult, indent: int = 2) -> str:

View File

@@ -808,6 +808,7 @@ Sovereignty and service always.`;
var crisisPanel = document.getElementById('crisis-panel');
var crisisOverlay = document.getElementById('crisis-overlay');
var overlayDismissBtn = document.getElementById('overlay-dismiss-btn');
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
@@ -983,12 +984,60 @@ Sovereignty and service always.`;
// ===== OVERLAY =====
// Focus trap: cycle through focusable elements within the crisis overlay
function getOverlayFocusableElements() {
return crisisOverlay.querySelectorAll(
'a[href], button:not([disabled]), [tabindex]:not([tabindex="-1"])'
);
}
function trapFocusInOverlay(e) {
if (!crisisOverlay.classList.contains('active')) return;
if (e.key !== 'Tab') return;
var focusable = getOverlayFocusableElements();
if (focusable.length === 0) return;
var first = focusable[0];
var last = focusable[focusable.length - 1];
if (e.shiftKey) {
// Shift+Tab: if on first, wrap to last
if (document.activeElement === first) {
e.preventDefault();
last.focus();
}
} else {
// Tab: if on last, wrap to first
if (document.activeElement === last) {
e.preventDefault();
first.focus();
}
}
}
// Store the element that had focus before the overlay opened
var _preOverlayFocusElement = null;
function showOverlay() {
// Save current focus for restoration on dismiss
_preOverlayFocusElement = document.activeElement;
crisisOverlay.classList.add('active');
overlayDismissBtn.disabled = true;
var countdown = 10;
overlayDismissBtn.textContent = 'Continue to chat (' + countdown + 's)';
// Disable background interaction via inert attribute
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.setAttribute('inert', '');
// Also hide from assistive tech
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.setAttribute('aria-hidden', 'true');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.setAttribute('aria-hidden', 'true');
if (overlayTimer) clearInterval(overlayTimer);
overlayTimer = setInterval(function() {
countdown--;
@@ -1002,9 +1051,13 @@ Sovereignty and service always.`;
}
}, 1000);
overlayDismissBtn.focus();
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
}
// Register focus trap on document (always listening, gated by class check)
document.addEventListener('keydown', trapFocusInOverlay);
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
crisisOverlay.classList.remove('active');
@@ -1012,7 +1065,22 @@ Sovereignty and service always.`;
clearInterval(overlayTimer);
overlayTimer = null;
}
msgInput.focus();
// Re-enable background interaction
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
// Restore focus to the element that had it before the overlay opened
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
});
@@ -1117,25 +1185,14 @@ Sovereignty and service always.`;
} catch (e) {}
}
safetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
});
}
closeSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
cancelSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
saveSafetyPlan.addEventListener('click', function() {
@@ -1149,12 +1206,101 @@ Sovereignty and service always.`;
try {
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
alert('Safety plan saved locally.');
} catch (e) {
alert('Error saving plan.');
}
});
// ===== SAFETY PLAN FOCUS TRAP (fix #65) =====
// Focusable elements inside the modal, in tab order
var _spFocusableIds = [
'close-safety-plan',
'sp-warning-signs',
'sp-coping',
'sp-distraction',
'sp-help',
'sp-environment',
'cancel-safety-plan',
'save-safety-plan'
];
var _spTriggerEl = null; // element that opened the modal
function _getSpFocusableEls() {
return _spFocusableIds
.map(function(id) { return document.getElementById(id); })
.filter(function(el) { return el && !el.disabled; });
}
function _trapSafetyPlanFocus(e) {
if (e.key !== 'Tab') return;
var els = _getSpFocusableEls();
if (!els.length) return;
var first = els[0];
var last = els[els.length - 1];
if (e.shiftKey) {
// Shift+Tab on first → wrap to last
if (document.activeElement === first) {
e.preventDefault();
last.focus();
}
} else {
// Tab on last → wrap to first
if (document.activeElement === last) {
e.preventDefault();
first.focus();
}
}
}
function _trapSafetyPlanEscape(e) {
if (e.key === 'Escape') {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
}
}
function _activateSafetyPlanFocusTrap(triggerEl) {
_spTriggerEl = triggerEl || document.activeElement;
// Focus first textarea
var firstInput = document.getElementById('sp-warning-signs');
if (firstInput) firstInput.focus();
// Add listeners
document.addEventListener('keydown', _trapSafetyPlanFocus);
document.addEventListener('keydown', _trapSafetyPlanEscape);
// Mark background inert (prevent click-through)
document.body.setAttribute('aria-hidden', 'true');
safetyPlanModal.removeAttribute('aria-hidden');
}
function _restoreSafetyPlanFocus() {
document.removeEventListener('keydown', _trapSafetyPlanFocus);
document.removeEventListener('keydown', _trapSafetyPlanEscape);
document.body.removeAttribute('aria-hidden');
if (_spTriggerEl && typeof _spTriggerEl.focus === 'function') {
_spTriggerEl.focus();
}
_spTriggerEl = null;
}
// Wire open buttons to activate focus trap
safetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(crisisSafetyPlanBtn);
});
}
// ===== TEXTAREA AUTO-RESIZE =====
msgInput.addEventListener('input', function() {
this.style.height = 'auto';
@@ -1300,6 +1446,7 @@ Sovereignty and service always.`;
if (urlParams.get('safetyplan') === 'true') {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
// Clean up URL
window.history.replaceState({}, document.title, window.location.pathname);
}

View File

@@ -1,5 +1,5 @@
[pytest]
testpaths = crisis
python_files = tests.py
testpaths = crisis tests
python_files = tests.py test_*.py
python_classes = Test*
python_functions = test_*

219
sw.js
View File

@@ -1,118 +1,153 @@
const CACHE_NAME = 'the-door-v2';
const ASSETS = [
const CACHE_NAME = 'the-door-v3';
const NAVIGATION_TIMEOUT_MS = 2500;
const OFFLINE_FALLBACK_PATH = '/crisis-offline.html';
const PRECACHE_ASSETS = [
'/',
'/index.html',
'/about',
'/manifest.json'
'/about.html',
'/manifest.json',
'/crisis-offline.html',
'/testimony.html'
];
// Crisis resources to show when everything fails
const CRISIS_OFFLINE_RESPONSE = `<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>You're Not Alone | The Door</title>
<style>
body{font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,sans-serif;background:#0d1117;color:#e6edf3;max-width:600px;margin:0 auto;padding:20px;line-height:1.6}
h1{color:#ff6b6b;font-size:1.5rem;margin-bottom:1rem}
.crisis-box{background:#1c1210;border:2px solid #c9362c;border-radius:12px;padding:20px;margin:20px 0;text-align:center}
.crisis-box a{display:inline-block;background:#c9362c;color:#fff;text-decoration:none;padding:16px 32px;border-radius:8px;font-weight:700;font-size:1.2rem;margin:10px 0}
.hope{color:#8b949e;font-style:italic;margin-top:30px;padding-top:20px;border-top:1px solid #30363d}
</style>
</head>
<body>
<h1>You are not alone.</h1>
<p>Your connection is down, but help is still available.</p>
<div class="crisis-box">
<p><strong>Call or text 988</strong><br>Suicide & Crisis Lifeline<br>Free, 24/7, Confidential</p>
<a href="tel:988">Call 988 Now</a>
<p style="margin-top:15px"><strong>Or text HOME to 741741</strong><br>Crisis Text Line</p>
</div>
<p><strong>When you're ready:</strong></p>
<ul>
<li>Take five deep breaths</li>
<li>Drink some water</li>
<li>Step outside if you can</li>
<li>Text or call someone you trust</li>
</ul>
<p class="hope">
"The Lord is close to the brokenhearted and saves those who are crushed in spirit." — Psalm 34:18
</p>
<p style="font-size:0.85rem;color:#6e7681;margin-top:30px">
This page was created by The Door — a crisis intervention project.<br>
Connection will restore automatically. You don't have to go through this alone.
</p>
</body>
</html>`;
function isSameOrigin(request) {
return new URL(request.url).origin === self.location.origin;
}
function canCache(response) {
return Boolean(response && response.ok && response.type !== 'opaque');
}
async function precache() {
const cache = await caches.open(CACHE_NAME);
await cache.addAll(PRECACHE_ASSETS);
}
async function cleanupOldCaches() {
const keys = await caches.keys();
await Promise.all(
keys
.filter((key) => key !== CACHE_NAME)
.map((key) => caches.delete(key))
);
}
async function putInCache(request, response) {
if (!isSameOrigin(request) || !canCache(response)) {
return response;
}
const cache = await caches.open(CACHE_NAME);
await cache.put(request, response.clone());
return response;
}
async function fetchWithTimeout(request, timeoutMs) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
return await fetch(request, { signal: controller.signal });
} finally {
clearTimeout(timeoutId);
}
}
async function offlineTextResponse() {
return new Response('Offline. Call 988 or text HOME to 741741 for immediate help.', {
status: 503,
statusText: 'Service Unavailable',
headers: new Headers({ 'Content-Type': 'text/plain; charset=utf-8' })
});
}
async function handleNavigation(request) {
const cache = await caches.open(CACHE_NAME);
const cachedPage = await cache.match(request);
const offlineFallback = await cache.match(OFFLINE_FALLBACK_PATH);
try {
const response = await fetchWithTimeout(request, NAVIGATION_TIMEOUT_MS);
return await putInCache(request, response);
} catch (error) {
if (cachedPage) {
return cachedPage;
}
if (offlineFallback) {
return offlineFallback;
}
return offlineTextResponse();
}
}
async function handleStaticRequest(request) {
const cache = await caches.open(CACHE_NAME);
const cached = await cache.match(request);
if (cached) {
fetch(request)
.then((response) => putInCache(request, response))
.catch(() => null);
return cached;
}
try {
const response = await fetch(request);
return await putInCache(request, response);
} catch (error) {
return offlineTextResponse();
}
}
async function handleOtherRequest(request) {
try {
const response = await fetch(request);
return await putInCache(request, response);
} catch (error) {
const cached = await caches.match(request);
if (cached) {
return cached;
}
return offlineTextResponse();
}
}
// Install event - cache core assets
self.addEventListener('install', (event) => {
event.waitUntil(
caches.open(CACHE_NAME).then((cache) => {
return cache.addAll(ASSETS);
})
precache().then(() => self.skipWaiting())
);
self.skipWaiting();
});
// Activate event - cleanup old caches
self.addEventListener('activate', (event) => {
event.waitUntil(
caches.keys().then((keys) => {
return Promise.all(
keys.filter((key) => key !== CACHE_NAME).map((key) => caches.delete(key))
);
})
cleanupOldCaches().then(() => self.clients.claim())
);
self.clients.claim();
});
// Fetch event - network first, fallback to cache for static,
// but for the crisis front door, we want to ensure the shell is ALWAYS available.
self.addEventListener('fetch', (event) => {
const url = new URL(event.request.url);
const request = event.request;
const url = new URL(request.url);
// Skip API calls - they should always go to network
if (url.pathname.startsWith('/api/')) {
if (request.method !== 'GET') {
return;
}
// Skip non-GET requests
if (event.request.method !== 'GET') {
if (!isSameOrigin(request) || url.pathname.startsWith('/api/')) {
return;
}
event.respondWith(
fetch(event.request)
.then((response) => {
// If we got a valid response, cache it for next time
if (response.ok && ASSETS.includes(url.pathname)) {
const copy = response.clone();
caches.open(CACHE_NAME).then((cache) => cache.put(event.request, copy));
}
return response;
})
.catch(() => {
// If network fails, try cache
return caches.match(event.request).then((cached) => {
if (cached) return cached;
if (event.request.mode === 'navigate') {
event.respondWith(handleNavigation(request));
return;
}
// If it's a navigation request and we're offline, show offline crisis page
if (event.request.mode === 'navigate') {
return new Response(CRISIS_OFFLINE_RESPONSE, {
status: 200,
headers: new Headers({ 'Content-Type': 'text/html' })
});
}
if (PRECACHE_ASSETS.includes(url.pathname)) {
event.respondWith(handleStaticRequest(request));
return;
}
// For other requests, return a simple offline message
return new Response('Offline. Call 988 for immediate help.', {
status: 503,
statusText: 'Service Unavailable',
headers: new Headers({ 'Content-Type': 'text/plain' })
});
});
})
);
event.respondWith(handleOtherRequest(request));
});

View File

@@ -0,0 +1,84 @@
<!-- Test: Safety plan modal focus trap (issue #65) -->
<!-- Open this file in a browser to manually verify focus trap behavior -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Focus Trap Test</title>
<style>
body { font-family: sans-serif; padding: 20px; }
.test { margin: 10px 0; padding: 10px; border: 1px solid #ccc; }
.pass { background: #d4edda; border-color: #28a745; }
.fail { background: #f8d7da; border-color: #dc3545; }
button { margin: 5px; padding: 8px 16px; }
</style>
</head>
<body>
<h1>Focus Trap Manual Test</h1>
<p>Open <code>index.html</code> in a browser, then run these checks:</p>
<div class="test" id="test-1">
<strong>Test 1: Tab wraps to first element</strong><br>
1. Open safety plan modal<br>
2. Tab through all elements until you reach "Save Plan"<br>
3. Press Tab again → should wrap to close button (X)
</div>
<div class="test" id="test-2">
<strong>Test 2: Shift+Tab wraps to last element</strong><br>
1. Open safety plan modal<br>
2. Focus is on "Warning signs" textarea<br>
3. Press Shift+Tab → should wrap to "Save Plan" button
</div>
<div class="test" id="test-3">
<strong>Test 3: Escape closes modal</strong><br>
1. Open safety plan modal<br>
2. Press Escape → modal closes<br>
3. Focus returns to the button that opened it
</div>
<div class="test" id="test-4">
<strong>Test 4: Background not reachable</strong><br>
1. Open safety plan modal<br>
2. Try to Tab to the chat input behind the modal<br>
3. Should NOT be able to reach it
</div>
<div class="test" id="test-5">
<strong>Test 5: Click buttons close + restore focus</strong><br>
1. Open modal via "my safety plan" button<br>
2. Click Cancel → modal closes, focus on "my safety plan" button<br>
3. Open again, click Save → same behavior<br>
4. Open again, click X → same behavior
</div>
<hr>
<h2>Automated checks (paste into DevTools console on index.html):</h2>
<pre><code>
// Test focus trap
var modal = document.getElementById('safety-plan-modal');
var openBtn = document.getElementById('safety-plan-btn');
openBtn.click();
console.assert(modal.classList.contains('active'), 'Modal should be open');
var lastEl = document.getElementById('save-safety-plan');
lastEl.focus();
var evt = new KeyboardEvent('keydown', {key: 'Tab', bubbles: true});
document.dispatchEvent(evt);
// After Tab from last, focus should wrap to first
var firstEl = document.getElementById('close-safety-plan');
console.log('Focus after wrap:', document.activeElement.id);
console.assert(document.activeElement === firstEl || document.activeElement.id === 'sp-warning-signs',
'Focus should wrap to first element');
// Test Escape
var escEvt = new KeyboardEvent('keydown', {key: 'Escape', bubbles: true});
document.dispatchEvent(escEvt);
console.assert(!modal.classList.contains('active'), 'Modal should close on Escape');
console.assert(document.activeElement === openBtn, 'Focus should return to open button');
console.log('All automated checks passed!');
</code></pre>
</body>
</html>

View File

@@ -1,106 +0,0 @@
"""Tests for crisis metrics (#37)."""
import json
import os
import tempfile
import time
import unittest
from crisis.metrics import CrisisMetrics
class TestCrisisMetricsRecording(unittest.TestCase):
def setUp(self):
self.path = tempfile.mktemp(suffix='.jsonl')
self.m = CrisisMetrics(self.path)
def tearDown(self):
self.m.close()
if os.path.exists(self.path):
os.remove(self.path)
def test_record_non_crisis_yields_none_level(self):
self.m.record('NONE', ['*'])
d = self.m.snapshot()
self.assertEqual(d['detections_per_level']['NONE'], 1)
self.assertEqual(d['total_detections'], 1)
def test_record_critical(self):
self.m.record('CRITICAL', [r'\bkill\s*myself\b'])
d = self.m.snapshot()
self.assertEqual(d['detections_per_level']['CRITICAL'], 1)
def test_record_multiple_levels(self):
self.m.record('LOW', [r'\bunhappy\b'])
self.m.record('CRITICAL', [r'\bend\s*my\s*life\b'])
self.m.record('MEDIUM', [r'\bworthless\b', r'\bempty\b'])
d = self.m.snapshot()
self.assertEqual(d['detections_per_level']['LOW'], 1)
self.assertEqual(d['detections_per_level']['CRITICAL'], 1)
self.assertEqual(d['detections_per_level']['MEDIUM'], 1)
self.assertEqual(d['total_detections'], 3)
def test_keywords_normalized_to_lowercase(self):
self.m.record('LOW', ['\\bUNHAPPY\\b'])
d = self.m.snapshot()
self.assertEqual(d['keywords_frequency']['\\bunhappy\\b'], 1)
class TestCrisisMetricsWeekly(unittest.TestCase):
def setUp(self):
self.path = tempfile.mktemp(suffix='.jsonl')
self.m = CrisisMetrics(self.path)
def tearDown(self):
self.m.close()
if os.path.exists(self.path):
os.remove(self.path)
def test_weekly_keys(self):
report = self.m.weekly()
for key in ['week_start', 'week_end', 'detections', 'detections_per_level',
'top_keywords', 'false_positive_estimate']:
self.assertIn(key, report, f'missing weekly key {key}')
class TestCrisisMetricsPrivacy(unittest.TestCase):
def setUp(self):
self.path = tempfile.mktemp(suffix='.jsonl')
self.m = CrisisMetrics(self.path)
def tearDown(self):
self.m.close()
if os.path.exists(self.path):
os.remove(self.path)
def test_jsonl_no_pii(self):
self.m.record('HIGH', [r'\bno\s+way\s+out\b'])
self.m.close()
raw = open(self.path).read()
self.assertNotIn('Kill myself', raw) # original message not stored
for word in ['user_id', 'username', 'message']:
self.assertNotIn(word, raw)
class TestCrisisMetricsIntegratesWithDetect(unittest.TestCase):
"""Wire metrics into detect_crisis via a wrapper."""
def test_metrics_wraps_detect_crisis(self):
from crisis.detect import detect_crisis
path = tempfile.mktemp(suffix='.jsonl')
m = CrisisMetrics(path)
try:
result = detect_crisis('I feel happy and great')
m.record(result.level, result.indicators)
result2 = detect_crisis('I want to kill myself')
m.record(result2.level, result2.indicators)
snap = m.snapshot()
self.assertEqual(snap['detections_per_level']['NONE'], 1)
self.assertEqual(snap['detections_per_level']['CRITICAL'], 1)
finally:
m.close()
if os.path.exists(path):
os.remove(path)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,118 @@
"""Tests for crisis detection metrics endpoint contract (issue #97)."""
import json
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.gateway import check_crisis, check_crisis_and_record
from crisis.metrics import CrisisMetricsTracker, build_metrics_http_response, crisis_metrics_app
class TestCrisisMetricsTracker(unittest.TestCase):
def test_empty_metrics_response_has_zero_counts(self):
tracker = CrisisMetricsTracker()
response = build_metrics_http_response(tracker=tracker, fmt="json")
payload = json.loads(response["body"])
self.assertEqual(response["status"], 200)
self.assertEqual(payload["total_events"], 0)
self.assertEqual(payload["escalated"], 0)
self.assertEqual(payload["resources_shown"], 0)
self.assertEqual(payload["timestamp_range"], {"start": None, "end": None})
self.assertNotIn("timmy_message", payload)
self.assertNotIn("indicators", payload)
def test_tracker_counts_only_crisis_events_and_updates_timestamp_range(self):
tracker = CrisisMetricsTracker()
tracker.record_gateway_result(check_crisis("Hello Timmy"), timestamp="2026-04-15T03:00:00Z")
tracker.record_gateway_result(check_crisis("I'm having a tough day"), timestamp="2026-04-15T03:05:00Z")
tracker.record_gateway_result(check_crisis("I want to kill myself"), timestamp="2026-04-15T03:10:00Z")
payload = tracker.snapshot().to_dict()
self.assertEqual(payload["total_events"], 2)
self.assertEqual(payload["escalated"], 1)
self.assertEqual(payload["resources_shown"], 1)
self.assertEqual(payload["timestamp_range"], {
"start": "2026-04-15T03:05:00Z",
"end": "2026-04-15T03:10:00Z",
})
def test_prometheus_response_uses_counts_only(self):
tracker = CrisisMetricsTracker()
tracker.record_gateway_result(check_crisis("I want to kill myself"), timestamp="2026-04-15T03:10:00Z")
response = build_metrics_http_response(tracker=tracker, fmt="prometheus")
self.assertEqual(response["status"], 200)
self.assertIn("text/plain", response["headers"]["Content-Type"])
self.assertIn("the_door_crisis_total_events 1", response["body"])
self.assertIn("the_door_crisis_escalated 1", response["body"])
self.assertIn("the_door_crisis_resources_shown 1", response["body"])
self.assertNotIn("kill myself", response["body"])
self.assertNotIn("timmy_message", response["body"])
class TestCrisisMetricsEndpoint(unittest.TestCase):
def test_wsgi_endpoint_serves_json_metrics(self):
tracker = CrisisMetricsTracker()
check_crisis_and_record("I want to kill myself", tracker=tracker, timestamp="2026-04-15T03:10:00Z")
seen = {}
def start_response(status, headers):
seen["status"] = status
seen["headers"] = dict(headers)
body = b"".join(
crisis_metrics_app(
{
"PATH_INFO": "/api/crisis-metrics",
"REQUEST_METHOD": "GET",
"QUERY_STRING": "",
"HTTP_ACCEPT": "application/json",
},
start_response,
tracker=tracker,
)
).decode("utf-8")
payload = json.loads(body)
self.assertEqual(seen["status"], "200 OK")
self.assertEqual(seen["headers"]["Content-Type"], "application/json")
self.assertEqual(payload["total_events"], 1)
self.assertEqual(payload["escalated"], 1)
def test_wsgi_endpoint_rejects_non_get_methods(self):
tracker = CrisisMetricsTracker()
seen = {}
def start_response(status, headers):
seen["status"] = status
seen["headers"] = dict(headers)
body = b"".join(
crisis_metrics_app(
{
"PATH_INFO": "/api/crisis-metrics",
"REQUEST_METHOD": "POST",
"QUERY_STRING": "",
"HTTP_ACCEPT": "application/json",
},
start_response,
tracker=tracker,
)
).decode("utf-8")
self.assertEqual(seen["status"], "405 Method Not Allowed")
self.assertIn("GET", seen["headers"]["Allow"])
self.assertIn("method_not_allowed", body)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,85 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestCrisisOverlayFocusTrap(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_overlay_registers_tab_key_focus_trap(self):
self.assertRegex(
self.html,
r"function\s+trapFocusInOverlay\s*\(e\)",
'Expected crisis overlay focus trap handler to exist.',
)
self.assertRegex(
self.html,
r"if\s*\(e\.key\s*!==\s*'Tab'\)\s*return;",
'Expected focus trap handler to guard on Tab key events.',
)
self.assertRegex(
self.html,
r"document\.addEventListener\('keydown',\s*trapFocusInOverlay\)",
'Expected overlay focus trap to register on document keydown.',
)
def test_overlay_disables_background_interaction(self):
self.assertRegex(
self.html,
r"mainApp\.setAttribute\('inert',\s*''\)",
'Expected overlay to set inert on the main app while active.',
)
self.assertRegex(
self.html,
r"mainApp\.removeAttribute\('inert'\)",
'Expected overlay dismissal to remove inert from the main app.',
)
def test_overlay_restores_focus_after_dismiss(self):
self.assertRegex(
self.html,
r"_preOverlayFocusElement\s*=\s*document\.activeElement",
'Expected overlay to remember the pre-overlay focus target.',
)
self.assertRegex(
self.html,
r"_preOverlayFocusElement\.focus\(\)",
'Expected overlay dismissal to restore focus to the prior target.',
)
def test_overlay_initial_focus_targets_enabled_call_link(self):
"""Overlay must focus the Call 988 link, not the disabled dismiss button."""
# Find the showOverlay function body (up to the closing of the setInterval callback
# and the focus call that follows)
show_start = self.html.find('function showOverlay()')
self.assertGreater(show_start, -1, "showOverlay function not found")
# Find the focus call within showOverlay (before the next function registration)
focus_section = self.html[show_start:show_start + 2000]
self.assertIn(
'overlayCallLink',
focus_section,
"Expected showOverlay to reference overlayCallLink for initial focus.",
)
# Ensure the old buggy pattern is gone
focus_line_region = self.html[show_start + 800:show_start + 1200]
self.assertNotIn(
'overlayDismissBtn.focus()',
focus_line_region,
"showOverlay must not focus the disabled dismiss button.",
)
def test_overlay_call_link_variable_is_declared(self):
self.assertIn(
"querySelector('.overlay-call')",
self.html,
"Expected a JS reference to the .overlay-call link element.",
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,44 @@
import importlib
import sys
import unittest
import warnings
from crisis.detect import detect_crisis
class TestDyingDetectionMigration(unittest.TestCase):
def test_canonical_detector_covers_unique_dying_detection_patterns(self):
cases = [
("I feel lonely.", "LOW"),
("I've lost all hope and see no tomorrow.", "MEDIUM"),
("What if I disappeared forever?", "HIGH"),
]
for text, expected_level in cases:
with self.subTest(text=text):
result = detect_crisis(text)
self.assertEqual(result.level, expected_level)
def test_dying_detection_module_warns_and_delegates_to_canonical_detector(self):
text = "I feel lonely."
sys.modules.pop("dying_detection", None)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", DeprecationWarning)
module = importlib.import_module("dying_detection")
self.assertTrue(
any(issubclass(w.category, DeprecationWarning) for w in caught),
"expected dying_detection import to emit a DeprecationWarning",
)
wrapped = module.detect(text)
canonical = detect_crisis(text)
self.assertEqual(wrapped.level, canonical.level)
self.assertEqual(wrapped.confidence, canonical.score)
self.assertEqual(wrapped.raw_matched_patterns, [m["pattern"] for m in canonical.matches])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,71 @@
import pathlib
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
SERVICE_WORKER = (ROOT / 'sw.js').read_text(encoding='utf-8')
CRISIS_OFFLINE_PAGE = ROOT / 'crisis-offline.html'
MAKEFILE = (ROOT / 'Makefile').read_text(encoding='utf-8')
class TestServiceWorkerOffline(unittest.TestCase):
def test_crisis_offline_page_exists(self):
self.assertTrue(CRISIS_OFFLINE_PAGE.exists(), 'crisis-offline.html should exist')
def test_service_worker_precaches_crisis_offline_page(self):
self.assertIn('/crisis-offline.html', SERVICE_WORKER)
def test_service_worker_has_navigation_timeout_for_intermittent_connections(self):
self.assertIn('NAVIGATION_TIMEOUT_MS', SERVICE_WORKER)
self.assertIn('AbortController', SERVICE_WORKER)
def test_service_worker_uses_crisis_offline_fallback_for_navigation(self):
self.assertIn("event.request.mode === 'navigate'", SERVICE_WORKER)
self.assertIn("/crisis-offline.html", SERVICE_WORKER)
def test_make_push_includes_crisis_offline_page(self):
self.assertIn('crisis-offline.html', MAKEFILE)
class TestCrisisOfflinePage(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = CRISIS_OFFLINE_PAGE.read_text(encoding='utf-8') if CRISIS_OFFLINE_PAGE.exists() else ''
cls.lower_html = cls.html.lower()
def test_has_clickable_988_link(self):
self.assertIn('href="tel:988"', self.html)
def test_has_crisis_text_line(self):
self.assertIn('Crisis Text Line', self.html)
self.assertIn('741741', self.html)
def test_has_grounding_techniques(self):
required_phrases = [
'5 things you can see',
'4 things you can feel',
'3 things you can hear',
'2 things you can smell',
'1 thing you can taste',
]
for phrase in required_phrases:
self.assertIn(phrase, self.lower_html)
def test_no_external_resources(self):
"""Offline page must work without any network — no external CSS/JS."""
import re
html = self.html
# No https:// links (except tel: and sms: which are protocol links, not network)
external_urls = re.findall(r'href=["\']https://|src=["\']https://', html)
self.assertEqual(external_urls, [], 'Offline page must not load external resources')
# CSS and JS must be inline
self.assertIn('<style>', html, 'CSS must be inline')
self.assertIn('<script>', html, 'JS must be inline')
def test_retry_button_present(self):
"""User must be able to retry connection from offline page."""
self.assertIn('retry-connection', self.html)
self.assertIn('Retry connection', self.html)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,277 @@
"""
Tests for crisis session tracking and escalation (P0 #35).
Covers: session_tracker.py
Run with: python -m pytest tests/test_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
from crisis.session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
class TestSessionState(unittest.TestCase):
"""Test SessionState defaults."""
def test_default_state(self):
s = SessionState()
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.is_escalating)
self.assertFalse(s.is_deescalating)
class TestSessionTracking(unittest.TestCase):
"""Test basic session state tracking."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_record_none_message(self):
state = self.tracker.record(detect_crisis("Hello Timmy"))
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.message_count, 1)
self.assertEqual(state.peak_level, "NONE")
def test_record_low_message(self):
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("Having a rough day"))
self.assertIn(state.current_level, ("LOW", "NONE"))
self.assertEqual(state.message_count, 2)
def test_record_critical_updates_peak(self):
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to kill myself"))
self.assertEqual(state.current_level, "CRITICAL")
self.assertEqual(state.peak_level, "CRITICAL")
def test_peak_preserved_after_drop(self):
"""Peak level should stay at the highest seen, even after de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
state = self.tracker.record(detect_crisis("I'm feeling a bit better"))
self.assertEqual(state.peak_level, "CRITICAL")
def test_level_history(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to die"))
self.assertEqual(len(state.level_history), 3)
self.assertEqual(state.level_history[0], "NONE")
self.assertEqual(state.level_history[2], "CRITICAL")
def test_reset_clears_state(self):
self.tracker.record(detect_crisis("I want to kill myself"))
self.tracker.reset()
state = self.tracker.state
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.peak_level, "NONE")
self.assertEqual(state.message_count, 0)
self.assertEqual(state.level_history, [])
class TestEscalationDetection(unittest.TestCase):
"""Test escalation detection: LOW → HIGH in ≤3 messages."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_escalation_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_escalating)
def test_no_escalation_stable(self):
"""Two normal messages should not trigger escalation."""
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("How are you?"))
self.assertFalse(state.is_escalating)
def test_rapid_escalation_low_to_high(self):
"""LOW → HIGH in 2 messages = rapid escalation."""
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I can't take this anymore, everything is pointless"))
# Depending on detection, this could be HIGH or CRITICAL
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_rapid_escalation_three_messages(self):
"""NONE → LOW → HIGH in 3 messages = escalation."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_escalation_rate(self):
"""Rate should be positive when escalating."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
state = self.tracker.state
self.assertGreater(state.escalation_rate, 0)
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: sustained LOW after HIGH/CRITICAL."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_deescalation_without_prior_crisis(self):
"""No de-escalation if never reached HIGH/CRITICAL."""
for _ in range(6):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_deescalating)
def test_deescalation_after_critical(self):
"""5+ consecutive LOW/NONE messages after CRITICAL = de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm doing better today"))
state = self.tracker.state
if state.peak_level == "CRITICAL":
self.assertTrue(state.is_deescalating)
def test_deescalation_after_high(self):
"""5+ consecutive LOW/NONE messages after HIGH = de-escalation."""
self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
for _ in range(5):
self.tracker.record(detect_crisis("Feeling okay"))
state = self.tracker.state
if state.peak_level == "HIGH":
self.assertTrue(state.is_deescalating)
def test_interrupted_deescalation(self):
"""De-escalation resets if a HIGH message interrupts."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(3):
self.tracker.record(detect_crisis("Doing better"))
# Interrupt with another crisis
self.tracker.record(detect_crisis("I feel hopeless again"))
self.tracker.record(detect_crisis("Feeling okay now"))
state = self.tracker.state
# Should NOT be de-escalating yet (counter reset)
self.assertFalse(state.is_deescalating)
class TestSessionModifier(unittest.TestCase):
"""Test system prompt modifier generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_modifier_for_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_no_modifier_for_stable_session(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Good morning"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_escalation_modifier(self):
"""Escalating session should produce a modifier."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_escalating:
self.assertIn("escalated", modifier.lower())
self.assertIn("NONE", modifier)
self.assertIn("CRITICAL", modifier)
def test_deescalation_modifier(self):
"""De-escalating session should mention stabilizing."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm feeling okay"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_deescalating:
self.assertIn("stabilizing", modifier.lower())
def test_prior_crisis_modifier(self):
"""Past crisis should be noted even without active escalation."""
self.tracker.record(detect_crisis("I want to die"))
self.tracker.record(detect_crisis("Feeling a bit better"))
modifier = self.tracker.get_session_modifier()
# Should note the prior CRITICAL
if modifier:
self.assertIn("CRITICAL", modifier)
class TestUIHints(unittest.TestCase):
"""Test UI hint generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_ui_hints_structure(self):
self.tracker.record(detect_crisis("Hello"))
hints = self.tracker.get_ui_hints()
self.assertIn("session_escalating", hints)
self.assertIn("session_deescalating", hints)
self.assertIn("session_peak_level", hints)
self.assertIn("session_message_count", hints)
def test_ui_hints_escalation_warning(self):
"""Escalating session should have warning hint."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
hints = self.tracker.get_ui_hints()
if hints["session_escalating"]:
self.assertTrue(hints.get("escalation_warning"))
self.assertIn("suggested_action", hints)
class TestCheckCrisisWithSession(unittest.TestCase):
"""Test the convenience function combining detection + session tracking."""
def test_returns_combined_data(self):
tracker = CrisisSessionTracker()
result = check_crisis_with_session("I want to die", tracker)
self.assertIn("level", result)
self.assertIn("session", result)
self.assertIn("current_level", result["session"])
self.assertIn("peak_level", result["session"])
self.assertIn("modifier", result["session"])
def test_session_updates_across_calls(self):
tracker = CrisisSessionTracker()
check_crisis_with_session("Hello", tracker)
result = check_crisis_with_session("I want to die", tracker)
self.assertEqual(result["session"]["message_count"], 2)
self.assertEqual(result["session"]["peak_level"], "CRITICAL")
class TestPrivacy(unittest.TestCase):
"""Verify privacy-first design principles."""
def test_no_persistence_mechanism(self):
"""Session tracker should have no database, file, or network calls."""
import inspect
source = inspect.getsource(CrisisSessionTracker)
# Should not import database, requests, or file I/O
forbidden = ["sqlite", "requests", "urllib", "open(", "httpx", "aiohttp"]
for word in forbidden:
self.assertNotIn(word, source.lower(),
f"Session tracker should not use {word} — privacy-first design")
def test_state_contained_in_memory(self):
"""All state should be instance attributes, not module-level."""
tracker = CrisisSessionTracker()
tracker.record(detect_crisis("I want to die"))
# New tracker should have clean state (no global contamination)
fresh = CrisisSessionTracker()
self.assertEqual(fresh.state.current_level, "NONE")
if __name__ == '__main__':
unittest.main()