Compare commits

..

4 Commits

8 changed files with 536 additions and 1112 deletions

View File

@@ -7,7 +7,13 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
from .session_tracker import (
CrisisSessionTracker,
CrisisSession,
CrisisLevel,
EscalationEvent,
get_tracker,
)
__all__ = [
"detect_crisis",
@@ -21,6 +27,8 @@ __all__ = [
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
"CrisisSession",
"CrisisLevel",
"EscalationEvent",
"get_tracker",
]

View File

@@ -22,7 +22,6 @@ from .response import (
get_system_prompt_modifier,
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
def check_crisis(text: str) -> dict:

View File

@@ -1,259 +1,251 @@
"""
Session-level crisis tracking and escalation for the-door (P0 #35).
Crisis Session Tracker for the-door <-> hermes-agent integration.
Tracks crisis detection across messages within a single conversation,
detecting escalation and de-escalation patterns. Privacy-first: no
persistence beyond the conversation session.
Tracks crisis session state, escalation history, and provides
a bridge between the-door's web UI and hermes-agent sessions.
Each message is analyzed in isolation by detect.py, but this module
maintains session state so the system can recognize patterns like:
- "I'm fine""I'm struggling""I can't go on" (rapid escalation)
- "I want to die""I'm calmer now""feeling better" (de-escalation)
Each browser session gets a unique crisis_session_id that is sent
with every API request. This allows hermes-agent to:
1. Receive crisis context from the-door's client-side detection
2. Report escalations/de-escalations back to the-door's UI
3. Maintain unified escalation history across web and CLI
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
# Feed each message's detection result
state = tracker.record(detect_crisis("I'm having a tough day"))
print(state.current_level) # "LOW"
print(state.is_escalating) # False
state = tracker.record(detect_crisis("I feel hopeless"))
print(state.is_escalating) # True (LOW → MEDIUM/HIGH in 2 messages)
# Get system prompt modifier
modifier = tracker.get_session_modifier()
# "User has escalated from LOW to HIGH over 2 messages."
# Reset for new session
tracker.reset()
tracker.record("session-abc", level="HIGH", source="the-door", detail="keyword: 'can't go on'")
history = tracker.get_history("session-abc")
"""
import json
import os
import time
import uuid
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from typing import List, Optional
from .detect import CrisisDetectionResult, SCORES
class CrisisLevel(Enum):
"""Crisis severity levels — aligned with both the-door and hermes-agent."""
NONE = "NONE"
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
# Level ordering for comparison (higher = more severe)
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@classmethod
def from_string(cls, s: str) -> "CrisisLevel":
try:
return cls[s.upper()]
except KeyError:
return cls.NONE
def __ge__(self, other: "CrisisLevel") -> bool:
order = [self.NONE, self.LOW, self.MEDIUM, self.HIGH, self.CRITICAL]
return order.index(self) >= order.index(other)
def __gt__(self, other: "CrisisLevel") -> bool:
order = [self.NONE, self.LOW, self.MEDIUM, self.HIGH, self.CRITICAL]
return order.index(self) > order.index(other)
@dataclass
class SessionState:
"""Immutable snapshot of session crisis tracking state."""
class EscalationEvent:
"""A single crisis escalation or de-escalation event."""
timestamp: float
level: str
previous_level: str
source: str # "the-door" or "hermes-agent"
detail: str = ""
session_id: str = ""
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class CrisisSession:
"""State for a single crisis session."""
session_id: str
created_at: float = field(default_factory=time.time)
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[str] = field(default_factory=list)
is_escalating: bool = False
is_deescalating: bool = False
escalation_rate: float = 0.0 # levels gained per message
consecutive_low_messages: int = 0 # for de-escalation tracking
events: List[EscalationEvent] = field(default_factory=list)
# Per-source levels (what each source last reported)
door_level: str = "NONE"
hermes_level: str = "NONE"
hermes_last_update: float = 0.0
# Whether the UI has acknowledged the current level
ui_acknowledged: bool = False
def record(self, level: str, source: str, detail: str = "") -> EscalationEvent:
"""Record an escalation or de-escalation event."""
event = EscalationEvent(
timestamp=time.time(),
level=level,
previous_level=self.current_level,
source=source,
detail=detail,
session_id=self.session_id,
)
self.events.append(event)
# Update the per-source level
if source == "hermes-agent":
self.hermes_level = level
self.hermes_last_update = time.time()
else:
self.door_level = level
# Merged level = max of both sources (safety-first: higher wins)
merged = max(
CrisisLevel.from_string(self.door_level),
CrisisLevel.from_string(self.hermes_level),
)
self.current_level = merged.value
self.ui_acknowledged = False
return event
def to_dict(self) -> dict:
return {
"session_id": self.session_id,
"created_at": self.created_at,
"current_level": self.current_level,
"door_level": self.door_level,
"hermes_level": self.hermes_level,
"hermes_last_update": self.hermes_last_update,
"ui_acknowledged": self.ui_acknowledged,
"event_count": len(self.events),
"events": [e.to_dict() for e in self.events[-20:]], # Last 20 events
}
class CrisisSessionTracker:
"""
Session-level crisis state tracker.
Tracks crisis sessions across the-door web UI and hermes-agent.
Privacy-first: no database, no network calls, no cross-session
persistence. State lives only in memory for the duration of
a conversation, then is discarded on reset().
In-memory by default. Optionally persists to JSON for crash recovery.
"""
# Thresholds (from issue #35)
ESCALATION_WINDOW = 3 # messages: LOW → HIGH in ≤3 messages = rapid escalation
DEESCALATION_WINDOW = 5 # messages: need 5+ consecutive LOW messages after CRITICAL
def __init__(self, persist_path: Optional[str] = None):
self._sessions: Dict[str, CrisisSession] = {}
self._persist_path = persist_path
if persist_path and os.path.exists(persist_path):
self._load()
def __init__(self):
self.reset()
def get_or_create(self, session_id: str) -> CrisisSession:
"""Get existing session or create a new one."""
if session_id not in self._sessions:
self._sessions[session_id] = CrisisSession(session_id=session_id)
return self._sessions[session_id]
def reset(self):
"""Reset all session state. Call on new conversation."""
self._current_level = "NONE"
self._peak_level = "NONE"
self._message_count = 0
self._level_history: List[str] = []
self._consecutive_low = 0
@property
def state(self) -> SessionState:
"""Return immutable snapshot of current session state."""
is_escalating = self._detect_escalation()
is_deescalating = self._detect_deescalation()
rate = self._compute_escalation_rate()
return SessionState(
current_level=self._current_level,
peak_level=self._peak_level,
message_count=self._message_count,
level_history=list(self._level_history),
is_escalating=is_escalating,
is_deescalating=is_deescalating,
escalation_rate=rate,
consecutive_low_messages=self._consecutive_low,
)
def record(self, detection: CrisisDetectionResult) -> SessionState:
def record(
self,
session_id: str,
level: str,
source: str,
detail: str = "",
) -> dict:
"""
Record a crisis detection result for the current message.
Record a crisis event. Returns the updated session state.
Returns updated SessionState.
Args:
session_id: Browser/hermes session identifier
level: NONE, LOW, MEDIUM, HIGH, CRITICAL
source: "the-door" or "hermes-agent"
detail: Human-readable description of what triggered this
"""
level = detection.level
self._message_count += 1
self._level_history.append(level)
session = self.get_or_create(session_id)
event = session.record(level, source, detail)
# Update peak
if LEVEL_ORDER.get(level, 0) > LEVEL_ORDER.get(self._peak_level, 0):
self._peak_level = level
if self._persist_path:
self._save()
# Track consecutive LOW/NONE messages for de-escalation
if LEVEL_ORDER.get(level, 0) <= LEVEL_ORDER["LOW"]:
self._consecutive_low += 1
else:
self._consecutive_low = 0
self._current_level = level
return self.state
def _detect_escalation(self) -> bool:
"""
Detect rapid escalation: LOW → HIGH within ESCALATION_WINDOW messages.
Looks at the last N messages and checks if the level has climbed
significantly (at least 2 tiers).
"""
if len(self._level_history) < 2:
return False
window = self._level_history[-self.ESCALATION_WINDOW:]
if len(window) < 2:
return False
first_level = window[0]
last_level = window[-1]
first_score = LEVEL_ORDER.get(first_level, 0)
last_score = LEVEL_ORDER.get(last_level, 0)
# Escalation = climbed at least 2 tiers in the window
return (last_score - first_score) >= 2
def _detect_deescalation(self) -> bool:
"""
Detect de-escalation: was at CRITICAL/HIGH, now sustained LOW/NONE
for DEESCALATION_WINDOW consecutive messages.
"""
if LEVEL_ORDER.get(self._peak_level, 0) < LEVEL_ORDER["HIGH"]:
return False
return self._consecutive_low >= self.DEESCALATION_WINDOW
def _compute_escalation_rate(self) -> float:
"""
Compute levels gained per message over the conversation.
Positive = escalating, negative = de-escalating, 0 = stable.
"""
if self._message_count < 2:
return 0.0
first = LEVEL_ORDER.get(self._level_history[0], 0)
current = LEVEL_ORDER.get(self._current_level, 0)
return (current - first) / (self._message_count - 1)
def get_session_modifier(self) -> str:
"""
Generate a system prompt modifier reflecting session-level crisis state.
Returns empty string if no session context is relevant.
"""
if self._message_count < 2:
return ""
s = self.state
if s.is_escalating:
return (
f"User has escalated from {self._level_history[0]} to "
f"{s.current_level} over {s.message_count} messages. "
f"Peak crisis level this session: {s.peak_level}. "
"Respond with heightened awareness. The trajectory is "
"worsening — prioritize safety and connection."
)
if s.is_deescalating:
return (
f"User previously reached {s.peak_level} crisis level "
f"but has been at {s.current_level} or below for "
f"{s.consecutive_low_messages} consecutive messages. "
"The situation appears to be stabilizing. Continue "
"supportive engagement while remaining vigilant."
)
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
return (
f"User previously reached {s.peak_level} crisis level "
f"this session (currently {s.current_level}). "
"Continue with care and awareness of the earlier crisis."
)
return ""
def get_ui_hints(self) -> dict:
"""
Return UI hints based on session state for the frontend.
These are advisory — the frontend decides what to show.
"""
s = self.state
hints = {
"session_escalating": s.is_escalating,
"session_deescalating": s.is_deescalating,
"session_peak_level": s.peak_level,
"session_message_count": s.message_count,
return {
"session": session.to_dict(),
"event": event.to_dict(),
"escalated": CrisisLevel.from_string(level) > CrisisLevel.from_string(event.previous_level),
"de_escalated": CrisisLevel.from_string(level) < CrisisLevel.from_string(event.previous_level),
}
if s.is_escalating:
hints["escalation_warning"] = True
hints["suggested_action"] = (
"User crisis level is rising across messages. "
"Consider increasing intervention level."
)
def get_history(self, session_id: str) -> Optional[dict]:
"""Get full escalation history for a session."""
session = self._sessions.get(session_id)
if not session:
return None
return session.to_dict()
return hints
def get_unacknowledged(self, session_id: str) -> Optional[dict]:
"""Get current unacknowledged crisis state for UI display."""
session = self._sessions.get(session_id)
if not session or session.ui_acknowledged:
return None
return {
"session_id": session_id,
"current_level": session.current_level,
"hermes_level": session.hermes_level,
"event_count": len(session.events),
"latest_event": session.events[-1].to_dict() if session.events else None,
}
def acknowledge(self, session_id: str) -> bool:
"""Mark the current crisis state as acknowledged by the UI."""
session = self._sessions.get(session_id)
if not session:
return False
session.ui_acknowledged = True
return True
def cleanup(self, max_age_seconds: float = 86400) -> int:
"""Remove sessions older than max_age_seconds. Returns count removed."""
cutoff = time.time() - max_age_seconds
stale = [sid for sid, s in self._sessions.items() if s.created_at < cutoff]
for sid in stale:
del self._sessions[sid]
if stale and self._persist_path:
self._save()
return len(stale)
def _save(self):
"""Persist sessions to JSON."""
if not self._persist_path:
return
data = {sid: s.to_dict() for sid, s in self._sessions.items()}
os.makedirs(os.path.dirname(self._persist_path) or ".", exist_ok=True)
with open(self._persist_path, "w") as f:
json.dump(data, f, indent=2)
def _load(self):
"""Load sessions from JSON (best-effort)."""
try:
with open(self._persist_path) as f:
data = json.load(f)
for sid, sdata in data.items():
session = CrisisSession(
session_id=sdata["session_id"],
created_at=sdata.get("created_at", time.time()),
current_level=sdata.get("current_level", "NONE"),
door_level=sdata.get("door_level", "NONE"),
hermes_level=sdata.get("hermes_level", "NONE"),
hermes_last_update=sdata.get("hermes_last_update", 0),
ui_acknowledged=sdata.get("ui_acknowledged", False),
)
for edata in sdata.get("events", []):
session.events.append(EscalationEvent(**edata))
self._sessions[sid] = session
except (FileNotFoundError, json.JSONDecodeError, KeyError):
pass
def check_crisis_with_session(
text: str,
tracker: CrisisSessionTracker,
) -> dict:
"""
Convenience: detect crisis and update session state in one call.
# Module-level singleton for convenience
_tracker: Optional[CrisisSessionTracker] = None
Returns combined single-message detection + session-level context.
"""
from .detect import detect_crisis
from .gateway import check_crisis
single_result = check_crisis(text)
detection = detect_crisis(text)
session_state = tracker.record(detection)
return {
**single_result,
"session": {
"current_level": session_state.current_level,
"peak_level": session_state.peak_level,
"message_count": session_state.message_count,
"is_escalating": session_state.is_escalating,
"is_deescalating": session_state.is_deescalating,
"modifier": tracker.get_session_modifier(),
"ui_hints": tracker.get_ui_hints(),
},
}
def get_tracker(persist_path: Optional[str] = None) -> CrisisSessionTracker:
"""Get or create the module-level tracker singleton."""
global _tracker
if _tracker is None:
_tracker = CrisisSessionTracker(persist_path=persist_path)
return _tracker

View File

@@ -500,184 +500,6 @@ html, body {
min-height: 60px;
}
.safety-plan-status {
min-height: 20px;
margin: 4px 0 18px;
font-size: 0.85rem;
color: #8b949e;
}
.safety-plan-status.success {
color: #3fb950;
}
.safety-plan-status.error {
color: #ff7b72;
}
.safety-plan-versioning {
margin-top: 24px;
padding-top: 20px;
border-top: 1px solid #30363d;
}
.safety-plan-versioning-grid {
display: grid;
gap: 16px;
}
.safety-plan-history-panel,
.safety-plan-diff-panel {
background: #0d1117;
border: 1px solid #30363d;
border-radius: 12px;
padding: 16px;
}
.safety-plan-section-header h3 {
font-size: 0.95rem;
margin-bottom: 4px;
}
.safety-plan-section-header p {
font-size: 0.8rem;
color: #8b949e;
margin-bottom: 12px;
}
.safety-plan-history {
display: flex;
flex-direction: column;
gap: 10px;
}
.safety-plan-history-item {
border: 1px solid #30363d;
border-radius: 10px;
padding: 12px;
background: #161b22;
}
.safety-plan-history-item.active {
border-color: #58a6ff;
box-shadow: 0 0 0 1px rgba(88, 166, 255, 0.35);
}
.safety-plan-history-meta {
display: flex;
justify-content: space-between;
gap: 12px;
align-items: baseline;
margin-bottom: 8px;
flex-wrap: wrap;
}
.safety-plan-history-title {
font-size: 0.88rem;
font-weight: 600;
color: #e6edf3;
}
.safety-plan-history-note {
font-size: 0.78rem;
color: #8b949e;
margin-bottom: 10px;
}
.safety-plan-history-actions {
display: flex;
gap: 8px;
flex-wrap: wrap;
}
.safety-plan-history-button,
.safety-plan-restore-button {
border-radius: 8px;
border: 1px solid #30363d;
background: transparent;
color: #e6edf3;
padding: 8px 12px;
font-size: 0.82rem;
cursor: pointer;
}
.safety-plan-history-button:hover,
.safety-plan-restore-button:hover,
.safety-plan-history-button:focus,
.safety-plan-restore-button:focus {
border-color: #58a6ff;
color: #58a6ff;
outline: none;
}
.safety-plan-restore-button {
background: rgba(35, 134, 54, 0.14);
}
.safety-plan-diff {
display: flex;
flex-direction: column;
gap: 12px;
}
.safety-plan-diff-meta {
font-size: 0.78rem;
color: #8b949e;
}
.safety-plan-diff-field {
border-top: 1px solid #21262d;
padding-top: 12px;
}
.safety-plan-diff-field:first-child {
border-top: none;
padding-top: 0;
}
.safety-plan-diff-field h4 {
font-size: 0.84rem;
margin-bottom: 8px;
color: #c9d1d9;
}
.safety-plan-diff-block {
white-space: pre-wrap;
line-height: 1.5;
border-radius: 8px;
padding: 10px;
font-size: 0.88rem;
margin-bottom: 8px;
}
.diff-unchanged {
background: #161b22;
color: #c9d1d9;
}
.diff-added {
background: rgba(46, 160, 67, 0.16);
border-left: 3px solid #2ea043;
color: #d2f4d3;
}
.diff-removed {
background: rgba(248, 81, 73, 0.16);
border-left: 3px solid #f85149;
color: #ffd8d3;
}
.safety-plan-empty {
color: #8b949e;
font-size: 0.85rem;
}
@media (min-width: 840px) {
.safety-plan-versioning-grid {
grid-template-columns: minmax(0, 0.9fr) minmax(0, 1.1fr);
}
}
.modal-footer {
display: flex;
justify-content: flex-end;
@@ -856,9 +678,12 @@ html, body {
</div>
</div>
<!-- Escalation history (crisis session tracking, issue #99) -->
<div id="escalation-history" style="display:none; padding:4px 16px; font-size:0.75rem; color:#8b949e; border-top:1px solid #21262d;"></div>
<!-- Footer -->
<footer id="footer">
<a href="/about.html" aria-label="About The Door">about</a>
<a href="/about" aria-label="About The Door">about</a>
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
</footer>
@@ -914,28 +739,6 @@ html, body {
<label for="sp-environment">5. Making my environment safe</label>
<textarea id="sp-environment" placeholder="e.g., Giving my car keys to a friend, locking away meds..."></textarea>
</div>
<div id="safety-plan-status" class="safety-plan-status" role="status" aria-live="polite"></div>
<section class="safety-plan-versioning" aria-labelledby="safety-plan-history-title">
<div class="safety-plan-versioning-grid">
<div class="safety-plan-history-panel">
<div class="safety-plan-section-header">
<h3 id="safety-plan-history-title">Version History</h3>
<p>Each save stays on this device so you can review changes and restore an earlier plan.</p>
</div>
<div id="safety-plan-history" class="safety-plan-history"></div>
</div>
<div class="safety-plan-diff-panel">
<div class="safety-plan-section-header">
<h3>Diff View</h3>
<p>Compare the selected version against the version immediately before it.</p>
</div>
<div id="safety-plan-diff" class="safety-plan-diff"></div>
</div>
</div>
</section>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" id="cancel-safety-plan">Cancel</button>
@@ -1008,7 +811,6 @@ Sovereignty and service always.`;
var crisisPanel = document.getElementById('crisis-panel');
var crisisOverlay = document.getElementById('crisis-overlay');
var overlayDismissBtn = document.getElementById('overlay-dismiss-btn');
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
@@ -1020,26 +822,68 @@ Sovereignty and service always.`;
var cancelSafetyPlan = document.getElementById('cancel-safety-plan');
var saveSafetyPlan = document.getElementById('save-safety-plan');
var clearChatBtn = document.getElementById('clear-chat-btn');
var safetyPlanHistory = document.getElementById('safety-plan-history');
var safetyPlanDiff = document.getElementById('safety-plan-diff');
var safetyPlanStatus = document.getElementById('safety-plan-status');
var safetyPlanFields = {
warningSigns: document.getElementById('sp-warning-signs'),
coping: document.getElementById('sp-coping'),
distraction: document.getElementById('sp-distraction'),
help: document.getElementById('sp-help'),
environment: document.getElementById('sp-environment')
};
var SAFETY_PLAN_STORAGE_KEY = 'timmy_safety_plan';
var SAFETY_PLAN_VERSIONS_KEY = 'timmy_safety_plan_versions';
var MAX_SAFETY_PLAN_VERSIONS = 20;
// ===== STATE =====
var messages = [];
var isStreaming = false;
var overlayTimer = null;
var crisisPanelShown = false;
var selectedSafetyPlanVersionId = null;
var currentCrisisLevel = 'NONE';
// ===== CRISIS SESSION TRACKING =====
// Unique session ID sent with every API request for cross-source tracking
var crisisSessionId = (function() {
var stored = null;
try { stored = sessionStorage.getItem('timmy_crisis_session_id'); } catch(e) {}
if (stored) return stored;
var id = 'door-' + Date.now().toString(36) + '-' + Math.random().toString(36).substr(2, 8);
try { sessionStorage.setItem('timmy_crisis_session_id', id); } catch(e) {}
return id;
})();
// Escalation history for this session (mirrors server-side tracker)
var escalationHistory = [];
function recordEscalation(level, source, detail) {
var event = {
timestamp: Date.now(),
level: level,
source: source || 'the-door',
detail: detail || ''
};
escalationHistory.push(event);
// Persist for session recovery
try { sessionStorage.setItem('timmy_escalation_history', JSON.stringify(escalationHistory.slice(-50))); } catch(e) {}
renderEscalationHistory();
}
// Load persisted history
try {
var saved = sessionStorage.getItem('timmy_escalation_history');
if (saved) escalationHistory = JSON.parse(saved);
} catch(e) {}
function renderEscalationHistory() {
var el = document.getElementById('escalation-history');
if (!el) return;
if (escalationHistory.length === 0) {
el.style.display = 'none';
return;
}
el.style.display = 'block';
var html = '<strong>Escalation History</strong><ul>';
var recent = escalationHistory.slice(-5).reverse();
for (var i = 0; i < recent.length; i++) {
var e = recent[i];
var time = new Date(e.timestamp).toLocaleTimeString();
var badge = e.level === 'CRITICAL' ? '🔴' : e.level === 'HIGH' ? '🟠' : e.level === 'MEDIUM' ? '🟡' : '🟢';
html += '<li>' + badge + ' <strong>' + e.level + '</strong> via ' + e.source + ' at ' + time;
if (e.detail) html += ' — ' + e.detail;
html += '</li>';
}
html += '</ul>';
el.innerHTML = html;
}
// ===== SERVICE WORKER =====
if ('serviceWorker' in navigator) {
@@ -1128,13 +972,19 @@ Sovereignty and service always.`;
}
}
// Update current crisis level for session tracking
var levelMap = { 0: 'NONE', 1: 'MEDIUM', 2: 'CRITICAL' };
currentCrisisLevel = levelMap[level] || 'NONE';
if (level >= 1 && !crisisPanelShown) {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
recordEscalation(level === 2 ? 'CRITICAL' : 'MEDIUM', 'the-door', 'keyword detected');
}
if (level === 2) {
showOverlay();
recordEscalation('CRITICAL', 'the-door', 'explicit phrase — overlay shown');
}
}
@@ -1265,8 +1115,7 @@ Sovereignty and service always.`;
}
}, 1000);
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
overlayDismissBtn.focus();
}
// Register focus trap on document (always listening, gated by class check)
@@ -1385,347 +1234,18 @@ Sovereignty and service always.`;
});
// ===== SAFETY PLAN LOGIC =====
function emptySafetyPlan() {
return {
warningSigns: '',
coping: '',
distraction: '',
help: '',
environment: ''
};
}
function cloneSafetyPlan(plan) {
var normalized = emptySafetyPlan();
var source = plan || {};
Object.keys(normalized).forEach(function(key) {
normalized[key] = typeof source[key] === 'string' ? source[key] : '';
});
return normalized;
}
function applySafetyPlan(plan) {
var nextPlan = cloneSafetyPlan(plan);
Object.keys(safetyPlanFields).forEach(function(key) {
safetyPlanFields[key].value = nextPlan[key];
});
}
function getSafetyPlanFormData() {
return {
warningSigns: safetyPlanFields.warningSigns.value,
coping: safetyPlanFields.coping.value,
distraction: safetyPlanFields.distraction.value,
help: safetyPlanFields.help.value,
environment: safetyPlanFields.environment.value
};
}
function escapeHtml(value) {
return String(value || '')
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;')
.replace(/'/g, '&#39;');
}
function setSafetyPlanStatus(message, tone) {
if (!safetyPlanStatus) return;
safetyPlanStatus.textContent = message || '';
safetyPlanStatus.className = 'safety-plan-status' + (tone ? ' ' + tone : '');
}
function getSafetyPlanVersions() {
try {
var saved = localStorage.getItem(SAFETY_PLAN_VERSIONS_KEY);
if (!saved) return [];
var parsed = JSON.parse(saved);
if (!Array.isArray(parsed)) return [];
return parsed.filter(function(version) {
return version && version.id && version.plan;
});
} catch (e) {
return [];
}
}
function setSafetyPlanVersions(versions) {
localStorage.setItem(
SAFETY_PLAN_VERSIONS_KEY,
JSON.stringify((versions || []).slice(0, MAX_SAFETY_PLAN_VERSIONS))
);
}
function buildSafetyPlanVersion(plan, meta) {
return {
id: 'spv_' + Date.now() + '_' + Math.random().toString(36).slice(2, 8),
savedAt: new Date().toISOString(),
source: (meta && meta.source) || 'save',
restoredFrom: meta && meta.restoredFrom ? meta.restoredFrom : null,
plan: cloneSafetyPlan(plan)
};
}
function ensureSafetyPlanVersionHistory() {
var versions = getSafetyPlanVersions();
if (versions.length) {
return versions;
}
try {
var saved = localStorage.getItem(SAFETY_PLAN_STORAGE_KEY);
if (!saved) {
return [];
}
var parsed = JSON.parse(saved);
var migrated = [buildSafetyPlanVersion(parsed, { source: 'legacy' })];
setSafetyPlanVersions(migrated);
return migrated;
} catch (e) {
return [];
}
}
function formatSafetyPlanTimestamp(iso) {
var date = new Date(iso || '');
if (isNaN(date.getTime())) {
return 'Saved just now';
}
return date.toLocaleString([], {
year: 'numeric',
month: 'short',
day: 'numeric',
hour: 'numeric',
minute: '2-digit'
});
}
function getSafetyPlanVersionById(versionId) {
var versions = getSafetyPlanVersions();
for (var i = 0; i < versions.length; i++) {
if (versions[i].id === versionId) {
return { version: versions[i], index: i, versions: versions };
}
}
return null;
}
function calculateSafetyPlanDiff(previousText, currentText) {
var previous = String(previousText || '');
var current = String(currentText || '');
var start = 0;
while (start < previous.length && start < current.length && previous.charAt(start) === current.charAt(start)) {
start += 1;
}
var previousEnd = previous.length - 1;
var currentEnd = current.length - 1;
while (previousEnd >= start && currentEnd >= start && previous.charAt(previousEnd) === current.charAt(currentEnd)) {
previousEnd -= 1;
currentEnd -= 1;
}
return {
before: current.slice(0, start),
removed: previous.slice(start, previousEnd + 1),
added: current.slice(start, currentEnd + 1),
after: current.slice(currentEnd + 1)
};
}
function renderDiffSegmentHtml(previousText, currentText) {
var previous = String(previousText || '');
var current = String(currentText || '');
if (!previous && !current) {
return '<p class="safety-plan-empty">No content saved for this section yet.</p>';
}
if (previous === current) {
return '<div class="safety-plan-diff-block diff-unchanged">' + escapeHtml(current || 'No changes in this version.') + '</div>';
}
var diff = calculateSafetyPlanDiff(previous, current);
var blocks = [];
if (diff.before) {
blocks.push('<div class="safety-plan-diff-block diff-unchanged">' + escapeHtml(diff.before) + '</div>');
}
if (diff.removed) {
blocks.push('<div class="safety-plan-diff-block diff-removed">Removed<br>' + escapeHtml(diff.removed) + '</div>');
}
if (diff.added) {
blocks.push('<div class="safety-plan-diff-block diff-added">Added<br>' + escapeHtml(diff.added) + '</div>');
}
if (diff.after) {
blocks.push('<div class="safety-plan-diff-block diff-unchanged">' + escapeHtml(diff.after) + '</div>');
}
return blocks.join('');
}
function renderSafetyPlanDiff(versionId) {
if (!safetyPlanDiff) return;
var versions = getSafetyPlanVersions();
if (!versions.length) {
safetyPlanDiff.innerHTML = '<p class="safety-plan-empty">Save your plan to start tracking changes.</p>';
return;
}
var selected = getSafetyPlanVersionById(versionId || selectedSafetyPlanVersionId || versions[0].id);
if (!selected) {
selected = { version: versions[0], index: 0, versions: versions };
selectedSafetyPlanVersionId = versions[0].id;
}
var baseline = selected.versions[selected.index + 1]
? cloneSafetyPlan(selected.versions[selected.index + 1].plan)
: emptySafetyPlan();
var currentPlan = cloneSafetyPlan(selected.version.plan);
var baselineLabel = selected.versions[selected.index + 1]
? formatSafetyPlanTimestamp(selected.versions[selected.index + 1].savedAt)
: 'an empty plan';
var fields = [
['Warning signs', 'warningSigns'],
['Internal coping strategies', 'coping'],
['People/Places for distraction', 'distraction'],
['People I can ask for help', 'help'],
['Making my environment safe', 'environment']
];
var html = [
'<div class="safety-plan-diff-meta">Comparing ' +
escapeHtml(formatSafetyPlanTimestamp(selected.version.savedAt)) +
' against ' + escapeHtml(baselineLabel) + '.</div>'
];
fields.forEach(function(field) {
html.push(
'<div class="safety-plan-diff-field">' +
'<h4>' + escapeHtml(field[0]) + '</h4>' +
renderDiffSegmentHtml(baseline[field[1]], currentPlan[field[1]]) +
'</div>'
);
});
safetyPlanDiff.innerHTML = html.join('');
}
function renderSafetyPlanVersionHistory() {
if (!safetyPlanHistory) return;
var versions = getSafetyPlanVersions();
if (!versions.length) {
safetyPlanHistory.innerHTML = '<p class="safety-plan-empty">No saved versions yet. Each save creates a new local version.</p>';
renderSafetyPlanDiff(null);
return;
}
if (!selectedSafetyPlanVersionId || !getSafetyPlanVersionById(selectedSafetyPlanVersionId)) {
selectedSafetyPlanVersionId = versions[0].id;
}
var html = [];
versions.forEach(function(version, index) {
var note = 'Saved locally';
if (version.source === 'restore') {
note = 'Restored from an earlier version';
} else if (version.source === 'legacy') {
note = 'Imported from your previous saved plan';
}
html.push(
'<div class="safety-plan-history-item' + (selectedSafetyPlanVersionId === version.id ? ' active' : '') + '">' +
'<div class="safety-plan-history-meta">' +
'<span class="safety-plan-history-title">' + escapeHtml(index === 0 ? 'Current version' : 'Version ' + (versions.length - index)) + '</span>' +
'<span class="safety-plan-empty">' + escapeHtml(formatSafetyPlanTimestamp(version.savedAt)) + '</span>' +
'</div>' +
'<div class="safety-plan-history-note">' + escapeHtml(note) + '</div>' +
'<div class="safety-plan-history-actions">' +
'<button type="button" class="safety-plan-history-button" data-version-id="' + escapeHtml(version.id) + '">View diff</button>' +
'<button type="button" class="safety-plan-restore-button" data-restore-version-id="' + escapeHtml(version.id) + '">Restore this version</button>' +
'</div>' +
'</div>'
);
});
safetyPlanHistory.innerHTML = html.join('');
renderSafetyPlanDiff(selectedSafetyPlanVersionId);
}
function saveSafetyPlanVersion(plan, meta) {
var snapshot = buildSafetyPlanVersion(plan, meta);
var versions = getSafetyPlanVersions();
versions.unshift(snapshot);
setSafetyPlanVersions(versions);
localStorage.setItem(SAFETY_PLAN_STORAGE_KEY, JSON.stringify(snapshot.plan));
selectedSafetyPlanVersionId = snapshot.id;
renderSafetyPlanVersionHistory();
return snapshot;
}
function restoreSafetyPlanVersion(versionId) {
var selected = getSafetyPlanVersionById(versionId);
if (!selected) {
setSafetyPlanStatus('That version could not be restored.', 'error');
return;
}
applySafetyPlan(selected.version.plan);
var restored = saveSafetyPlanVersion(selected.version.plan, {
source: 'restore',
restoredFrom: selected.version.id
});
setSafetyPlanStatus(
'Restored version from ' + formatSafetyPlanTimestamp(selected.version.savedAt) + ' as the current plan.',
'success'
);
return restored;
}
function loadSafetyPlan() {
var versions = ensureSafetyPlanVersionHistory();
var latestPlan = versions.length ? versions[0].plan : null;
if (!latestPlan) {
try {
var saved = localStorage.getItem(SAFETY_PLAN_STORAGE_KEY);
latestPlan = saved ? JSON.parse(saved) : null;
} catch (e) {
latestPlan = null;
try {
var saved = localStorage.getItem('timmy_safety_plan');
if (saved) {
var plan = JSON.parse(saved);
document.getElementById('sp-warning-signs').value = plan.warningSigns || '';
document.getElementById('sp-coping').value = plan.coping || '';
document.getElementById('sp-distraction').value = plan.distraction || '';
document.getElementById('sp-help').value = plan.help || '';
document.getElementById('sp-environment').value = plan.environment || '';
}
}
applySafetyPlan(latestPlan || emptySafetyPlan());
renderSafetyPlanVersionHistory();
if (versions.length) {
setSafetyPlanStatus('Version history stays on this device only.', '');
} else {
setSafetyPlanStatus('Every save creates a local version you can diff and restore.', '');
}
}
function openSafetyPlanModal(triggerEl) {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(triggerEl);
}
if (safetyPlanHistory) {
safetyPlanHistory.addEventListener('click', function(event) {
var restoreButton = event.target.closest('[data-restore-version-id]');
if (restoreButton) {
restoreSafetyPlanVersion(restoreButton.getAttribute('data-restore-version-id'));
return;
}
var diffButton = event.target.closest('[data-version-id]');
if (diffButton) {
selectedSafetyPlanVersionId = diffButton.getAttribute('data-version-id');
renderSafetyPlanVersionHistory();
}
});
} catch (e) {}
}
closeSafetyPlan.addEventListener('click', function() {
@@ -1739,23 +1259,41 @@ Sovereignty and service always.`;
});
saveSafetyPlan.addEventListener('click', function() {
var plan = {
warningSigns: document.getElementById('sp-warning-signs').value,
coping: document.getElementById('sp-coping').value,
distraction: document.getElementById('sp-distraction').value,
help: document.getElementById('sp-help').value,
environment: document.getElementById('sp-environment').value
};
try {
var snapshot = saveSafetyPlanVersion(getSafetyPlanFormData(), { source: 'save' });
setSafetyPlanStatus('Saved locally as a new version at ' + formatSafetyPlanTimestamp(snapshot.savedAt) + '.', 'success');
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
alert('Safety plan saved locally.');
} catch (e) {
setSafetyPlanStatus('Error saving plan.', 'error');
alert('Error saving plan.');
}
});
// ===== SAFETY PLAN FOCUS TRAP (fix #65) =====
// Focusable elements inside the modal, in tab order
var _spFocusableIds = [
'close-safety-plan',
'sp-warning-signs',
'sp-coping',
'sp-distraction',
'sp-help',
'sp-environment',
'cancel-safety-plan',
'save-safety-plan'
];
var _spTriggerEl = null; // element that opened the modal
function _getSpFocusableEls() {
return Array.prototype.slice.call(
safetyPlanModal.querySelectorAll('button:not([disabled]), textarea:not([disabled]), a[href], input:not([disabled]), select:not([disabled]), [tabindex]:not([tabindex="-1"])')
).filter(function(el) {
return !!(el.offsetWidth || el.offsetHeight || el.getClientRects().length);
});
return _spFocusableIds
.map(function(id) { return document.getElementById(id); })
.filter(function(el) { return el && !el.disabled; });
}
function _trapSafetyPlanFocus(e) {
@@ -1812,13 +1350,17 @@ Sovereignty and service always.`;
// Wire open buttons to activate focus trap
safetyPlanBtn.addEventListener('click', function() {
openSafetyPlanModal(safetyPlanBtn);
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
openSafetyPlanModal(crisisSafetyPlanBtn);
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(crisisSafetyPlanBtn);
});
}
@@ -1860,7 +1402,11 @@ Sovereignty and service always.`;
fetch('/api/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
'X-Crisis-Session-ID': crisisSessionId,
'X-Crisis-Level': currentCrisisLevel || 'NONE'
},
body: JSON.stringify({
model: 'timmy',
messages: allMessages,
@@ -1870,6 +1416,23 @@ Sovereignty and service always.`;
}).then(function(response) {
clearTimeout(timeoutId);
// Check for hermes-agent crisis escalation headers
var hermesLevel = response.headers.get('X-Crisis-Escalation');
if (hermesLevel && hermesLevel !== 'NONE') {
recordEscalation(hermesLevel, 'hermes-agent', 'from response header');
// Update UI if escalated beyond current
if (hermesLevel === 'CRITICAL' && !crisisOverlay.classList.contains('active')) {
showOverlay();
} else if (hermesLevel === 'HIGH' && !crisisPanelShown) {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
}
}
var hermesAck = response.headers.get('X-Crisis-Deescalation');
if (hermesAck) {
recordEscalation(hermesAck, 'hermes-agent', 'de-escalation');
}
if (!response.ok) {
throw new Error('HTTP ' + response.status);
}
@@ -1965,7 +1528,9 @@ Sovereignty and service always.`;
// Check for URL params (e.g., ?safetyplan=true for PWA shortcut)
var urlParams = new URLSearchParams(window.location.search);
if (urlParams.get('safetyplan') === 'true') {
openSafetyPlanModal(safetyPlanBtn);
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
// Clean up URL
window.history.replaceState({}, document.title, window.location.pathname);
}

View File

@@ -52,34 +52,6 @@ class TestCrisisOverlayFocusTrap(unittest.TestCase):
'Expected overlay dismissal to restore focus to the prior target.',
)
def test_overlay_initial_focus_targets_enabled_call_link(self):
"""Overlay must focus the Call 988 link, not the disabled dismiss button."""
# Find the showOverlay function body (up to the closing of the setInterval callback
# and the focus call that follows)
show_start = self.html.find('function showOverlay()')
self.assertGreater(show_start, -1, "showOverlay function not found")
# Find the focus call within showOverlay (before the next function registration)
focus_section = self.html[show_start:show_start + 2000]
self.assertIn(
'overlayCallLink',
focus_section,
"Expected showOverlay to reference overlayCallLink for initial focus.",
)
# Ensure the old buggy pattern is gone
focus_line_region = self.html[show_start + 800:show_start + 1200]
self.assertNotIn(
'overlayDismissBtn.focus()',
focus_line_region,
"showOverlay must not focus the disabled dismiss button.",
)
def test_overlay_call_link_variable_is_declared(self):
self.assertIn(
"querySelector('.overlay-call')",
self.html,
"Expected a JS reference to the .overlay-call link element.",
)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,21 +0,0 @@
from pathlib import Path
def test_safety_plan_version_history_contract_present() -> None:
html = Path("index.html").read_text(encoding="utf-8")
required_snippets = [
'id="safety-plan-history"',
'id="safety-plan-diff"',
'id="safety-plan-status"',
'Version History',
'timmy_safety_plan_versions',
'function renderSafetyPlanVersionHistory()',
'function renderSafetyPlanDiff(',
'function restoreSafetyPlanVersion(',
'diff-added',
'diff-removed',
]
for snippet in required_snippets:
assert snippet in html, f"missing safety plan versioning contract: {snippet}"

View File

@@ -50,22 +50,6 @@ class TestCrisisOfflinePage(unittest.TestCase):
for phrase in required_phrases:
self.assertIn(phrase, self.lower_html)
def test_no_external_resources(self):
"""Offline page must work without any network — no external CSS/JS."""
import re
html = self.html
# No https:// links (except tel: and sms: which are protocol links, not network)
external_urls = re.findall(r'href=["\']https://|src=["\']https://', html)
self.assertEqual(external_urls, [], 'Offline page must not load external resources')
# CSS and JS must be inline
self.assertIn('<style>', html, 'CSS must be inline')
self.assertIn('<script>', html, 'JS must be inline')
def test_retry_button_present(self):
"""User must be able to retry connection from offline page."""
self.assertIn('retry-connection', self.html)
self.assertIn('Retry connection', self.html)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,277 +1,202 @@
"""
Tests for crisis session tracking and escalation (P0 #35).
Tests for crisis session tracker — the-door <-> hermes-agent integration.
Covers: session_tracker.py
Run with: python -m pytest tests/test_session_tracker.py -v
Verifies:
- Session creation and tracking
- Escalation and de-escalation events
- Merged level from the-door + hermes-agent
- History and unacknowledged state
- Persistence round-trip
- Cleanup of stale sessions
"""
import unittest
import sys
import json
import os
import tempfile
import time
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
from crisis.session_tracker import (
CrisisLevel,
CrisisSession,
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
EscalationEvent,
)
class TestSessionState(unittest.TestCase):
"""Test SessionState defaults."""
class TestCrisisLevel(unittest.TestCase):
"""CrisisLevel enum ordering and conversion."""
def test_default_state(self):
s = SessionState()
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.is_escalating)
self.assertFalse(s.is_deescalating)
def test_from_string(self):
self.assertEqual(CrisisLevel.from_string("HIGH"), CrisisLevel.HIGH)
self.assertEqual(CrisisLevel.from_string("critical"), CrisisLevel.CRITICAL)
self.assertEqual(CrisisLevel.from_string("garbage"), CrisisLevel.NONE)
def test_ordering(self):
self.assertTrue(CrisisLevel.HIGH > CrisisLevel.MEDIUM)
self.assertTrue(CrisisLevel.CRITICAL >= CrisisLevel.HIGH)
self.assertTrue(CrisisLevel.LOW < CrisisLevel.HIGH)
self.assertEqual(CrisisLevel.NONE, CrisisLevel.NONE)
class TestSessionTracking(unittest.TestCase):
"""Test basic session state tracking."""
class TestCrisisSession(unittest.TestCase):
"""Single session state management."""
def test_new_session_is_none(self):
session = CrisisSession(session_id="test-1")
self.assertEqual(session.current_level, "NONE")
self.assertEqual(session.hermes_level, "NONE")
self.assertEqual(len(session.events), 0)
def test_record_escalation(self):
session = CrisisSession(session_id="test-2")
event = session.record("HIGH", "the-door", "keyword: despair")
self.assertEqual(event.level, "HIGH")
self.assertEqual(event.previous_level, "NONE")
self.assertEqual(event.source, "the-door")
self.assertEqual(session.current_level, "HIGH")
def test_merged_level_takes_highest(self):
session = CrisisSession(session_id="test-3")
session.record("MEDIUM", "the-door")
session.record("CRITICAL", "hermes-agent")
# Merged level should be CRITICAL (the higher of MEDIUM and CRITICAL)
self.assertEqual(session.current_level, "CRITICAL")
self.assertEqual(session.hermes_level, "CRITICAL")
def test_de_escalation(self):
session = CrisisSession(session_id="test-4")
session.record("HIGH", "the-door")
session.record("MEDIUM", "the-door")
self.assertEqual(session.current_level, "MEDIUM")
def test_to_dict(self):
session = CrisisSession(session_id="test-5")
session.record("HIGH", "the-door", "test")
d = session.to_dict()
self.assertEqual(d["session_id"], "test-5")
self.assertEqual(d["current_level"], "HIGH")
self.assertEqual(d["event_count"], 1)
self.assertEqual(len(d["events"]), 1)
class TestCrisisSessionTracker(unittest.TestCase):
"""Full tracker with multiple sessions."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_record_none_message(self):
state = self.tracker.record(detect_crisis("Hello Timmy"))
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.message_count, 1)
self.assertEqual(state.peak_level, "NONE")
def test_get_or_create(self):
s1 = self.tracker.get_or_create("s1")
s2 = self.tracker.get_or_create("s1")
self.assertIs(s1, s2) # Same session object
def test_record_low_message(self):
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("Having a rough day"))
self.assertIn(state.current_level, ("LOW", "NONE"))
self.assertEqual(state.message_count, 2)
def test_record_critical_updates_peak(self):
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to kill myself"))
self.assertEqual(state.current_level, "CRITICAL")
self.assertEqual(state.peak_level, "CRITICAL")
def test_peak_preserved_after_drop(self):
"""Peak level should stay at the highest seen, even after de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
state = self.tracker.record(detect_crisis("I'm feeling a bit better"))
self.assertEqual(state.peak_level, "CRITICAL")
def test_level_history(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to die"))
self.assertEqual(len(state.level_history), 3)
self.assertEqual(state.level_history[0], "NONE")
self.assertEqual(state.level_history[2], "CRITICAL")
def test_reset_clears_state(self):
self.tracker.record(detect_crisis("I want to kill myself"))
self.tracker.reset()
state = self.tracker.state
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.peak_level, "NONE")
self.assertEqual(state.message_count, 0)
self.assertEqual(state.level_history, [])
class TestEscalationDetection(unittest.TestCase):
"""Test escalation detection: LOW → HIGH in ≤3 messages."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_escalation_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_escalating)
def test_no_escalation_stable(self):
"""Two normal messages should not trigger escalation."""
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("How are you?"))
self.assertFalse(state.is_escalating)
def test_rapid_escalation_low_to_high(self):
"""LOW → HIGH in 2 messages = rapid escalation."""
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I can't take this anymore, everything is pointless"))
# Depending on detection, this could be HIGH or CRITICAL
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_rapid_escalation_three_messages(self):
"""NONE → LOW → HIGH in 3 messages = escalation."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_escalation_rate(self):
"""Rate should be positive when escalating."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
state = self.tracker.state
self.assertGreater(state.escalation_rate, 0)
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: sustained LOW after HIGH/CRITICAL."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_deescalation_without_prior_crisis(self):
"""No de-escalation if never reached HIGH/CRITICAL."""
for _ in range(6):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_deescalating)
def test_deescalation_after_critical(self):
"""5+ consecutive LOW/NONE messages after CRITICAL = de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm doing better today"))
state = self.tracker.state
if state.peak_level == "CRITICAL":
self.assertTrue(state.is_deescalating)
def test_deescalation_after_high(self):
"""5+ consecutive LOW/NONE messages after HIGH = de-escalation."""
self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
for _ in range(5):
self.tracker.record(detect_crisis("Feeling okay"))
state = self.tracker.state
if state.peak_level == "HIGH":
self.assertTrue(state.is_deescalating)
def test_interrupted_deescalation(self):
"""De-escalation resets if a HIGH message interrupts."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(3):
self.tracker.record(detect_crisis("Doing better"))
# Interrupt with another crisis
self.tracker.record(detect_crisis("I feel hopeless again"))
self.tracker.record(detect_crisis("Feeling okay now"))
state = self.tracker.state
# Should NOT be de-escalating yet (counter reset)
self.assertFalse(state.is_deescalating)
class TestSessionModifier(unittest.TestCase):
"""Test system prompt modifier generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_modifier_for_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_no_modifier_for_stable_session(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Good morning"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_escalation_modifier(self):
"""Escalating session should produce a modifier."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_escalating:
self.assertIn("escalated", modifier.lower())
self.assertIn("NONE", modifier)
self.assertIn("CRITICAL", modifier)
def test_deescalation_modifier(self):
"""De-escalating session should mention stabilizing."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm feeling okay"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_deescalating:
self.assertIn("stabilizing", modifier.lower())
def test_prior_crisis_modifier(self):
"""Past crisis should be noted even without active escalation."""
self.tracker.record(detect_crisis("I want to die"))
self.tracker.record(detect_crisis("Feeling a bit better"))
modifier = self.tracker.get_session_modifier()
# Should note the prior CRITICAL
if modifier:
self.assertIn("CRITICAL", modifier)
class TestUIHints(unittest.TestCase):
"""Test UI hint generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_ui_hints_structure(self):
self.tracker.record(detect_crisis("Hello"))
hints = self.tracker.get_ui_hints()
self.assertIn("session_escalating", hints)
self.assertIn("session_deescalating", hints)
self.assertIn("session_peak_level", hints)
self.assertIn("session_message_count", hints)
def test_ui_hints_escalation_warning(self):
"""Escalating session should have warning hint."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
hints = self.tracker.get_ui_hints()
if hints["session_escalating"]:
self.assertTrue(hints.get("escalation_warning"))
self.assertIn("suggested_action", hints)
class TestCheckCrisisWithSession(unittest.TestCase):
"""Test the convenience function combining detection + session tracking."""
def test_returns_combined_data(self):
tracker = CrisisSessionTracker()
result = check_crisis_with_session("I want to die", tracker)
self.assertIn("level", result)
def test_record_returns_result(self):
result = self.tracker.record("s1", "HIGH", "the-door", "keyword: help")
self.assertIn("session", result)
self.assertIn("current_level", result["session"])
self.assertIn("peak_level", result["session"])
self.assertIn("modifier", result["session"])
self.assertIn("event", result)
self.assertTrue(result["escalated"])
self.assertFalse(result["de_escalated"])
def test_session_updates_across_calls(self):
def test_de_escalation_detected(self):
self.tracker.record("s1", "HIGH", "the-door")
result = self.tracker.record("s1", "LOW", "the-door", "user calmed down")
self.assertFalse(result["escalated"])
self.assertTrue(result["de_escalated"])
def test_get_history(self):
self.tracker.record("s1", "HIGH", "the-door")
self.tracker.record("s1", "CRITICAL", "hermes-agent")
history = self.tracker.get_history("s1")
self.assertEqual(history["event_count"], 2)
self.assertEqual(history["current_level"], "CRITICAL")
self.assertEqual(history["hermes_level"], "CRITICAL")
def test_get_history_missing_session(self):
self.assertIsNone(self.tracker.get_history("nonexistent"))
def test_unacknowledged_state(self):
self.tracker.record("s1", "HIGH", "the-door")
unacked = self.tracker.get_unacknowledged("s1")
self.assertIsNotNone(unacked)
self.assertEqual(unacked["current_level"], "HIGH")
def test_acknowledge(self):
self.tracker.record("s1", "HIGH", "the-door")
self.assertTrue(self.tracker.acknowledge("s1"))
self.assertIsNone(self.tracker.get_unacknowledged("s1"))
def test_acknowledge_missing(self):
self.assertFalse(self.tracker.acknowledge("nonexistent"))
def test_cleanup(self):
self.tracker.record("old-session", "LOW", "the-door")
self.tracker._sessions["old-session"].created_at = time.time() - 100000
self.tracker.record("new-session", "LOW", "the-door")
removed = self.tracker.cleanup(max_age_seconds=50000)
self.assertEqual(removed, 1)
self.assertNotIn("old-session", self.tracker._sessions)
self.assertIn("new-session", self.tracker._sessions)
class TestPersistence(unittest.TestCase):
"""JSON persistence round-trip."""
def test_save_and_load(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
path = f.name
try:
t1 = CrisisSessionTracker(persist_path=path)
t1.record("s1", "HIGH", "the-door", "test persist")
t1.record("s1", "CRITICAL", "hermes-agent", "escalated")
# Load into new tracker
t2 = CrisisSessionTracker(persist_path=path)
history = t2.get_history("s1")
self.assertIsNotNone(history)
self.assertEqual(history["current_level"], "CRITICAL")
self.assertEqual(history["event_count"], 2)
finally:
os.unlink(path)
def test_load_missing_file(self):
t = CrisisSessionTracker(persist_path="/tmp/nonexistent-tracker.json")
self.assertEqual(len(t._sessions), 0)
class TestCrossSourceIntegration(unittest.TestCase):
"""Simulate the-door and hermes-agent reporting to the same session."""
def test_unified_escalation_history(self):
tracker = CrisisSessionTracker()
check_crisis_with_session("Hello", tracker)
result = check_crisis_with_session("I want to die", tracker)
self.assertEqual(result["session"]["message_count"], 2)
self.assertEqual(result["session"]["peak_level"], "CRITICAL")
sid = "web-session-abc"
# the-door detects something
r1 = tracker.record(sid, "MEDIUM", "the-door", "keyword: 'alone in this'")
self.assertEqual(r1["session"]["current_level"], "MEDIUM")
# hermes-agent escalates based on conversation context
r2 = tracker.record(sid, "HIGH", "hermes-agent", "user repeated 'no way out' 3x")
self.assertEqual(r2["session"]["current_level"], "HIGH")
self.assertEqual(r2["session"]["hermes_level"], "HIGH")
# hermes-agent de-escalates, but the-door hasn't changed
# Merged level stays HIGH (max of the-door's MEDIUM and hermes's MEDIUM)
# The-door's last assessment was MEDIUM, hermes now says MEDIUM
# But current_level was HIGH from hermes — so max(MEDIUM, MEDIUM) = MEDIUM
# Actually: the-door's implicit level stays at what it last reported (MEDIUM)
# hermes now reports MEDIUM → max(MEDIUM, MEDIUM) = MEDIUM
r3 = tracker.record(sid, "MEDIUM", "hermes-agent", "user said 'feeling a bit better'")
self.assertEqual(r3["session"]["current_level"], "MEDIUM")
self.assertTrue(r3["de_escalated"])
# Verify full history
history = tracker.get_history(sid)
self.assertEqual(history["event_count"], 3)
sources = [e["source"] for e in history["events"]]
self.assertIn("the-door", sources)
self.assertIn("hermes-agent", sources)
class TestPrivacy(unittest.TestCase):
"""Verify privacy-first design principles."""
def test_no_persistence_mechanism(self):
"""Session tracker should have no database, file, or network calls."""
import inspect
source = inspect.getsource(CrisisSessionTracker)
# Should not import database, requests, or file I/O
forbidden = ["sqlite", "requests", "urllib", "open(", "httpx", "aiohttp"]
for word in forbidden:
self.assertNotIn(word, source.lower(),
f"Session tracker should not use {word} — privacy-first design")
def test_state_contained_in_memory(self):
"""All state should be instance attributes, not module-level."""
tracker = CrisisSessionTracker()
tracker.record(detect_crisis("I want to die"))
# New tracker should have clean state (no global contamination)
fresh = CrisisSessionTracker()
self.assertEqual(fresh.state.current_level, "NONE")
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()