Compare commits

..

4 Commits

5 changed files with 552 additions and 76 deletions

View File

@@ -1,75 +0,0 @@
# GENOME.md — the-door
**Generated:** 2026-04-14
**Repo:** Timmy_Foundation/the-door
**Description:** Crisis Front Door — a single URL where a man at 3am can talk to Timmy. No login, no signup. 988 always visible.
---
## Project Overview
The-door is a crisis intervention web application — the most sacred surface in the Timmy Foundation. When a man at 3am reaches the end of his road, this is where he lands. No login, no signup, no barriers. 988 Suicide and Crisis Lifeline always visible. The "When a Man Is Dying" protocol active on every page.
## Architecture
```
the-door/
├── index.html # Main crisis page (PWA-capable)
├── crisis-offline.html # Offline fallback (service worker cached)
├── about.html # About page
├── testimony.html # Testimony/stories page
├── sw.js # Service worker (offline-first)
├── manifest.json # PWA manifest
├── crisis/ # Core crisis detection + response
│ ├── detect.py # Keyword/pattern detection (4 tiers)
│ ├── gateway.py # API endpoints, prompt injection
│ ├── response.py # Response generation, 988 routing
│ ├── compassion_router.py # Profile-based response routing
│ ├── profiles.py # Compassion profiles
│ └── PROTOCOL.md # The protocol (SOUL.md reference)
├── crisis_detector.py # Legacy shim → crisis/detect.py
├── crisis_responder.py # Legacy responder
├── dying_detection/ # Deprecated module
├── evolution/ # Crisis synthesizer (creative)
├── tests/ # Safety-critical tests
│ ├── test_crisis_overlay_focus_trap.py
│ ├── test_dying_detection_deprecation.py
│ └── test_false_positive_fixes.py
└── deploy/ # Deployment docs
```
## Key Abstractions
| Module | Purpose |
|---|---|
| `crisis/detect.py` | 4-tier detection: LOW/MEDIUM/HIGH/CRITICAL via regex patterns |
| `crisis/gateway.py` | HTTP API, Sovereign Heart prompt injection |
| `crisis/response.py` | Response generation, 988 integration, escalation |
| `crisis/compassion_router.py` | Profile-based routing (different crisis types) |
| `sw.js` | Service worker for offline-first PWA |
## Safety Constraints
- **The-door never auto-closes PRs** (in fleet-ops exempt list)
- **988 always visible** on every page, even offline
- **When a Man Is Dying protocol** active on every interaction
- **No login/signup** — zero barriers to crisis support
- **Offline-first** — service worker caches critical pages
## Test Coverage
| Test | Coverage |
|---|---|
| Crisis overlay focus trap | ✅ |
| Dying detection deprecation | ✅ |
| False positive fixes | ✅ |
| Crisis detection tiers | ❌ (in crisis/tests.py) |
| Response generation | ❌ |
| Offline service worker | ❌ |
## Security
- No user data stored (crisis intervention is stateless by design)
- No cookies, no tracking, no analytics
- Service worker only caches static assets
- Crisis detection runs client-side where possible

View File

@@ -7,6 +7,13 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import (
CrisisSessionTracker,
CrisisSession,
CrisisLevel,
EscalationEvent,
get_tracker,
)
__all__ = [
"detect_crisis",
@@ -19,4 +26,9 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"CrisisSession",
"CrisisLevel",
"EscalationEvent",
"get_tracker",
]

251
crisis/session_tracker.py Normal file
View File

@@ -0,0 +1,251 @@
"""
Crisis Session Tracker for the-door <-> hermes-agent integration.
Tracks crisis session state, escalation history, and provides
a bridge between the-door's web UI and hermes-agent sessions.
Each browser session gets a unique crisis_session_id that is sent
with every API request. This allows hermes-agent to:
1. Receive crisis context from the-door's client-side detection
2. Report escalations/de-escalations back to the-door's UI
3. Maintain unified escalation history across web and CLI
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
tracker.record("session-abc", level="HIGH", source="the-door", detail="keyword: 'can't go on'")
history = tracker.get_history("session-abc")
"""
import json
import os
import time
import uuid
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional
class CrisisLevel(Enum):
"""Crisis severity levels — aligned with both the-door and hermes-agent."""
NONE = "NONE"
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
@classmethod
def from_string(cls, s: str) -> "CrisisLevel":
try:
return cls[s.upper()]
except KeyError:
return cls.NONE
def __ge__(self, other: "CrisisLevel") -> bool:
order = [self.NONE, self.LOW, self.MEDIUM, self.HIGH, self.CRITICAL]
return order.index(self) >= order.index(other)
def __gt__(self, other: "CrisisLevel") -> bool:
order = [self.NONE, self.LOW, self.MEDIUM, self.HIGH, self.CRITICAL]
return order.index(self) > order.index(other)
@dataclass
class EscalationEvent:
"""A single crisis escalation or de-escalation event."""
timestamp: float
level: str
previous_level: str
source: str # "the-door" or "hermes-agent"
detail: str = ""
session_id: str = ""
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class CrisisSession:
"""State for a single crisis session."""
session_id: str
created_at: float = field(default_factory=time.time)
current_level: str = "NONE"
events: List[EscalationEvent] = field(default_factory=list)
# Per-source levels (what each source last reported)
door_level: str = "NONE"
hermes_level: str = "NONE"
hermes_last_update: float = 0.0
# Whether the UI has acknowledged the current level
ui_acknowledged: bool = False
def record(self, level: str, source: str, detail: str = "") -> EscalationEvent:
"""Record an escalation or de-escalation event."""
event = EscalationEvent(
timestamp=time.time(),
level=level,
previous_level=self.current_level,
source=source,
detail=detail,
session_id=self.session_id,
)
self.events.append(event)
# Update the per-source level
if source == "hermes-agent":
self.hermes_level = level
self.hermes_last_update = time.time()
else:
self.door_level = level
# Merged level = max of both sources (safety-first: higher wins)
merged = max(
CrisisLevel.from_string(self.door_level),
CrisisLevel.from_string(self.hermes_level),
)
self.current_level = merged.value
self.ui_acknowledged = False
return event
def to_dict(self) -> dict:
return {
"session_id": self.session_id,
"created_at": self.created_at,
"current_level": self.current_level,
"door_level": self.door_level,
"hermes_level": self.hermes_level,
"hermes_last_update": self.hermes_last_update,
"ui_acknowledged": self.ui_acknowledged,
"event_count": len(self.events),
"events": [e.to_dict() for e in self.events[-20:]], # Last 20 events
}
class CrisisSessionTracker:
"""
Tracks crisis sessions across the-door web UI and hermes-agent.
In-memory by default. Optionally persists to JSON for crash recovery.
"""
def __init__(self, persist_path: Optional[str] = None):
self._sessions: Dict[str, CrisisSession] = {}
self._persist_path = persist_path
if persist_path and os.path.exists(persist_path):
self._load()
def get_or_create(self, session_id: str) -> CrisisSession:
"""Get existing session or create a new one."""
if session_id not in self._sessions:
self._sessions[session_id] = CrisisSession(session_id=session_id)
return self._sessions[session_id]
def record(
self,
session_id: str,
level: str,
source: str,
detail: str = "",
) -> dict:
"""
Record a crisis event. Returns the updated session state.
Args:
session_id: Browser/hermes session identifier
level: NONE, LOW, MEDIUM, HIGH, CRITICAL
source: "the-door" or "hermes-agent"
detail: Human-readable description of what triggered this
"""
session = self.get_or_create(session_id)
event = session.record(level, source, detail)
if self._persist_path:
self._save()
return {
"session": session.to_dict(),
"event": event.to_dict(),
"escalated": CrisisLevel.from_string(level) > CrisisLevel.from_string(event.previous_level),
"de_escalated": CrisisLevel.from_string(level) < CrisisLevel.from_string(event.previous_level),
}
def get_history(self, session_id: str) -> Optional[dict]:
"""Get full escalation history for a session."""
session = self._sessions.get(session_id)
if not session:
return None
return session.to_dict()
def get_unacknowledged(self, session_id: str) -> Optional[dict]:
"""Get current unacknowledged crisis state for UI display."""
session = self._sessions.get(session_id)
if not session or session.ui_acknowledged:
return None
return {
"session_id": session_id,
"current_level": session.current_level,
"hermes_level": session.hermes_level,
"event_count": len(session.events),
"latest_event": session.events[-1].to_dict() if session.events else None,
}
def acknowledge(self, session_id: str) -> bool:
"""Mark the current crisis state as acknowledged by the UI."""
session = self._sessions.get(session_id)
if not session:
return False
session.ui_acknowledged = True
return True
def cleanup(self, max_age_seconds: float = 86400) -> int:
"""Remove sessions older than max_age_seconds. Returns count removed."""
cutoff = time.time() - max_age_seconds
stale = [sid for sid, s in self._sessions.items() if s.created_at < cutoff]
for sid in stale:
del self._sessions[sid]
if stale and self._persist_path:
self._save()
return len(stale)
def _save(self):
"""Persist sessions to JSON."""
if not self._persist_path:
return
data = {sid: s.to_dict() for sid, s in self._sessions.items()}
os.makedirs(os.path.dirname(self._persist_path) or ".", exist_ok=True)
with open(self._persist_path, "w") as f:
json.dump(data, f, indent=2)
def _load(self):
"""Load sessions from JSON (best-effort)."""
try:
with open(self._persist_path) as f:
data = json.load(f)
for sid, sdata in data.items():
session = CrisisSession(
session_id=sdata["session_id"],
created_at=sdata.get("created_at", time.time()),
current_level=sdata.get("current_level", "NONE"),
door_level=sdata.get("door_level", "NONE"),
hermes_level=sdata.get("hermes_level", "NONE"),
hermes_last_update=sdata.get("hermes_last_update", 0),
ui_acknowledged=sdata.get("ui_acknowledged", False),
)
for edata in sdata.get("events", []):
session.events.append(EscalationEvent(**edata))
self._sessions[sid] = session
except (FileNotFoundError, json.JSONDecodeError, KeyError):
pass
# Module-level singleton for convenience
_tracker: Optional[CrisisSessionTracker] = None
def get_tracker(persist_path: Optional[str] = None) -> CrisisSessionTracker:
"""Get or create the module-level tracker singleton."""
global _tracker
if _tracker is None:
_tracker = CrisisSessionTracker(persist_path=persist_path)
return _tracker

View File

@@ -678,6 +678,9 @@ html, body {
</div>
</div>
<!-- Escalation history (crisis session tracking, issue #99) -->
<div id="escalation-history" style="display:none; padding:4px 16px; font-size:0.75rem; color:#8b949e; border-top:1px solid #21262d;"></div>
<!-- Footer -->
<footer id="footer">
<a href="/about" aria-label="About The Door">about</a>
@@ -825,6 +828,62 @@ Sovereignty and service always.`;
var isStreaming = false;
var overlayTimer = null;
var crisisPanelShown = false;
var currentCrisisLevel = 'NONE';
// ===== CRISIS SESSION TRACKING =====
// Unique session ID sent with every API request for cross-source tracking
var crisisSessionId = (function() {
var stored = null;
try { stored = sessionStorage.getItem('timmy_crisis_session_id'); } catch(e) {}
if (stored) return stored;
var id = 'door-' + Date.now().toString(36) + '-' + Math.random().toString(36).substr(2, 8);
try { sessionStorage.setItem('timmy_crisis_session_id', id); } catch(e) {}
return id;
})();
// Escalation history for this session (mirrors server-side tracker)
var escalationHistory = [];
function recordEscalation(level, source, detail) {
var event = {
timestamp: Date.now(),
level: level,
source: source || 'the-door',
detail: detail || ''
};
escalationHistory.push(event);
// Persist for session recovery
try { sessionStorage.setItem('timmy_escalation_history', JSON.stringify(escalationHistory.slice(-50))); } catch(e) {}
renderEscalationHistory();
}
// Load persisted history
try {
var saved = sessionStorage.getItem('timmy_escalation_history');
if (saved) escalationHistory = JSON.parse(saved);
} catch(e) {}
function renderEscalationHistory() {
var el = document.getElementById('escalation-history');
if (!el) return;
if (escalationHistory.length === 0) {
el.style.display = 'none';
return;
}
el.style.display = 'block';
var html = '<strong>Escalation History</strong><ul>';
var recent = escalationHistory.slice(-5).reverse();
for (var i = 0; i < recent.length; i++) {
var e = recent[i];
var time = new Date(e.timestamp).toLocaleTimeString();
var badge = e.level === 'CRITICAL' ? '🔴' : e.level === 'HIGH' ? '🟠' : e.level === 'MEDIUM' ? '🟡' : '🟢';
html += '<li>' + badge + ' <strong>' + e.level + '</strong> via ' + e.source + ' at ' + time;
if (e.detail) html += ' — ' + e.detail;
html += '</li>';
}
html += '</ul>';
el.innerHTML = html;
}
// ===== SERVICE WORKER =====
if ('serviceWorker' in navigator) {
@@ -913,13 +972,19 @@ Sovereignty and service always.`;
}
}
// Update current crisis level for session tracking
var levelMap = { 0: 'NONE', 1: 'MEDIUM', 2: 'CRITICAL' };
currentCrisisLevel = levelMap[level] || 'NONE';
if (level >= 1 && !crisisPanelShown) {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
recordEscalation(level === 2 ? 'CRITICAL' : 'MEDIUM', 'the-door', 'keyword detected');
}
if (level === 2) {
showOverlay();
recordEscalation('CRITICAL', 'the-door', 'explicit phrase — overlay shown');
}
}
@@ -1337,7 +1402,11 @@ Sovereignty and service always.`;
fetch('/api/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
'X-Crisis-Session-ID': crisisSessionId,
'X-Crisis-Level': currentCrisisLevel || 'NONE'
},
body: JSON.stringify({
model: 'timmy',
messages: allMessages,
@@ -1347,6 +1416,23 @@ Sovereignty and service always.`;
}).then(function(response) {
clearTimeout(timeoutId);
// Check for hermes-agent crisis escalation headers
var hermesLevel = response.headers.get('X-Crisis-Escalation');
if (hermesLevel && hermesLevel !== 'NONE') {
recordEscalation(hermesLevel, 'hermes-agent', 'from response header');
// Update UI if escalated beyond current
if (hermesLevel === 'CRITICAL' && !crisisOverlay.classList.contains('active')) {
showOverlay();
} else if (hermesLevel === 'HIGH' && !crisisPanelShown) {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
}
}
var hermesAck = response.headers.get('X-Crisis-Deescalation');
if (hermesAck) {
recordEscalation(hermesAck, 'hermes-agent', 'de-escalation');
}
if (!response.ok) {
throw new Error('HTTP ' + response.status);
}

View File

@@ -0,0 +1,202 @@
"""
Tests for crisis session tracker — the-door <-> hermes-agent integration.
Verifies:
- Session creation and tracking
- Escalation and de-escalation events
- Merged level from the-door + hermes-agent
- History and unacknowledged state
- Persistence round-trip
- Cleanup of stale sessions
"""
import json
import os
import tempfile
import time
import unittest
from crisis.session_tracker import (
CrisisLevel,
CrisisSession,
CrisisSessionTracker,
EscalationEvent,
)
class TestCrisisLevel(unittest.TestCase):
"""CrisisLevel enum ordering and conversion."""
def test_from_string(self):
self.assertEqual(CrisisLevel.from_string("HIGH"), CrisisLevel.HIGH)
self.assertEqual(CrisisLevel.from_string("critical"), CrisisLevel.CRITICAL)
self.assertEqual(CrisisLevel.from_string("garbage"), CrisisLevel.NONE)
def test_ordering(self):
self.assertTrue(CrisisLevel.HIGH > CrisisLevel.MEDIUM)
self.assertTrue(CrisisLevel.CRITICAL >= CrisisLevel.HIGH)
self.assertTrue(CrisisLevel.LOW < CrisisLevel.HIGH)
self.assertEqual(CrisisLevel.NONE, CrisisLevel.NONE)
class TestCrisisSession(unittest.TestCase):
"""Single session state management."""
def test_new_session_is_none(self):
session = CrisisSession(session_id="test-1")
self.assertEqual(session.current_level, "NONE")
self.assertEqual(session.hermes_level, "NONE")
self.assertEqual(len(session.events), 0)
def test_record_escalation(self):
session = CrisisSession(session_id="test-2")
event = session.record("HIGH", "the-door", "keyword: despair")
self.assertEqual(event.level, "HIGH")
self.assertEqual(event.previous_level, "NONE")
self.assertEqual(event.source, "the-door")
self.assertEqual(session.current_level, "HIGH")
def test_merged_level_takes_highest(self):
session = CrisisSession(session_id="test-3")
session.record("MEDIUM", "the-door")
session.record("CRITICAL", "hermes-agent")
# Merged level should be CRITICAL (the higher of MEDIUM and CRITICAL)
self.assertEqual(session.current_level, "CRITICAL")
self.assertEqual(session.hermes_level, "CRITICAL")
def test_de_escalation(self):
session = CrisisSession(session_id="test-4")
session.record("HIGH", "the-door")
session.record("MEDIUM", "the-door")
self.assertEqual(session.current_level, "MEDIUM")
def test_to_dict(self):
session = CrisisSession(session_id="test-5")
session.record("HIGH", "the-door", "test")
d = session.to_dict()
self.assertEqual(d["session_id"], "test-5")
self.assertEqual(d["current_level"], "HIGH")
self.assertEqual(d["event_count"], 1)
self.assertEqual(len(d["events"]), 1)
class TestCrisisSessionTracker(unittest.TestCase):
"""Full tracker with multiple sessions."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_get_or_create(self):
s1 = self.tracker.get_or_create("s1")
s2 = self.tracker.get_or_create("s1")
self.assertIs(s1, s2) # Same session object
def test_record_returns_result(self):
result = self.tracker.record("s1", "HIGH", "the-door", "keyword: help")
self.assertIn("session", result)
self.assertIn("event", result)
self.assertTrue(result["escalated"])
self.assertFalse(result["de_escalated"])
def test_de_escalation_detected(self):
self.tracker.record("s1", "HIGH", "the-door")
result = self.tracker.record("s1", "LOW", "the-door", "user calmed down")
self.assertFalse(result["escalated"])
self.assertTrue(result["de_escalated"])
def test_get_history(self):
self.tracker.record("s1", "HIGH", "the-door")
self.tracker.record("s1", "CRITICAL", "hermes-agent")
history = self.tracker.get_history("s1")
self.assertEqual(history["event_count"], 2)
self.assertEqual(history["current_level"], "CRITICAL")
self.assertEqual(history["hermes_level"], "CRITICAL")
def test_get_history_missing_session(self):
self.assertIsNone(self.tracker.get_history("nonexistent"))
def test_unacknowledged_state(self):
self.tracker.record("s1", "HIGH", "the-door")
unacked = self.tracker.get_unacknowledged("s1")
self.assertIsNotNone(unacked)
self.assertEqual(unacked["current_level"], "HIGH")
def test_acknowledge(self):
self.tracker.record("s1", "HIGH", "the-door")
self.assertTrue(self.tracker.acknowledge("s1"))
self.assertIsNone(self.tracker.get_unacknowledged("s1"))
def test_acknowledge_missing(self):
self.assertFalse(self.tracker.acknowledge("nonexistent"))
def test_cleanup(self):
self.tracker.record("old-session", "LOW", "the-door")
self.tracker._sessions["old-session"].created_at = time.time() - 100000
self.tracker.record("new-session", "LOW", "the-door")
removed = self.tracker.cleanup(max_age_seconds=50000)
self.assertEqual(removed, 1)
self.assertNotIn("old-session", self.tracker._sessions)
self.assertIn("new-session", self.tracker._sessions)
class TestPersistence(unittest.TestCase):
"""JSON persistence round-trip."""
def test_save_and_load(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
path = f.name
try:
t1 = CrisisSessionTracker(persist_path=path)
t1.record("s1", "HIGH", "the-door", "test persist")
t1.record("s1", "CRITICAL", "hermes-agent", "escalated")
# Load into new tracker
t2 = CrisisSessionTracker(persist_path=path)
history = t2.get_history("s1")
self.assertIsNotNone(history)
self.assertEqual(history["current_level"], "CRITICAL")
self.assertEqual(history["event_count"], 2)
finally:
os.unlink(path)
def test_load_missing_file(self):
t = CrisisSessionTracker(persist_path="/tmp/nonexistent-tracker.json")
self.assertEqual(len(t._sessions), 0)
class TestCrossSourceIntegration(unittest.TestCase):
"""Simulate the-door and hermes-agent reporting to the same session."""
def test_unified_escalation_history(self):
tracker = CrisisSessionTracker()
sid = "web-session-abc"
# the-door detects something
r1 = tracker.record(sid, "MEDIUM", "the-door", "keyword: 'alone in this'")
self.assertEqual(r1["session"]["current_level"], "MEDIUM")
# hermes-agent escalates based on conversation context
r2 = tracker.record(sid, "HIGH", "hermes-agent", "user repeated 'no way out' 3x")
self.assertEqual(r2["session"]["current_level"], "HIGH")
self.assertEqual(r2["session"]["hermes_level"], "HIGH")
# hermes-agent de-escalates, but the-door hasn't changed
# Merged level stays HIGH (max of the-door's MEDIUM and hermes's MEDIUM)
# The-door's last assessment was MEDIUM, hermes now says MEDIUM
# But current_level was HIGH from hermes — so max(MEDIUM, MEDIUM) = MEDIUM
# Actually: the-door's implicit level stays at what it last reported (MEDIUM)
# hermes now reports MEDIUM → max(MEDIUM, MEDIUM) = MEDIUM
r3 = tracker.record(sid, "MEDIUM", "hermes-agent", "user said 'feeling a bit better'")
self.assertEqual(r3["session"]["current_level"], "MEDIUM")
self.assertTrue(r3["de_escalated"])
# Verify full history
history = tracker.get_history(sid)
self.assertEqual(history["event_count"], 3)
sources = [e["source"] for e in history["events"]]
self.assertIn("the-door", sources)
self.assertIn("hermes-agent", sources)
if __name__ == "__main__":
unittest.main()