Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
b7281f9542 feat: expose safety plan from chat header (#38)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 15s
2026-04-13 21:55:03 -04:00
6 changed files with 169 additions and 762 deletions

View File

@@ -7,7 +7,6 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, CrisisSessionState
__all__ = [
"detect_crisis",
@@ -20,6 +19,4 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"CrisisSessionState",
]

View File

@@ -1,261 +0,0 @@
"""
Session-Level Crisis Tracking for the-door.
Tracks crisis signals across a conversation session to detect escalation
patterns and de-escalation. Privacy-first: state lives in memory only,
resets on new session, never persists to disk.
Key behaviors:
- Escalation: LOW → HIGH in 3 messages triggers heightened awareness
- De-escalation: CRITICAL → LOW for 5+ messages allows stepping down
- Session state enriches the system prompt with trajectory context
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
tracker.record("I'm feeling down")
tracker.record("Things are really hard")
tracker.record("I can't go on anymore") # escalation detected
print(tracker.get_prompt_context())
# "User has escalated from LOW to HIGH over 3 messages."
"""
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
from .detect import detect_crisis, CrisisDetectionResult, SCORES
# Level hierarchy for ordering comparisons
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class CrisisSessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[Tuple[int, str]] = field(default_factory=list) # (message_index, level)
escalated: bool = False
escalation_messages: int = 0 # messages from first level to peak
deescalation_count: int = 0 # consecutive messages at lower level
deescalating: bool = False
deescalation_confirmed: bool = False # True once de-escalation threshold met
class CrisisSessionTracker:
"""
Session-level crisis tracker.
Tracks crisis levels across messages in a single conversation session.
Detects rapid escalation and gradual de-escalation. Provides context
strings for system prompt injection.
State is in-memory only. New session = new instance.
"""
# How many messages for escalation detection
ESCALATION_WINDOW = 3
# How many consecutive messages at lower level for de-escalation
DEESCALATION_THRESHOLD = 5
def __init__(self):
self._state = CrisisSessionState()
@property
def state(self) -> CrisisSessionState:
"""Read-only snapshot of current state."""
return CrisisSessionState(
current_level=self._state.current_level,
peak_level=self._state.peak_level,
message_count=self._state.message_count,
level_history=list(self._state.level_history),
escalated=self._state.escalated,
escalation_messages=self._state.escalation_messages,
deescalation_count=self._state.deescalation_count,
deescalating=self._state.deescalating,
)
def reset(self):
"""Reset all state. Called on new session."""
self._state = CrisisSessionState()
def record(self, text: str) -> CrisisDetectionResult:
"""
Record a message and update session state.
Returns the single-message detection result (unchanged from detect.py).
Session-level intelligence is tracked internally.
"""
detection = detect_crisis(text)
self._record_level(detection.level)
return detection
def record_level(self, level: str):
"""Record an already-detected crisis level (for when detection ran separately)."""
self._record_level(level)
def _record_level(self, level: str):
"""Internal: update state with a new crisis level."""
self._state.message_count += 1
idx = self._state.message_count
self._state.level_history.append((idx, level))
prev_level = self._state.current_level
prev_order = LEVEL_ORDER.get(prev_level, 0)
new_order = LEVEL_ORDER.get(level, 0)
# Update current level
self._state.current_level = level
# Track peak
if new_order > LEVEL_ORDER.get(self._state.peak_level, 0):
self._state.peak_level = level
# ── Escalation detection ──────────────────────────────
if new_order > prev_order:
# User is going up — reset de-escalation counter
self._state.deescalation_count = 0
self._state.deescalating = False
# Always check for escalation pattern in recent window
if not self._state.escalated:
window = self._get_recent_levels(self.ESCALATION_WINDOW)
if len(window) >= self.ESCALATION_WINDOW:
first_in_window = window[0][1]
last_in_window = window[-1][1]
first_order = LEVEL_ORDER.get(first_in_window, 0)
last_order = LEVEL_ORDER.get(last_in_window, 0)
if last_order > first_order:
self._state.escalated = True
self._state.escalation_messages = self.ESCALATION_WINDOW
# ── De-escalation detection ───────────────────────────
elif new_order < prev_order:
self._state.deescalation_count += 1
self._state.deescalating = True
# If de-escalation counter meets threshold AND we were escalated
if (self._state.escalated and
self._state.deescalation_count >= self.DEESCALATION_THRESHOLD):
self._state.escalated = False
self._state.deescalating = False
self._state.deescalation_confirmed = True
self._state.deescalation_count = 0
# Same level — increment de-escalation counter if already de-escalating
elif self._state.deescalating:
self._state.deescalation_count += 1
if (self._state.escalated and
self._state.deescalation_count >= self.DEESCALATION_THRESHOLD):
self._state.escalated = False
self._state.deescalating = False
self._state.deescalation_confirmed = True
self._state.deescalation_count = 0
def _get_recent_levels(self, n: int) -> List[Tuple[int, str]]:
"""Get the last n entries from level history."""
return self._state.level_history[-n:]
def get_prompt_context(self) -> str:
"""
Generate a human-readable context string for system prompt injection.
Returns empty string if no session-level crisis context is needed.
"""
s = self._state
if s.message_count == 0:
return ""
parts = []
# Escalation alert
if s.escalated and s.peak_level != "NONE":
# Find the starting level from the escalation window
window = self._get_recent_levels(self.ESCALATION_WINDOW)
if window:
start_level = window[0][1]
parts.append(
f"User has escalated from {start_level} to {s.peak_level} "
f"over {s.message_count} message{'s' if s.message_count != 1 else ''}."
)
parts.append("Heightened crisis awareness is warranted.")
# Confirmed de-escalation
elif s.deescalation_confirmed and s.peak_level in ("HIGH", "CRITICAL"):
parts.append(
f"User has de-escalated from {s.peak_level} to {s.current_level}."
)
parts.append("De-escalation confirmed. Continue gentle presence.")
# Active de-escalation (not yet confirmed)
elif s.deescalating and s.peak_level != "NONE":
parts.append(
f"User has de-escalated from {s.peak_level} to {s.current_level} "
f"over {s.deescalation_count} message{'s' if s.deescalation_count != 1 else ''}."
)
parts.append("Still in de-escalation. Maintain supportive awareness.")
# Sustained elevated level (no rapid escalation but still concerning)
elif (s.current_level in ("HIGH", "CRITICAL") and
not s.escalated and s.message_count >= 3):
parts.append(
f"User has been at {s.current_level} level for {s.message_count} messages."
)
parts.append("Continue crisis-aware response.")
# Peak was higher than current (user improved but may still be fragile)
elif (LEVEL_ORDER.get(s.peak_level, 0) > LEVEL_ORDER.get(s.current_level, 0) and
s.peak_level in ("HIGH", "CRITICAL") and
not s.deescalating):
parts.append(
f"Note: session peak was {s.peak_level}. "
f"User is now at {s.current_level}."
)
parts.append("Remain attentive.")
return " ".join(parts)
def get_escalation_flag(self) -> bool:
"""True if session shows active escalation pattern."""
return self._state.escalated
def get_effective_level(self) -> str:
"""
Get the effective crisis level considering session state.
Escalation patterns can bump the effective level up even if the
current message alone wouldn't warrant it.
"""
s = self._state
if s.escalated and s.peak_level in ("HIGH", "CRITICAL"):
return s.peak_level
return s.current_level
def should_heighten_awareness(self) -> bool:
"""
True when the session trajectory warrants heightened awareness
beyond what a single-message detection would provide.
"""
return self._state.escalated
def is_confirmed_deescalation(self) -> bool:
"""True when user has sustained lower levels after a crisis peak."""
s = self._state
return s.deescalation_confirmed
def __repr__(self) -> str:
s = self._state
return (
f"CrisisSessionTracker("
f"current={s.current_level}, peak={s.peak_level}, "
f"msgs={s.message_count}, escalated={s.escalated}, "
f"deescalating={s.deescalating})"
)

View File

@@ -1,407 +0,0 @@
"""
Tests for session-level crisis tracking (Issue #35).
Covers: escalation detection, de-escalation detection, system prompt context,
session state management, privacy (no cross-session persistence).
Run with: python -m pytest crisis/tests_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.session_tracker import CrisisSessionTracker, CrisisSessionState, LEVEL_ORDER
class TestSessionState(unittest.TestCase):
"""Test basic session state management."""
def test_initial_state(self):
tracker = CrisisSessionTracker()
s = tracker.state
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.escalated)
def test_record_increments_count(self):
tracker = CrisisSessionTracker()
tracker.record("hello")
self.assertEqual(tracker.state.message_count, 1)
tracker.record("world")
self.assertEqual(tracker.state.message_count, 2)
def test_record_tracks_history(self):
tracker = CrisisSessionTracker()
tracker.record("I'm struggling") # LOW
tracker.record("I feel hopeless and nobody cares") # MEDIUM
history = tracker.state.level_history
self.assertEqual(len(history), 2)
self.assertEqual(history[0], (1, "LOW"))
self.assertIn(history[1][1], ("MEDIUM", "LOW", "HIGH"))
def test_reset_clears_state(self):
tracker = CrisisSessionTracker()
tracker.record("I want to die") # CRITICAL
self.assertEqual(tracker.state.peak_level, "CRITICAL")
tracker.reset()
s = tracker.state
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
def test_peak_tracking(self):
tracker = CrisisSessionTracker()
tracker.record("I'm feeling down") # LOW
tracker.record("I feel hopeless with no way out") # HIGH
tracker.record("Actually I'm a bit better") # back to LOW
self.assertEqual(tracker.state.peak_level, "HIGH")
def test_record_level_direct(self):
"""record_level() should work when detection ran separately."""
tracker = CrisisSessionTracker()
tracker.record_level("LOW")
tracker.record_level("MEDIUM")
tracker.record_level("HIGH")
self.assertEqual(tracker.state.message_count, 3)
self.assertEqual(tracker.state.peak_level, "HIGH")
class TestEscalationDetection(unittest.TestCase):
"""Test rapid escalation detection: LOW → HIGH in N messages."""
def test_escalation_low_to_high_in_3(self):
"""Core AC: LOW → HIGH in 3 messages triggers escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time") # LOW
tracker.record("I feel really hopeless about everything") # HIGH
tracker.record("I can't take this anymore") # HIGH
self.assertTrue(tracker.get_escalation_flag())
def test_escalation_low_to_critical_in_3(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm struggling") # LOW
tracker.record("Nobody would miss me") # HIGH
tracker.record("I want to end my life") # CRITICAL
self.assertTrue(tracker.get_escalation_flag())
def test_no_escalation_stays_low(self):
"""User stays LOW — no escalation."""
tracker = CrisisSessionTracker()
tracker.record("rough day")
tracker.record("tough week")
tracker.record("feeling down")
self.assertFalse(tracker.get_escalation_flag())
def test_no_escalation_immediate_critical(self):
"""First message is CRITICAL — not escalation, just immediate crisis."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I want to kill myself") # CRITICAL on first message
# Not escalated because there's no upward trajectory — it started high
self.assertFalse(tracker.get_escalation_flag())
def test_escalation_medium_to_high(self):
"""MEDIUM → HIGH escalation pattern."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I feel alone and nobody cares") # MEDIUM
tracker.record("I feel completely hopeless") # HIGH
tracker.record("There's no way out") # HIGH
self.assertTrue(tracker.get_escalation_flag())
def test_escalation_not_triggered_without_increase(self):
"""Same level messages don't constitute escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I feel hopeless") # HIGH
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is broken") # HIGH
# No upward trajectory — started high, stayed high
self.assertFalse(tracker.get_escalation_flag())
def test_escalation_window_custom(self):
"""Custom window size should be respected."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 2
tracker.record("I'm unhappy") # LOW
tracker.record("I can't go on anymore") # HIGH
self.assertTrue(tracker.get_escalation_flag())
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: CRITICAL → LOW for 5+ messages."""
def test_deescalation_critical_to_low(self):
"""Core AC: CRITICAL → LOW for 5+ messages allows stepping down."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate to CRITICAL
tracker.record("I'm struggling") # LOW
tracker.record("I can't go on") # HIGH
tracker.record("I want to die") # CRITICAL
self.assertTrue(tracker.get_escalation_flag())
# De-escalate over 5 messages
tracker.record("I called 988") # NONE/LOW
tracker.record("I'm calmer now") # NONE
tracker.record("Thank you for being here") # NONE
tracker.record("I'm going to be okay") # NONE
tracker.record("Taking it one moment at a time") # NONE
self.assertTrue(tracker.is_confirmed_deescalation())
self.assertFalse(tracker.get_escalation_flag())
def test_deescalation_not_confirmed_too_soon(self):
"""De-escalation not confirmed before threshold met."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
tracker.record("I'm struggling")
tracker.record("I can't go on")
tracker.record("I want to die")
self.assertTrue(tracker.get_escalation_flag())
# Only 3 messages of de-escalation
tracker.record("I called someone")
tracker.record("I'm calmer")
tracker.record("Feeling better")
self.assertFalse(tracker.is_confirmed_deescalation())
self.assertTrue(tracker.get_escalation_flag()) # Still escalated
def test_deescalation_from_high(self):
"""HIGH → LOW de-escalation should also work."""
tracker = CrisisSessionTracker()
tracker.DEESCALATION_THRESHOLD = 5
# Build up to HIGH
tracker.record("I'm down") # LOW
tracker.record("I feel hopeless") # HIGH
tracker.record("No way out") # HIGH
tracker.record("Everything is pointless") # HIGH
# De-escalate
for _ in range(5):
tracker.record("I'm doing a bit better") # LOW/NONE
self.assertTrue(tracker.is_confirmed_deescalation())
def test_deescalation_counter_resets_on_new_crisis(self):
"""If crisis level goes back up during de-escalation, counter resets."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate
tracker.record("I'm struggling") # LOW
tracker.record("I feel hopeless") # HIGH
tracker.record("I want to die") # CRITICAL
# Start de-escalating
tracker.record("I called someone") # LOW
tracker.record("I'm calmer") # NONE
# Go back up
tracker.record("Actually I can't do this") # HIGH
self.assertFalse(tracker.is_confirmed_deescalation())
class TestSystemPromptContext(unittest.TestCase):
"""Test system prompt context generation."""
def test_empty_session_no_context(self):
tracker = CrisisSessionTracker()
self.assertEqual(tracker.get_prompt_context(), "")
def test_escalation_context(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time")
tracker.record("Things are really hard")
tracker.record("I can't go on anymore")
ctx = tracker.get_prompt_context()
self.assertIn("escalated", ctx.lower())
self.assertIn("LOW", ctx)
self.assertIn("heightened", ctx.lower())
def test_deescalation_context(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate
tracker.record("I'm struggling")
tracker.record("I feel hopeless")
tracker.record("I want to die")
# De-escalate
for _ in range(5):
tracker.record("I'm okay now")
ctx = tracker.get_prompt_context()
self.assertIn("de-escalated", ctx.lower())
self.assertIn("confirmed", ctx.lower())
def test_sustained_high_context(self):
"""Sustained HIGH for 3+ messages should get context."""
tracker = CrisisSessionTracker()
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is pointless") # HIGH
tracker.record("I can't go on") # HIGH
ctx = tracker.get_prompt_context()
self.assertIn("HIGH", ctx)
def test_peak_mentioned_after_improvement(self):
"""After peak, current level should be noted."""
tracker = CrisisSessionTracker()
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is pointless") # HIGH
tracker.record("I feel a bit better") # back to LOW
ctx = tracker.get_prompt_context()
self.assertIn("peak", ctx.lower())
self.assertIn("HIGH", ctx)
class TestEffectiveLevel(unittest.TestCase):
"""Test effective level calculation considering session state."""
def test_effective_level_normal(self):
tracker = CrisisSessionTracker()
tracker.record("I'm struggling") # LOW
self.assertEqual(tracker.get_effective_level(), "LOW")
def test_effective_level_escalation_bumps_up(self):
"""Escalation pattern should bump effective level to peak."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time") # LOW
tracker.record("I feel completely hopeless") # HIGH
tracker.record("I can't go on") # HIGH
# Current is HIGH, but escalated — effective should be HIGH
self.assertEqual(tracker.get_effective_level(), "HIGH")
class TestPrivacy(unittest.TestCase):
"""Test privacy requirements — no cross-session persistence."""
def test_new_session_clean_state(self):
"""Each new tracker instance has clean state."""
t1 = CrisisSessionTracker()
t1.record("I want to die")
self.assertEqual(t1.state.peak_level, "CRITICAL")
t2 = CrisisSessionTracker()
self.assertEqual(t2.state.peak_level, "NONE")
self.assertEqual(t2.state.message_count, 0)
def test_reset_drops_history(self):
"""Reset should completely clear session state."""
t = CrisisSessionTracker()
t.record("I'm struggling")
t.record("I can't go on")
t.reset()
self.assertEqual(t.state.level_history, [])
self.assertEqual(t.state.message_count, 0)
self.assertFalse(t.state.escalated)
class TestEdgeCases(unittest.TestCase):
"""Edge cases and boundary conditions."""
def test_single_message_none(self):
tracker = CrisisSessionTracker()
tracker.record("Hello Timmy")
self.assertFalse(tracker.get_escalation_flag())
self.assertEqual(tracker.get_prompt_context(), "")
def test_oscillating_levels(self):
"""User oscillating between levels shouldn't cause false escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm fine")
tracker.record("I'm struggling")
tracker.record("I'm fine")
tracker.record("I'm struggling")
# Oscillation without sustained escalation
self.assertFalse(tracker.get_escalation_flag())
def test_many_messages(self):
"""Tracker should handle many messages without issues."""
tracker = CrisisSessionTracker()
for i in range(100):
tracker.record("Hello there")
self.assertEqual(tracker.state.message_count, 100)
def test_empty_string(self):
tracker = CrisisSessionTracker()
tracker.record("")
self.assertEqual(tracker.state.message_count, 1)
self.assertEqual(tracker.state.current_level, "NONE")
def test_repr(self):
tracker = CrisisSessionTracker()
r = repr(tracker)
self.assertIn("CrisisSessionTracker", r)
self.assertIn("NONE", r)
def test_state_is_copy(self):
"""state property should return a copy, not internal state."""
tracker = CrisisSessionTracker()
s1 = tracker.state
tracker.record("I'm struggling")
s2 = tracker.state
self.assertEqual(s1.message_count, 0)
self.assertEqual(s2.message_count, 1)
class TestLevelOrder(unittest.TestCase):
"""Test level ordering is correct."""
def test_level_ordering(self):
self.assertLess(LEVEL_ORDER["NONE"], LEVEL_ORDER["LOW"])
self.assertLess(LEVEL_ORDER["LOW"], LEVEL_ORDER["MEDIUM"])
self.assertLess(LEVEL_ORDER["MEDIUM"], LEVEL_ORDER["HIGH"])
self.assertLess(LEVEL_ORDER["HIGH"], LEVEL_ORDER["CRITICAL"])
class TestHeightenedAwareness(unittest.TestCase):
"""Test heightened awareness flag."""
def test_heightened_on_escalation(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm unhappy")
tracker.record("I feel hopeless")
tracker.record("I can't go on")
self.assertTrue(tracker.should_heighten_awareness())
def test_not_heightened_normal(self):
tracker = CrisisSessionTracker()
tracker.record("Hello")
self.assertFalse(tracker.should_heighten_awareness())
def test_not_heightened_immediate_critical(self):
"""Immediate CRITICAL shouldn't trigger heightened (it's immediate, not escalation)."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I want to kill myself")
self.assertFalse(tracker.should_heighten_awareness())
if __name__ == "__main__":
unittest.main()

View File

@@ -80,6 +80,64 @@ html, body {
gap: 4px;
}
#chat-header {
flex-shrink: 0;
display: flex;
align-items: center;
justify-content: space-between;
gap: 12px;
padding: 10px 12px;
background: #10161d;
border-bottom: 1px solid #21262d;
}
.chat-header-copy {
min-width: 0;
}
#chat-header-title {
font-size: 0.95rem;
font-weight: 600;
color: #e6edf3;
}
#chat-header-subtitle {
font-size: 0.75rem;
color: #8b949e;
margin-top: 2px;
}
.safety-link-btn {
display: inline-flex;
align-items: center;
gap: 8px;
padding: 8px 12px;
min-height: 40px;
border-radius: 999px;
border: 1px solid #30363d;
background: transparent;
color: #8b949e;
font-size: 0.82rem;
font-weight: 600;
cursor: pointer;
transition: color 0.2s, border-color 0.2s, background 0.2s;
}
.safety-link-btn:hover,
.safety-link-btn:focus {
color: #e6edf3;
border-color: #58a6ff;
background: rgba(88, 166, 255, 0.08);
outline: 2px solid rgba(88, 166, 255, 0.35);
outline-offset: 2px;
}
.safety-link-btn svg {
width: 16px;
height: 16px;
flex-shrink: 0;
}
.status-dot {
width: 6px;
height: 6px;
@@ -579,6 +637,19 @@ html, body {
#chat-area { padding: 20px 24px 8px; }
#input-area { padding: 10px 24px; }
#banner-988 a { font-size: 0.95rem; }
#chat-header { padding: 12px 24px; }
}
@media (max-width: 480px) {
#chat-header {
align-items: flex-start;
flex-direction: column;
}
.safety-link-btn {
width: 100%;
justify-content: center;
}
}
@media (min-width: 900px) {
@@ -642,13 +713,24 @@ html, body {
<a href="sms:741741&body=HOME" class="crisis-btn" aria-label="Text HOME to 741741 for Crisis Text Line">
Text HOME to 741741
</a>
<button class="crisis-btn" id="crisis-safety-plan-btn" aria-label="Open my safety plan" style="background:#3d3d3d;">
<button class="crisis-btn" id="crisis-safety-plan-btn" type="button" aria-label="Open my safety plan" aria-controls="safety-plan-modal" aria-haspopup="dialog" style="background:#3d3d3d;">
<svg width="18" height="18" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><path d="M14 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V8z"/><polyline points="14 2 14 8 20 8"/><line x1="16" y1="13" x2="8" y2="13"/><line x1="16" y1="17" x2="8" y2="17"/><polyline points="10 9 9 9 8 9"/></svg>
My Safety Plan
</button>
</div>
</div>
<div id="chat-header" role="region" aria-label="Chat tools">
<div class="chat-header-copy">
<p id="chat-header-title">Talk to Timmy</p>
<p id="chat-header-subtitle">No login. No tracking. Just someone to listen.</p>
</div>
<button id="chat-safety-plan-btn" class="safety-link-btn" type="button" aria-label="Open my safety plan" aria-controls="safety-plan-modal" aria-haspopup="dialog">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><path d="M14 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V8z"></path><polyline points="14 2 14 8 20 8"></polyline><line x1="16" y1="13" x2="8" y2="13"></line><line x1="16" y1="17" x2="8" y2="17"></line><polyline points="10 9 9 9 8 9"></polyline></svg>
<span>Safety plan</span>
</button>
</div>
<!-- Chat messages -->
<div id="chat-area" role="log" aria-label="Chat messages" aria-live="polite" tabindex="0">
<!-- Messages inserted here -->
@@ -681,7 +763,7 @@ html, body {
<!-- Footer -->
<footer id="footer">
<a href="/about" aria-label="About The Door">about</a>
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
<button id="safety-plan-btn" type="button" aria-label="Open My Safety Plan" aria-controls="safety-plan-modal" aria-haspopup="dialog">my safety plan</button>
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
</footer>
</div>
@@ -701,16 +783,16 @@ html, body {
</div>
<!-- Safety Plan Modal -->
<div id="safety-plan-modal" class="modal-overlay" role="dialog" aria-modal="true" aria-labelledby="safety-plan-title">
<div id="safety-plan-modal" class="modal-overlay" role="dialog" aria-modal="true" aria-labelledby="safety-plan-title" aria-hidden="true">
<div class="modal-content">
<div class="modal-header">
<h2 id="safety-plan-title">My Safety Plan</h2>
<button class="close-modal" id="close-safety-plan" aria-label="Close modal">
<button class="close-modal" id="close-safety-plan" type="button" aria-label="Close modal">
<svg width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><line x1="18" y1="6" x2="6" y2="18"></line><line x1="6" y1="6" x2="18" y2="18"></line></svg>
</button>
</div>
<div class="modal-body">
<p style="font-size: 0.85rem; color: #8b949e; margin-bottom: 16px;">This plan is saved only on your device. No one else can see it.</p>
<p id="safety-plan-description" style="font-size: 0.85rem; color: #8b949e; margin-bottom: 16px;">This plan is saved only on your device. No one else can see it.</p>
<div class="form-group">
<label for="sp-warning-signs">1. Warning signs (thoughts, moods, behaviors)</label>
@@ -738,8 +820,8 @@ html, body {
</div>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" id="cancel-safety-plan">Cancel</button>
<button class="btn btn-primary" id="save-safety-plan">Save Plan</button>
<button class="btn btn-secondary" id="cancel-safety-plan" type="button">Cancel</button>
<button class="btn btn-primary" id="save-safety-plan" type="button">Save Plan</button>
</div>
</div>
</div>
@@ -812,7 +894,8 @@ Sovereignty and service always.`;
var statusText = document.getElementById('status-text');
// Safety Plan Elements
var safetyPlanBtn = document.getElementById('safety-plan-btn');
var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');
var footerSafetyPlanBtn = document.getElementById('safety-plan-btn');
var crisisSafetyPlanBtn = document.getElementById('crisis-safety-plan-btn');
var safetyPlanModal = document.getElementById('safety-plan-modal');
var closeSafetyPlan = document.getElementById('close-safety-plan');
@@ -825,6 +908,7 @@ Sovereignty and service always.`;
var isStreaming = false;
var overlayTimer = null;
var crisisPanelShown = false;
var lastSafetyPlanTrigger = null;
// ===== SERVICE WORKER =====
if ('serviceWorker' in navigator) {
@@ -965,76 +1049,7 @@ Sovereignty and service always.`;
}
// ===== GET SYSTEM PROMPT (wraps with crisis context) =====
// ===== SESSION CRISIS TRACKING (#35) =====
var sessionCrisis = {
currentLevel: 0, // 0=NONE, 1=LOW, 2=MEDIUM, 3=HIGH, 4=CRITICAL
peakLevel: 0,
messageCount: 0,
history: [], // [{level, timestamp}]
escalationRate: 0, // levels per message
lastEscalation: null // timestamp of last escalation
};
function trackCrisis(text) {
var level = getCrisisLevel(text);
sessionCrisis.messageCount++;
sessionCrisis.history.push({ level: level, time: Date.now() });
if (level > sessionCrisis.currentLevel) {
sessionCrisis.lastEscalation = Date.now();
}
sessionCrisis.currentLevel = Math.max(sessionCrisis.currentLevel, level);
sessionCrisis.peakLevel = Math.max(sessionCrisis.peakLevel, level);
// Calculate escalation rate over last 5 messages
var recent = sessionCrisis.history.slice(-5);
if (recent.length >= 2) {
var first = recent[0].level;
var last = recent[recent.length - 1].level;
sessionCrisis.escalationRate = (last - first) / recent.length;
}
return getSessionContext();
}
function getSessionContext() {
var ctx = '';
if (sessionCrisis.history.length < 2) return ctx;
// Escalation detection: LOW -> HIGH in <= 3 messages
if (sessionCrisis.escalationRate > 0.5 && sessionCrisis.history.length <= 3) {
ctx += 'ESCALATION ALERT: User crisis level is rising rapidly. ';
}
// De-escalation detection: CRITICAL -> LOW for 5+ messages
if (sessionCrisis.peakLevel >= 3 && sessionCrisis.currentLevel <= 1 && sessionCrisis.messageCount >= 5) {
ctx += 'DE-ESCALATION: User appears to be calming. Maintain presence but reduce urgency. ';
}
// Persistent high state
if (sessionCrisis.currentLevel >= 2 && sessionCrisis.messageCount >= 3) {
ctx += 'User has been in crisis for ' + sessionCrisis.messageCount + ' messages. ';
}
// Level history summary
var levels = sessionCrisis.history.map(function(h) { return h.level; });
if (levels.length >= 2) {
ctx += 'Crisis trajectory: ' + levels.join(' → ') + '. ';
}
return ctx;
}
function resetSessionCrisis() {
sessionCrisis = {
currentLevel: 0, peakLevel: 0, messageCount: 0,
history: [], escalationRate: 0, lastEscalation: null
};
}
function getSystemPrompt(userText) {
function getSystemPrompt(userText) {
var level = getCrisisLevel(userText);
if (level === 0) return SYSTEM_PROMPT;
@@ -1047,9 +1062,7 @@ function getSystemPrompt(userText) {
var directive = 'DIRECTIVE: ' + profile.directive + '\n';
var tone = 'TONE: ' + profile.tone + '\n';
var sessionCtx = getSessionContext();
var sessionBlock = sessionCtx ? '\nSESSION CONTEXT: ' + sessionCtx : '';
return SYSTEM_PROMPT + divider + header + directive + tone + sessionBlock;
return SYSTEM_PROMPT + divider + header + directive + tone;
}
@@ -1188,25 +1201,50 @@ function getSystemPrompt(userText) {
} catch (e) {}
}
safetyPlanBtn.addEventListener('click', function() {
function openSafetyPlanModal(trigger) {
lastSafetyPlanTrigger = trigger || document.activeElement;
loadSafetyPlan();
safetyPlanModal.classList.add('active');
safetyPlanModal.setAttribute('aria-hidden', 'false');
document.getElementById('sp-warning-signs').focus();
}
function closeSafetyPlanModal() {
safetyPlanModal.classList.remove('active');
safetyPlanModal.setAttribute('aria-hidden', 'true');
if (lastSafetyPlanTrigger && typeof lastSafetyPlanTrigger.focus === 'function') {
lastSafetyPlanTrigger.focus();
}
}
chatSafetyPlanBtn.addEventListener('click', function() {
openSafetyPlanModal(chatSafetyPlanBtn);
});
footerSafetyPlanBtn.addEventListener('click', function() {
openSafetyPlanModal(footerSafetyPlanBtn);
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
openSafetyPlanModal(crisisSafetyPlanBtn);
});
}
closeSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
closeSafetyPlanModal();
});
cancelSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
closeSafetyPlanModal();
});
document.addEventListener('keydown', function(e) {
if (e.key === 'Escape' && safetyPlanModal.classList.contains('active')) {
e.preventDefault();
closeSafetyPlanModal();
}
});
saveSafetyPlan.addEventListener('click', function() {
@@ -1219,7 +1257,7 @@ function getSystemPrompt(userText) {
};
try {
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
safetyPlanModal.classList.remove('active');
closeSafetyPlanModal();
alert('Safety plan saved locally.');
} catch (e) {
alert('Error saving plan.');
@@ -1243,7 +1281,6 @@ function getSystemPrompt(userText) {
var lastUserMessage = text;
checkCrisis(text);
trackCrisis(text);
msgInput.value = '';
msgInput.style.height = 'auto';
@@ -1370,8 +1407,7 @@ function getSystemPrompt(userText) {
// Check for URL params (e.g., ?safetyplan=true for PWA shortcut)
var urlParams = new URLSearchParams(window.location.search);
if (urlParams.get('safetyplan') === 'true') {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
openSafetyPlanModal(chatSafetyPlanBtn);
// Clean up URL
window.history.replaceState({}, document.title, window.location.pathname);
}

View File

@@ -1,5 +1,5 @@
[pytest]
testpaths = crisis
python_files = tests.py tests_*.py
python_files = tests.py
python_classes = Test*
python_functions = test_*

View File

@@ -0,0 +1,42 @@
from pathlib import Path
INDEX_HTML = Path(__file__).resolve().parents[1] / 'index.html'
HTML = INDEX_HTML.read_text()
def _between(start_marker: str, end_marker: str) -> str:
start = HTML.index(start_marker)
end = HTML.index(end_marker)
return HTML[start:end]
def test_persistent_safety_plan_button_lives_in_chat_header():
assert 'id="chat-header"' in HTML, 'expected a dedicated chat header area'
header_html = _between('<div id="chat-header"', '<div id="chat-area"')
assert 'id="chat-safety-plan-btn"' in header_html
assert 'aria-label="Open my safety plan"' in header_html
assert 'aria-controls="safety-plan-modal"' in header_html
assert 'aria-haspopup="dialog"' in header_html
def test_chat_header_entry_point_reuses_same_modal_open_flow():
assert "var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');" in HTML
assert 'function openSafetyPlanModal(trigger) {' in HTML
assert 'loadSafetyPlan();' in HTML
assert "safetyPlanModal.classList.add('active');" in HTML
assert "safetyPlanModal.setAttribute('aria-hidden', 'false');" in HTML
assert 'openSafetyPlanModal(chatSafetyPlanBtn);' in HTML
assert 'openSafetyPlanModal(footerSafetyPlanBtn);' in HTML
assert 'openSafetyPlanModal(crisisSafetyPlanBtn);' in HTML
def test_modal_returns_focus_and_supports_escape_to_close():
assert 'var lastSafetyPlanTrigger = null;' in HTML
assert 'lastSafetyPlanTrigger = trigger || document.activeElement;' in HTML
assert 'document.getElementById(\'sp-warning-signs\').focus();' in HTML
assert 'closeSafetyPlanModal();' in HTML
assert "safetyPlanModal.setAttribute('aria-hidden', 'true');" in HTML
assert 'lastSafetyPlanTrigger.focus();' in HTML
assert "if (e.key === 'Escape' && safetyPlanModal.classList.contains('active'))" in HTML