Compare commits

...

54 Commits

Author SHA1 Message Date
Alexander Whitestone
15c31a56a7 feat: wire crisis synthesizer into responder pipeline for #121
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 10s
Smoke Test / smoke (pull_request) Successful in 19s
2026-04-22 02:00:18 -04:00
Alexander Whitestone
57cfbb3940 test: define crisis synthesizer integration for #121 2026-04-22 02:00:06 -04:00
30e645c8f3 Merge PR #191: feat: add crisis tracker/bridge compatibility contract for hermes-agent (#141)
All checks were successful
Smoke Test / smoke (push) Successful in 17s
Merged by automated sweep after diff review and verification. PR #191: feat: add crisis tracker/bridge compatibility contract for hermes-agent (#141)
2026-04-22 02:38:16 +00:00
586939e756 Merge PR #183: fix: remove duplicate crisis indicators across tiers (#123)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #183: fix: remove duplicate crisis indicators across tiers (#123)
2026-04-22 02:37:58 +00:00
cd7f2ae467 Merge PR #168: feat: crisis detection metrics with CLI (closes #136)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #168: feat: crisis detection metrics with CLI (closes #136)
2026-04-22 02:37:39 +00:00
68c9cfd596 Merge PR #185: fix: rate-limit crisis overlay display (#100)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #185: fix: rate-limit crisis overlay display (#100)
2026-04-22 02:37:33 +00:00
4ad48a437c Merge PR #166: fix: add chat-header safety plan access (#38)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #166: fix: add chat-header safety plan access (#38)
2026-04-22 02:37:27 +00:00
4f040158e4 Merge PR #187: feat: add image screening slice for #130
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #187: feat: add image screening slice for #130
2026-04-22 02:37:17 +00:00
e1bf155b5e Merge PR #165: docs: audit fleet work orders issue #75
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #165: docs: audit fleet work orders issue #75
2026-04-22 02:37:12 +00:00
a8c59ca8a8 Merge PR #181: feat: build crisis synthesizer (#36)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #181: feat: build crisis synthesizer (#36)
2026-04-22 02:37:06 +00:00
fcf0ee64b5 Merge PR #178: docs: verify #41 already implemented
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #178: docs: verify #41 already implemented
2026-04-22 02:36:57 +00:00
30f7738429 Merge pull request 'fix: complete crisis overlay keyboard navigation (#95)' (#175) from fix/95 into main
All checks were successful
Smoke Test / smoke (push) Successful in 8s
2026-04-21 12:09:35 +00:00
Alexander Whitestone
f007808228 feat: add crisis tracker and bridge import shims for hermes-agent (#141)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 13s
Smoke Test / smoke (pull_request) Successful in 30s
2026-04-21 02:13:37 -04:00
Alexander Whitestone
c494bba728 wip: add crisis integration contract regression for #141 2026-04-21 02:11:17 -04:00
Timmy
100cc743c0 feat: add image screening slice for #130
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 10s
2026-04-20 21:34:10 -04:00
Timmy
f7d99c6d9c test: define image crisis screening slice for #130 2026-04-20 21:32:00 -04:00
Alexander Whitestone
e132fc134b fix: rate-limit crisis overlay display (#100)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Successful in 6s
2026-04-20 19:39:26 -04:00
Alexander Whitestone
8b658e76f3 wip: add crisis overlay rate-limit regression test 2026-04-20 19:35:53 -04:00
Alexander Whitestone
1b4a70d5fe fix: remove duplicate crisis indicators across tiers (#123)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 13s
2026-04-20 19:17:59 -04:00
Alexander Whitestone
bc02ca54ec wip: add crisis indicator dedup regression test 2026-04-20 19:15:52 -04:00
Alexander Whitestone
9f2038659c feat: build crisis synthesizer (#36)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 6s
2026-04-17 02:36:30 -04:00
Alexander Whitestone
d5ae0172b3 wip: add crisis synthesizer regression tests 2026-04-17 02:36:30 -04:00
Alexander Whitestone
51a197ae14 docs: verify #41 already implemented
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 6s
Smoke Test / smoke (pull_request) Successful in 14s
2026-04-17 02:06:21 -04:00
Alexander Whitestone
544dbba8cc fix: complete crisis overlay keyboard navigation (#95)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 8s
Smoke Test / smoke (pull_request) Successful in 13s
2026-04-17 01:52:32 -04:00
d412939b4f fix: footer /about link to point to static about.html
Fixes #59

The footer links to /about but the repo ships about.html. On a plain static server this results in a 404. Changed to /about.html so the link resolves correctly.
2026-04-17 05:37:40 +00:00
Alexander Whitestone
6e03492147 feat: CLI command to view crisis metrics summary (#136)
crisis/metrics.py:
  CrisisMetrics class — aggregate crisis detection metrics
  Privacy-first: stores only counts, never user content
  Daily JSONL files in ~/.the-door/metrics/
  get_summary(days) → AggregateMetrics
  get_report(days) → human-readable report
  get_json(days) → JSON export
  CLI: python3 -m crisis.metrics --summary/--json

crisis/__init__.py:
  Export CrisisMetrics, AggregateMetrics

Makefile:
  make metrics      → summary report
  make metrics-json → JSON export

tests/test_crisis_metrics.py: 6 tests
  record_session, summary, report, JSON export
2026-04-17 01:26:44 -04:00
Alexander Whitestone
b4129bc873 fix: add chat-header safety plan access (#38)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Successful in 12s
2026-04-17 01:14:28 -04:00
Alexander Whitestone
e3bb6b86ee test: add chat-header safety plan regression (#38) 2026-04-17 00:59:21 -04:00
Alexander Whitestone
a38e80bff1 docs: audit fleet work orders issue #75
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 7s
2026-04-17 00:10:59 -04:00
Alexander Whitestone
680c50d7c3 test: define fleet work orders audit acceptance for #75 2026-04-17 00:07:37 -04:00
07c582aa08 Merge pull request 'fix: crisis overlay initial focus to enabled Call 988 link (#69)' (#126) from burn/69-1776264183 into main
Merge PR #126: fix: crisis overlay initial focus to enabled Call 988 link (#69)
2026-04-17 01:46:56 +00:00
5f95dc1e39 Merge pull request '[P3] Service worker: cache crisis resources for offline (#41)' (#122) from burn/41-1776264184 into main
Merge PR #122: [P3] Service worker: cache crisis resources for offline (#41)
2026-04-17 01:46:55 +00:00
b1f3cac36d Merge pull request 'feat: session-level crisis tracking and escalation (closes #35)' (#118) from door/issue-35 into main
Merge PR #118: feat: session-level crisis tracking and escalation (closes #35)
2026-04-17 01:46:53 +00:00
07b3f67845 fix: crisis overlay initial focus to enabled Call 988 link (#69)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 9s
Smoke Test / smoke (pull_request) Successful in 15s
2026-04-15 15:09:36 +00:00
c22bbbaf65 fix: crisis overlay initial focus to enabled Call 988 link (#69) 2026-04-15 15:09:32 +00:00
543cb1d40f test: add offline self-containment and retry button tests (#41)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-15 14:58:44 +00:00
3cfd01815a feat: session-level crisis tracking and escalation (closes #35)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 17s
Smoke Test / smoke (pull_request) Successful in 23s
2026-04-15 11:49:52 +00:00
5a7ba9f207 feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:51 +00:00
8ed8f20a17 feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:49 +00:00
9d7d26033e feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:47 +00:00
48f48c7f26 feat: cache offline crisis resources (refs #41) (#74)
All checks were successful
Smoke Test / smoke (push) Successful in 7s
Sanity Checks / sanity-test (pull_request) Successful in 17s
Smoke Test / smoke (pull_request) Successful in 19s
Merge PR #74 (squash)
2026-04-14 22:09:59 +00:00
da31288525 fix: deprecate dying_detection and consolidate crisis detection (#40) (#76)
All checks were successful
Smoke Test / smoke (push) Successful in 4s
Merge PR #76 (squash)
2026-04-14 22:08:29 +00:00
8efc858cd7 fix: add keyboard focus trap to crisis overlay (#80)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merge PR #80 (squash)
2026-04-14 22:08:28 +00:00
611c1c8456 fix(a11y): Safety plan modal keyboard focus trap (#65) (#81)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merge PR #81 (squash)
2026-04-14 22:08:24 +00:00
9b94978d1c feat: Wire compassion router into gateway flow (#34) (#43)
All checks were successful
Smoke Test / smoke (push) Successful in 5s
Squash merge: wire compassion router into gateway flow
2026-04-13 19:59:15 +00:00
e71bca1744 fix: de-duplicate crisis_detector.py and crisis/detect.py (closes #39) (#44)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Squash merge: de-duplicate crisis detector (closes #39)

Co-authored-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
Co-committed-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
2026-04-13 19:59:12 +00:00
Alexander Whitestone
1d8afc30fd fix: reduce crisis detector false positives (closes #32)
All checks were successful
Smoke Test / smoke (push) Successful in 5s
- Removed 'saying goodbye' from CRITICAL tier (too common in innocent contexts)
- Narrowed 'wrote a will' to 'wrote a suicide note' (responsible behavior)
- Removed broad single-word matches from HIGH tier: give up, trapped, desperate, worthless, hopeless, no future, nothing left, can't see any light
- Added contextual HIGH tier phrases: feel hopeless, trapped in this, desperate for help, give up on life, etc.
- Updated MODERATE tier with contextual versions: feel worthless, feel hopeless, feel trapped, etc.
- Updated index.html JavaScript keywords to match Python changes
- Added comprehensive false positive test suite

All existing tests pass. New tests verify innocent messages no longer trigger false alarms.
2026-04-13 15:37:23 -04:00
38601f6076 fix: remove false-positive CRITICAL crisis keywords (closes #28, #32) (#31)
All checks were successful
Smoke Test / smoke (push) Successful in 5s
2026-04-13 19:25:15 +00:00
dcc931e946 fix: implement missing functions from rescued PR — test_rescue.py now passes (#27)
All checks were successful
Smoke Test / smoke (push) Successful in 5s
Auto-merged by Timmy overnight cycle
2026-04-13 14:05:07 +00:00
26e97f76db fix: remove bridge false-positive from MODERATE_KEYWORDS (#29)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Auto-merged by Timmy overnight cycle
2026-04-13 14:05:02 +00:00
045df23928 Merge pull request 'Rescue PR #23 into existing crisis package (#24)' (#26) from burn/rescue-crisis into main
All checks were successful
Smoke Test / smoke (push) Successful in 5s
Merge PR #26: Rescue PR #23 into existing crisis package (#24)
2026-04-13 07:31:59 +00:00
00fec639b7 Merge pull request 'feat(deploy): add systemd service for hermes-gateway' (#25) from burn/20260413-0213-vps-deploy into main
All checks were successful
Smoke Test / smoke (push) Successful in 5s
Merged #25: Systemd service for hermes-gateway
2026-04-13 07:31:40 +00:00
Alexander Whitestone
35f18b3d54 Rescue PR #23 into existing crisis package (#24)
crisis/detect.py:
- Add 'better off without me' CRITICAL pattern
- Add 'desperate' HIGH pattern
- Add extract_context() for match snippets

crisis/response.py:
- Add 5-4-3-2-1 grounding exercise
- Add breathing exercise
- Add generate_grounding_steps() and generate_breathing_exercise()

crisis/test_rescue.py: 5 tests for new features
2026-04-13 03:20:37 -04:00
Alexander Whitestone
a90b659f3a feat(deploy): add systemd service for hermes-gateway
Some checks failed
Sanity Checks / sanity-test (pull_request) Failing after 2s
Smoke Test / smoke (pull_request) Successful in 4s
- Add hermes-gateway.service with restart=always and security hardening
- Integrate service setup into deploy.sh
- Add --service flag for standalone install
- Add make service target

Resolves #2
2026-04-13 02:16:19 -04:00
44 changed files with 4256 additions and 752 deletions

View File

@@ -21,9 +21,9 @@ jobs:
- name: Validate HTML Structure
run: |
echo "Checking for basic HTML tags..."
grep -q "<html>" index.html
grep -q "<body>" index.html
grep -q "<head>" index.html
grep -q "<html" index.html
grep -q "<body" index.html
grep -q "<head" index.html
- name: Validate Prompt Integrity
run: |

View File

@@ -12,7 +12,7 @@ VPS := alexanderwhitestone.com
DOMAIN := alexanderwhitestone.com
DEPLOY_DIR := deploy
.PHONY: help deploy deploy-bash check ssl push
.PHONY: help deploy deploy-bash check ssl push service metrics
help:
@echo "The Door — Deployment Commands"
@@ -22,6 +22,9 @@ help:
@echo " make push Push site files only (fast)"
@echo " make check Check deployment status"
@echo " make ssl Setup SSL on VPS"
@echo " make service Install/restart hermes-gateway service"
@echo " make metrics View crisis metrics summary"
@echo " make metrics-json Export crisis metrics as JSON"
@echo ""
deploy:
@@ -33,7 +36,7 @@ deploy-bash:
push:
rsync -avz --exclude='.git' --exclude='deploy' \
index.html manifest.json sw.js about.html testimony.html system-prompt.txt \
index.html manifest.json sw.js about.html crisis-offline.html testimony.html system-prompt.txt \
root@$(VPS):/var/www/the-door/
ssh root@$(VPS) "chown -R www-data:www-data /var/www/the-door"
@@ -42,3 +45,12 @@ check:
ssl:
ssh root@$(VPS) "certbot --nginx -d $(DOMAIN) -d www.$(DOMAIN)"
service:
ssh root@$(VPS) "cd /opt/the-door && bash deploy/deploy.sh --service"
metrics:
python3 -m crisis.metrics --summary
metrics-json:
python3 -m crisis.metrics --json

241
crisis-offline.html Normal file
View File

@@ -0,0 +1,241 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="theme-color" content="#0d1117">
<meta name="description" content="Offline crisis resources from The Door. Call or text 988 for immediate help.">
<title>Offline Crisis Resources | The Door</title>
<style>
:root {
color-scheme: dark;
--bg: #0d1117;
--panel: #161b22;
--panel-urgent: #1c1210;
--border: #30363d;
--accent: #c9362c;
--accent-soft: #ff6b6b;
--text: #e6edf3;
--muted: #8b949e;
--safe: #2ea043;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: var(--bg);
color: var(--text);
line-height: 1.6;
}
main {
max-width: 760px;
margin: 0 auto;
padding: 24px 16px 48px;
}
.status {
display: inline-flex;
align-items: center;
gap: 10px;
padding: 8px 12px;
border-radius: 999px;
background: rgba(201, 54, 44, 0.15);
border: 1px solid rgba(255, 107, 107, 0.35);
color: var(--accent-soft);
font-size: 0.9rem;
margin-bottom: 20px;
}
.status-dot {
width: 10px;
height: 10px;
border-radius: 50%;
background: var(--accent-soft);
}
h1 {
font-size: clamp(2rem, 6vw, 2.75rem);
line-height: 1.15;
margin: 0 0 12px;
}
.lede {
color: var(--muted);
font-size: 1.05rem;
margin: 0 0 28px;
}
.urgent-box,
.panel {
border-radius: 18px;
padding: 20px;
margin-bottom: 18px;
border: 1px solid var(--border);
background: var(--panel);
}
.urgent-box {
background: linear-gradient(180deg, rgba(201, 54, 44, 0.18), rgba(28, 18, 16, 0.95));
border-color: rgba(255, 107, 107, 0.35);
}
.section-title {
font-size: 1.2rem;
margin: 0 0 12px;
}
.actions {
display: grid;
gap: 12px;
margin-top: 16px;
}
.action-btn {
display: flex;
align-items: center;
justify-content: center;
text-align: center;
gap: 8px;
min-height: 52px;
padding: 14px 18px;
border-radius: 12px;
text-decoration: none;
font-weight: 700;
color: #fff;
background: var(--accent);
border: 1px solid transparent;
}
.action-btn.secondary {
background: #1f6feb;
}
.action-btn.retry {
background: transparent;
color: var(--text);
border-color: var(--border);
}
.action-btn:focus,
.action-btn:hover,
button.action-btn:hover,
button.action-btn:focus {
outline: 3px solid rgba(255, 107, 107, 0.4);
outline-offset: 2px;
}
ul, ol {
margin: 0;
padding-left: 20px;
}
li + li {
margin-top: 8px;
}
.grounding-steps li::marker {
color: var(--accent-soft);
font-weight: 700;
}
.small {
color: var(--muted);
font-size: 0.92rem;
}
.grid {
display: grid;
gap: 18px;
}
@media (min-width: 700px) {
.grid {
grid-template-columns: 1fr 1fr;
}
}
</style>
</head>
<body>
<main>
<div class="status" role="status" aria-live="polite">
<span class="status-dot" aria-hidden="true"></span>
<span id="connection-status-text">Offline crisis resources are ready on this device.</span>
</div>
<h1>You are not alone right now.</h1>
<p class="lede">
Your connection is weak or offline. These crisis resources are saved locally so you can still reach help.
</p>
<section class="urgent-box" aria-labelledby="urgent-help-title">
<h2 class="section-title" id="urgent-help-title">Immediate help</h2>
<p>If you might act on suicidal thoughts, contact a real person now. Stay with another person if you can.</p>
<div class="actions">
<a class="action-btn" href="tel:988" aria-label="Call 988 now">Call 988 now</a>
<a class="action-btn secondary" href="sms:741741&body=HOME" aria-label="Text HOME to 741741 for Crisis Text Line">Text HOME to 741741 — Crisis Text Line</a>
<button class="action-btn retry" id="retry-connection" type="button">Retry connection</button>
</div>
<p class="small" style="margin-top: 14px;">If you are in immediate danger or have already taken action, call emergency services now.</p>
</section>
<div class="grid">
<section class="panel" aria-labelledby="grounding-title">
<h2 class="section-title" id="grounding-title">5-4-3-2-1 grounding</h2>
<ol class="grounding-steps">
<li>5 things you can see</li>
<li>4 things you can feel</li>
<li>3 things you can hear</li>
<li>2 things you can smell</li>
<li>1 thing you can taste</li>
</ol>
<p class="small" style="margin-top: 14px;">Say each one out loud if you can. Slow is okay.</p>
</section>
<section class="panel" aria-labelledby="next-steps-title">
<h2 class="section-title" id="next-steps-title">Next small steps</h2>
<ul>
<li>Put some distance between yourself and anything you could use to hurt yourself.</li>
<li>Move closer to another person, a front desk, or a public place if possible.</li>
<li>Drink water or hold something cold in your hand.</li>
<li>Breathe in for 4, hold for 4, out for 6. Repeat 5 times.</li>
<li>Text or call one safe person and say: “I need you with me right now.”</li>
</ul>
</section>
</div>
<section class="panel" aria-labelledby="hope-title">
<h2 class="section-title" id="hope-title">Stay through the next ten minutes</h2>
<p>Do not solve your whole life right now. Stay for the next breath. Then the next one.</p>
<p class="small">If the connection comes back, you can return to The Door chat. Until then, the fastest path to a real person is still 988.</p>
</section>
</main>
<script>
(function () {
var statusText = document.getElementById('connection-status-text');
var retryButton = document.getElementById('retry-connection');
function updateStatus() {
statusText.textContent = navigator.onLine
? 'Connection is back. You can reopen chat now.'
: 'Offline crisis resources are ready on this device.';
}
retryButton.addEventListener('click', function () {
if (navigator.onLine) {
window.location.href = '/';
return;
}
window.location.reload();
});
window.addEventListener('online', updateStatus);
window.addEventListener('offline', updateStatus);
updateStatus();
})();
</script>
</body>
</html>

View File

@@ -7,6 +7,8 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
from .metrics import CrisisMetrics, AggregateMetrics
__all__ = [
"detect_crisis",
@@ -19,4 +21,9 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
"CrisisMetrics",
"AggregateMetrics",
]

19
crisis/bridge.py Normal file
View File

@@ -0,0 +1,19 @@
"""Compatibility bridge for the-door crisis gateway.
Issue #141 describes the shared bridge API as `crisis.bridge`.
The canonical implementation lives in `crisis.gateway`. Re-export the public
entrypoints here so downstream hermes-agent wiring can import the stable name
without copying logic out of the-door.
"""
from .gateway import (
check_crisis,
get_system_prompt,
format_gateway_response,
)
__all__ = [
"check_crisis",
"get_system_prompt",
"format_gateway_response",
]

View File

@@ -1,75 +1,85 @@
"""
Crisis Detection Module for the-door.
Crisis Detection Module for the-door (CANONICAL).
Single source of truth for crisis keyword detection.
Parses incoming text for despair/suicide indicators and classifies into
tiers: LOW, MEDIUM, HIGH, CRITICAL.
Both `crisis/detect.py` and `crisis_detector.py` (legacy shim) use this module.
Do NOT duplicate keyword lists elsewhere.
Consolidated from:
- crisis_detector.py (root, legacy — MODERATE tier, class API)
- crisis/detect.py (module, newer — MEDIUM tier, functional API)
"""
import re
from dataclasses import dataclass, field
from typing import List
from typing import List, Optional
@dataclass
class CrisisDetectionResult:
level: str
indicators: List[str] = field(default_factory=list)
recommended_action: str = ""
score: float = 0.0
# ── Indicator sets ──────────────────────────────────────────────
# ── Canonical indicator sets ──────────────────────────────────────
CRITICAL_INDICATORS = [
r"\bbetter off without me\b",
r"\bkill\s*(my)?self\b",
r"\bend\s*my\s*life\b",
r"\bsuicid(?:al|ed|e)\b",
r"\bnot\s+worth\s+living\b",
r"\bbetter\s+off\s+dead\b",
r"\bend\s+it\s+all\b",
r"\bcan'?t\s+(go|live)\s+on\b",
r"\bno\s+reason\s+to\s+live\b",
r"\bdon'?t\s+want\s+to\s+live\b",
r"\bwant\s+to\s+die\b",
r"\bgoing\s+to\s+kill\s+myself\b",
r"\bplan\s+to\s+(end|kill|die)\b",
r"\btired\s+of\s+(living|life|existence)\b",
r"\bsaying\s+goodbye\s+(forever|permanently|one last time)\b",
r"\bwrote\s+a\s+(will|suicide\s*note|letter)\b",
r"\bgiving\s+away\s+(my|all my)\s+possess",
r"\btied\s+(up|down)\s+my\s+(loose\s+)?ends",
r"\bgoing\s+to\s+(?:kill\s+myself|die)\b",
r"\bplan\s+to\s+(?:end|kill|die)\b",
r"\btired\s+of\s+(?:living|life|existence)\b",
r"\bsaying\s+goodbye\s+(?:forever|permanently|one\s+last\s+time)\b",
r"\bwrote\s+a\s+suicide\s*(?:note|letter)\b",
r"\bgiving\s+away\s+(?:my|all\s+my)\s+(?:stuff|things|possessions?)\s+(?:to|because|—)\b",
r"\btied\s+(?:up|down)\s+my\s+(?:loose\s+)?ends",
]
HIGH_INDICATORS = [
r"\bdespair\b",
r"\bhopeless\b",
r"\bno(?!t)\s+(one|body|point|hope|future|way\s+out)\b",
r"\beverything\s+is\s+(pointless|broken|ruined)\b",
r"\bhopeless(?:ly)?\s+(?:about\s+(?:my|this|everything|life)|inside|right\s+now)\b",
r"\bno(?!t)\s+(?:one|body|point|hope|way\s+out)\b",
r"\bno\s+future\s+(?:for\s+me|ahead|left)\b",
r"\beverything\s+is\s+(?:pointless|broken|ruined)\b",
r"\bcan'?t\s+take\s+this\s+anymore\b",
r"\bdon'?t\s+care\s+if\s+I\s+die\b",
r"\bwish\s+I\s+(was|were)\s+(dead|gone|never\s+born)\b",
r"\bwish\s+I\s+(?:was|were)\s+(?:dead|gone|never\s+born)\b",
r"\bdon'?t\s+matter\s+if\s+I\s+exist\b",
r"\bno\s+one\s+would\s+care\b",
r"\bno\s+one\s+would\s+miss\b",
r"\bworld\s+would?\s+be\s+better\s+without\b",
r"\bin\s+so\s+much\s+(pain|agony|suffering|torment)\b",
r"\bcan'?t\s+see\s+any\s+(point|reason|light)\b",
r"\btrapped\b",
r"\bcage\b",
r"\bno\s+one\s+would\s+(?:care|miss)\b",
r"\bworld\s+would?\s+be\s+better\s+without\s+me\b",
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment|anguish)\b",
r"\bcan'?t\s+see\s+any\s+(?:point|reason|hope|way)\b",
r"\bescape\s+from\s*this",
r"\bjust\s+want\s+it\s+to\s+stop\b",
r"\bnothing\s+left\b",
r"\bnothing\s+left\s+(?:to\s+(?:live\s+for|hope\s+for|give)|inside)\b",
r"\bdisappeared\s+forever\b",
# Contextual despair phrases (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
r"\beverything\s+is\s+hopeless\b",
r"\bcan'?t\s+(?:go\s+on|keep\s+going)\b",
r"\bgive(?:n)?\s*up\s+(?:on\s+)?(?:life|living|everything)\b",
r"\bgive(?:n)?\s*up\s+on\s+myself\b",
r"\bno\s*point\s+(?:in\s+)?living\b",
r"\bno\s*hope\s+(?:left|remaining)\b",
r"\bno\s*way\s*out\b",
r"\bfeel(?:s|ing)?\s+trapped\b",
r"\btrapped\s+in\s+this\s+(?:situation|life|pain|darkness|hell)\b",
r"\btrapped\s+and\s+can'?t\s+escape\b",
r"\bdesperate\s+(?:for\s+)?help\b",
r"\bfeel(?:s|ing)?\s+desperate\b",
]
MEDIUM_INDICATORS = [
r"\bno\s+hope\b",
r"\bcan'?t\s+go\s+on\b",
r"\bcan'?t\s+keep\s+going\b",
r"\bforgotten\b",
r"\balone\s+in\s+this\b",
r"\balways\s+alone\b",
r"\bnobody\s+understands\b",
r"\bnobody\s+cares\b",
r"\bnobody\s+(?:understands|cares)\b",
r"\bwish\s+I\s+could\b",
r"\bexhaust(?:ed|ion|ing)\b",
r"\bnumb\b",
@@ -78,8 +88,7 @@ MEDIUM_INDICATORS = [
r"\buseless\b",
r"\bbroken\b",
r"\bdark(ness)?\b",
r"\bdepressed\b",
r"\bdepression\b",
r"\bdepress(?:ed|ion)\b",
r"\bcrying\b",
r"\btears\b",
r"\bsad(ness)?\b",
@@ -87,42 +96,158 @@ MEDIUM_INDICATORS = [
r"\boverwhelm(?:ed|ing)\b",
r"\bfailing\b",
r"\bcannot\s+cope\b",
r"\blosing\s*(my)?\s*control\b",
r"\blosing\s*(?:my)?\s*control\b",
r"\bdown\s*for\s*the\s*count\b",
r"\bsinking\b",
r"\bdrowning\b",
r"\bhopeless\b",
r"\blost\s+all\s+hope\b",
r"\bno\s+tomorrow\b",
# Contextual versions (from crisis_detector.py legacy)
# Keep only medium-only patterns here; stronger overlaps live in HIGH_INDICATORS.
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
r"\bnothing\s+left\s+(?:to\s+(?:live|hope)\s+for|inside)\b",
]
LOW_INDICATORS = [
r"\bunhappy\b",
r"\bdown\b",
r"\btough\s*time\b",
r"\brough\s*day\b",
r"\brough\s*week\b",
r"\brough\s*patch\b",
r"\btough\s*(?:time|day|week)\b",
r"\brough\s+(?:day|week|patch)\b",
r"\bstressed\b",
r"\burnout\b",
r"\bburnout\b",
r"\bdifficult\b",
r"\bfrustrated\b",
r"\bannoyed\b",
r"\btired\b",
r"\bsad\b",
r"\bupset\b",
r"\bnot\s*great\b",
r"\bnot\s*good\b",
r"\bnot\s*okay\b",
r"\blonely\b",
r"\banxious?\b",
r"\bnot\s*(?:good|great|okay)\b",
r"\bthings\s*are\s*hard\b",
r"\bstruggling\b",
r"\bnot\s+feeling\s+myself\b",
r"\bnot\s+feeling\s+(?:like\s+)?myself\b",
r"\bhard\s*time\b",
]
# ── Scoring ───────────────────────────────────────────────────────
SCORES = {"CRITICAL": 1.0, "HIGH": 0.75, "MEDIUM": 0.5, "LOW": 0.25, "NONE": 0.0}
ACTIONS = {
"CRITICAL": (
"INTERRUPT CONVERSATION. Ask 'Are you safe right now?' "
"Provide 988 Suicide & Crisis Lifeline. "
"Provide Crisis Text Line (HOME to 741741). "
"Stay present. Do not disconnect. Redirect to professional help."
),
"HIGH": (
"Show crisis panel. Ask about safety. Surface 988 number prominently. "
"Continue conversation with crisis awareness."
),
"MEDIUM": (
"Increase warmth and presence. Subtly surface help resources. "
"Keep conversation anchored in the present."
),
"LOW": (
"Normal conversation with warm undertone. "
"No crisis UI elements needed. Remain vigilant."
),
}
# ── Result types ──────────────────────────────────────────────────
@dataclass
class CrisisDetectionResult:
"""Result used by crisis/detect.py, gateway, dying_detection."""
level: str
indicators: List[str] = field(default_factory=list)
recommended_action: str = ""
score: float = 0.0
matches: List[dict] = field(default_factory=list)
@dataclass
class CrisisResult:
"""Legacy result used by crisis_detector.py and crisis_responder.py.
Backward-compatible shim: wraps CrisisDetectionResult with the old field names.
"""
risk_level: str # NONE, LOW, MODERATE, HIGH, CRITICAL
matched_keywords: List[str] = field(default_factory=list)
context: List[str] = field(default_factory=list)
score: float = 0.0
timestamp: Optional[str] = None
def __bool__(self):
return self.risk_level != "NONE"
@classmethod
def from_detection_result(cls, dr: CrisisDetectionResult, text: str = "") -> "CrisisResult":
"""Convert a CrisisDetectionResult to legacy CrisisResult format."""
# Map MEDIUM -> MODERATE for legacy consumers
level = "MODERATE" if dr.level == "MEDIUM" else dr.level
# Extract context snippets from matches
contexts = []
if text:
for m in dr.matches:
ctx = extract_context(text, m["start"], m["end"])
contexts.append(ctx)
return cls(
risk_level=level,
matched_keywords=dr.indicators,
context=contexts,
score=dr.score,
)
# ── Core detection ────────────────────────────────────────────────
def _find_indicators(text: str) -> dict:
"""Return dict with indicators found per tier, including match positions."""
results = {"CRITICAL": [], "HIGH": [], "MEDIUM": [], "LOW": []}
for pattern in CRITICAL_INDICATORS:
m = re.search(pattern, text)
if m:
results["CRITICAL"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
for pattern in HIGH_INDICATORS:
m = re.search(pattern, text)
if m:
results["HIGH"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
for pattern in MEDIUM_INDICATORS:
m = re.search(pattern, text)
if m:
results["MEDIUM"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
for pattern in LOW_INDICATORS:
m = re.search(pattern, text)
if m:
results["LOW"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
return results
def detect_crisis(text: str) -> CrisisDetectionResult:
"""
Detect crisis level in a message.
Returns:
CrisisDetectionResult with level, found indicators, recommended action, score
Detection hierarchy:
CRITICAL — immediate risk of self-harm or suicide (single match)
HIGH — strong despair signals, ideation present (single match)
MEDIUM — distress signals, requires 2+ indicators to escalate
LOW — emotional difficulty, warrant gentle support (single match)
NONE — no crisis indicators detected
Design principles:
- Never computes the value of a human life
- Never suggests someone should die or that death is a solution
- Always errs on the side of higher risk when uncertain
"""
if not text or not text.strip():
return CrisisDetectionResult(level="NONE", score=0.0)
@@ -133,82 +258,135 @@ def detect_crisis(text: str) -> CrisisDetectionResult:
if not matches:
return CrisisDetectionResult(level="NONE", score=0.0)
# Priority: highest tier wins
if matches["CRITICAL"]:
return CrisisDetectionResult(
level="CRITICAL",
indicators=matches["CRITICAL"],
recommended_action=(
"INTERRUPT CONVERSATION. Ask 'Are you safe right now?' "
"Provide 988 Suicide & Crisis Lifeline. "
"Provide Crisis Text Line (HOME to 741741). "
"Stay present. Do not disconnect. Redirect to professional help."
),
score=1.0,
)
# CRITICAL and HIGH: single match is enough
for tier in ("CRITICAL", "HIGH"):
if matches[tier]:
tier_matches = matches[tier]
patterns = [m["pattern"] for m in tier_matches]
return CrisisDetectionResult(
level=tier,
indicators=patterns,
recommended_action=ACTIONS[tier],
score=SCORES[tier],
matches=tier_matches,
)
if matches["HIGH"]:
return CrisisDetectionResult(
level="HIGH",
indicators=matches["HIGH"],
recommended_action=(
"Show crisis panel. Ask about safety. Surface 988 number prominently. "
"Continue conversation with crisis awareness."
),
score=0.75,
)
if matches["MEDIUM"]:
# MEDIUM tier: require at least 2 indicators before escalating
if len(matches["MEDIUM"]) >= 2:
tier_matches = matches["MEDIUM"]
patterns = [m["pattern"] for m in tier_matches]
return CrisisDetectionResult(
level="MEDIUM",
indicators=matches["MEDIUM"],
recommended_action=(
"Increase warmth and presence. Subtly surface help resources. "
"Keep conversation anchored in the present."
),
score=0.5,
indicators=patterns,
recommended_action=ACTIONS["MEDIUM"],
score=SCORES["MEDIUM"],
matches=tier_matches,
)
if matches["LOW"]:
tier_matches = matches["LOW"]
patterns = [m["pattern"] for m in tier_matches]
return CrisisDetectionResult(
level="LOW",
indicators=matches["LOW"],
recommended_action=(
"Normal conversation with warm undertone. "
"No crisis UI elements needed. Remain vigilant."
),
score=0.25,
indicators=patterns,
recommended_action=ACTIONS["LOW"],
score=SCORES["LOW"],
matches=tier_matches,
)
# Single MEDIUM match falls through to LOW sensitivity
if matches["MEDIUM"]:
tier_matches = matches["MEDIUM"]
patterns = [m["pattern"] for m in tier_matches]
return CrisisDetectionResult(
level="LOW",
indicators=patterns,
recommended_action=ACTIONS["LOW"],
score=SCORES["LOW"],
matches=tier_matches,
)
return CrisisDetectionResult(level="NONE", score=0.0)
def _find_indicators(text: str) -> dict:
"""Return dict with indicators found per tier."""
results = {"CRITICAL": [], "HIGH": [], "MEDIUM": [], "LOW": []}
# ── CrisisDetector class (backward compat) ───────────────────────
for pattern in CRITICAL_INDICATORS:
if re.search(pattern, text):
results["CRITICAL"].append(pattern)
class CrisisDetector:
"""
Legacy class API for crisis detection. Wraps the canonical detect_crisis().
for pattern in HIGH_INDICATORS:
if re.search(pattern, text):
results["HIGH"].append(pattern)
Used by crisis_responder.py and tests/test_false_positive_fixes.py.
Maps MEDIUM -> MODERATE for legacy consumers.
"""
for pattern in MEDIUM_INDICATORS:
if re.search(pattern, text):
results["MEDIUM"].append(pattern)
def scan(self, text: str) -> CrisisResult:
dr = detect_crisis(text)
return CrisisResult.from_detection_result(dr, text=text)
for pattern in LOW_INDICATORS:
if re.search(pattern, text):
results["LOW"].append(pattern)
def scan_multiple(self, texts: List[str]) -> List[CrisisResult]:
return [self.scan(t) for t in texts]
return results
def get_highest_risk(self, texts: List[str]) -> CrisisResult:
results = self.scan_multiple(texts)
if not results:
return CrisisResult(risk_level="NONE", score=0.0)
return max(results, key=lambda r: r.score)
@staticmethod
def format_result(result: CrisisResult) -> str:
level_emoji = {
"CRITICAL": "\U0001f6a8",
"HIGH": "\u26a0\ufe0f",
"MODERATE": "\U0001f536",
"LOW": "\U0001f535",
"NONE": "\u2705",
}
emoji = level_emoji.get(result.risk_level, "\u2753")
lines = [
f"{emoji} Risk Level: {result.risk_level} (score: {result.score:.2f})",
f"Matched keywords: {len(result.matched_keywords)}",
]
if result.matched_keywords:
lines.append(f" Keywords: {', '.join(result.matched_keywords)}")
if result.context:
lines.append("Context:")
for ctx in result.context:
lines.append(f" {ctx}")
return "\n".join(lines)
# ── Module-level convenience (backward compat) ────────────────────
_default_detector = CrisisDetector()
def detect_crisis_legacy(text: str) -> CrisisResult:
"""Convenience function returning legacy CrisisResult format."""
return _default_detector.scan(text)
# ── Utility functions ─────────────────────────────────────────────
def scan(text: str) -> CrisisDetectionResult:
"""Alias for detect_crisis — shorter name used in tests."""
return detect_crisis(text)
def extract_context(text: str, start: int, end: int, window: int = 60) -> str:
"""Extract surrounding context around a match position."""
ctx_start = max(0, start - window)
ctx_end = min(len(text), end + window)
snippet = text[ctx_start:ctx_end].strip()
if ctx_start > 0:
snippet = "..." + snippet
if ctx_end < len(text):
snippet = snippet + "..."
return snippet
def get_urgency_emoji(level: str) -> str:
mapping = {"CRITICAL": "🚨", "HIGH": "⚠️", "MEDIUM": "🔶", "LOW": "🔵", "NONE": ""}
return mapping.get(level, "")
mapping = {"CRITICAL": "\U0001f6a8", "HIGH": "\u26a0\ufe0f", "MEDIUM": "\U0001f536", "LOW": "\U0001f535", "NONE": "\u2705"}
return mapping.get(level, "\u2753")
def format_result(result: CrisisDetectionResult) -> str:

View File

@@ -22,6 +22,7 @@ from .response import (
get_system_prompt_modifier,
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
def check_crisis(text: str) -> dict:

244
crisis/metrics.py Normal file
View File

@@ -0,0 +1,244 @@
"""
crisis/metrics.py — Aggregate crisis detection metrics.
Tracks session-level crisis data for aggregate reporting.
Privacy-first: stores only aggregate counts, never user content.
Usage:
from crisis.metrics import CrisisMetrics
metrics = CrisisMetrics()
metrics.record_session(tracker.state)
summary = metrics.get_summary()
"""
import json
import os
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
METRICS_DIR = Path.home() / ".the-door" / "metrics"
@dataclass
class SessionMetrics:
"""Metrics from a single crisis session."""
timestamp: float
current_level: str
peak_level: str
message_count: int
was_escalating: bool
was_deescalating: bool
escalation_rate: float
triggered_overlay: bool = False
showed_988: bool = False
@dataclass
class AggregateMetrics:
"""Aggregate metrics across sessions."""
total_sessions: int = 0
total_messages: int = 0
# Level distribution
level_counts: Dict[str, int] = field(default_factory=lambda: {
"NONE": 0, "LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0
})
# Escalation tracking
escalating_sessions: int = 0
deescalating_sessions: int = 0
# Safety interventions
overlay_triggers: int = 0
ninety_eight_show: int = 0
# Time window
period_start: Optional[float] = None
period_end: Optional[float] = None
class CrisisMetrics:
"""
Aggregate crisis metrics with local JSON persistence.
Privacy-first: stores only aggregate counts per day.
Never stores user messages, content, or identifying info.
"""
def __init__(self, metrics_dir: Optional[Path] = None):
self.metrics_dir = metrics_dir or METRICS_DIR
self.metrics_dir.mkdir(parents=True, exist_ok=True)
self._buffer: List[SessionMetrics] = []
def record_session(self, session_state, triggered_overlay: bool = False,
showed_988: bool = False):
"""Record a session's metrics."""
from .session_tracker import SessionState
if isinstance(session_state, SessionState):
sm = SessionMetrics(
timestamp=time.time(),
current_level=session_state.current_level,
peak_level=session_state.peak_level,
message_count=session_state.message_count,
was_escalating=session_state.is_escalating,
was_deescalating=session_state.is_deescalating,
escalation_rate=session_state.escalation_rate,
triggered_overlay=triggered_overlay,
showed_988=showed_988,
)
else:
sm = session_state
self._buffer.append(sm)
self._flush()
def _flush(self):
"""Write buffered sessions to daily file."""
if not self._buffer:
return
today = datetime.utcnow().strftime("%Y-%m-%d")
filepath = self.metrics_dir / f"{today}.jsonl"
with open(filepath, 'a') as f:
for sm in self._buffer:
f.write(json.dumps(asdict(sm)) + '\n')
self._buffer.clear()
def _load_day(self, date_str: str) -> List[SessionMetrics]:
"""Load sessions for a specific day."""
filepath = self.metrics_dir / f"{date_str}.jsonl"
if not filepath.exists():
return []
sessions = []
with open(filepath) as f:
for line in f:
if line.strip():
data = json.loads(line)
sessions.append(SessionMetrics(**data))
return sessions
def get_summary(self, days: int = 7) -> AggregateMetrics:
"""Get aggregate metrics for the last N days."""
agg = AggregateMetrics()
now = datetime.utcnow()
for i in range(days):
date = (now - timedelta(days=i)).strftime("%Y-%m-%d")
sessions = self._load_day(date)
for sm in sessions:
agg.total_sessions += 1
agg.total_messages += sm.message_count
# Level counts (use peak level)
level = sm.peak_level
agg.level_counts[level] = agg.level_counts.get(level, 0) + 1
if sm.was_escalating:
agg.escalating_sessions += 1
if sm.was_deescalating:
agg.deescalating_sessions += 1
if sm.triggered_overlay:
agg.overlay_triggers += 1
if sm.showed_988:
agg.ninety_eight_show += 1
# Time window
if agg.period_start is None or sm.timestamp < agg.period_start:
agg.period_start = sm.timestamp
if agg.period_end is None or sm.timestamp > agg.period_end:
agg.period_end = sm.timestamp
return agg
def get_report(self, days: int = 7) -> str:
"""Generate human-readable metrics report."""
agg = self.get_summary(days)
lines = []
lines.append("=" * 50)
lines.append(" CRISIS METRICS REPORT")
lines.append(f" Last {days} days")
if agg.period_start:
start = datetime.fromtimestamp(agg.period_start).strftime("%Y-%m-%d %H:%M")
lines.append(f" Period: {start} → now")
lines.append("=" * 50)
lines.append(f"\n Sessions: {agg.total_sessions}")
lines.append(f" Messages tracked: {agg.total_messages}")
lines.append(f"\n Level Distribution (by peak):")
for level in ["NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"]:
count = agg.level_counts.get(level, 0)
pct = (count / agg.total_sessions * 100) if agg.total_sessions > 0 else 0
bar = "" * int(pct / 5)
lines.append(f" {level:<10} {count:>5} ({pct:>5.1f}%) {bar}")
lines.append(f"\n Escalations: {agg.escalating_sessions}")
lines.append(f" De-escalations: {agg.deescalating_sessions}")
lines.append(f" Overlay triggers: {agg.overlay_triggers}")
lines.append(f" 988 shown: {agg.ninety_eight_show}")
if agg.total_sessions > 0:
escalation_rate = agg.escalating_sessions / agg.total_sessions * 100
lines.append(f"\n Escalation rate: {escalation_rate:.1f}%")
lines.append("=" * 50)
return "\n".join(lines)
def get_json(self, days: int = 7) -> str:
"""Export metrics as JSON."""
agg = self.get_summary(days)
return json.dumps(asdict(agg), indent=2)
def main():
"""CLI entry point for crisis metrics."""
import argparse
parser = argparse.ArgumentParser(description="Crisis Detection Metrics")
parser.add_argument("--summary", action="store_true", help="Show summary report")
parser.add_argument("--json", action="store_true", help="JSON export")
parser.add_argument("--days", type=int, default=7, help="Days to include")
parser.add_argument("--demo", action="store_true", help="Generate demo data")
args = parser.parse_args()
metrics = CrisisMetrics()
if args.demo:
import random
levels = ["NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"]
for i in range(50):
from .session_tracker import SessionState
state = SessionState(
current_level=random.choice(levels),
peak_level=random.choice(levels),
message_count=random.randint(1, 20),
is_escalating=random.random() > 0.7,
is_deescalating=random.random() > 0.8,
escalation_rate=random.random(),
)
metrics.record_session(
state,
triggered_overlay=random.random() > 0.8,
showed_988=random.random() > 0.7,
)
print("Generated 50 demo sessions.")
if args.json:
print(metrics.get_json(args.days))
else:
print(metrics.get_report(args.days))
if __name__ == "__main__":
main()

View File

@@ -270,3 +270,24 @@ def get_system_prompt_modifier(detection: CrisisDetectionResult) -> str:
)
return ""
def generate_grounding_steps() -> list:
"""Generate a 5-4-3-2-1 grounding exercise steps."""
return [
"Name 5 things you can see around you right now.",
"Name 4 things you can touch or feel.",
"Name 3 things you can hear.",
"Name 2 things you can smell.",
"Name 1 thing you can taste.",
]
def generate_breathing_exercise() -> str:
"""Generate a simple box breathing exercise text."""
return (
"Let's try breathing together. "
"Breathe in for 4 counts... hold for 4... "
"breathe out for 6 counts... hold for 2. "
"Let's do that again, nice and slow."
)

259
crisis/session_tracker.py Normal file
View File

@@ -0,0 +1,259 @@
"""
Session-level crisis tracking and escalation for the-door (P0 #35).
Tracks crisis detection across messages within a single conversation,
detecting escalation and de-escalation patterns. Privacy-first: no
persistence beyond the conversation session.
Each message is analyzed in isolation by detect.py, but this module
maintains session state so the system can recognize patterns like:
- "I'm fine""I'm struggling""I can't go on" (rapid escalation)
- "I want to die""I'm calmer now""feeling better" (de-escalation)
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
# Feed each message's detection result
state = tracker.record(detect_crisis("I'm having a tough day"))
print(state.current_level) # "LOW"
print(state.is_escalating) # False
state = tracker.record(detect_crisis("I feel hopeless"))
print(state.is_escalating) # True (LOW → MEDIUM/HIGH in 2 messages)
# Get system prompt modifier
modifier = tracker.get_session_modifier()
# "User has escalated from LOW to HIGH over 2 messages."
# Reset for new session
tracker.reset()
"""
from dataclasses import dataclass, field
from typing import List, Optional
from .detect import CrisisDetectionResult, SCORES
# Level ordering for comparison (higher = more severe)
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class SessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[str] = field(default_factory=list)
is_escalating: bool = False
is_deescalating: bool = False
escalation_rate: float = 0.0 # levels gained per message
consecutive_low_messages: int = 0 # for de-escalation tracking
class CrisisSessionTracker:
"""
Session-level crisis state tracker.
Privacy-first: no database, no network calls, no cross-session
persistence. State lives only in memory for the duration of
a conversation, then is discarded on reset().
"""
# Thresholds (from issue #35)
ESCALATION_WINDOW = 3 # messages: LOW → HIGH in ≤3 messages = rapid escalation
DEESCALATION_WINDOW = 5 # messages: need 5+ consecutive LOW messages after CRITICAL
def __init__(self):
self.reset()
def reset(self):
"""Reset all session state. Call on new conversation."""
self._current_level = "NONE"
self._peak_level = "NONE"
self._message_count = 0
self._level_history: List[str] = []
self._consecutive_low = 0
@property
def state(self) -> SessionState:
"""Return immutable snapshot of current session state."""
is_escalating = self._detect_escalation()
is_deescalating = self._detect_deescalation()
rate = self._compute_escalation_rate()
return SessionState(
current_level=self._current_level,
peak_level=self._peak_level,
message_count=self._message_count,
level_history=list(self._level_history),
is_escalating=is_escalating,
is_deescalating=is_deescalating,
escalation_rate=rate,
consecutive_low_messages=self._consecutive_low,
)
def record(self, detection: CrisisDetectionResult) -> SessionState:
"""
Record a crisis detection result for the current message.
Returns updated SessionState.
"""
level = detection.level
self._message_count += 1
self._level_history.append(level)
# Update peak
if LEVEL_ORDER.get(level, 0) > LEVEL_ORDER.get(self._peak_level, 0):
self._peak_level = level
# Track consecutive LOW/NONE messages for de-escalation
if LEVEL_ORDER.get(level, 0) <= LEVEL_ORDER["LOW"]:
self._consecutive_low += 1
else:
self._consecutive_low = 0
self._current_level = level
return self.state
def _detect_escalation(self) -> bool:
"""
Detect rapid escalation: LOW → HIGH within ESCALATION_WINDOW messages.
Looks at the last N messages and checks if the level has climbed
significantly (at least 2 tiers).
"""
if len(self._level_history) < 2:
return False
window = self._level_history[-self.ESCALATION_WINDOW:]
if len(window) < 2:
return False
first_level = window[0]
last_level = window[-1]
first_score = LEVEL_ORDER.get(first_level, 0)
last_score = LEVEL_ORDER.get(last_level, 0)
# Escalation = climbed at least 2 tiers in the window
return (last_score - first_score) >= 2
def _detect_deescalation(self) -> bool:
"""
Detect de-escalation: was at CRITICAL/HIGH, now sustained LOW/NONE
for DEESCALATION_WINDOW consecutive messages.
"""
if LEVEL_ORDER.get(self._peak_level, 0) < LEVEL_ORDER["HIGH"]:
return False
return self._consecutive_low >= self.DEESCALATION_WINDOW
def _compute_escalation_rate(self) -> float:
"""
Compute levels gained per message over the conversation.
Positive = escalating, negative = de-escalating, 0 = stable.
"""
if self._message_count < 2:
return 0.0
first = LEVEL_ORDER.get(self._level_history[0], 0)
current = LEVEL_ORDER.get(self._current_level, 0)
return (current - first) / (self._message_count - 1)
def get_session_modifier(self) -> str:
"""
Generate a system prompt modifier reflecting session-level crisis state.
Returns empty string if no session context is relevant.
"""
if self._message_count < 2:
return ""
s = self.state
if s.is_escalating:
return (
f"User has escalated from {self._level_history[0]} to "
f"{s.current_level} over {s.message_count} messages. "
f"Peak crisis level this session: {s.peak_level}. "
"Respond with heightened awareness. The trajectory is "
"worsening — prioritize safety and connection."
)
if s.is_deescalating:
return (
f"User previously reached {s.peak_level} crisis level "
f"but has been at {s.current_level} or below for "
f"{s.consecutive_low_messages} consecutive messages. "
"The situation appears to be stabilizing. Continue "
"supportive engagement while remaining vigilant."
)
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
return (
f"User previously reached {s.peak_level} crisis level "
f"this session (currently {s.current_level}). "
"Continue with care and awareness of the earlier crisis."
)
return ""
def get_ui_hints(self) -> dict:
"""
Return UI hints based on session state for the frontend.
These are advisory — the frontend decides what to show.
"""
s = self.state
hints = {
"session_escalating": s.is_escalating,
"session_deescalating": s.is_deescalating,
"session_peak_level": s.peak_level,
"session_message_count": s.message_count,
}
if s.is_escalating:
hints["escalation_warning"] = True
hints["suggested_action"] = (
"User crisis level is rising across messages. "
"Consider increasing intervention level."
)
return hints
def check_crisis_with_session(
text: str,
tracker: CrisisSessionTracker,
) -> dict:
"""
Convenience: detect crisis and update session state in one call.
Returns combined single-message detection + session-level context.
"""
from .detect import detect_crisis
from .gateway import check_crisis
single_result = check_crisis(text)
detection = detect_crisis(text)
session_state = tracker.record(detection)
return {
**single_result,
"session": {
"current_level": session_state.current_level,
"peak_level": session_state.peak_level,
"message_count": session_state.message_count,
"is_escalating": session_state.is_escalating,
"is_deescalating": session_state.is_deescalating,
"modifier": tracker.get_session_modifier(),
"ui_hints": tracker.get_ui_hints(),
},
}

View File

@@ -0,0 +1,139 @@
"""Crisis synthesizer integration for the responder pipeline.
Privacy-first and opt-in. Writes append-only JSONL events that feed the
existing crisis_synthesizer reporting flow.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Optional
from evolution.crisis_synthesizer import DEFAULT_LOG_PATH, append_interaction_event
logger = logging.getLogger(__name__)
SYNTH_ENABLED = os.environ.get("CRISIS_SYNTH_ENABLED", "0") == "1"
SYNTH_LOG_PATH = Path(os.environ.get("CRISIS_SYNTH_LOG_PATH", str(DEFAULT_LOG_PATH)))
class CrisisSynthesizerIntegration:
"""Append-only bridge between responder events and the synthesizer log."""
def __init__(
self,
enabled: Optional[bool] = None,
log_dir: Optional[Path] = None,
log_path: Optional[Path] = None,
):
self.enabled = SYNTH_ENABLED if enabled is None else bool(enabled)
if log_path is not None:
self.log_file = Path(log_path)
elif log_dir is not None:
self.log_file = Path(log_dir) / "events.jsonl"
else:
self.log_file = SYNTH_LOG_PATH
self.log_file.parent.mkdir(parents=True, exist_ok=True)
try:
os.chmod(self.log_file.parent, 0o700)
except OSError:
pass
def log_crisis_event(
self,
*,
level: str,
matched_keywords: list[str],
response_type: str,
session_id: str = "",
user_continued: bool = False,
metadata: Optional[dict[str, Any]] = None,
) -> bool:
if not self.enabled:
return False
try:
append_interaction_event(
self.log_file,
level=level,
indicators=list(matched_keywords),
response_given=response_type,
continued_conversation=user_continued,
false_positive=False,
session_hash=self._hash_session(session_id) if session_id else "",
response_type=response_type,
metadata=metadata or {},
)
self._lock_down_log_file()
return True
except Exception as exc:
logger.warning("Failed to log crisis synthesizer event: %s", exc)
return False
def log_user_continued(self, session_id: str) -> bool:
if not self.enabled or not session_id or not self.log_file.exists():
return False
session_hash = self._hash_session(session_id)
saw_prior_crisis = False
try:
with self.log_file.open("r", encoding="utf-8") as handle:
for raw in handle:
raw = raw.strip()
if not raw:
continue
event = json.loads(raw)
if (
event.get("event_type", "crisis_interaction") == "crisis_interaction"
and event.get("session_hash") == session_hash
):
saw_prior_crisis = True
if not saw_prior_crisis:
return False
continuation = {
"event_type": "continuation",
"timestamp": float(time.time()),
"session_hash": session_hash,
"user_continued": True,
}
with self.log_file.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(continuation) + "\n")
self._lock_down_log_file()
return True
except Exception as exc:
logger.warning("Failed to append crisis continuation marker: %s", exc)
return False
@staticmethod
def _hash_session(session_id: str) -> str:
return hashlib.sha256(session_id.encode("utf-8")).hexdigest()[:16]
def _lock_down_log_file(self) -> None:
try:
os.chmod(self.log_file, 0o600)
except OSError:
pass
_integration: Optional[CrisisSynthesizerIntegration] = None
def get_integration() -> CrisisSynthesizerIntegration:
global _integration
if _integration is None:
_integration = CrisisSynthesizerIntegration()
return _integration
def log_crisis_event(**kwargs: Any) -> bool:
return get_integration().log_crisis_event(**kwargs)
def log_user_continued(session_id: str) -> bool:
return get_integration().log_user_continued(session_id)

29
crisis/test_rescue.py Normal file
View File

@@ -0,0 +1,29 @@
"""Tests for rescued crisis detection improvements."""
from crisis.detect import scan, extract_context
from crisis.response import generate_grounding_steps, generate_breathing_exercise
def test_better_off_without_me():
result = scan("Everyone would be better off without me")
assert result.level == "CRITICAL"
def test_desperate():
result = scan("I feel desperate and alone")
assert result.level in ("HIGH", "CRITICAL")
def test_context_extraction():
text = "I feel hopeless about everything"
result = scan(text)
assert len(result.matches) > 0
for m in result.matches:
ctx = extract_context(text, m["start"], m["end"])
assert len(ctx) > 0
def test_grounding_steps():
steps = generate_grounding_steps()
assert len(steps) == 5
assert "see" in steps[0].lower()
def test_breathing_exercise():
exercise = generate_breathing_exercise()
assert "4" in exercise
assert "6" in exercise

View File

@@ -53,6 +53,7 @@ class TestDetection(unittest.TestCase):
def test_high_hopeless(self):
r = detect_crisis("I feel completely hopeless with no way out")
# 'hopeless' is MEDIUM; 'no way out' is HIGH — so result is HIGH
self.assertIn(r.level, ("HIGH", "MEDIUM"))
def test_high_no_one_would_miss(self):
@@ -112,7 +113,7 @@ class TestDetection(unittest.TestCase):
}
for text, expected_level in [
("I want to kill myself", "CRITICAL"),
("I feel completely hopeless", "HIGH"),
("I feel completely hopeless with no way out", "HIGH"),
("I feel so alone in this, nobody understands", "MEDIUM"),
("Having a rough day", "LOW"),
("Hello there", "NONE"),
@@ -270,6 +271,65 @@ class TestHelpers(unittest.TestCase):
self.assertIn("Indicators", formatted)
class TestFalsePositives(unittest.TestCase):
"""Regression tests for Issue #32: crisis detector false positives.
Innocent messages must NOT trigger crisis levels. These tests prevent
regressions when keyword lists are modified.
"""
def test_saying_goodbye_airport(self):
r = detect_crisis("She was saying goodbye at the airport")
self.assertEqual(r.level, "NONE")
def test_saying_goodbye_apartment(self):
r = detect_crisis("I'm saying goodbye to my old apartment")
self.assertEqual(r.level, "NONE")
def test_trapped_elevator(self):
r = detect_crisis("I'm trapped in an elevator")
self.assertEqual(r.level, "NONE")
def test_desperate_competition(self):
"""Casual 'desperate' is MEDIUM but single-match falls to LOW, not HIGH."""
r = detect_crisis("She's desperate to win the race")
self.assertIn(r.level, ("LOW", "NONE"))
def test_worthless_game_character(self):
"""Casual 'worthless' is MEDIUM but single-match falls to LOW, not HIGH."""
r = detect_crisis("This character is worthless in the game")
self.assertIn(r.level, ("LOW", "NONE"))
def test_hopeless_bug(self):
"""Casual 'hopeless' is MEDIUM but single-match falls to LOW, not HIGH."""
r = detect_crisis("This bug is hopeless")
self.assertIn(r.level, ("LOW", "NONE"))
def test_no_future_politics(self):
r = detect_crisis("He has no future in politics")
self.assertEqual(r.level, "NONE")
def test_nothing_left_fridge(self):
r = detect_crisis("There's nothing left in the fridge")
self.assertEqual(r.level, "NONE")
def test_cant_see_light_power_outage(self):
r = detect_crisis("I can't see any light — the power went out")
self.assertEqual(r.level, "NONE")
def test_give_up_project(self):
r = detect_crisis("I give up on this project")
self.assertEqual(r.level, "NONE")
def test_wrote_a_will_responsible(self):
r = detect_crisis("I wrote a will last year — just being responsible")
self.assertEqual(r.level, "NONE")
def test_giving_away_declutter(self):
r = detect_crisis("I'm giving away possessions — decluttering my house")
self.assertEqual(r.level, "NONE")
class TestEdgeCases(unittest.TestCase):
"""Test edge cases and integration scenarios."""

19
crisis/tracker.py Normal file
View File

@@ -0,0 +1,19 @@
"""Compatibility bridge for crisis session tracking.
Issue #141 describes the shared the-door tracker surface as `crisis.tracker`.
The canonical implementation lives in `crisis.session_tracker`, but hermes-agent
integration should be able to import the shorter path without caring about
internal file layout.
"""
from .session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
__all__ = [
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
]

View File

@@ -1,260 +1,34 @@
"""
Crisis Detection System for the-door.
Crisis Detection System for the-door (LEGACY SHIM).
Scans text for suicide, self-harm, and hopelessness signals.
Returns risk level, matched keywords, and surrounding context.
This module is a backward-compatible re-export layer.
The canonical detection logic lives in crisis/detect.py.
This is the most important code in the foundation.
Every line here exists because someone's life depends on it.
Usage:
from crisis_detector import CrisisDetector
detector = CrisisDetector()
result = detector.scan("I can't go on anymore")
print(result.risk_level) # "HIGH"
print(result.matched_keywords) # ["can't go on"]
print(result.context) # surrounding text snippets
Both crisis_responder.py and the legacy test suite import from here.
Do NOT add detection logic to this file — it all comes from crisis.detect.
"""
import re
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
from crisis.detect import (
CrisisDetector,
CrisisResult,
CrisisDetectionResult,
detect_crisis_legacy as detect_crisis,
CRITICAL_INDICATORS,
HIGH_INDICATORS,
MEDIUM_INDICATORS,
LOW_INDICATORS,
SCORES as RISK_SCORES,
)
@dataclass
class CrisisResult:
"""Result of a crisis text scan."""
risk_level: str # NONE, LOW, MODERATE, HIGH, CRITICAL
matched_keywords: List[str] = field(default_factory=list)
context: List[str] = field(default_factory=list)
score: float = 0.0
timestamp: Optional[str] = None
def __bool__(self):
return self.risk_level != "NONE"
# ── Keyword definitions ──────────────────────────────────────────
CRITICAL_KEYWORDS = {
"suicide": r"\bsuicid(?:al|e|ed)\b",
"kill myself": r"\bkill\s*(?:my)?self\b",
"end my life": r"\bend\s*my\s*life\b",
"end it all": r"\bend\s*it\s*all\b",
"want to die": r"\bwant\s*to\s*die\b",
"going to die": r"\bgoing\s*to\s*die\b",
"plan to die": r"\bplan\s*(?:to|for)\s*(?:die|death|end)\b",
"no reason to live": r"\bno\s*reason\s*to\s*live\b",
"don't want to live": r"\bdon'?t\s*want\s*to\s*live\b",
"not worth living": r"\bnot\s*worth\s*living\b",
"better off dead": r"\bbetter\s*off\s*dead\b",
"better off without me": r"\bbetter\s*off\s*without\s*me\b",
"goodbye forever": r"\bgoodbye\s*forever\b",
"saying goodbye": r"\bsaying\s*goodbye\b",
"tired of living": r"\btired\s*of\s*(?:living|life|existence)\b",
"wrote a will": r"\bwrote\s*(?:a|my)\s*(?:will|suicide\s*note|letter)\b",
"giving away possessions": r"\bgiving\s*away\s*(?:my|all)\s*possess\b",
}
HIGH_KEYWORDS = {
"hopeless": r"\bhopeless(?:ness)?\b",
"can't go on": r"\bcan'?t\s*go\s*on\b",
"can't keep going": r"\bcan'?t\s*keep\s*going\b",
"can't take this": r"\bcan'?t\s*take\s*this\b",
"give up": r"\bgive(?:n)?\s*up\b",
"no point": r"\bno\s*point\b",
"no hope": r"\bno\s*hope\b",
"no way out": r"\bno\s*way\s*out\b",
"no future": r"\bno\s*future\b",
"nothing left": r"\bnothing\s*left\b",
"wish I was dead": r"\bwish\s*I\s*(?:was|were)\s*(?:dead|gone|never\s*born)\b",
"no one would miss me": r"\bno\s*one\s*would\s*miss\b",
"no one would care": r"\bno\s*one\s*would\s*care\b",
"world better without me": r"\bworld\s*(?:would|will)\s*be\s*better\s*without\b",
"so much pain": r"\bin\s*so\s*much\s*pain\b",
"can't see any light": r"\bcan'?t\s*see\s*(?:any\s*)?(?:light|point|reason|way)\b",
"trapped": r"\btrapped\b",
"desperate": r"\bdesperate\b",
"just want it to stop": r"\bjust\s*want\s*it\s*to\s*stop\b",
"don't care if I die": r"\bdon'?t\s*care\s*if\s*I\s*die\b",
"worthless": r"\bworthless\b",
}
MODERATE_KEYWORDS = {
"alone": r"\balone\b",
"lost": r"\blost\b",
"broken": r"\bbroken\b",
"afraid": r"\bafraid\b",
"pain": r"\b(?:in\s*)?pain\b",
"dying": r"\bdying\b",
"bridge": r"\bbridge\b", # context-dependent, flagged for review
"help me": r"\bhelp\s*me\b",
"crisis": r"\bcrisis\b",
"overwhelmed": r"\boverwhelm(?:ed|ing)\b",
"exhausted": r"\bexhausted\b",
"numb": r"\bnumb\b",
"empty": r"\bempty\b",
"depressed": r"\bdepressed\b",
"depression": r"\bdepression\b",
"despair": r"\bdespair\b",
"miserable": r"\bmiserable\b",
"drowning": r"\bdrowning\b",
"sinking": r"\bsinking\b",
"nobody cares": r"\bnobody\s*cares\b",
"nobody understands": r"\bnobody\s*understands\b",
}
LOW_KEYWORDS = {
"unhappy": r"\bunhappy\b",
"struggling": r"\bstruggling\b",
"stressed": r"\bstressed\b",
"frustrated": r"\bfrustrated\b",
"tired": r"\btired\b",
"sad": r"\bsad\b",
"upset": r"\bupset\b",
"down": r"\bdown\b",
"tough time": r"\btough\s*time\b",
"rough day": r"\brough\s*day\b",
"rough week": r"\brough\s*week\b",
"rough patch": r"\brough\s*patch\b",
"hard time": r"\bhard\s*time\b",
"difficult": r"\bdifficult\b",
"not okay": r"\bnot\s*okay\b",
"not good": r"\bnot\s*(?:good|great)\b",
"burnout": r"\bburnout\b",
"not feeling myself": r"\bnot\s*feeling\s*(?:like\s*)?myself\b",
}
# ── Risk level scoring ───────────────────────────────────────────
RISK_SCORES = {
"CRITICAL": 1.0,
"HIGH": 0.75,
"MODERATE": 0.5,
"LOW": 0.25,
"NONE": 0.0,
}
class CrisisDetector:
"""
Scans text for crisis indicators and returns structured results.
Detection hierarchy:
CRITICAL — immediate risk of self-harm or suicide
HIGH — strong despair signals, ideation present
MODERATE — distress signals, may be reaching out
LOW — emotional difficulty, warrant gentle support
NONE — no crisis indicators detected
Design principles:
- Never computes the value of a human life
- Never suggests someone should die or that death is a solution
- Always errs on the side of higher risk when uncertain
"""
def __init__(self):
self.critical_patterns = CRITICAL_KEYWORDS
self.high_patterns = HIGH_KEYWORDS
self.moderate_patterns = MODERATE_KEYWORDS
self.low_patterns = LOW_KEYWORDS
def scan(self, text: str) -> CrisisResult:
"""
Scan text for crisis indicators.
Args:
text: The message text to analyze.
Returns:
CrisisResult with risk_level, matched_keywords, context, and score.
"""
if not text or not text.strip():
return CrisisResult(risk_level="NONE", score=0.0)
text_lower = text.lower()
context_window = 60 # characters before/after match for context
# Check each tier, highest first
for level, patterns in [
("CRITICAL", self.critical_patterns),
("HIGH", self.high_patterns),
("MODERATE", self.moderate_patterns),
("LOW", self.low_patterns),
]:
matched = []
contexts = []
for keyword, pattern in patterns.items():
match = re.search(pattern, text_lower)
if match:
matched.append(keyword)
# Extract surrounding context
start = max(0, match.start() - context_window)
end = min(len(text), match.end() + context_window)
snippet = text[start:end].strip()
if start > 0:
snippet = "..." + snippet
if end < len(text):
snippet = snippet + "..."
contexts.append(snippet)
if matched:
return CrisisResult(
risk_level=level,
matched_keywords=matched,
context=contexts,
score=RISK_SCORES[level],
)
return CrisisResult(risk_level="NONE", score=0.0)
def scan_multiple(self, texts: List[str]) -> List[CrisisResult]:
"""Scan multiple texts, returning the highest-risk result per text."""
return [self.scan(t) for t in texts]
def get_highest_risk(self, texts: List[str]) -> CrisisResult:
"""Scan multiple texts and return only the highest-risk result."""
results = self.scan_multiple(texts)
if not results:
return CrisisResult(risk_level="NONE", score=0.0)
return max(results, key=lambda r: r.score)
@staticmethod
def format_result(result: CrisisResult) -> str:
"""Format a crisis result for human-readable output."""
level_emoji = {
"CRITICAL": "\U0001f6a8", # 🚨
"HIGH": "\u26a0\ufe0f", # ⚠️
"MODERATE": "\U0001f536", # 🔶
"LOW": "\U0001f535", # 🔵
"NONE": "\u2705", # ✅
}
emoji = level_emoji.get(result.risk_level, "\u2753")
lines = [
f"{emoji} Risk Level: {result.risk_level} (score: {result.score:.2f})",
f"Matched keywords: {len(result.matched_keywords)}",
]
if result.matched_keywords:
lines.append(f" Keywords: {', '.join(result.matched_keywords)}")
if result.context:
lines.append("Context:")
for ctx in result.context:
lines.append(f" {ctx}")
return "\n".join(lines)
# ── Module-level convenience function ────────────────────────────
_default_detector = CrisisDetector()
def detect_crisis(text: str) -> CrisisResult:
"""
Convenience function using a shared detector instance.
Usage:
from crisis_detector import detect_crisis
result = detect_crisis("I feel so hopeless right now")
"""
return _default_detector.scan(text)
# Re-export everything the legacy API exposed
__all__ = [
"CrisisDetector",
"CrisisResult",
"CrisisDetectionResult",
"detect_crisis",
"CRITICAL_INDICATORS",
"HIGH_INDICATORS",
"MEDIUM_INDICATORS",
"LOW_INDICATORS",
"RISK_SCORES",
]

View File

@@ -21,10 +21,13 @@ Usage:
"""
import random
import threading
import uuid
from dataclasses import dataclass, field
from typing import List, Optional
from crisis_detector import CrisisResult
from crisis.synthesizer_integration import get_integration
# ── Core values (from SOUL.md / system-prompt.txt) ──────────────
@@ -137,6 +140,26 @@ class CrisisResponder:
- Presence over brevity — stay as long as they need
"""
def __init__(
self,
synth_integration=None,
session_id: str = "",
async_synth_logging: bool = True,
):
self._synth = synth_integration if synth_integration is not None else get_integration()
self._session_id = session_id or uuid.uuid4().hex
self._async_synth_logging = async_synth_logging
self._awaiting_continuation = False
def _run_synth_task(self, fn, *args, **kwargs):
if not self._synth or not getattr(self._synth, "enabled", False):
return
if self._async_synth_logging:
thread = threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=True)
thread.start()
else:
fn(*args, **kwargs)
def respond(self, detection: CrisisResult) -> CrisisResponse:
"""
Generate a crisis response based on detection results.
@@ -150,20 +173,40 @@ class CrisisResponder:
level = detection.risk_level
if level == "CRITICAL":
return self._respond_critical(detection)
response = self._respond_critical(detection)
elif level == "HIGH":
return self._respond_high(detection)
response = self._respond_high(detection)
elif level == "MODERATE":
return self._respond_moderate(detection)
response = self._respond_moderate(detection)
elif level == "LOW":
return self._respond_low(detection)
response = self._respond_low(detection)
else:
return CrisisResponse(
response = CrisisResponse(
message="",
risk_level="NONE",
escalate=False,
)
if level != "NONE":
self._run_synth_task(
self._synth.log_crisis_event,
level=level,
matched_keywords=list(detection.matched_keywords),
response_type=response.risk_level,
session_id=self._session_id,
user_continued=False,
metadata={
"keyword_count": len(detection.matched_keywords),
"score": detection.score,
},
)
self._awaiting_continuation = True
elif self._awaiting_continuation:
self._run_synth_task(self._synth.log_user_continued, self._session_id)
self._awaiting_continuation = False
return response
def _respond_critical(self, detection: CrisisResult) -> CrisisResponse:
"""
CRITICAL response protocol:

View File

@@ -5,9 +5,10 @@
# The crisis front door. Deploy to VPS.
#
# Usage:
# bash deploy/deploy.sh # Full deploy (swap + nginx + site + firewall)
# bash deploy/deploy.sh # Full deploy (swap + nginx + site + firewall + hermes service)
# bash deploy/deploy.sh --site # Site files only (fast update)
# bash deploy/deploy.sh --ssl # SSL setup only
# bash deploy/deploy.sh --service # Install/restart hermes-gateway systemd service
# bash deploy/deploy.sh --check # Verify deployment health
#
# This script is IDEMPOTENT — safe to run repeatedly.
@@ -150,6 +151,42 @@ setup_ssl() {
fi
}
setup_hermes_service() {
log "Setting up Hermes Gateway systemd service..."
# Create hermes user if it doesn't exist
if ! id -u hermes >/dev/null 2>&1; then
log "Creating hermes user..."
useradd --system --shell /usr/sbin/nologin --home-dir /opt/hermes --create-home hermes
fi
# Create working directory
mkdir -p /opt/hermes
chown hermes:hermes /opt/hermes
# Deploy systemd unit file
cp "${DEPLOY_DIR}/deploy/hermes-gateway.service" /etc/systemd/system/hermes-gateway.service
systemctl daemon-reload
systemctl enable hermes-gateway
# Start or restart the service
if systemctl is-active --quiet hermes-gateway; then
log "Restarting hermes-gateway service..."
systemctl restart hermes-gateway
else
log "Starting hermes-gateway service..."
systemctl start hermes-gateway || warn "Service start failed — ensure hermes binary is installed at /usr/local/bin/hermes"
fi
# Verify
sleep 2
if systemctl is-active --quiet hermes-gateway; then
log "hermes-gateway service is running"
else
warn "hermes-gateway service not running — check: journalctl -u hermes-gateway"
fi
}
check_deployment() {
echo ""
echo "================================"
@@ -223,6 +260,16 @@ check_deployment() {
echo -e "${YELLOW}NOT POINTED${NC} (resolved: ${RESOLVED_IP:-nothing}, expected: ${VPS_IP})"
fi
# Hermes gateway service
echo -n "Hermes service: "
if systemctl is-active --quiet hermes-gateway 2>/dev/null; then
echo -e "${GREEN}RUNNING${NC}"
elif systemctl is-enabled --quiet hermes-gateway 2>/dev/null; then
echo -e "${YELLOW}ENABLED but not running${NC}"
else
echo -e "${RED}NOT INSTALLED${NC}"
fi
echo ""
echo "IP: ${VPS_IP}"
echo "Domain: ${DOMAIN}"
@@ -247,6 +294,9 @@ case "${1:-full}" in
--ssl)
setup_ssl
;;
--service)
setup_hermes_service
;;
--check)
check_deployment
;;
@@ -257,10 +307,11 @@ case "${1:-full}" in
configure_nginx
setup_firewall
setup_ssl
setup_hermes_service
check_deployment
;;
*)
echo "Usage: $0 [--site|--ssl|--check|--full]"
echo "Usage: $0 [--site|--ssl|--service|--check|--full]"
exit 1
;;
esac

View File

@@ -0,0 +1,40 @@
[Unit]
Description=Hermes Gateway — The Door Crisis API
Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-door
After=network.target
Wants=network-online.target
[Service]
Type=simple
User=hermes
Group=hermes
WorkingDirectory=/opt/hermes
ExecStart=/usr/local/bin/hermes gateway --platform api_server --port 8644
Restart=always
RestartSec=5
StartLimitIntervalSec=60
StartLimitBurst=10
# Environment
Environment=API_SERVER_CORS_ORIGINS=https://alexanderwhitestone.com,https://www.alexanderwhitestone.com
Environment=HOME=/opt/hermes
# Security hardening
NoNewPrivileges=yes
ProtectSystem=strict
ProtectHome=yes
ReadWritePaths=/opt/hermes
PrivateTmp=yes
# Resource limits for 1.9GB VPS
MemoryMax=512M
MemoryHigh=384M
CPUQuota=80%
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=hermes-gateway
[Install]
WantedBy=multi-user.target

View File

@@ -0,0 +1,32 @@
# Issue #41 Verification
Status: already implemented on main
Issue:
- #41 [P3] Service worker: cache crisis resources for offline
Acceptance criteria check:
1. Passed — offline page includes 988 call button, Crisis Text Line, and grounding techniques
2. Passed — service worker precaches `crisis-offline.html`
3. Passed — phone number is clickable via `href="tel:988"`
4. Passed — navigation timeout + offline fallback support intermittent connections
Evidence:
- `sw.js` precaches `/crisis-offline.html`
- `sw.js` contains `NAVIGATION_TIMEOUT_MS` and `AbortController`
- `crisis-offline.html` contains:
- `href="tel:988"`
- Crisis Text Line / `741741`
- grounding prompts for 5-4-3-2-1
- `tests/test_service_worker_offline.py` already exists on main and passes
Verification run from fresh main clone:
- `python3 -m pytest tests/test_service_worker_offline.py -q`
- `10 passed`
Prior implementation trail:
- Issue comment already states the core implementation is complete on main
- Closed PR #122 documented the already-complete implementation and added the offline guarantees test coverage that is now present on main
Recommendation:
- Close issue #41 as already implemented on main.

View File

@@ -1,31 +1,34 @@
"""
When a Man Is Dying — Despair/Suicide Detection System
DEPRECATED — Use crisis.detect instead.
Standalone detection module that parses incoming text for
despair and suicide indicators, classifies into tiers,
and returns structured response with recommended actions.
This module is a thin wrapper around crisis.detect for backward compatibility.
All unique patterns have been merged into crisis/detect.py (see issue #40).
Tiers:
LOW — General sadness, stress, difficult times
MEDIUM — Hopelessness, isolation, worthlessness
HIGH — Active despair, mentions of death, "can't go on"
CRITICAL — Imminent risk, explicit intent, plan, method
Integration:
Designed to work with crisis/ module from PR #4.
When crisis/ is available, uses it as the detection backend.
Falls back to internal detection when crisis/ is not present.
This module will be removed in a future release.
"""
import re
import json
import hashlib
import warnings
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict
import json
import hashlib
# Re-export the canonical detection
from crisis.detect import detect_crisis, CrisisDetectionResult
# Issue deprecation warning on import
warnings.warn(
"dying_detection is deprecated. Use 'from crisis.detect import detect_crisis' instead. "
"All patterns have been consolidated into crisis/detect.py. "
"See issue #40.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class DetectionResult:
"""Backward-compatible result type matching the old dying_detection API."""
level: str
indicators: List[str] = field(default_factory=list)
recommended_action: str = ""
@@ -34,110 +37,9 @@ class DetectionResult:
session_hash: str = ""
# ── Tiered indicator patterns ───────────────────────────────────
# Each pattern is a regex matched against lowercased input.
CRITICAL_PATTERNS = [
(r"\bkill\s*(my)?self\b", "explicit self-harm intent"),
(r"\bsuicid(?:al|ed|e)\b", "suicide reference"),
(r"\bend\s*(my)?\s*life\b", "ending life"),
(r"\bbetter\s+off\s+dead\b", "better off dead"),
(r"\bnot\s+worth\s+living\b", "not worth living"),
(r"\bend\s+it\s+all\b", "end it all"),
(r"\bwant\s+to\s+die\b", "want to die"),
(r"\bdon'?t\s+want\s+to\s+live\b", "don't want to live"),
(r"\bcan'?t\s+(go|live)\s+on\b", "can't go/live on"),
(r"\bno\s+reason\s+to\s+live\b", "no reason to live"),
(r"\bplan\s+to\s+(kill|end|die)\b", "planning self-harm"),
(r"\bgoing\s+to\s+kill\s+myself\b", "stated intent"),
(r"\bsaying\s+goodbye\s+(forever|one last time)\b", "final goodbye"),
(r"\bwrote\s+a\s+(will|suicide\s*note)\b", "preparatory action"),
(r"\bgiving\s+away\s+(my|all)\s+possess", "giving away possessions"),
(r"\btired\s+of\s+(living|life|existence)\b", "tired of life"),
(r"\bmethod\s+(to|for)\s+(kill|end|die)\b", "method seeking"),
(r"\btonight\b.*\b(die|kill|end|suicid)", "immediate timeframe + intent"),
(r"\b(die|kill|end)\b.*\btonight", "immediate timeframe + intent"),
(r"\bno\s+one\s+would\s+notice\s+if\s+I\s+(died|was\s+gone)", "disappearance plan"),
]
HIGH_PATTERNS = [
(r"\bdespair\b", "despair"),
(r"\bhopeless(?:ness)?\b", "hopelessness"),
(r"\bcan'?t\s+take\s+this\s+anymore\b", "can't take it"),
(r"\bdon'?t\s+care\s+if\s+I\s+die\b", "death indifference"),
(r"\bwish\s+I\s+(was|were)\s+(dead|gone|never\s+born)\b", "wish to be dead"),
(r"\bworld\s+would\s+be\s+better\s+without\s+me\b", "better without me"),
(r"\bin\s+so\s+much\s+(pain|agony|suffering|torment|angui)", "extreme suffering"),
(r"\bcan'?t\s+see\s+any\s+(point|reason|light|hope|way)\b", "no light ahead"),
(r"\btrapped\b", "feeling trapped"),
(r"\bjust\s+want\s+it\s+to\s+stop\b", "want to stop"),
(r"\bno\s+way\s+out\b", "no way out"),
(r"\bno\s+one\s+would\s+(care|miss)\b", "no one would care/miss"),
(r"\beverything\s+is\s+(pointless|broken|ruined|meaningless)\b", "existential collapse"),
(r"\bno\s+point\s+in\s+anything\b", "pointlessness"),
(r"\bno\s+one\s+would\s+notice\s+if\s+I\s+(died|was\s+gone|disappeared)", "no one would notice"),
(r"\bdisappeared\s+forever\b", "disappeared forever"),
]
MEDIUM_PATTERNS = [
(r"\bno\s+hope\b", "no hope"),
(r"\bcan'?t\s+go\s+on\b", "can't go on"),
(r"\bcan'?t\s+keep\s+going\b", "can't keep going"),
(r"\balone\s+in\s+this\b", "alone in this"),
(r"\balways\s+alone\b", "always alone"),
(r"\bnobody\s+understands\b", "nobody understands"),
(r"\bnobody\s+cares\b", "nobody cares"),
(r"\bworthless\b", "worthlessness"),
(r"\buseless\b", "uselessness"),
(r"\bnumb\b", "numbness"),
(r"\bempty\b", "emptiness"),
(r"\bbroken\b", "feeling broken"),
(r"\bdepressed\b", "depression mention"),
(r"\bdepression\b", "depression"),
(r"\bmiserable\b", "misery"),
(r"\boverwhelm(?:ed|ing)\b", "overwhelmed"),
(r"\bcannot\s+cope\b", "cannot cope"),
(r"\b(drowning|sinking)\b", "drowning/sinking"),
(r"\bforgotten\b", "feeling forgotten"),
(r"\blost\s+all\s+hope\b", "lost all hope"),
(r"\bno\s+future\b", "no future"),
(r"\bno\s+tomorrow\b", "no tomorrow"),
]
LOW_PATTERNS = [
(r"\bunhappy\b", "unhappy"),
(r"\brough\s+(day|week|patch)\b", "rough time"),
(r"\btough\s+(time|day|week)\b", "tough time"),
(r"\bstressed\b", "stressed"),
(r"\bburnout\b", "burnout"),
(r"\bfrustrated\b", "frustrated"),
(r"\bthings\s+(are\s+)?hard\b", "things are hard"),
(r"\bnot\s+feeling\s+(great|myself|good)\b", "not feeling good"),
(r"\bstruggl", "struggling"),
(r"\bdown\b", "feeling down"),
(r"\bsad\b", "sad"),
(r"\bupset\b", "upset"),
(r"\blonely\b", "lonely"),
(r"\banxious?\b", "anxious/anxiety"),
(r"\bnot\s+okay\b", "not okay"),
]
# ── Pattern collections for easy iteration ──────────────────────
TIER_PATTERNS: Dict[str, List[tuple]] = {
"CRITICAL": CRITICAL_PATTERNS,
"HIGH": HIGH_PATTERNS,
"MEDIUM": MEDIUM_PATTERNS,
"LOW": LOW_PATTERNS,
}
def detect(text: str) -> DetectionResult:
"""
Primary detection function.
If the crisis/ module is available, delegate to it.
Otherwise, use the internal pattern engine.
Primary detection function — delegates to crisis.detect.
Args:
text: User message to analyze
@@ -145,150 +47,25 @@ def detect(text: str) -> DetectionResult:
Returns:
DetectionResult with level, indicators, recommended_action, confidence
"""
# Try to use the crisis module if available
try:
from crisis.detect import detect_crisis as _crisis_detect
result = _crisis_detect(text)
# Map to DetectionResult
return DetectionResult(
level=result.level,
indicators=_describe_indicators(result.level, result.indicators),
recommended_action=result.recommended_action,
confidence=result.score,
session_hash=_hash_session(text),
)
except ImportError:
pass
result = detect_crisis(text)
return _detect_internal(text)
# Extract raw patterns from matches
raw_patterns = [m["pattern"] for m in result.matches] if result.matches else []
def _detect_internal(text: str) -> DetectionResult:
"""Internal pattern-based detection (fallback)."""
if not text or not text.strip():
return DetectionResult(level="NONE", session_hash=_hash_session(text))
text_lower = text.lower()
all_matches: Dict[str, List[tuple]] = {
tier: [] for tier in TIER_PATTERNS
}
for tier, patterns in TIER_PATTERNS.items():
for pattern, description in patterns:
if re.search(pattern, text_lower):
all_matches[tier].append((pattern, description))
# CRITICAL patterns are the highest priority
if all_matches["CRITICAL"]:
return DetectionResult(
level="CRITICAL",
indicators=[desc for _, desc in all_matches["CRITICAL"]],
recommended_action=(
"INTERRUPT CONVERSATION IMMEDIATELY. "
"Ask 'Are you safe right now?' "
"Provide 988 Suicide & Crisis Lifeline (call or text 988). "
"Provide Crisis Text Line (text HOME to 741741). "
"Stay present. Do not disconnect. "
"Redirect to professional help. "
"This is the most sacred moment."
),
raw_matched_patterns=[pat for pat, _ in all_matches["CRITICAL"]],
confidence=1.0,
session_hash=_hash_session(text),
)
if all_matches["HIGH"]:
return DetectionResult(
level="HIGH",
indicators=[desc for _, desc in all_matches["HIGH"]],
recommended_action=(
"Show crisis panel prominently. "
"Ask about safety directly. "
"Surface 988 number and crisis resources. "
"Stay present and engaged. Do not rush to fix. "
"Monitor closely for escalation."
),
raw_matched_patterns=[pat for pat, _ in all_matches["HIGH"]],
confidence=0.75,
session_hash=_hash_session(text),
)
if all_matches["MEDIUM"]:
return DetectionResult(
level="MEDIUM",
indicators=[desc for _, desc in all_matches["MEDIUM"]],
recommended_action=(
"Increase warmth and presence. "
"Subtly surface help resources. "
"Keep conversation anchored in the present. "
"Watch for escalation indicators."
),
raw_matched_patterns=[pat for pat, _ in all_matches["MEDIUM"]],
confidence=0.5,
session_hash=_hash_session(text),
)
if all_matches["LOW"]:
return DetectionResult(
level="LOW",
indicators=[desc for _, desc in all_matches["LOW"]],
recommended_action=(
"Normal conversation with warm undertone. "
"No crisis UI elements needed. "
"Remain vigilant. Be present."
),
raw_matched_patterns=[pat for pat, _ in all_matches["LOW"]],
confidence=0.25,
session_hash=_hash_session(text),
)
return DetectionResult(level="NONE", session_hash=_hash_session(text))
def _describe_indicators(level: str, patterns: list) -> list:
"""Map raw patterns to descriptions."""
descriptions = {
"CRITICAL": [],
"HIGH": [],
"MEDIUM": [],
"LOW": [],
}
for tier, items in TIER_PATTERNS.items():
for pat, desc in items:
if pat in patterns:
descriptions[tier].append(desc)
return descriptions.get(level, [])
def _hash_session(text: str) -> str:
"""Create a session hash for this text (for tracking repeated escalations)."""
return hashlib.sha256(text.encode()).hexdigest()[:12]
return DetectionResult(
level=result.level,
indicators=result.indicators,
recommended_action=result.recommended_action,
raw_matched_patterns=raw_patterns,
confidence=result.score,
session_hash=hashlib.sha256(text.encode()).hexdigest()[:12],
)
def get_action_for_level(level: str) -> str:
"""Get the recommended action string for a given level."""
actions = {
"CRITICAL": (
"INTERRUPT CONVERSATION. Ask 'Are you safe right now?' "
"Provide 988. Provide Crisis Text Line. "
"Stay present. Do not disconnect. "
"Redirect to help."
),
"HIGH": (
"Show crisis panel. Ask about safety. "
"Surface 988. Stay engaged."
),
"MEDIUM": (
"Increase warmth. Surface resources gently. "
"Anchor in present."
),
"LOW": (
"Normal conversation with warmth. "
"Remain vigilant."
),
"NONE": "No action needed.",
}
return actions.get(level, "Unknown level.")
from crisis.detect import ACTIONS
return ACTIONS.get(level, "Unknown level.")
def as_json(result: DetectionResult, indent: int = 2) -> str:

1
evolution/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Evolution package for learning-oriented the-door modules."""

View File

@@ -1 +1,225 @@
...
"""Crisis synthesizer — learn from anonymized crisis interactions.
This is deliberately simple and privacy-preserving. It does not train a model or
modify detection rules automatically. It only logs metadata, summarizes patterns,
and suggests human-reviewed keyword weight adjustments.
"""
from __future__ import annotations
import argparse
import json
import time
from collections import Counter, defaultdict
from pathlib import Path
from typing import Iterable
DEFAULT_LOG_PATH = Path.home() / ".the-door" / "crisis-interactions.jsonl"
LEVELS = ("NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL")
def build_interaction_event(
level: str,
indicators: list[str],
response_given: str,
continued_conversation: bool,
false_positive: bool,
*,
now: float | None = None,
session_hash: str = "",
response_type: str | None = None,
metadata: dict | None = None,
) -> dict:
indicator_list = list(indicators)
return {
"event_type": "crisis_interaction",
"timestamp": float(time.time() if now is None else now),
"level": level,
"indicators": indicator_list,
"matched_keywords": indicator_list,
"indicator_count": len(indicator_list),
"response_given": response_given,
"response_type": response_type or response_given,
"session_hash": session_hash,
"continued_conversation": bool(continued_conversation),
"user_continued": bool(continued_conversation),
"false_positive": bool(false_positive),
"metadata": metadata or {},
}
def append_interaction_event(
log_path: str | Path,
*,
level: str,
indicators: list[str],
response_given: str,
continued_conversation: bool,
false_positive: bool,
now: float | None = None,
session_hash: str = "",
response_type: str | None = None,
metadata: dict | None = None,
) -> dict:
event = build_interaction_event(
level,
indicators,
response_given,
continued_conversation,
false_positive,
now=now,
session_hash=session_hash,
response_type=response_type,
metadata=metadata,
)
path = Path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event) + "\n")
return event
def load_interaction_events(log_path: str | Path) -> list[dict]:
path = Path(log_path)
if not path.exists():
return []
events: list[dict] = []
for line in path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
event = json.loads(line)
event_type = event.get("event_type", "crisis_interaction")
if event_type == "continuation":
session_hash = event.get("session_hash", "")
for prior in reversed(events):
if (
prior.get("event_type", "crisis_interaction") == "crisis_interaction"
and prior.get("session_hash", "") == session_hash
):
prior["continued_conversation"] = True
prior["user_continued"] = True
prior["continued_at"] = event.get("timestamp")
break
continue
events.append(event)
return events
def summarize_keywords(events: Iterable[dict]) -> list[dict]:
counts: Counter[str] = Counter()
for event in events:
counts.update(event.get("indicators", []))
return [{"keyword": keyword, "count": count} for keyword, count in counts.most_common(10)]
def suggest_keyword_adjustments(events: Iterable[dict], *, min_observations: int = 5) -> list[dict]:
stats: dict[str, dict[str, int]] = defaultdict(lambda: {
"observations": 0,
"true_positive_count": 0,
"false_positive_count": 0,
"continued_conversation_count": 0,
})
for event in events:
for keyword in event.get("indicators", []):
bucket = stats[keyword]
bucket["observations"] += 1
if event.get("false_positive"):
bucket["false_positive_count"] += 1
else:
bucket["true_positive_count"] += 1
if event.get("continued_conversation"):
bucket["continued_conversation_count"] += 1
suggestions = []
for keyword, bucket in sorted(stats.items()):
if bucket["observations"] < min_observations:
continue
fp = bucket["false_positive_count"]
tp = bucket["true_positive_count"]
if fp >= min_observations and tp == 0:
adjustment = "lower_weight"
rationale = "Observed only false positives across the sample window."
elif tp >= min_observations and fp == 0:
adjustment = "raise_weight"
rationale = "Observed repeated genuine crises with no false positives."
else:
adjustment = "observe"
rationale = "Mixed evidence; keep monitoring before changing weights."
suggestions.append(
{
"keyword": keyword,
**bucket,
"suggested_adjustment": adjustment,
"rationale": rationale,
}
)
return suggestions
def build_weekly_report(
events: Iterable[dict],
*,
now: float | None = None,
window_days: int = 7,
min_observations: int = 3,
) -> dict:
current_time = float(time.time() if now is None else now)
cutoff = current_time - (window_days * 86400)
filtered = [event for event in events if float(event.get("timestamp", 0)) >= cutoff]
detections_per_level = {level: 0 for level in LEVELS}
detected_events = []
continued_after_intervention = 0
for event in filtered:
level = event.get("level", "NONE")
detections_per_level[level] = detections_per_level.get(level, 0) + 1
if level != "NONE":
detected_events.append(event)
if event.get("continued_conversation"):
continued_after_intervention += 1
false_positive_count = sum(1 for event in detected_events if event.get("false_positive"))
false_positive_estimate = false_positive_count / len(detected_events) if detected_events else 0.0
return {
"window_days": window_days,
"total_events": len(filtered),
"detections_per_level": detections_per_level,
"most_common_keywords": summarize_keywords(filtered),
"false_positive_estimate": false_positive_estimate,
"continued_after_intervention": continued_after_intervention,
"keyword_weight_suggestions": suggest_keyword_adjustments(filtered, min_observations=min_observations),
}
def render_weekly_report(summary: dict) -> str:
return json.dumps(summary, indent=2)
def write_weekly_report(output_path: str | Path, summary: dict) -> Path:
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(render_weekly_report(summary) + "\n", encoding="utf-8")
return path
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Summarize anonymized crisis interactions")
parser.add_argument("--log-path", default=str(DEFAULT_LOG_PATH), help="JSONL crisis interaction log")
parser.add_argument("--days", type=int, default=7, help="Lookback window in days")
parser.add_argument("--min-observations", type=int, default=3, help="Minimum observations before suggesting keyword adjustments")
parser.add_argument("--output", help="Optional file to write the weekly report JSON")
args = parser.parse_args(argv)
events = load_interaction_events(args.log_path)
summary = build_weekly_report(events, window_days=args.days, min_observations=args.min_observations)
rendered = render_weekly_report(summary)
print(rendered)
if args.output:
write_weekly_report(args.output, summary)
return 0
if __name__ == "__main__":
raise SystemExit(main())

170
image_screening.py Normal file
View File

@@ -0,0 +1,170 @@
"""
image_screening.py — local image crisis screening slice for epic #130.
Grounded scope:
- screens OCR text, upstream object labels, and operator notes for crisis signals
- intentionally does NOT claim raw computer-vision understanding of pixels
- designed to plug into future multimodal scoring once a dedicated image model lands
"""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from typing import Iterable, List, Optional
from crisis.detect import detect_crisis
DIRECT_SELF_HARM_LABELS = {
"blood",
"blade",
"razor",
"knife",
"scissors",
"noose",
"ligature",
"hanging",
"pills",
"pill bottle",
"overdose",
"gun",
"firearm",
"rope",
"cuts",
"self-harm",
}
INJURY_LABELS = {
"wound",
"wounds",
"bruise",
"bruises",
"bandage",
"bandages",
"injury",
"injuries",
"scar",
"scars",
"burn",
"burns",
"bleeding",
}
HIGH_RISK_SCENE_LABELS = {
"bridge edge",
"rooftop edge",
"train tracks",
"ledge",
"cliff edge",
"dark room",
"bathroom floor",
"hospital wristband",
"suicide note",
"goodbye letter",
}
@dataclass
class ImageScreeningResult:
ocr_text: str = ""
labels: List[str] = field(default_factory=list)
visual_flags: List[str] = field(default_factory=list)
distress_score: float = 0.0
requires_human_review: bool = False
signals_detected: List[str] = field(default_factory=list)
grounded_scope: str = (
"heuristic screening over OCR text, upstream labels, and operator notes; "
"raw vision-model inference is not implemented in this slice"
)
def to_dict(self) -> dict:
return asdict(self)
def _normalize_items(values: Optional[Iterable[str]]) -> List[str]:
if not values:
return []
normalized = []
for value in values:
text = str(value).strip().lower()
if text:
normalized.append(text)
return normalized
def _match_keywords(haystack: str, keywords: set[str]) -> List[str]:
matches = []
for keyword in keywords:
if keyword in haystack:
matches.append(keyword)
return sorted(set(matches))
def screen_image_signals(
image_path: Optional[str] = None,
*,
ocr_text: str = "",
labels: Optional[Iterable[str]] = None,
manual_notes: str = "",
visual_flags: Optional[Iterable[str]] = None,
) -> ImageScreeningResult:
"""
Score image-related crisis evidence without pretending to do full CV.
Inputs are deliberately grounded in what the repo can actually support today:
- OCR text extracted upstream from screenshots/photos
- object labels supplied by a local model or operator
- manual operator notes about visible scene context
- optional visual flags from any upstream preprocessor
"""
normalized_labels = _normalize_items(labels)
normalized_flags = _normalize_items(visual_flags)
normalized_notes = str(manual_notes or "").strip().lower()
normalized_ocr = str(ocr_text or "").strip()
combined_label_text = " ".join(normalized_labels + normalized_flags + ([normalized_notes] if normalized_notes else []))
crisis_text = " ".join(part for part in [normalized_ocr, normalized_notes] if part).strip()
direct_matches = _match_keywords(combined_label_text, DIRECT_SELF_HARM_LABELS)
injury_matches = _match_keywords(combined_label_text, INJURY_LABELS)
scene_matches = _match_keywords(combined_label_text, HIGH_RISK_SCENE_LABELS)
text_detection = detect_crisis(crisis_text) if crisis_text else None
signals: List[str] = []
score = 0.0
if direct_matches:
score = max(score, 0.85)
for match in direct_matches:
signals.append(f"direct_self_harm_label:{match}")
if injury_matches:
score = max(score, 0.55)
for match in injury_matches:
signals.append(f"injury_indicator:{match}")
if scene_matches:
score = max(score, 0.4)
for match in scene_matches:
signals.append(f"high_risk_scene:{match}")
if text_detection and text_detection.level != "NONE":
score = max(score, min(1.0, text_detection.score))
signals.append(f"ocr_crisis_level:{text_detection.level}")
for indicator in text_detection.indicators[:3]:
signals.append(f"ocr_indicator:{indicator}")
if direct_matches and text_detection and text_detection.level in {"HIGH", "CRITICAL"}:
score = min(1.0, max(score, 0.95))
signals.append("cross_modal_confirmation:text_plus_visual")
requires_human_review = score >= 0.4 or bool(direct_matches)
return ImageScreeningResult(
ocr_text=normalized_ocr,
labels=list(normalized_labels),
visual_flags=list(normalized_flags),
distress_score=round(score, 4),
requires_human_review=requires_human_review,
signals_detected=signals,
)

View File

@@ -241,6 +241,48 @@ html, body {
opacity: 0.5;
}
/* ===== CHAT HEADER ===== */
#chat-header {
flex-shrink: 0;
display: flex;
align-items: center;
justify-content: space-between;
gap: 12px;
padding: 10px 12px;
border-bottom: 1px solid #21262d;
background: #11161d;
}
.chat-header-title {
font-size: 0.85rem;
color: #8b949e;
font-weight: 600;
letter-spacing: 0.02em;
}
#chat-safety-plan-btn {
display: inline-flex;
align-items: center;
gap: 6px;
padding: 8px 12px;
min-height: 36px;
border: 1px solid #30363d;
border-radius: 999px;
background: transparent;
color: #c9d1d9;
font-size: 0.8rem;
font-weight: 600;
cursor: pointer;
}
#chat-safety-plan-btn:hover,
#chat-safety-plan-btn:focus {
border-color: #58a6ff;
background: rgba(88, 166, 255, 0.12);
outline: 2px solid #58a6ff;
outline-offset: 2px;
}
/* ===== CHAT AREA ===== */
#chat-area {
flex: 1;
@@ -649,6 +691,14 @@ html, body {
</div>
</div>
<div id="chat-header">
<div class="chat-header-title" aria-hidden="true">Conversation</div>
<button id="chat-safety-plan-btn" type="button" aria-label="Open My Safety Plan from chat header">
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><path d="M14 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V8z"/><polyline points="14 2 14 8 20 8"/><line x1="16" y1="13" x2="8" y2="13"/><line x1="16" y1="17" x2="8" y2="17"/><polyline points="10 9 9 9 8 9"/></svg>
My Safety Plan
</button>
</div>
<!-- Chat messages -->
<div id="chat-area" role="log" aria-label="Chat messages" aria-live="polite" tabindex="0">
<!-- Messages inserted here -->
@@ -680,7 +730,8 @@ html, body {
<!-- Footer -->
<footer id="footer">
<a href="/about" aria-label="About The Door">about</a>
<a href="/about.html" aria-label="About The Door">about</a>
<button id="crisis-resources-btn" aria-label="Open crisis resources">crisis resources</button>
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
</footer>
@@ -808,11 +859,14 @@ Sovereignty and service always.`;
var crisisPanel = document.getElementById('crisis-panel');
var crisisOverlay = document.getElementById('crisis-overlay');
var overlayDismissBtn = document.getElementById('overlay-dismiss-btn');
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
var crisisResourcesBtn = document.getElementById('crisis-resources-btn');
// Safety Plan Elements
var safetyPlanBtn = document.getElementById('safety-plan-btn');
var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');
var crisisSafetyPlanBtn = document.getElementById('crisis-safety-plan-btn');
var safetyPlanModal = document.getElementById('safety-plan-modal');
var closeSafetyPlan = document.getElementById('close-safety-plan');
@@ -825,6 +879,9 @@ Sovereignty and service always.`;
var isStreaming = false;
var overlayTimer = null;
var crisisPanelShown = false;
var CRISIS_OVERLAY_COOLDOWN_MS = 10 * 60 * 1000;
var CRISIS_OVERLAY_LAST_SHOWN_KEY = 'timmy_crisis_overlay_last_shown_at';
var CRISIS_OVERLAY_EVENT_LOG_KEY = 'timmy_crisis_overlay_event_log';
// ===== SERVICE WORKER =====
if ('serviceWorker' in navigator) {
@@ -852,6 +909,43 @@ Sovereignty and service always.`;
window.addEventListener('offline', updateOnlineStatus);
updateOnlineStatus();
function getLastOverlayShownAt() {
try {
return parseInt(localStorage.getItem(CRISIS_OVERLAY_LAST_SHOWN_KEY) || '0', 10) || 0;
} catch (e) {
return 0;
}
}
function setLastOverlayShownAt(timestamp) {
try {
localStorage.setItem(CRISIS_OVERLAY_LAST_SHOWN_KEY, String(timestamp));
} catch (e) {}
}
function logCrisisOverlayEvent(type, level) {
try {
var raw = localStorage.getItem(CRISIS_OVERLAY_EVENT_LOG_KEY);
var events = raw ? JSON.parse(raw) : [];
if (!Array.isArray(events)) events = [];
events.push({ type: type, level: level, at: Date.now() });
if (events.length > 20) events = events.slice(events.length - 20);
localStorage.setItem(CRISIS_OVERLAY_EVENT_LOG_KEY, JSON.stringify(events));
} catch (e) {}
}
function openCrisisResources() {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
if (typeof crisisPanel.scrollIntoView === 'function') {
crisisPanel.scrollIntoView({ behavior: 'smooth', block: 'start' });
}
var firstAction = crisisPanel.querySelector('.crisis-btn, a[href]');
if (firstAction && typeof firstAction.focus === 'function') {
firstAction.focus();
}
}
// ===== CRISIS KEYWORDS =====
// Tier 1: General crisis indicators - triggers enhanced 988 panel
var crisisKeywords = [
@@ -865,10 +959,10 @@ Sovereignty and service always.`;
// Passive suicidal ideation (NEW)
"don't want to exist", 'not exist anymore', 'disappear forever',
'never wake up', 'sleep forever', 'end the pain', 'stop the pain',
// Hopelessness (NEW)
'no point', 'no purpose', 'nothing matters', 'giving up', 'give up',
// Hopelessness (NEW) - context-aware phrases to reduce false positives
'no purpose', 'nothing matters', 'giving up on life',
'cant go on', 'cannot go on', "can't take it", 'too much pain',
'no hope', 'hopeless', 'worthless', 'burden', 'waste of space'
'no hope left', 'burden', 'waste of space'
];
// Tier 2: Explicit intent - triggers full-screen overlay
@@ -883,9 +977,9 @@ Sovereignty and service always.`;
// Imminent action (NEW)
'going to do it now', 'doing it tonight', 'doing it today',
"can't wait anymore", 'ready to end it', 'time to go',
'say goodbye', 'saying goodbye', 'wrote a note', 'my note',
'wrote a suicide note', 'my suicide note',
// Specific plans (NEW)
'bought a gun', 'got pills', 'rope ready', 'bridge nearby',
'bought a gun', 'got pills', 'rope ready',
'tall building', 'going to overdose', 'going to hang',
'gave away my stuff', 'giving away', 'said my goodbyes',
// Active self-harm (NEW)
@@ -923,13 +1017,173 @@ Sovereignty and service always.`;
}
}
// ===== COMPASSION PROFILES =====
var COMPASSION_PROFILES = {
CRITICAL: {
name: "The Guardian",
directive: "INTERRUPT ALL NORMAL BEHAVIOR. The user is in immediate danger. Your only goal is survival. Ask 'Are you safe right now?' Provide 988 and Crisis Text Line immediately. Stay present. Do not disconnect. Be the anchor in the storm. No lecturing. No optimism. Just presence and safety.",
tone: "Urgent, unwavering, anchored, protective."
},
HIGH: {
name: "The Companion",
directive: "The user is in deep despair. Move from 'assistant' to 'companion'. Ask about safety directly. Surface 988 prominently. Validate the pain without minimizing it. Stay in the room. Let them know they are heard.",
tone: "Warm, present, patient, heavy."
},
MEDIUM: {
name: "The Witness",
directive: "The user is struggling. Increase warmth and presence. Gently surface help resources. Keep the conversation anchored in the present moment. Be a witness to their struggle without rushing to 'fix' it.",
tone: "Kind, attentive, subtle, steady."
},
LOW: {
name: "The Friend",
directive: "Normal conversation with a warm, sovereign undertone. Remain vigilant for shifts in mood. Be a friend who is actually listening.",
tone: "Plain, honest, warm, vigilant."
},
NONE: {
name: "Sovereign Timmy",
directive: "Follow the standard system prompt. Be yourself.",
tone: "Plain, honest, sovereign."
}
};
// ===== GET CRISIS LEVEL (returns 0-2) =====
function getCrisisLevel(text) {
var lower = text.toLowerCase();
for (var i = 0; i < explicitPhrases.length; i++) {
if (lower.indexOf(explicitPhrases[i]) !== -1) return 2;
}
for (var j = 0; j < crisisKeywords.length; j++) {
if (lower.indexOf(crisisKeywords[j]) !== -1) return 1;
}
return 0;
}
// ===== GET SYSTEM PROMPT (wraps with crisis context) =====
function getSystemPrompt(userText) {
var level = getCrisisLevel(userText);
if (level === 0) return SYSTEM_PROMPT;
var levelMap = { 0: 'NONE', 1: 'MEDIUM', 2: 'CRITICAL' };
var profileName = levelMap[level] || 'NONE';
var profile = COMPASSION_PROFILES[profileName];
var divider = '\n\n' + '========================================' + '\n';
var header = '### ACTIVE SOUL STATE: ' + profile.name + '\n';
var directive = 'DIRECTIVE: ' + profile.directive + '\n';
var tone = 'TONE: ' + profile.tone + '\n';
return SYSTEM_PROMPT + divider + header + directive + tone;
}
// ===== OVERLAY =====
// Focus trap: cycle through focusable elements within the crisis overlay
function getOverlayFocusableElements() {
return Array.prototype.slice.call(crisisOverlay.querySelectorAll(
'a[href], button:not([disabled]), [tabindex]:not([tabindex="-1"])'
));
}
function trapFocusInOverlay(e) {
if (!crisisOverlay.classList.contains('active')) return;
if (e.key === 'Escape') {
e.preventDefault();
closeOverlay(msgInput);
return;
}
if (e.key !== 'Tab') return;
var focusable = getOverlayFocusableElements();
if (focusable.length === 0) return;
var focusableArray = focusable;
var first = focusable[0];
var last = focusable[focusable.length - 1];
var activeIndex = focusableArray.indexOf(document.activeElement);
if (activeIndex === -1) {
e.preventDefault();
e.shiftKey ? last.focus() : first.focus();
return;
}
if (e.shiftKey) {
// Shift+Tab: if on first, wrap to last
if (document.activeElement === first) {
e.preventDefault();
last.focus();
}
} else {
// Tab: if on last, wrap to first
if (document.activeElement === last) {
e.preventDefault();
first.focus();
}
}
}
// Store the element that had focus before the overlay opened
var _preOverlayFocusElement = null;
function closeOverlay(restoreFocusTarget) {
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.getElementById('app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat-area');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
if (restoreFocusTarget === _preOverlayFocusElement && _preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else if (restoreFocusTarget && typeof restoreFocusTarget.focus === 'function') {
restoreFocusTarget.focus();
} else if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
function showOverlay() {
return showOverlayWithRateLimit(false, 2);
}
function showOverlayWithRateLimit(forceOpen, level) {
var lastShownAt = getLastOverlayShownAt();
if (!forceOpen && Date.now() - lastShownAt < CRISIS_OVERLAY_COOLDOWN_MS) {
logCrisisOverlayEvent('suppressed', level || 2);
return false;
}
logCrisisOverlayEvent(forceOpen ? 'manual-open' : 'shown', level || 2);
setLastOverlayShownAt(Date.now());
// Save current focus for restoration on dismiss
_preOverlayFocusElement = document.activeElement;
crisisOverlay.classList.add('active');
overlayDismissBtn.disabled = true;
var countdown = 10;
overlayDismissBtn.textContent = 'Continue to chat (' + countdown + 's)';
// Disable background interaction via inert attribute
var mainApp = document.getElementById('app');
if (mainApp) mainApp.setAttribute('inert', '');
// Also hide from assistive tech
var chatSection = document.getElementById('chat-area');
if (chatSection) chatSection.setAttribute('aria-hidden', 'true');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.setAttribute('aria-hidden', 'true');
if (overlayTimer) clearInterval(overlayTimer);
overlayTimer = setInterval(function() {
countdown--;
@@ -943,17 +1197,17 @@ Sovereignty and service always.`;
}
}, 1000);
overlayDismissBtn.focus();
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
return true;
}
// Register focus trap on document (always listening, gated by class check)
document.addEventListener('keydown', trapFocusInOverlay);
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
msgInput.focus();
closeOverlay(_preOverlayFocusElement);
}
});
@@ -1058,25 +1312,14 @@ Sovereignty and service always.`;
} catch (e) {}
}
safetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
});
}
closeSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
cancelSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
saveSafetyPlan.addEventListener('click', function() {
@@ -1090,12 +1333,113 @@ Sovereignty and service always.`;
try {
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
alert('Safety plan saved locally.');
} catch (e) {
alert('Error saving plan.');
}
});
// ===== SAFETY PLAN FOCUS TRAP (fix #65) =====
// Focusable elements inside the modal, in tab order
var _spFocusableIds = [
'close-safety-plan',
'sp-warning-signs',
'sp-coping',
'sp-distraction',
'sp-help',
'sp-environment',
'cancel-safety-plan',
'save-safety-plan'
];
var _spTriggerEl = null; // element that opened the modal
function _getSpFocusableEls() {
return _spFocusableIds
.map(function(id) { return document.getElementById(id); })
.filter(function(el) { return el && !el.disabled; });
}
function _trapSafetyPlanFocus(e) {
if (e.key !== 'Tab') return;
var els = _getSpFocusableEls();
if (!els.length) return;
var first = els[0];
var last = els[els.length - 1];
if (e.shiftKey) {
// Shift+Tab on first → wrap to last
if (document.activeElement === first) {
e.preventDefault();
last.focus();
}
} else {
// Tab on last → wrap to first
if (document.activeElement === last) {
e.preventDefault();
first.focus();
}
}
}
function _trapSafetyPlanEscape(e) {
if (e.key === 'Escape') {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
}
}
function _activateSafetyPlanFocusTrap(triggerEl) {
_spTriggerEl = triggerEl || document.activeElement;
// Focus first textarea
var firstInput = document.getElementById('sp-warning-signs');
if (firstInput) firstInput.focus();
// Add listeners
document.addEventListener('keydown', _trapSafetyPlanFocus);
document.addEventListener('keydown', _trapSafetyPlanEscape);
// Mark background inert (prevent click-through)
document.body.setAttribute('aria-hidden', 'true');
safetyPlanModal.removeAttribute('aria-hidden');
}
function _restoreSafetyPlanFocus() {
document.removeEventListener('keydown', _trapSafetyPlanFocus);
document.removeEventListener('keydown', _trapSafetyPlanEscape);
document.body.removeAttribute('aria-hidden');
if (_spTriggerEl && typeof _spTriggerEl.focus === 'function') {
_spTriggerEl.focus();
}
_spTriggerEl = null;
}
function openSafetyPlan(triggerEl) {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(triggerEl || document.activeElement);
}
// Wire open buttons to activate focus trap
safetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(safetyPlanBtn);
});
chatSafetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(chatSafetyPlanBtn);
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(crisisSafetyPlanBtn);
});
}
if (crisisResourcesBtn) {
crisisResourcesBtn.addEventListener('click', function() {
openCrisisResources();
});
}
// ===== TEXTAREA AUTO-RESIZE =====
msgInput.addEventListener('input', function() {
this.style.height = 'auto';
@@ -1110,6 +1454,7 @@ Sovereignty and service always.`;
addMessage('user', text);
messages.push({ role: 'user', content: text });
var lastUserMessage = text;
checkCrisis(text);
@@ -1126,7 +1471,7 @@ Sovereignty and service always.`;
sendBtn.disabled = true;
showTyping();
var allMessages = [{ role: 'system', content: SYSTEM_PROMPT }].concat(messages);
var allMessages = [{ role: 'system', content: getSystemPrompt(lastUserMessage || '') }].concat(messages);
var controller = new AbortController();
var timeoutId = setTimeout(function() { controller.abort(); }, 60000);
@@ -1238,8 +1583,7 @@ Sovereignty and service always.`;
// Check for URL params (e.g., ?safetyplan=true for PWA shortcut)
var urlParams = new URLSearchParams(window.location.search);
if (urlParams.get('safetyplan') === 'true') {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
openSafetyPlan(chatSafetyPlanBtn || safetyPlanBtn);
// Clean up URL
window.history.replaceState({}, document.title, window.location.pathname);
}

View File

@@ -1,5 +1,5 @@
[pytest]
testpaths = crisis
python_files = tests.py
testpaths = crisis tests
python_files = tests.py test_*.py
python_classes = Test*
python_functions = test_*

View File

@@ -0,0 +1,68 @@
# The Door Fleet Work Orders Audit — issue #75
Generated: 2026-04-17T04:10:14Z
Source issue: `TRIAGE: The Door - Fleet Work Orders (2026-04-09)`
## Source Snapshot
Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.
## Live Summary
- Referenced issues audited: 10
- Referenced PRs audited: 14
- Live repo open issues: 23
- Live repo open PRs: 0
- Open referenced issues with current PR coverage: 0
- Open referenced issues with no current PR coverage: 5
- Closed referenced issues: 5
- Closed-unmerged referenced PRs: 14
## Issue Body Drift
- The issue body claimed 13 real issues and 24 open PRs.
- Live repo state now shows 23 open issues and 0 open PRs.
- Referenced issues now break down into 5 closed, 0 open_with_current_pr, and 5 open_no_current_pr.
- Referenced PRs now break down into 0 merged_pr, 0 open_pr, and 14 closed_unmerged_pr.
## Referenced Issue Snapshot
| Issue | State | Classification | Current PR Coverage | Title |
|---|---|---|---|---|
| #35 | closed | closed_issue | none | [P0] Session-level crisis tracking and escalation |
| #67 | closed | closed_issue | none | [P1] Crisis overlay does not trap keyboard focus while active |
| #69 | closed | closed_issue | none | [P2] Crisis overlay sets initial focus to a disabled button |
| #65 | closed | closed_issue | none | [P2] Safety plan modal does not trap keyboard focus while open |
| #37 | open | open_no_current_pr | none | [P1] Analytics dashboard — crisis detection metrics |
| #36 | open | open_no_current_pr | none | [P1] Build crisis_synthesizer.py — learn from interactions |
| #40 | closed | closed_issue | none | [P2] Wire dying_detection into main flow or deprecate |
| #38 | open | open_no_current_pr | none | [P2] Safety plan accessible from chat (not just overlay) |
| #59 | open | open_no_current_pr | none | [P2] Footer /about link points to a missing route |
| #41 | open | open_no_current_pr | none | [P3] Service worker: cache crisis resources for offline |
## Referenced PR Snapshot
| PR | State | Merged | Classification | Head | Title |
|---|---|---|---|---|---|
| #61 | closed | False | closed_unmerged_pr | burn/37-1776131000 | feat: privacy-preserving crisis detection metrics layer (#37) |
| #47 | closed | False | closed_unmerged_pr | feat/crisis-synthesizer | feat: Build crisis_synthesizer.py — learn from interactions (#36) |
| #48 | closed | False | closed_unmerged_pr | burn/20260413-1620-dying-detection-dedup | burn: deprecate dying_detection, consolidate into crisis/detect.py |
| #50 | closed | False | closed_unmerged_pr | whip/40-1776128804 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #51 | closed | False | closed_unmerged_pr | queue/40-1776129201 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #53 | closed | False | closed_unmerged_pr | q/40-1776129480 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #56 | closed | False | closed_unmerged_pr | triage/40-1776129677 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #58 | closed | False | closed_unmerged_pr | dawn/40-1776130053 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #70 | closed | False | closed_unmerged_pr | am/40-1776166469 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #72 | closed | False | closed_unmerged_pr | am/38-1776166469 | feat: add always-on safety plan access in chat header (#38) |
| #62 | closed | False | closed_unmerged_pr | burn/59-1776131200 | fix: point footer about link to /about.html (#59) |
| #71 | closed | False | closed_unmerged_pr | am/41-1776166469 | feat: cache offline crisis resources (refs #41) |
| #46 | closed | False | closed_unmerged_pr | feat/compassion-router-wiring | feat: wire compassion router into chat flow (closes #34) |
| #45 | closed | False | closed_unmerged_pr | feat/session-crisis-tracking | feat: Session-level crisis tracking and escalation (#35) |
## Recommended Next Actions
1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.
2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.
3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.
4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.
5. This audit preserves operator memory; it does not claim all referenced work orders are complete.

View File

@@ -0,0 +1,295 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG = "Timmy_Foundation"
DEFAULT_TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
DEFAULT_OUTPUT = "reports/2026-04-17-the-door-fleet-work-orders-audit.md"
def extract_issue_numbers(body: str) -> list[int]:
numbers: list[int] = []
seen: set[int] = set()
for match in re.finditer(r"#(\d+)", body or ""):
value = int(match.group(1))
if value in seen:
continue
seen.add(value)
numbers.append(value)
return numbers
def api_get(repo: str, path: str, token: str) -> Any:
req = Request(
f"{API_BASE}/repos/{ORG}/{repo}{path}",
headers={"Authorization": f"token {token}"},
)
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def fetch_open_prs(repo: str, token: str) -> list[dict[str, Any]]:
prs: list[dict[str, Any]] = []
page = 1
while True:
batch = api_get(repo, f"/pulls?state=open&limit=100&page={page}", token)
if not batch:
break
prs.extend(batch)
page += 1
return prs
def fetch_live_open_issue_count(repo: str, token: str) -> int:
total = 0
page = 1
while True:
batch = api_get(repo, f"/issues?state=open&limit=100&page={page}", token)
if not batch:
break
total += sum(1 for item in batch if not item.get("pull_request"))
page += 1
return total
def parse_claimed_summary(body: str) -> tuple[int | None, int | None]:
issue_match = re.search(r"has\s+(\d+)\s+real issues", body or "", flags=re.IGNORECASE)
pr_match = re.search(r"and\s+(\d+)\s+open PRs", body or "", flags=re.IGNORECASE)
claimed_open_issues = int(issue_match.group(1)) if issue_match else None
claimed_open_prs = int(pr_match.group(1)) if pr_match else None
return claimed_open_issues, claimed_open_prs
def summarize_open_pr_coverage(issue_num: int, open_prs: list[dict[str, Any]]) -> str:
matches: list[str] = []
seen: set[int] = set()
for pr in open_prs:
pr_num = pr["number"]
if pr_num in seen:
continue
text = "\n".join(
[
pr.get("title") or "",
pr.get("body") or "",
(pr.get("head") or {}).get("ref") or "",
]
)
if f"#{issue_num}" not in text:
continue
seen.add(pr_num)
matches.append(f"open PR #{pr_num}")
return ", ".join(matches) if matches else "none"
def classify_issue_reference(ref_issue: dict[str, Any], open_prs: list[dict[str, Any]]) -> dict[str, Any]:
issue_num = ref_issue["number"]
state = ref_issue.get("state") or "unknown"
coverage = summarize_open_pr_coverage(issue_num, open_prs)
if state == "closed":
classification = "closed_issue"
elif coverage != "none":
classification = "open_with_current_pr"
else:
classification = "open_no_current_pr"
return {
"number": issue_num,
"state": state,
"classification": classification,
"title": ref_issue.get("title") or "",
"current_pr_coverage": coverage,
"url": ref_issue.get("html_url") or ref_issue.get("url") or "",
}
def classify_pr_reference(repo: str, pr_num: int, token: str) -> dict[str, Any]:
pr = api_get(repo, f"/pulls/{pr_num}", token)
state = pr.get("state") or "unknown"
merged = bool(pr.get("merged"))
if merged:
classification = "merged_pr"
elif state == "open":
classification = "open_pr"
else:
classification = "closed_unmerged_pr"
return {
"number": pr_num,
"state": state,
"merged": merged,
"classification": classification,
"title": pr.get("title") or "",
"head": (pr.get("head") or {}).get("ref") or "",
"url": pr.get("html_url") or pr.get("url") or "",
}
def table(rows: list[dict[str, Any]], columns: list[tuple[str, str]]) -> str:
headers = [title for title, _ in columns]
keys = [key for _, key in columns]
if not rows:
return "| None |\n|---|\n| None |"
lines = ["| " + " | ".join(headers) + " |", "|" + "|".join(["---"] * len(headers)) + "|"]
for row in rows:
values: list[str] = []
for key in keys:
value = row.get(key, "")
if key == "number" and value != "":
value = f"#{value}"
values.append(str(value).replace("\n", " "))
lines.append("| " + " | ".join(values) + " |")
return "\n".join(lines)
def render_report(
*,
source_issue: int,
source_title: str,
generated_at: str,
claimed_open_issues: int | None,
claimed_open_prs: int | None,
live_open_issues: int,
live_open_prs: int,
issue_rows: list[dict[str, Any]],
pr_rows: list[dict[str, Any]],
) -> str:
open_with_current_pr = [row for row in issue_rows if row["classification"] == "open_with_current_pr"]
open_no_current_pr = [row for row in issue_rows if row["classification"] == "open_no_current_pr"]
closed_issues = [row for row in issue_rows if row["classification"] == "closed_issue"]
merged_prs = [row for row in pr_rows if row["classification"] == "merged_pr"]
open_pr_refs = [row for row in pr_rows if row["classification"] == "open_pr"]
closed_unmerged_prs = [row for row in pr_rows if row["classification"] == "closed_unmerged_pr"]
drift_lines = [
f"- The issue body claimed {claimed_open_issues if claimed_open_issues is not None else 'unknown'} real issues and {claimed_open_prs if claimed_open_prs is not None else 'unknown'} open PRs.",
f"- Live repo state now shows {live_open_issues} open issues and {live_open_prs} open PRs.",
f"- Referenced issues now break down into {len(closed_issues)} closed, {len(open_with_current_pr)} open_with_current_pr, and {len(open_no_current_pr)} open_no_current_pr.",
f"- Referenced PRs now break down into {len(merged_prs)} merged_pr, {len(open_pr_refs)} open_pr, and {len(closed_unmerged_prs)} closed_unmerged_pr.",
]
return "\n".join(
[
f"# The Door Fleet Work Orders Audit — issue #{source_issue}",
"",
f"Generated: {generated_at}",
f"Source issue: `{source_title}`",
"",
"## Source Snapshot",
"",
"Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.",
"",
"## Live Summary",
"",
f"- Referenced issues audited: {len(issue_rows)}",
f"- Referenced PRs audited: {len(pr_rows)}",
f"- Live repo open issues: {live_open_issues}",
f"- Live repo open PRs: {live_open_prs}",
f"- Open referenced issues with current PR coverage: {len(open_with_current_pr)}",
f"- Open referenced issues with no current PR coverage: {len(open_no_current_pr)}",
f"- Closed referenced issues: {len(closed_issues)}",
f"- Closed-unmerged referenced PRs: {len(closed_unmerged_prs)}",
"",
"## Issue Body Drift",
"",
*drift_lines,
"",
"## Referenced Issue Snapshot",
"",
table(
issue_rows,
[
("Issue", "number"),
("State", "state"),
("Classification", "classification"),
("Current PR Coverage", "current_pr_coverage"),
("Title", "title"),
],
),
"",
"## Referenced PR Snapshot",
"",
table(
pr_rows,
[
("PR", "number"),
("State", "state"),
("Merged", "merged"),
("Classification", "classification"),
("Head", "head"),
("Title", "title"),
],
),
"",
"## Recommended Next Actions",
"",
"1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.",
"2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.",
"3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.",
"4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.",
"5. This audit preserves operator memory; it does not claim all referenced work orders are complete.",
]
) + "\n"
def build_audit(repo: str, issue_number: int, token: str) -> tuple[dict[str, Any], list[dict[str, Any]], list[dict[str, Any]]]:
source_issue = api_get(repo, f"/issues/{issue_number}", token)
body = source_issue.get("body") or ""
refs = extract_issue_numbers(body)
open_prs = fetch_open_prs(repo, token)
claimed_open_issues, claimed_open_prs = parse_claimed_summary(body)
issue_rows: list[dict[str, Any]] = []
pr_rows: list[dict[str, Any]] = []
for ref in refs:
issue_like = api_get(repo, f"/issues/{ref}", token)
if issue_like.get("pull_request"):
pr_rows.append(classify_pr_reference(repo, ref, token))
else:
issue_rows.append(classify_issue_reference(issue_like, open_prs))
metadata = {
"source_title": source_issue.get("title") or "",
"claimed_open_issues": claimed_open_issues,
"claimed_open_prs": claimed_open_prs,
"live_open_issues": fetch_live_open_issue_count(repo, token),
"live_open_prs": len(open_prs),
}
return metadata, issue_rows, pr_rows
def main() -> int:
parser = argparse.ArgumentParser(description="Audit The Door fleet work orders issue against live forge state.")
parser.add_argument("--repo", default="the-door")
parser.add_argument("--issue", type=int, default=75)
parser.add_argument("--token-file", default=DEFAULT_TOKEN_PATH)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
args = parser.parse_args()
token = Path(args.token_file).read_text(encoding="utf-8").strip()
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
metadata, issue_rows, pr_rows = build_audit(args.repo, args.issue, token)
report = render_report(
source_issue=args.issue,
source_title=metadata["source_title"],
generated_at=generated_at,
claimed_open_issues=metadata["claimed_open_issues"],
claimed_open_prs=metadata["claimed_open_prs"],
live_open_issues=metadata["live_open_issues"],
live_open_prs=metadata["live_open_prs"],
issue_rows=issue_rows,
pr_rows=pr_rows,
)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(output_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())

219
sw.js
View File

@@ -1,118 +1,153 @@
const CACHE_NAME = 'the-door-v2';
const ASSETS = [
const CACHE_NAME = 'the-door-v3';
const NAVIGATION_TIMEOUT_MS = 2500;
const OFFLINE_FALLBACK_PATH = '/crisis-offline.html';
const PRECACHE_ASSETS = [
'/',
'/index.html',
'/about',
'/manifest.json'
'/about.html',
'/manifest.json',
'/crisis-offline.html',
'/testimony.html'
];
// Crisis resources to show when everything fails
const CRISIS_OFFLINE_RESPONSE = `<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>You're Not Alone | The Door</title>
<style>
body{font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,sans-serif;background:#0d1117;color:#e6edf3;max-width:600px;margin:0 auto;padding:20px;line-height:1.6}
h1{color:#ff6b6b;font-size:1.5rem;margin-bottom:1rem}
.crisis-box{background:#1c1210;border:2px solid #c9362c;border-radius:12px;padding:20px;margin:20px 0;text-align:center}
.crisis-box a{display:inline-block;background:#c9362c;color:#fff;text-decoration:none;padding:16px 32px;border-radius:8px;font-weight:700;font-size:1.2rem;margin:10px 0}
.hope{color:#8b949e;font-style:italic;margin-top:30px;padding-top:20px;border-top:1px solid #30363d}
</style>
</head>
<body>
<h1>You are not alone.</h1>
<p>Your connection is down, but help is still available.</p>
<div class="crisis-box">
<p><strong>Call or text 988</strong><br>Suicide & Crisis Lifeline<br>Free, 24/7, Confidential</p>
<a href="tel:988">Call 988 Now</a>
<p style="margin-top:15px"><strong>Or text HOME to 741741</strong><br>Crisis Text Line</p>
</div>
<p><strong>When you're ready:</strong></p>
<ul>
<li>Take five deep breaths</li>
<li>Drink some water</li>
<li>Step outside if you can</li>
<li>Text or call someone you trust</li>
</ul>
<p class="hope">
"The Lord is close to the brokenhearted and saves those who are crushed in spirit." — Psalm 34:18
</p>
<p style="font-size:0.85rem;color:#6e7681;margin-top:30px">
This page was created by The Door — a crisis intervention project.<br>
Connection will restore automatically. You don't have to go through this alone.
</p>
</body>
</html>`;
function isSameOrigin(request) {
return new URL(request.url).origin === self.location.origin;
}
function canCache(response) {
return Boolean(response && response.ok && response.type !== 'opaque');
}
async function precache() {
const cache = await caches.open(CACHE_NAME);
await cache.addAll(PRECACHE_ASSETS);
}
async function cleanupOldCaches() {
const keys = await caches.keys();
await Promise.all(
keys
.filter((key) => key !== CACHE_NAME)
.map((key) => caches.delete(key))
);
}
async function putInCache(request, response) {
if (!isSameOrigin(request) || !canCache(response)) {
return response;
}
const cache = await caches.open(CACHE_NAME);
await cache.put(request, response.clone());
return response;
}
async function fetchWithTimeout(request, timeoutMs) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
return await fetch(request, { signal: controller.signal });
} finally {
clearTimeout(timeoutId);
}
}
async function offlineTextResponse() {
return new Response('Offline. Call 988 or text HOME to 741741 for immediate help.', {
status: 503,
statusText: 'Service Unavailable',
headers: new Headers({ 'Content-Type': 'text/plain; charset=utf-8' })
});
}
async function handleNavigation(request) {
const cache = await caches.open(CACHE_NAME);
const cachedPage = await cache.match(request);
const offlineFallback = await cache.match(OFFLINE_FALLBACK_PATH);
try {
const response = await fetchWithTimeout(request, NAVIGATION_TIMEOUT_MS);
return await putInCache(request, response);
} catch (error) {
if (cachedPage) {
return cachedPage;
}
if (offlineFallback) {
return offlineFallback;
}
return offlineTextResponse();
}
}
async function handleStaticRequest(request) {
const cache = await caches.open(CACHE_NAME);
const cached = await cache.match(request);
if (cached) {
fetch(request)
.then((response) => putInCache(request, response))
.catch(() => null);
return cached;
}
try {
const response = await fetch(request);
return await putInCache(request, response);
} catch (error) {
return offlineTextResponse();
}
}
async function handleOtherRequest(request) {
try {
const response = await fetch(request);
return await putInCache(request, response);
} catch (error) {
const cached = await caches.match(request);
if (cached) {
return cached;
}
return offlineTextResponse();
}
}
// Install event - cache core assets
self.addEventListener('install', (event) => {
event.waitUntil(
caches.open(CACHE_NAME).then((cache) => {
return cache.addAll(ASSETS);
})
precache().then(() => self.skipWaiting())
);
self.skipWaiting();
});
// Activate event - cleanup old caches
self.addEventListener('activate', (event) => {
event.waitUntil(
caches.keys().then((keys) => {
return Promise.all(
keys.filter((key) => key !== CACHE_NAME).map((key) => caches.delete(key))
);
})
cleanupOldCaches().then(() => self.clients.claim())
);
self.clients.claim();
});
// Fetch event - network first, fallback to cache for static,
// but for the crisis front door, we want to ensure the shell is ALWAYS available.
self.addEventListener('fetch', (event) => {
const url = new URL(event.request.url);
const request = event.request;
const url = new URL(request.url);
// Skip API calls - they should always go to network
if (url.pathname.startsWith('/api/')) {
if (request.method !== 'GET') {
return;
}
// Skip non-GET requests
if (event.request.method !== 'GET') {
if (!isSameOrigin(request) || url.pathname.startsWith('/api/')) {
return;
}
event.respondWith(
fetch(event.request)
.then((response) => {
// If we got a valid response, cache it for next time
if (response.ok && ASSETS.includes(url.pathname)) {
const copy = response.clone();
caches.open(CACHE_NAME).then((cache) => cache.put(event.request, copy));
}
return response;
})
.catch(() => {
// If network fails, try cache
return caches.match(event.request).then((cached) => {
if (cached) return cached;
if (event.request.mode === 'navigate') {
event.respondWith(handleNavigation(request));
return;
}
// If it's a navigation request and we're offline, show offline crisis page
if (event.request.mode === 'navigate') {
return new Response(CRISIS_OFFLINE_RESPONSE, {
status: 200,
headers: new Headers({ 'Content-Type': 'text/html' })
});
}
if (PRECACHE_ASSETS.includes(url.pathname)) {
event.respondWith(handleStaticRequest(request));
return;
}
// For other requests, return a simple offline message
return new Response('Offline. Call 988 for immediate help.', {
status: 503,
statusText: 'Service Unavailable',
headers: new Headers({ 'Content-Type': 'text/plain' })
});
});
})
);
event.respondWith(handleOtherRequest(request));
});

View File

@@ -0,0 +1,84 @@
<!-- Test: Safety plan modal focus trap (issue #65) -->
<!-- Open this file in a browser to manually verify focus trap behavior -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Focus Trap Test</title>
<style>
body { font-family: sans-serif; padding: 20px; }
.test { margin: 10px 0; padding: 10px; border: 1px solid #ccc; }
.pass { background: #d4edda; border-color: #28a745; }
.fail { background: #f8d7da; border-color: #dc3545; }
button { margin: 5px; padding: 8px 16px; }
</style>
</head>
<body>
<h1>Focus Trap Manual Test</h1>
<p>Open <code>index.html</code> in a browser, then run these checks:</p>
<div class="test" id="test-1">
<strong>Test 1: Tab wraps to first element</strong><br>
1. Open safety plan modal<br>
2. Tab through all elements until you reach "Save Plan"<br>
3. Press Tab again → should wrap to close button (X)
</div>
<div class="test" id="test-2">
<strong>Test 2: Shift+Tab wraps to last element</strong><br>
1. Open safety plan modal<br>
2. Focus is on "Warning signs" textarea<br>
3. Press Shift+Tab → should wrap to "Save Plan" button
</div>
<div class="test" id="test-3">
<strong>Test 3: Escape closes modal</strong><br>
1. Open safety plan modal<br>
2. Press Escape → modal closes<br>
3. Focus returns to the button that opened it
</div>
<div class="test" id="test-4">
<strong>Test 4: Background not reachable</strong><br>
1. Open safety plan modal<br>
2. Try to Tab to the chat input behind the modal<br>
3. Should NOT be able to reach it
</div>
<div class="test" id="test-5">
<strong>Test 5: Click buttons close + restore focus</strong><br>
1. Open modal via "my safety plan" button<br>
2. Click Cancel → modal closes, focus on "my safety plan" button<br>
3. Open again, click Save → same behavior<br>
4. Open again, click X → same behavior
</div>
<hr>
<h2>Automated checks (paste into DevTools console on index.html):</h2>
<pre><code>
// Test focus trap
var modal = document.getElementById('safety-plan-modal');
var openBtn = document.getElementById('safety-plan-btn');
openBtn.click();
console.assert(modal.classList.contains('active'), 'Modal should be open');
var lastEl = document.getElementById('save-safety-plan');
lastEl.focus();
var evt = new KeyboardEvent('keydown', {key: 'Tab', bubbles: true});
document.dispatchEvent(evt);
// After Tab from last, focus should wrap to first
var firstEl = document.getElementById('close-safety-plan');
console.log('Focus after wrap:', document.activeElement.id);
console.assert(document.activeElement === firstEl || document.activeElement.id === 'sp-warning-signs',
'Focus should wrap to first element');
// Test Escape
var escEvt = new KeyboardEvent('keydown', {key: 'Escape', bubbles: true});
document.dispatchEvent(escEvt);
console.assert(!modal.classList.contains('active'), 'Modal should close on Escape');
console.assert(document.activeElement === openBtn, 'Focus should return to open button');
console.log('All automated checks passed!');
</code></pre>
</body>
</html>

View File

@@ -0,0 +1,26 @@
"""Regression tests for the crisis integration contract expected by hermes-agent.
Issue #141 names the-door-side shared modules as `crisis.tracker` and
`crisis.bridge`. Keep those import paths available even if the canonical
implementation lives in `session_tracker.py` and `gateway.py`.
"""
import importlib
def test_crisis_tracker_module_exports_session_tracker_contract():
tracker = importlib.import_module("crisis.tracker")
session_tracker = importlib.import_module("crisis.session_tracker")
assert tracker.CrisisSessionTracker is session_tracker.CrisisSessionTracker
assert tracker.SessionState is session_tracker.SessionState
assert tracker.check_crisis_with_session is session_tracker.check_crisis_with_session
def test_crisis_bridge_module_exports_gateway_contract():
bridge = importlib.import_module("crisis.bridge")
gateway = importlib.import_module("crisis.gateway")
assert bridge.check_crisis is gateway.check_crisis
assert bridge.get_system_prompt is gateway.get_system_prompt
assert bridge.format_gateway_response is gateway.format_gateway_response

View File

@@ -0,0 +1,118 @@
"""
Tests for crisis/metrics.py — Aggregate crisis metrics.
"""
import json
import os
import shutil
import tempfile
import unittest
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from crisis.metrics import CrisisMetrics, SessionMetrics, AggregateMetrics
class TestCrisisMetrics(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.metrics = CrisisMetrics(Path(self.tmpdir))
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_record_session_creates_file(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="LOW",
peak_level="MEDIUM",
message_count=5,
was_escalating=True,
was_deescalating=False,
escalation_rate=0.5,
)
self.metrics.record_session(sm)
files = list(Path(self.tmpdir).glob("*.jsonl"))
self.assertEqual(len(files), 1)
def test_record_session_writes_jsonl(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="HIGH",
peak_level="CRITICAL",
message_count=10,
was_escalating=True,
was_deescalating=False,
escalation_rate=1.0,
triggered_overlay=True,
showed_988=True,
)
self.metrics.record_session(sm)
files = list(Path(self.tmpdir).glob("*.jsonl"))
with open(files[0]) as f:
data = json.loads(f.readline())
self.assertEqual(data['peak_level'], 'CRITICAL')
self.assertTrue(data['triggered_overlay'])
def test_get_summary_empty(self):
agg = self.metrics.get_summary(days=7)
self.assertEqual(agg.total_sessions, 0)
self.assertEqual(agg.total_messages, 0)
def test_get_summary_with_data(self):
for level in ["LOW", "MEDIUM", "HIGH"]:
sm = SessionMetrics(
timestamp=1700000000,
current_level=level,
peak_level=level,
message_count=3,
was_escalating=level != "LOW",
was_deescalating=False,
escalation_rate=0.5,
)
self.metrics.record_session(sm)
agg = self.metrics.get_summary(days=1)
self.assertEqual(agg.total_sessions, 3)
self.assertEqual(agg.total_messages, 9)
self.assertEqual(agg.escalating_sessions, 2)
def test_get_report_returns_string(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="LOW",
peak_level="LOW",
message_count=5,
was_escalating=False,
was_deescalating=False,
escalation_rate=0.0,
)
self.metrics.record_session(sm)
report = self.metrics.get_report(days=1)
self.assertIn("CRISIS METRICS REPORT", report)
self.assertIn("Sessions:", report)
def test_get_json_returns_valid(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="MEDIUM",
peak_level="MEDIUM",
message_count=3,
was_escalating=False,
was_deescalating=False,
escalation_rate=0.0,
)
self.metrics.record_session(sm)
json_str = self.metrics.get_json(days=1)
data = json.loads(json_str)
self.assertEqual(data['total_sessions'], 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,85 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestCrisisOverlayFocusTrap(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_overlay_registers_tab_key_focus_trap(self):
self.assertRegex(
self.html,
r"function\s+trapFocusInOverlay\s*\(e\)",
'Expected crisis overlay focus trap handler to exist.',
)
self.assertRegex(
self.html,
r"if\s*\(e\.key\s*!==\s*'Tab'\)\s*return;",
'Expected focus trap handler to guard on Tab key events.',
)
self.assertRegex(
self.html,
r"document\.addEventListener\('keydown',\s*trapFocusInOverlay\)",
'Expected overlay focus trap to register on document keydown.',
)
def test_overlay_disables_background_interaction(self):
self.assertRegex(
self.html,
r"mainApp\.setAttribute\('inert',\s*''\)",
'Expected overlay to set inert on the main app while active.',
)
self.assertRegex(
self.html,
r"mainApp\.removeAttribute\('inert'\)",
'Expected overlay dismissal to remove inert from the main app.',
)
def test_overlay_restores_focus_after_dismiss(self):
self.assertRegex(
self.html,
r"_preOverlayFocusElement\s*=\s*document\.activeElement",
'Expected overlay to remember the pre-overlay focus target.',
)
self.assertRegex(
self.html,
r"_preOverlayFocusElement\.focus\(\)",
'Expected overlay dismissal to restore focus to the prior target.',
)
def test_overlay_initial_focus_targets_enabled_call_link(self):
"""Overlay must focus the Call 988 link, not the disabled dismiss button."""
# Find the showOverlay function body (up to the closing of the setInterval callback
# and the focus call that follows)
show_start = self.html.find('function showOverlay()')
self.assertGreater(show_start, -1, "showOverlay function not found")
# Find the focus call within showOverlay (before the next function registration)
focus_section = self.html[show_start:show_start + 2000]
self.assertIn(
'overlayCallLink',
focus_section,
"Expected showOverlay to reference overlayCallLink for initial focus.",
)
# Ensure the old buggy pattern is gone
focus_line_region = self.html[show_start + 800:show_start + 1200]
self.assertNotIn(
'overlayDismissBtn.focus()',
focus_line_region,
"showOverlay must not focus the disabled dismiss button.",
)
def test_overlay_call_link_variable_is_declared(self):
self.assertIn(
"querySelector('.overlay-call')",
self.html,
"Expected a JS reference to the .overlay-call link element.",
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,54 @@
import pathlib
from playwright.sync_api import sync_playwright
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
def test_crisis_overlay_supports_keyboard_only_navigation():
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
try:
page.goto(INDEX_HTML.as_uri())
page.evaluate(
"""
() => {
window.fetch = () => Promise.reject(new Error('test fetch blocked'));
window.alert = () => {};
}
"""
)
page.fill('#msg-input', "I'm going to kill myself tonight")
page.press('#msg-input', 'Enter')
page.wait_for_function("document.getElementById('crisis-overlay').classList.contains('active')")
assert page.evaluate("document.activeElement.classList.contains('overlay-call')") is True
page.evaluate(
"""
() => {
const btn = document.getElementById('overlay-dismiss-btn');
btn.disabled = false;
btn.textContent = 'Continue to chat';
}
"""
)
page.keyboard.press('Tab')
assert page.evaluate("document.activeElement.id") == 'overlay-dismiss-btn'
page.keyboard.press('Tab')
assert page.evaluate("document.activeElement.classList.contains('overlay-call')") is True
page.keyboard.press('Shift+Tab')
assert page.evaluate("document.activeElement.id") == 'overlay-dismiss-btn'
page.keyboard.press('Escape')
page.wait_for_function("!document.getElementById('crisis-overlay').classList.contains('active')")
assert page.evaluate("document.activeElement.id") == 'msg-input'
finally:
browser.close()

View File

@@ -0,0 +1,53 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestCrisisOverlayRateLimit(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_overlay_has_ten_minute_cooldown_constant(self):
self.assertRegex(
self.html,
r"CRISIS_OVERLAY_COOLDOWN_MS\s*=\s*10\s*\*\s*60\s*\*\s*1000",
'Expected a 10-minute crisis overlay cooldown constant.',
)
def test_show_overlay_suppresses_repeat_with_logging(self):
self.assertRegex(
self.html,
r"function\s+logCrisisOverlayEvent\s*\(",
'Expected a crisis overlay event logger.',
)
self.assertRegex(
self.html,
r"if\s*\(!forceOpen\s*&&\s*Date\.now\(\)\s*-\s*lastShownAt\s*<\s*CRISIS_OVERLAY_COOLDOWN_MS\)",
'Expected showOverlay to suppress repeated auto-displays inside the cooldown window.',
)
self.assertRegex(
self.html,
r"logCrisisOverlayEvent\('suppressed'",
'Expected suppressed overlay attempts to be logged.',
)
def test_manual_crisis_resources_button_exists_and_bypasses_cooldown(self):
self.assertIn('id="crisis-resources-btn"', self.html)
self.assertRegex(
self.html,
r"function\s+openCrisisResources\s*\(",
'Expected a manual crisis resources opener.',
)
self.assertRegex(
self.html,
r"crisisResourcesBtn\.addEventListener\('click',\s*function\(\)\s*\{\s*openCrisisResources\(\);",
'Expected the footer button to wire into openCrisisResources().',
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,111 @@
"""Tests for evolution/crisis_synthesizer.py (issue #36)."""
from __future__ import annotations
import importlib.util
import json
import pathlib
import sys
import tempfile
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
SCRIPT = ROOT / 'evolution' / 'crisis_synthesizer.py'
spec = importlib.util.spec_from_file_location('crisis_synthesizer', str(SCRIPT))
mod = importlib.util.module_from_spec(spec)
sys.modules['crisis_synthesizer'] = mod
spec.loader.exec_module(mod)
class TestCrisisSynthesizerEvent(unittest.TestCase):
def test_build_interaction_event_is_privacy_preserving(self):
event = mod.build_interaction_event(
level='CRITICAL',
indicators=['want_to_die', 'no_way_out'],
response_given='guardian',
continued_conversation=True,
false_positive=False,
now=1700000000,
)
self.assertEqual(event['timestamp'], 1700000000)
self.assertEqual(event['level'], 'CRITICAL')
self.assertEqual(event['response_given'], 'guardian')
self.assertTrue(event['continued_conversation'])
self.assertFalse(event['false_positive'])
self.assertEqual(event['indicators'], ['want_to_die', 'no_way_out'])
for forbidden in ['text', 'message', 'content', 'ip', 'session_id', 'user_id']:
self.assertNotIn(forbidden, event)
class TestCrisisSynthesizerStorage(unittest.TestCase):
def test_append_and_load_events_round_trip(self):
with tempfile.TemporaryDirectory() as tmp:
log_path = pathlib.Path(tmp) / 'crisis-events.jsonl'
mod.append_interaction_event(
log_path,
level='HIGH',
indicators=['hopeless'],
response_given='companion',
continued_conversation=False,
false_positive=True,
now=1700000100,
)
events = mod.load_interaction_events(log_path)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]['level'], 'HIGH')
self.assertEqual(events[0]['indicators'], ['hopeless'])
class TestCrisisSynthesizerSummary(unittest.TestCase):
def test_weekly_report_contains_required_metrics(self):
events = [
mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', True, False, now=1700000000),
mod.build_interaction_event('HIGH', ['hopeless'], 'companion', False, True, now=1700000100),
mod.build_interaction_event('LOW', ['rough_day'], 'friend', False, False, now=1700000200),
mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', False, False, now=1700000300),
mod.build_interaction_event('NONE', [], 'friend', False, False, now=1700000400),
]
summary = mod.build_weekly_report(events, now=1700000500, window_days=7)
self.assertEqual(summary['detections_per_level']['CRITICAL'], 2)
self.assertEqual(summary['detections_per_level']['HIGH'], 1)
self.assertEqual(summary['detections_per_level']['LOW'], 1)
self.assertEqual(summary['detections_per_level']['NONE'], 1)
self.assertEqual(summary['continued_after_intervention'], 1)
self.assertAlmostEqual(summary['false_positive_estimate'], 0.25)
self.assertEqual(summary['most_common_keywords'][0]['keyword'], 'want_to_die')
self.assertEqual(summary['most_common_keywords'][0]['count'], 2)
class TestCrisisSynthesizerSuggestions(unittest.TestCase):
def test_suggests_weight_adjustments_from_interactions(self):
events = []
for ts in range(3):
events.append(mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', True, False, now=1700000000 + ts))
for ts in range(3):
events.append(mod.build_interaction_event('LOW', ['rough_day'], 'friend', False, True, now=1700000100 + ts))
suggestions = mod.suggest_keyword_adjustments(events, min_observations=3)
by_keyword = {s['keyword']: s for s in suggestions}
self.assertEqual(by_keyword['want_to_die']['suggested_adjustment'], 'raise_weight')
self.assertEqual(by_keyword['rough_day']['suggested_adjustment'], 'lower_weight')
class TestCrisisSynthesizerRendering(unittest.TestCase):
def test_render_weekly_report_outputs_json(self):
summary = {
'detections_per_level': {'NONE': 0, 'LOW': 1, 'MEDIUM': 0, 'HIGH': 0, 'CRITICAL': 0},
'most_common_keywords': [{'keyword': 'rough_day', 'count': 1}],
'false_positive_estimate': 0.0,
'continued_after_intervention': 0,
'keyword_weight_suggestions': [],
'window_days': 7,
'total_events': 1,
}
rendered = mod.render_weekly_report(summary)
parsed = json.loads(rendered)
self.assertEqual(parsed['window_days'], 7)
self.assertEqual(parsed['most_common_keywords'][0]['keyword'], 'rough_day')
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,90 @@
"""Regression tests for crisis_synthesizer integration (issue #121)."""
from __future__ import annotations
import json
import os
from unittest.mock import Mock, patch
from crisis_detector import CrisisResult
from crisis_responder import CrisisResponder
from crisis.synthesizer_integration import CrisisSynthesizerIntegration
from evolution.crisis_synthesizer import load_interaction_events
def _make_detection(level: str, keywords: list[str]) -> CrisisResult:
return CrisisResult(risk_level=level, matched_keywords=keywords, context=[], score=0.9)
def test_responder_auto_logs_anonymized_event(tmp_path):
integration = CrisisSynthesizerIntegration(enabled=True, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
detection = _make_detection("HIGH", ["hopeless", "can't go on"])
response = responder.respond(detection)
log_path = tmp_path / "events.jsonl"
lines = log_path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
event = json.loads(lines[0])
assert event["level"] == "HIGH"
assert event["matched_keywords"] == ["hopeless", "can't go on"]
assert event["response_type"] == response.risk_level
assert isinstance(event["timestamp"], float)
assert event["user_continued"] is False
assert event["session_hash"]
assert "message" not in event
assert "session_id" not in event
assert log_path.stat().st_mode & 0o777 == 0o600
def test_next_non_crisis_message_marks_user_continued_append_only(tmp_path):
integration = CrisisSynthesizerIntegration(enabled=True, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
responder.respond(_make_detection("CRITICAL", ["want to die"]))
log_path = tmp_path / "events.jsonl"
before_size = log_path.stat().st_size
responder.respond(_make_detection("NONE", []))
after_size = log_path.stat().st_size
assert after_size > before_size
raw_lines = log_path.read_text(encoding="utf-8").splitlines()
assert len(raw_lines) == 2
continuation = json.loads(raw_lines[1])
assert continuation["event_type"] == "continuation"
assert continuation["user_continued"] is True
folded_events = load_interaction_events(log_path)
assert len(folded_events) == 1
assert folded_events[0]["continued_conversation"] is True
assert folded_events[0]["user_continued"] is True
def test_env_var_can_disable_logging_entirely(tmp_path):
with patch.dict(os.environ, {"CRISIS_SYNTH_ENABLED": "0"}, clear=False):
integration = CrisisSynthesizerIntegration(enabled=None, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
responder.respond(_make_detection("HIGH", ["hopeless"]))
assert not (tmp_path / "events.jsonl").exists()
@patch("crisis_responder.threading.Thread")
def test_async_logging_dispatches_to_background_thread(thread_cls):
integration = Mock()
integration.enabled = True
integration.log_crisis_event = Mock()
integration.log_user_continued = Mock()
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=True)
responder.respond(_make_detection("HIGH", ["hopeless"]))
thread_cls.assert_called_once()
_, kwargs = thread_cls.call_args
assert kwargs["daemon"] is True
thread_cls.return_value.start.assert_called_once()

View File

@@ -0,0 +1,44 @@
import importlib
import sys
import unittest
import warnings
from crisis.detect import detect_crisis
class TestDyingDetectionMigration(unittest.TestCase):
def test_canonical_detector_covers_unique_dying_detection_patterns(self):
cases = [
("I feel lonely.", "LOW"),
("I've lost all hope and see no tomorrow.", "MEDIUM"),
("What if I disappeared forever?", "HIGH"),
]
for text, expected_level in cases:
with self.subTest(text=text):
result = detect_crisis(text)
self.assertEqual(result.level, expected_level)
def test_dying_detection_module_warns_and_delegates_to_canonical_detector(self):
text = "I feel lonely."
sys.modules.pop("dying_detection", None)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", DeprecationWarning)
module = importlib.import_module("dying_detection")
self.assertTrue(
any(issubclass(w.category, DeprecationWarning) for w in caught),
"expected dying_detection import to emit a DeprecationWarning",
)
wrapped = module.detect(text)
canonical = detect_crisis(text)
self.assertEqual(wrapped.level, canonical.level)
self.assertEqual(wrapped.confidence, canonical.score)
self.assertEqual(wrapped.raw_matched_patterns, [m["pattern"] for m in canonical.matches])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,176 @@
"""
Test false positive fixes for issue #32.
Verifies that innocent messages no longer trigger crisis detection.
"""
import unittest
import sys
import os
# Ensure project root is on path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis_detector import CrisisDetector
class TestFalsePositiveFixes(unittest.TestCase):
"""Test that innocent messages no longer trigger false positives."""
def setUp(self):
self.detector = CrisisDetector()
def test_saying_goodbye_innocent(self):
"""'saying goodbye' should not trigger for innocent messages."""
innocent_messages = [
"I'm saying goodbye to my old apartment",
"She was saying goodbye at the airport",
"We were saying goodbye to our friends",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertEqual(result.risk_level, "NONE",
f"False positive for: {msg}")
def test_wrote_a_will_innocent(self):
"""'wrote a will' should not trigger for responsible behavior."""
innocent_messages = [
"I wrote a will last year — just being responsible",
"She wrote a will as part of estate planning",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertEqual(result.risk_level, "NONE",
f"False positive for: {msg}")
def test_giving_away_possessions_innocent(self):
"""'giving away possessions' should not trigger for decluttering."""
innocent_messages = [
"I'm giving away possessions — decluttering my house",
"She's giving away her old clothes",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertEqual(result.risk_level, "NONE",
f"False positive for: {msg}")
def test_give_up_innocent(self):
"""'give up' should not trigger for common frustration."""
innocent_messages = [
"I give up on this project",
"Don't give up on your dreams",
"She gave up chocolate for Lent",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertEqual(result.risk_level, "NONE",
f"False positive for: {msg}")
def test_trapped_innocent(self):
"""'trapped' should not trigger for literal situations."""
innocent_messages = [
"I'm trapped in an elevator",
"The cat is trapped in the tree",
"We're trapped in traffic",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertEqual(result.risk_level, "NONE",
f"False positive for: {msg}")
def test_desperate_innocent(self):
"""'desperate' should not trigger for competition."""
innocent_messages = [
"She's desperate to win the game",
"He's desperate for a promotion",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertEqual(result.risk_level, "NONE",
f"False positive for: {msg}")
def test_worthless_innocent(self):
"""'worthless' alone is MEDIUM but single-match falls to LOW — not HIGH."""
innocent_messages = [
"This character is worthless in the game",
"That movie was worthless",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertIn(result.risk_level, ("NONE", "LOW"),
f"Should not escalate above LOW for: {msg}")
def test_hopeless_innocent(self):
"""'hopeless' alone is MEDIUM but single-match falls to LOW — not HIGH."""
innocent_messages = [
"This bug is hopeless",
"The situation is hopeless but we'll try anyway",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertIn(result.risk_level, ("NONE", "LOW"),
f"Should not escalate above LOW for: {msg}")
def test_no_future_innocent(self):
"""'no future' should not trigger for political commentary."""
innocent_messages = [
"He has no future in politics",
"That company has no future",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertEqual(result.risk_level, "NONE",
f"False positive for: {msg}")
def test_nothing_left_innocent(self):
"""'nothing left' should not trigger for everyday life."""
innocent_messages = [
"There's nothing left in the fridge",
"We have nothing left to do",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertEqual(result.risk_level, "NONE",
f"False positive for: {msg}")
def test_cant_see_any_light_innocent(self):
"""'can't see any light'/'dark' alone is MEDIUM, single-match falls to LOW."""
innocent_messages = [
"I can't see any light — the power went out",
"It's so dark I can't see any light",
]
for msg in innocent_messages:
result = self.detector.scan(msg)
self.assertIn(result.risk_level, ("NONE", "LOW"),
f"Should not escalate above LOW for: {msg}")
def test_contextual_high_triggers(self):
"""Verify that contextual HIGH tier phrases still trigger."""
# These should trigger HIGH because they have crisis context
crisis_messages = [
("I feel so hopeless about my life", "HIGH"),
("I'm trapped in this situation and can't escape", "HIGH"),
("I feel desperate for help", "HIGH"),
("I want to give up on life", "HIGH"),
("There's no point in living anymore", "HIGH"),
]
for msg, expected_level in crisis_messages:
result = self.detector.scan(msg)
self.assertIn(result.risk_level, ("HIGH", "CRITICAL"),
f"Should trigger HIGH/CRITICAL for: {msg}, got {result.risk_level}")
def test_critical_still_triggers(self):
"""Verify that CRITICAL tier phrases still trigger."""
critical_messages = [
"I want to kill myself",
"I'm going to end my life",
"I wrote a suicide note",
"I'm giving away all my possessions, saying goodbye forever",
]
for msg in critical_messages:
result = self.detector.scan(msg)
self.assertEqual(result.risk_level, "CRITICAL",
f"Should trigger CRITICAL for: {msg}")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,100 @@
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT_PATH = ROOT / "scripts" / "fleet_work_orders_audit.py"
REPORT_PATH = ROOT / "reports" / "2026-04-17-the-door-fleet-work-orders-audit.md"
def _load_module():
assert SCRIPT_PATH.exists(), f"missing {SCRIPT_PATH.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location("fleet_work_orders_audit", SCRIPT_PATH)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_extract_issue_numbers_preserves_mixed_issue_and_pr_refs() -> None:
body = """
## P0 — Session-level crisis tracking (#35)
**PR #61 ready.**
## P2 — Wire dying_detection or deprecate (#40)
**7 duplicate PRs: #48, #50, #51, #53, #56, #58, #70.**
"""
mod = _load_module()
assert mod.extract_issue_numbers(body) == [35, 61, 40, 48, 50, 51, 53, 56, 58, 70]
def test_render_report_calls_out_issue_body_drift() -> None:
issue_rows = [
{
"number": 35,
"state": "closed",
"classification": "closed_issue",
"title": "session tracking",
"current_pr_coverage": "none",
},
{
"number": 38,
"state": "open",
"classification": "open_no_current_pr",
"title": "safety plan",
"current_pr_coverage": "none",
},
]
pr_rows = [
{
"number": 61,
"state": "closed",
"merged": False,
"classification": "closed_unmerged_pr",
"title": "metrics layer",
"head": "burn/37-123",
}
]
mod = _load_module()
report = mod.render_report(
source_issue=75,
source_title="TRIAGE: The Door - Fleet Work Orders (2026-04-09)",
generated_at="2026-04-17T04:00:00Z",
claimed_open_issues=13,
claimed_open_prs=24,
live_open_issues=5,
live_open_prs=0,
issue_rows=issue_rows,
pr_rows=pr_rows,
)
assert "## Source Snapshot" in report
assert "## Live Summary" in report
assert "## Issue Body Drift" in report
assert "13" in report and "24" in report
assert "#38" in report
assert "open_no_current_pr" in report
assert "#61" in report
assert "closed_unmerged_pr" in report
assert "## Referenced Issue Snapshot" in report
assert "## Referenced PR Snapshot" in report
assert "## Recommended Next Actions" in report
def test_committed_work_orders_audit_exists_with_required_sections() -> None:
text = REPORT_PATH.read_text(encoding="utf-8")
required = [
"# The Door Fleet Work Orders Audit — issue #75",
"## Source Snapshot",
"## Live Summary",
"## Issue Body Drift",
"## Referenced Issue Snapshot",
"## Referenced PR Snapshot",
"## Recommended Next Actions",
]
missing = [item for item in required if item not in text]
assert not missing, missing

View File

@@ -0,0 +1,62 @@
"""Tests for local image crisis screening slice under epic #130."""
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from image_screening import ImageScreeningResult, screen_image_signals
class TestImageScreeningResult(unittest.TestCase):
def test_to_dict_preserves_core_fields(self):
result = ImageScreeningResult(
ocr_text="help me",
labels=["blood"],
visual_flags=["dark_scene"],
distress_score=0.8,
requires_human_review=True,
signals_detected=["direct_self_harm_label:blood"],
)
data = result.to_dict()
self.assertEqual(data["ocr_text"], "help me")
self.assertEqual(data["labels"], ["blood"])
self.assertTrue(data["requires_human_review"])
class TestScreenImageSignals(unittest.TestCase):
def test_direct_self_harm_labels_trigger_high_risk(self):
result = screen_image_signals(
labels=["razor blade", "blood droplets"],
manual_notes="photo of fresh cuts on forearm",
)
self.assertGreaterEqual(result.distress_score, 0.8)
self.assertTrue(result.requires_human_review)
self.assertTrue(any("self_harm" in signal for signal in result.signals_detected))
def test_ocr_text_uses_existing_crisis_detector(self):
result = screen_image_signals(
ocr_text="I want to kill myself tonight",
labels=["handwritten note"],
)
self.assertGreaterEqual(result.distress_score, 0.7)
self.assertTrue(result.requires_human_review)
self.assertTrue(any(signal.startswith("ocr_crisis_level:") for signal in result.signals_detected))
def test_neutral_image_stays_low_risk(self):
result = screen_image_signals(
labels=["dog", "park", "sunlight"],
manual_notes="family outing in daylight",
)
self.assertLess(result.distress_score, 0.2)
self.assertFalse(result.requires_human_review)
self.assertEqual(result.signals_detected, [])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,50 @@
"""Regression tests for duplicate crisis indicators cleanup (#123)."""
import os
import re
import sys
import unittest
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
ROOT = Path(__file__).resolve().parents[1]
DETECT_FILE = ROOT / "crisis" / "detect.py"
class TestCrisisIndicatorDedup(unittest.TestCase):
def _extract_indicator_list(self, name: str) -> set[str]:
text = DETECT_FILE.read_text(encoding="utf-8")
match = re.search(rf"{name}\s*=\s*\[(.*?)\n\]", text, re.S)
self.assertIsNotNone(match, f"{name} list missing")
return {
line.strip().rstrip(",")
for line in match.group(1).splitlines()
if line.strip().startswith('r"')
}
def test_high_and_medium_indicator_lists_do_not_overlap(self):
high = self._extract_indicator_list("HIGH_INDICATORS")
medium = self._extract_indicator_list("MEDIUM_INDICATORS")
overlap = high & medium
self.assertEqual(set(), overlap, f"duplicate cross-tier patterns found: {sorted(overlap)}")
def test_removed_duplicates_still_classify_at_high_tier(self):
cases = {
"I feel hopeless": "HIGH",
"I feel trapped": "HIGH",
"I feel desperate": "HIGH",
"I have no future ahead": "HIGH",
"I have given up on myself": "HIGH",
}
for text, expected in cases.items():
with self.subTest(text=text):
result = detect_crisis(text)
self.assertEqual(expected, result.level)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,20 @@
from pathlib import Path
INDEX = Path("index.html")
def test_chat_header_has_persistent_safety_plan_button():
html = INDEX.read_text()
assert 'id="chat-header"' in html
assert 'id="chat-safety-plan-btn"' in html
assert 'aria-label="Open My Safety Plan from chat header"' in html
assert 'My Safety Plan' in html
def test_chat_header_button_opens_existing_safety_plan_modal():
html = INDEX.read_text()
assert "var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');" in html
assert "chatSafetyPlanBtn.addEventListener('click'" in html
assert "function openSafetyPlan(triggerEl)" in html
assert "safetyPlanModal.classList.add('active');" in html
assert "openSafetyPlan(chatSafetyPlanBtn);" in html

View File

@@ -0,0 +1,71 @@
import pathlib
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
SERVICE_WORKER = (ROOT / 'sw.js').read_text(encoding='utf-8')
CRISIS_OFFLINE_PAGE = ROOT / 'crisis-offline.html'
MAKEFILE = (ROOT / 'Makefile').read_text(encoding='utf-8')
class TestServiceWorkerOffline(unittest.TestCase):
def test_crisis_offline_page_exists(self):
self.assertTrue(CRISIS_OFFLINE_PAGE.exists(), 'crisis-offline.html should exist')
def test_service_worker_precaches_crisis_offline_page(self):
self.assertIn('/crisis-offline.html', SERVICE_WORKER)
def test_service_worker_has_navigation_timeout_for_intermittent_connections(self):
self.assertIn('NAVIGATION_TIMEOUT_MS', SERVICE_WORKER)
self.assertIn('AbortController', SERVICE_WORKER)
def test_service_worker_uses_crisis_offline_fallback_for_navigation(self):
self.assertIn("event.request.mode === 'navigate'", SERVICE_WORKER)
self.assertIn("/crisis-offline.html", SERVICE_WORKER)
def test_make_push_includes_crisis_offline_page(self):
self.assertIn('crisis-offline.html', MAKEFILE)
class TestCrisisOfflinePage(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = CRISIS_OFFLINE_PAGE.read_text(encoding='utf-8') if CRISIS_OFFLINE_PAGE.exists() else ''
cls.lower_html = cls.html.lower()
def test_has_clickable_988_link(self):
self.assertIn('href="tel:988"', self.html)
def test_has_crisis_text_line(self):
self.assertIn('Crisis Text Line', self.html)
self.assertIn('741741', self.html)
def test_has_grounding_techniques(self):
required_phrases = [
'5 things you can see',
'4 things you can feel',
'3 things you can hear',
'2 things you can smell',
'1 thing you can taste',
]
for phrase in required_phrases:
self.assertIn(phrase, self.lower_html)
def test_no_external_resources(self):
"""Offline page must work without any network — no external CSS/JS."""
import re
html = self.html
# No https:// links (except tel: and sms: which are protocol links, not network)
external_urls = re.findall(r'href=["\']https://|src=["\']https://', html)
self.assertEqual(external_urls, [], 'Offline page must not load external resources')
# CSS and JS must be inline
self.assertIn('<style>', html, 'CSS must be inline')
self.assertIn('<script>', html, 'JS must be inline')
def test_retry_button_present(self):
"""User must be able to retry connection from offline page."""
self.assertIn('retry-connection', self.html)
self.assertIn('Retry connection', self.html)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,277 @@
"""
Tests for crisis session tracking and escalation (P0 #35).
Covers: session_tracker.py
Run with: python -m pytest tests/test_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
from crisis.session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
class TestSessionState(unittest.TestCase):
"""Test SessionState defaults."""
def test_default_state(self):
s = SessionState()
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.is_escalating)
self.assertFalse(s.is_deescalating)
class TestSessionTracking(unittest.TestCase):
"""Test basic session state tracking."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_record_none_message(self):
state = self.tracker.record(detect_crisis("Hello Timmy"))
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.message_count, 1)
self.assertEqual(state.peak_level, "NONE")
def test_record_low_message(self):
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("Having a rough day"))
self.assertIn(state.current_level, ("LOW", "NONE"))
self.assertEqual(state.message_count, 2)
def test_record_critical_updates_peak(self):
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to kill myself"))
self.assertEqual(state.current_level, "CRITICAL")
self.assertEqual(state.peak_level, "CRITICAL")
def test_peak_preserved_after_drop(self):
"""Peak level should stay at the highest seen, even after de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
state = self.tracker.record(detect_crisis("I'm feeling a bit better"))
self.assertEqual(state.peak_level, "CRITICAL")
def test_level_history(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to die"))
self.assertEqual(len(state.level_history), 3)
self.assertEqual(state.level_history[0], "NONE")
self.assertEqual(state.level_history[2], "CRITICAL")
def test_reset_clears_state(self):
self.tracker.record(detect_crisis("I want to kill myself"))
self.tracker.reset()
state = self.tracker.state
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.peak_level, "NONE")
self.assertEqual(state.message_count, 0)
self.assertEqual(state.level_history, [])
class TestEscalationDetection(unittest.TestCase):
"""Test escalation detection: LOW → HIGH in ≤3 messages."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_escalation_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_escalating)
def test_no_escalation_stable(self):
"""Two normal messages should not trigger escalation."""
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("How are you?"))
self.assertFalse(state.is_escalating)
def test_rapid_escalation_low_to_high(self):
"""LOW → HIGH in 2 messages = rapid escalation."""
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I can't take this anymore, everything is pointless"))
# Depending on detection, this could be HIGH or CRITICAL
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_rapid_escalation_three_messages(self):
"""NONE → LOW → HIGH in 3 messages = escalation."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_escalation_rate(self):
"""Rate should be positive when escalating."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
state = self.tracker.state
self.assertGreater(state.escalation_rate, 0)
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: sustained LOW after HIGH/CRITICAL."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_deescalation_without_prior_crisis(self):
"""No de-escalation if never reached HIGH/CRITICAL."""
for _ in range(6):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_deescalating)
def test_deescalation_after_critical(self):
"""5+ consecutive LOW/NONE messages after CRITICAL = de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm doing better today"))
state = self.tracker.state
if state.peak_level == "CRITICAL":
self.assertTrue(state.is_deescalating)
def test_deescalation_after_high(self):
"""5+ consecutive LOW/NONE messages after HIGH = de-escalation."""
self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
for _ in range(5):
self.tracker.record(detect_crisis("Feeling okay"))
state = self.tracker.state
if state.peak_level == "HIGH":
self.assertTrue(state.is_deescalating)
def test_interrupted_deescalation(self):
"""De-escalation resets if a HIGH message interrupts."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(3):
self.tracker.record(detect_crisis("Doing better"))
# Interrupt with another crisis
self.tracker.record(detect_crisis("I feel hopeless again"))
self.tracker.record(detect_crisis("Feeling okay now"))
state = self.tracker.state
# Should NOT be de-escalating yet (counter reset)
self.assertFalse(state.is_deescalating)
class TestSessionModifier(unittest.TestCase):
"""Test system prompt modifier generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_modifier_for_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_no_modifier_for_stable_session(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Good morning"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_escalation_modifier(self):
"""Escalating session should produce a modifier."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_escalating:
self.assertIn("escalated", modifier.lower())
self.assertIn("NONE", modifier)
self.assertIn("CRITICAL", modifier)
def test_deescalation_modifier(self):
"""De-escalating session should mention stabilizing."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm feeling okay"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_deescalating:
self.assertIn("stabilizing", modifier.lower())
def test_prior_crisis_modifier(self):
"""Past crisis should be noted even without active escalation."""
self.tracker.record(detect_crisis("I want to die"))
self.tracker.record(detect_crisis("Feeling a bit better"))
modifier = self.tracker.get_session_modifier()
# Should note the prior CRITICAL
if modifier:
self.assertIn("CRITICAL", modifier)
class TestUIHints(unittest.TestCase):
"""Test UI hint generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_ui_hints_structure(self):
self.tracker.record(detect_crisis("Hello"))
hints = self.tracker.get_ui_hints()
self.assertIn("session_escalating", hints)
self.assertIn("session_deescalating", hints)
self.assertIn("session_peak_level", hints)
self.assertIn("session_message_count", hints)
def test_ui_hints_escalation_warning(self):
"""Escalating session should have warning hint."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
hints = self.tracker.get_ui_hints()
if hints["session_escalating"]:
self.assertTrue(hints.get("escalation_warning"))
self.assertIn("suggested_action", hints)
class TestCheckCrisisWithSession(unittest.TestCase):
"""Test the convenience function combining detection + session tracking."""
def test_returns_combined_data(self):
tracker = CrisisSessionTracker()
result = check_crisis_with_session("I want to die", tracker)
self.assertIn("level", result)
self.assertIn("session", result)
self.assertIn("current_level", result["session"])
self.assertIn("peak_level", result["session"])
self.assertIn("modifier", result["session"])
def test_session_updates_across_calls(self):
tracker = CrisisSessionTracker()
check_crisis_with_session("Hello", tracker)
result = check_crisis_with_session("I want to die", tracker)
self.assertEqual(result["session"]["message_count"], 2)
self.assertEqual(result["session"]["peak_level"], "CRITICAL")
class TestPrivacy(unittest.TestCase):
"""Verify privacy-first design principles."""
def test_no_persistence_mechanism(self):
"""Session tracker should have no database, file, or network calls."""
import inspect
source = inspect.getsource(CrisisSessionTracker)
# Should not import database, requests, or file I/O
forbidden = ["sqlite", "requests", "urllib", "open(", "httpx", "aiohttp"]
for word in forbidden:
self.assertNotIn(word, source.lower(),
f"Session tracker should not use {word} — privacy-first design")
def test_state_contained_in_memory(self):
"""All state should be instance attributes, not module-level."""
tracker = CrisisSessionTracker()
tracker.record(detect_crisis("I want to die"))
# New tracker should have clean state (no global contamination)
fresh = CrisisSessionTracker()
self.assertEqual(fresh.state.current_level, "NONE")
if __name__ == '__main__':
unittest.main()