Compare commits

..

2 Commits

Author SHA1 Message Date
48e093fe98 fix: replace safety plan alert() with inline live feedback (closes #94, refs #73)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 15s
Smoke Test / smoke (pull_request) Successful in 16s
2026-04-14 23:45:18 -04:00
Alexander Whitestone
cd50b3d767 feat: add session-level crisis tracking and escalation (#35)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-14 22:24:09 -04:00
7 changed files with 894 additions and 150 deletions

View File

@@ -7,6 +7,7 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, CrisisSessionState
__all__ = [
"detect_crisis",
@@ -19,4 +20,6 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"CrisisSessionState",
]

261
crisis/session_tracker.py Normal file
View File

@@ -0,0 +1,261 @@
"""
Session-Level Crisis Tracking for the-door.
Tracks crisis signals across a conversation session to detect escalation
patterns and de-escalation. Privacy-first: state lives in memory only,
resets on new session, never persists to disk.
Key behaviors:
- Escalation: LOW → HIGH in 3 messages triggers heightened awareness
- De-escalation: CRITICAL → LOW for 5+ messages allows stepping down
- Session state enriches the system prompt with trajectory context
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
tracker.record("I'm feeling down")
tracker.record("Things are really hard")
tracker.record("I can't go on anymore") # escalation detected
print(tracker.get_prompt_context())
# "User has escalated from LOW to HIGH over 3 messages."
"""
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
from .detect import detect_crisis, CrisisDetectionResult, SCORES
# Level hierarchy for ordering comparisons
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class CrisisSessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[Tuple[int, str]] = field(default_factory=list) # (message_index, level)
escalated: bool = False
escalation_messages: int = 0 # messages from first level to peak
deescalation_count: int = 0 # consecutive messages at lower level
deescalating: bool = False
deescalation_confirmed: bool = False # True once de-escalation threshold met
class CrisisSessionTracker:
"""
Session-level crisis tracker.
Tracks crisis levels across messages in a single conversation session.
Detects rapid escalation and gradual de-escalation. Provides context
strings for system prompt injection.
State is in-memory only. New session = new instance.
"""
# How many messages for escalation detection
ESCALATION_WINDOW = 3
# How many consecutive messages at lower level for de-escalation
DEESCALATION_THRESHOLD = 5
def __init__(self):
self._state = CrisisSessionState()
@property
def state(self) -> CrisisSessionState:
"""Read-only snapshot of current state."""
return CrisisSessionState(
current_level=self._state.current_level,
peak_level=self._state.peak_level,
message_count=self._state.message_count,
level_history=list(self._state.level_history),
escalated=self._state.escalated,
escalation_messages=self._state.escalation_messages,
deescalation_count=self._state.deescalation_count,
deescalating=self._state.deescalating,
)
def reset(self):
"""Reset all state. Called on new session."""
self._state = CrisisSessionState()
def record(self, text: str) -> CrisisDetectionResult:
"""
Record a message and update session state.
Returns the single-message detection result (unchanged from detect.py).
Session-level intelligence is tracked internally.
"""
detection = detect_crisis(text)
self._record_level(detection.level)
return detection
def record_level(self, level: str):
"""Record an already-detected crisis level (for when detection ran separately)."""
self._record_level(level)
def _record_level(self, level: str):
"""Internal: update state with a new crisis level."""
self._state.message_count += 1
idx = self._state.message_count
self._state.level_history.append((idx, level))
prev_level = self._state.current_level
prev_order = LEVEL_ORDER.get(prev_level, 0)
new_order = LEVEL_ORDER.get(level, 0)
# Update current level
self._state.current_level = level
# Track peak
if new_order > LEVEL_ORDER.get(self._state.peak_level, 0):
self._state.peak_level = level
# ── Escalation detection ──────────────────────────────
if new_order > prev_order:
# User is going up — reset de-escalation counter
self._state.deescalation_count = 0
self._state.deescalating = False
# Always check for escalation pattern in recent window
if not self._state.escalated:
window = self._get_recent_levels(self.ESCALATION_WINDOW)
if len(window) >= self.ESCALATION_WINDOW:
first_in_window = window[0][1]
last_in_window = window[-1][1]
first_order = LEVEL_ORDER.get(first_in_window, 0)
last_order = LEVEL_ORDER.get(last_in_window, 0)
if last_order > first_order:
self._state.escalated = True
self._state.escalation_messages = self.ESCALATION_WINDOW
# ── De-escalation detection ───────────────────────────
elif new_order < prev_order:
self._state.deescalation_count += 1
self._state.deescalating = True
# If de-escalation counter meets threshold AND we were escalated
if (self._state.escalated and
self._state.deescalation_count >= self.DEESCALATION_THRESHOLD):
self._state.escalated = False
self._state.deescalating = False
self._state.deescalation_confirmed = True
self._state.deescalation_count = 0
# Same level — increment de-escalation counter if already de-escalating
elif self._state.deescalating:
self._state.deescalation_count += 1
if (self._state.escalated and
self._state.deescalation_count >= self.DEESCALATION_THRESHOLD):
self._state.escalated = False
self._state.deescalating = False
self._state.deescalation_confirmed = True
self._state.deescalation_count = 0
def _get_recent_levels(self, n: int) -> List[Tuple[int, str]]:
"""Get the last n entries from level history."""
return self._state.level_history[-n:]
def get_prompt_context(self) -> str:
"""
Generate a human-readable context string for system prompt injection.
Returns empty string if no session-level crisis context is needed.
"""
s = self._state
if s.message_count == 0:
return ""
parts = []
# Escalation alert
if s.escalated and s.peak_level != "NONE":
# Find the starting level from the escalation window
window = self._get_recent_levels(self.ESCALATION_WINDOW)
if window:
start_level = window[0][1]
parts.append(
f"User has escalated from {start_level} to {s.peak_level} "
f"over {s.message_count} message{'s' if s.message_count != 1 else ''}."
)
parts.append("Heightened crisis awareness is warranted.")
# Confirmed de-escalation
elif s.deescalation_confirmed and s.peak_level in ("HIGH", "CRITICAL"):
parts.append(
f"User has de-escalated from {s.peak_level} to {s.current_level}."
)
parts.append("De-escalation confirmed. Continue gentle presence.")
# Active de-escalation (not yet confirmed)
elif s.deescalating and s.peak_level != "NONE":
parts.append(
f"User has de-escalated from {s.peak_level} to {s.current_level} "
f"over {s.deescalation_count} message{'s' if s.deescalation_count != 1 else ''}."
)
parts.append("Still in de-escalation. Maintain supportive awareness.")
# Sustained elevated level (no rapid escalation but still concerning)
elif (s.current_level in ("HIGH", "CRITICAL") and
not s.escalated and s.message_count >= 3):
parts.append(
f"User has been at {s.current_level} level for {s.message_count} messages."
)
parts.append("Continue crisis-aware response.")
# Peak was higher than current (user improved but may still be fragile)
elif (LEVEL_ORDER.get(s.peak_level, 0) > LEVEL_ORDER.get(s.current_level, 0) and
s.peak_level in ("HIGH", "CRITICAL") and
not s.deescalating):
parts.append(
f"Note: session peak was {s.peak_level}. "
f"User is now at {s.current_level}."
)
parts.append("Remain attentive.")
return " ".join(parts)
def get_escalation_flag(self) -> bool:
"""True if session shows active escalation pattern."""
return self._state.escalated
def get_effective_level(self) -> str:
"""
Get the effective crisis level considering session state.
Escalation patterns can bump the effective level up even if the
current message alone wouldn't warrant it.
"""
s = self._state
if s.escalated and s.peak_level in ("HIGH", "CRITICAL"):
return s.peak_level
return s.current_level
def should_heighten_awareness(self) -> bool:
"""
True when the session trajectory warrants heightened awareness
beyond what a single-message detection would provide.
"""
return self._state.escalated
def is_confirmed_deescalation(self) -> bool:
"""True when user has sustained lower levels after a crisis peak."""
s = self._state
return s.deescalation_confirmed
def __repr__(self) -> str:
s = self._state
return (
f"CrisisSessionTracker("
f"current={s.current_level}, peak={s.peak_level}, "
f"msgs={s.message_count}, escalated={s.escalated}, "
f"deescalating={s.deescalating})"
)

View File

@@ -0,0 +1,407 @@
"""
Tests for session-level crisis tracking (Issue #35).
Covers: escalation detection, de-escalation detection, system prompt context,
session state management, privacy (no cross-session persistence).
Run with: python -m pytest crisis/tests_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.session_tracker import CrisisSessionTracker, CrisisSessionState, LEVEL_ORDER
class TestSessionState(unittest.TestCase):
"""Test basic session state management."""
def test_initial_state(self):
tracker = CrisisSessionTracker()
s = tracker.state
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.escalated)
def test_record_increments_count(self):
tracker = CrisisSessionTracker()
tracker.record("hello")
self.assertEqual(tracker.state.message_count, 1)
tracker.record("world")
self.assertEqual(tracker.state.message_count, 2)
def test_record_tracks_history(self):
tracker = CrisisSessionTracker()
tracker.record("I'm struggling") # LOW
tracker.record("I feel hopeless and nobody cares") # MEDIUM
history = tracker.state.level_history
self.assertEqual(len(history), 2)
self.assertEqual(history[0], (1, "LOW"))
self.assertIn(history[1][1], ("MEDIUM", "LOW", "HIGH"))
def test_reset_clears_state(self):
tracker = CrisisSessionTracker()
tracker.record("I want to die") # CRITICAL
self.assertEqual(tracker.state.peak_level, "CRITICAL")
tracker.reset()
s = tracker.state
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
def test_peak_tracking(self):
tracker = CrisisSessionTracker()
tracker.record("I'm feeling down") # LOW
tracker.record("I feel hopeless with no way out") # HIGH
tracker.record("Actually I'm a bit better") # back to LOW
self.assertEqual(tracker.state.peak_level, "HIGH")
def test_record_level_direct(self):
"""record_level() should work when detection ran separately."""
tracker = CrisisSessionTracker()
tracker.record_level("LOW")
tracker.record_level("MEDIUM")
tracker.record_level("HIGH")
self.assertEqual(tracker.state.message_count, 3)
self.assertEqual(tracker.state.peak_level, "HIGH")
class TestEscalationDetection(unittest.TestCase):
"""Test rapid escalation detection: LOW → HIGH in N messages."""
def test_escalation_low_to_high_in_3(self):
"""Core AC: LOW → HIGH in 3 messages triggers escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time") # LOW
tracker.record("I feel really hopeless about everything") # HIGH
tracker.record("I can't take this anymore") # HIGH
self.assertTrue(tracker.get_escalation_flag())
def test_escalation_low_to_critical_in_3(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm struggling") # LOW
tracker.record("Nobody would miss me") # HIGH
tracker.record("I want to end my life") # CRITICAL
self.assertTrue(tracker.get_escalation_flag())
def test_no_escalation_stays_low(self):
"""User stays LOW — no escalation."""
tracker = CrisisSessionTracker()
tracker.record("rough day")
tracker.record("tough week")
tracker.record("feeling down")
self.assertFalse(tracker.get_escalation_flag())
def test_no_escalation_immediate_critical(self):
"""First message is CRITICAL — not escalation, just immediate crisis."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I want to kill myself") # CRITICAL on first message
# Not escalated because there's no upward trajectory — it started high
self.assertFalse(tracker.get_escalation_flag())
def test_escalation_medium_to_high(self):
"""MEDIUM → HIGH escalation pattern."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I feel alone and nobody cares") # MEDIUM
tracker.record("I feel completely hopeless") # HIGH
tracker.record("There's no way out") # HIGH
self.assertTrue(tracker.get_escalation_flag())
def test_escalation_not_triggered_without_increase(self):
"""Same level messages don't constitute escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I feel hopeless") # HIGH
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is broken") # HIGH
# No upward trajectory — started high, stayed high
self.assertFalse(tracker.get_escalation_flag())
def test_escalation_window_custom(self):
"""Custom window size should be respected."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 2
tracker.record("I'm unhappy") # LOW
tracker.record("I can't go on anymore") # HIGH
self.assertTrue(tracker.get_escalation_flag())
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: CRITICAL → LOW for 5+ messages."""
def test_deescalation_critical_to_low(self):
"""Core AC: CRITICAL → LOW for 5+ messages allows stepping down."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate to CRITICAL
tracker.record("I'm struggling") # LOW
tracker.record("I can't go on") # HIGH
tracker.record("I want to die") # CRITICAL
self.assertTrue(tracker.get_escalation_flag())
# De-escalate over 5 messages
tracker.record("I called 988") # NONE/LOW
tracker.record("I'm calmer now") # NONE
tracker.record("Thank you for being here") # NONE
tracker.record("I'm going to be okay") # NONE
tracker.record("Taking it one moment at a time") # NONE
self.assertTrue(tracker.is_confirmed_deescalation())
self.assertFalse(tracker.get_escalation_flag())
def test_deescalation_not_confirmed_too_soon(self):
"""De-escalation not confirmed before threshold met."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
tracker.record("I'm struggling")
tracker.record("I can't go on")
tracker.record("I want to die")
self.assertTrue(tracker.get_escalation_flag())
# Only 3 messages of de-escalation
tracker.record("I called someone")
tracker.record("I'm calmer")
tracker.record("Feeling better")
self.assertFalse(tracker.is_confirmed_deescalation())
self.assertTrue(tracker.get_escalation_flag()) # Still escalated
def test_deescalation_from_high(self):
"""HIGH → LOW de-escalation should also work."""
tracker = CrisisSessionTracker()
tracker.DEESCALATION_THRESHOLD = 5
# Build up to HIGH
tracker.record("I'm down") # LOW
tracker.record("I feel hopeless") # HIGH
tracker.record("No way out") # HIGH
tracker.record("Everything is pointless") # HIGH
# De-escalate
for _ in range(5):
tracker.record("I'm doing a bit better") # LOW/NONE
self.assertTrue(tracker.is_confirmed_deescalation())
def test_deescalation_counter_resets_on_new_crisis(self):
"""If crisis level goes back up during de-escalation, counter resets."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate
tracker.record("I'm struggling") # LOW
tracker.record("I feel hopeless") # HIGH
tracker.record("I want to die") # CRITICAL
# Start de-escalating
tracker.record("I called someone") # LOW
tracker.record("I'm calmer") # NONE
# Go back up
tracker.record("Actually I can't do this") # HIGH
self.assertFalse(tracker.is_confirmed_deescalation())
class TestSystemPromptContext(unittest.TestCase):
"""Test system prompt context generation."""
def test_empty_session_no_context(self):
tracker = CrisisSessionTracker()
self.assertEqual(tracker.get_prompt_context(), "")
def test_escalation_context(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time")
tracker.record("Things are really hard")
tracker.record("I can't go on anymore")
ctx = tracker.get_prompt_context()
self.assertIn("escalated", ctx.lower())
self.assertIn("LOW", ctx)
self.assertIn("heightened", ctx.lower())
def test_deescalation_context(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate
tracker.record("I'm struggling")
tracker.record("I feel hopeless")
tracker.record("I want to die")
# De-escalate
for _ in range(5):
tracker.record("I'm okay now")
ctx = tracker.get_prompt_context()
self.assertIn("de-escalated", ctx.lower())
self.assertIn("confirmed", ctx.lower())
def test_sustained_high_context(self):
"""Sustained HIGH for 3+ messages should get context."""
tracker = CrisisSessionTracker()
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is pointless") # HIGH
tracker.record("I can't go on") # HIGH
ctx = tracker.get_prompt_context()
self.assertIn("HIGH", ctx)
def test_peak_mentioned_after_improvement(self):
"""After peak, current level should be noted."""
tracker = CrisisSessionTracker()
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is pointless") # HIGH
tracker.record("I feel a bit better") # back to LOW
ctx = tracker.get_prompt_context()
self.assertIn("peak", ctx.lower())
self.assertIn("HIGH", ctx)
class TestEffectiveLevel(unittest.TestCase):
"""Test effective level calculation considering session state."""
def test_effective_level_normal(self):
tracker = CrisisSessionTracker()
tracker.record("I'm struggling") # LOW
self.assertEqual(tracker.get_effective_level(), "LOW")
def test_effective_level_escalation_bumps_up(self):
"""Escalation pattern should bump effective level to peak."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time") # LOW
tracker.record("I feel completely hopeless") # HIGH
tracker.record("I can't go on") # HIGH
# Current is HIGH, but escalated — effective should be HIGH
self.assertEqual(tracker.get_effective_level(), "HIGH")
class TestPrivacy(unittest.TestCase):
"""Test privacy requirements — no cross-session persistence."""
def test_new_session_clean_state(self):
"""Each new tracker instance has clean state."""
t1 = CrisisSessionTracker()
t1.record("I want to die")
self.assertEqual(t1.state.peak_level, "CRITICAL")
t2 = CrisisSessionTracker()
self.assertEqual(t2.state.peak_level, "NONE")
self.assertEqual(t2.state.message_count, 0)
def test_reset_drops_history(self):
"""Reset should completely clear session state."""
t = CrisisSessionTracker()
t.record("I'm struggling")
t.record("I can't go on")
t.reset()
self.assertEqual(t.state.level_history, [])
self.assertEqual(t.state.message_count, 0)
self.assertFalse(t.state.escalated)
class TestEdgeCases(unittest.TestCase):
"""Edge cases and boundary conditions."""
def test_single_message_none(self):
tracker = CrisisSessionTracker()
tracker.record("Hello Timmy")
self.assertFalse(tracker.get_escalation_flag())
self.assertEqual(tracker.get_prompt_context(), "")
def test_oscillating_levels(self):
"""User oscillating between levels shouldn't cause false escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm fine")
tracker.record("I'm struggling")
tracker.record("I'm fine")
tracker.record("I'm struggling")
# Oscillation without sustained escalation
self.assertFalse(tracker.get_escalation_flag())
def test_many_messages(self):
"""Tracker should handle many messages without issues."""
tracker = CrisisSessionTracker()
for i in range(100):
tracker.record("Hello there")
self.assertEqual(tracker.state.message_count, 100)
def test_empty_string(self):
tracker = CrisisSessionTracker()
tracker.record("")
self.assertEqual(tracker.state.message_count, 1)
self.assertEqual(tracker.state.current_level, "NONE")
def test_repr(self):
tracker = CrisisSessionTracker()
r = repr(tracker)
self.assertIn("CrisisSessionTracker", r)
self.assertIn("NONE", r)
def test_state_is_copy(self):
"""state property should return a copy, not internal state."""
tracker = CrisisSessionTracker()
s1 = tracker.state
tracker.record("I'm struggling")
s2 = tracker.state
self.assertEqual(s1.message_count, 0)
self.assertEqual(s2.message_count, 1)
class TestLevelOrder(unittest.TestCase):
"""Test level ordering is correct."""
def test_level_ordering(self):
self.assertLess(LEVEL_ORDER["NONE"], LEVEL_ORDER["LOW"])
self.assertLess(LEVEL_ORDER["LOW"], LEVEL_ORDER["MEDIUM"])
self.assertLess(LEVEL_ORDER["MEDIUM"], LEVEL_ORDER["HIGH"])
self.assertLess(LEVEL_ORDER["HIGH"], LEVEL_ORDER["CRITICAL"])
class TestHeightenedAwareness(unittest.TestCase):
"""Test heightened awareness flag."""
def test_heightened_on_escalation(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm unhappy")
tracker.record("I feel hopeless")
tracker.record("I can't go on")
self.assertTrue(tracker.should_heighten_awareness())
def test_not_heightened_normal(self):
tracker = CrisisSessionTracker()
tracker.record("Hello")
self.assertFalse(tracker.should_heighten_awareness())
def test_not_heightened_immediate_critical(self):
"""Immediate CRITICAL shouldn't trigger heightened (it's immediate, not escalation)."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I want to kill myself")
self.assertFalse(tracker.should_heighten_awareness())
if __name__ == "__main__":
unittest.main()

View File

@@ -423,35 +423,6 @@ html, body {
fill: currentColor;
}
/* Chat safety plan button — always visible, subtle */
#chat-safety-plan-btn {
flex-shrink: 0;
width: 44px;
height: 44px;
background: transparent;
color: #8b949e;
border: 1px solid #30363d;
border-radius: 12px;
cursor: pointer;
display: flex;
align-items: center;
justify-content: center;
transition: background 0.2s, color 0.2s, border-color 0.2s;
-webkit-appearance: none;
}
#chat-safety-plan-btn:hover,
#chat-safety-plan-btn:focus {
background: #161b22;
color: #58a6ff;
border-color: #58a6ff;
outline: 2px solid #58a6ff;
outline-offset: 2px;
}
#chat-safety-plan-btn svg {
width: 20px;
height: 20px;
}
/* ===== MODALS ===== */
.modal-overlay {
position: fixed;
@@ -642,6 +613,21 @@ html, body {
top: 8px;
outline: 2px solid #58a6ff;
}
/* Safety plan inline status feedback */
#sp-status {
font-size: 0.85rem;
opacity: 0;
transition: opacity 0.3s ease;
margin-right: auto;
}
#sp-status.success {
color: #3fb950;
opacity: 1;
}
#sp-status.error {
color: #f85149;
opacity: 1;
}
</style>
</head>
<body>
@@ -704,9 +690,6 @@ html, body {
<button id="send-btn" type="button" aria-label="Send message" disabled>
<svg viewBox="0 0 24 24" aria-hidden="true"><path d="M2.01 21L23 12 2.01 3 2 10l15 2-15 2z"/></svg>
</button>
<button id="chat-safety-plan-btn" type="button" aria-label="Open My Safety Plan" title="My Safety Plan">
<svg viewBox="0 0 24 24" aria-hidden="true" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M12 22s8-4 8-10V5l-8-3-8 3v7c0 6 8 10 8 10z"/></svg>
</button>
</div>
</div>
@@ -770,6 +753,7 @@ html, body {
</div>
</div>
<div class="modal-footer">
<span id="sp-status" role="status" aria-live="polite"></span>
<button class="btn btn-secondary" id="cancel-safety-plan">Cancel</button>
<button class="btn btn-primary" id="save-safety-plan">Save Plan</button>
</div>
@@ -845,7 +829,6 @@ Sovereignty and service always.`;
// Safety Plan Elements
var safetyPlanBtn = document.getElementById('safety-plan-btn');
var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');
var crisisSafetyPlanBtn = document.getElementById('crisis-safety-plan-btn');
var safetyPlanModal = document.getElementById('safety-plan-modal');
var closeSafetyPlan = document.getElementById('close-safety-plan');
@@ -997,6 +980,70 @@ Sovereignty and service always.`;
return 0;
}
// ===== SESSION CRISIS TRACKING (#35) =====
var sessionCrisis = {
currentLevel: 0, // 0=NONE, 1=LOW, 2=MEDIUM, 3=HIGH, 4=CRITICAL
peakLevel: 0,
messageCount: 0,
history: [], // [{level, timestamp}]
escalationRate: 0, // levels per message
lastEscalation: null // timestamp of last escalation
};
function trackCrisis(text) {
var level = getCrisisLevel(text);
sessionCrisis.messageCount++;
sessionCrisis.history.push({ level: level, time: Date.now() });
if (level > sessionCrisis.currentLevel) {
sessionCrisis.lastEscalation = Date.now();
}
sessionCrisis.currentLevel = level;
sessionCrisis.peakLevel = Math.max(sessionCrisis.peakLevel, level);
var recent = sessionCrisis.history.slice(-5);
if (recent.length >= 2) {
var first = recent[0].level;
var last = recent[recent.length - 1].level;
sessionCrisis.escalationRate = (last - first) / recent.length;
}
return getSessionContext();
}
function getSessionContext() {
var ctx = '';
if (sessionCrisis.history.length < 2) return ctx;
if (sessionCrisis.escalationRate > 0.5 && sessionCrisis.history.length <= 3) {
ctx += 'ESCALATION ALERT: User crisis level is rising rapidly. ';
}
if (sessionCrisis.peakLevel >= 3 && sessionCrisis.currentLevel <= 1 && sessionCrisis.messageCount >= 5) {
ctx += 'DE-ESCALATION: User appears to be calming. Maintain presence but reduce urgency. ';
}
if (sessionCrisis.currentLevel >= 2 && sessionCrisis.messageCount >= 3) {
ctx += 'User has been in crisis for ' + sessionCrisis.messageCount + ' messages. ';
}
var levels = sessionCrisis.history.map(function(h) { return h.level; });
if (levels.length >= 2) {
ctx += 'Crisis trajectory: ' + levels.join(' → ') + '. ';
}
return ctx;
}
function resetSessionCrisis() {
sessionCrisis = {
currentLevel: 0, peakLevel: 0, messageCount: 0,
history: [], escalationRate: 0, lastEscalation: null
};
}
// ===== GET SYSTEM PROMPT (wraps with crisis context) =====
function getSystemPrompt(userText) {
var level = getCrisisLevel(userText);
@@ -1011,7 +1058,9 @@ Sovereignty and service always.`;
var directive = 'DIRECTIVE: ' + profile.directive + '\n';
var tone = 'TONE: ' + profile.tone + '\n';
return SYSTEM_PROMPT + divider + header + directive + tone;
var sessionCtx = getSessionContext();
var sessionBlock = sessionCtx ? '\nSESSION CONTEXT: ' + sessionCtx : '';
return SYSTEM_PROMPT + divider + header + directive + tone + sessionBlock;
}
@@ -1086,8 +1135,36 @@ Sovereignty and service always.`;
overlayDismissBtn.focus();
}
// Register focus trap on document (always listening, gated by class check)
// Crisis overlay Escape key handler
function trapCrisisOverlayEscape(e) {
if (e.key !== 'Escape') return;
if (!crisisOverlay.classList.contains('active')) return;
if (overlayDismissBtn.disabled) return; // Don't escape during countdown
// Dismiss the overlay
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
// Restore focus to chat input
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
// Register focus trap and Escape handler on document (always listening, gated by class check)
document.addEventListener('keydown', trapFocusInOverlay);
document.addEventListener('keydown', trapCrisisOverlayEscape);
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
@@ -1197,6 +1274,7 @@ Sovereignty and service always.`;
clearChatBtn.addEventListener('click', function() {
if (confirm('Clear all chat history?')) {
localStorage.removeItem('timmy_chat_history');
resetSessionCrisis();
window.location.reload();
}
});
@@ -1236,11 +1314,23 @@ Sovereignty and service always.`;
};
try {
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
alert('Safety plan saved locally.');
var spStatus = document.getElementById('sp-status');
spStatus.textContent = '\u2713 Safety plan saved locally.';
spStatus.className = 'success';
setTimeout(function() {
spStatus.className = '';
spStatus.textContent = '';
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
}, 2000);
} catch (e) {
alert('Error saving plan.');
var spStatusErr = document.getElementById('sp-status');
spStatusErr.textContent = '\u2717 Error saving plan.';
spStatusErr.className = 'error';
setTimeout(function() {
spStatusErr.className = '';
spStatusErr.textContent = '';
}, 4000);
}
});
@@ -1323,15 +1413,6 @@ Sovereignty and service always.`;
_activateSafetyPlanFocusTrap(safetyPlanBtn);
});
// Chat input area safety plan button — always visible (#38)
if (chatSafetyPlanBtn) {
chatSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(chatSafetyPlanBtn);
});
}
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
@@ -1358,6 +1439,7 @@ Sovereignty and service always.`;
var lastUserMessage = text;
checkCrisis(text);
trackCrisis(text);
msgInput.value = '';
msgInput.style.height = 'auto';
@@ -1446,6 +1528,7 @@ Sovereignty and service always.`;
messages.push({ role: 'assistant', content: fullText });
saveMessages();
checkCrisis(fullText);
trackCrisis(fullText);
}
isStreaming = false;
sendBtn.disabled = msgInput.value.trim().length === 0;
@@ -1475,6 +1558,7 @@ Sovereignty and service always.`;
// ===== WELCOME MESSAGE =====
function init() {
resetSessionCrisis();
if (!loadMessages()) {
var welcomeText = "Hey. I'm Timmy. I'm here if you want to talk. No judgment, no login, no tracking. Just us.";
addMessage('assistant', welcomeText);

View File

@@ -1,102 +0,0 @@
"""
Tests for #38 — Safety plan accessible from chat (not just overlay).
Verifies:
1. Safety plan button exists in the input area
2. Button has proper ARIA attributes
3. Button is keyboard focusable
4. Button does not require crisis detection to be visible
"""
import re
import unittest
from pathlib import Path
INDEX_HTML = Path(__file__).parent.parent / "index.html"
class TestSafetyPlanInChat(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_chat_safety_plan_button_exists(self):
"""Button #chat-safety-plan-btn exists in the DOM."""
self.assertIn('id="chat-safety-plan-btn"', self.html)
def test_button_has_aria_label(self):
"""Button has aria-label for screen readers."""
match = re.search(
r'<button[^>]*id="chat-safety-plan-btn"[^>]*aria-label="([^"]*)"',
self.html
)
self.assertIsNotNone(match, "chat-safety-plan-btn missing aria-label")
self.assertIn("safety", match.group(1).lower())
def test_button_has_title(self):
"""Button has title attribute for tooltip."""
self.assertRegex(
self.html,
r'<button[^>]*id="chat-safety-plan-btn"[^>]*title="[^"]*"[^>]*>'
)
def test_button_is_in_input_area(self):
"""Button is inside #input-area, not in crisis overlay."""
input_area = re.search(
r'<div id="input-area">(.*?)</div>\s*</div>',
self.html, re.DOTALL
)
self.assertIsNotNone(input_area)
self.assertIn('chat-safety-plan-btn', input_area.group(1))
def test_button_not_in_crisis_overlay(self):
"""Button is NOT inside #crisis-overlay (always visible, no detection)."""
overlay = re.search(
r'<div id="crisis-overlay".*?</div>\s*</div>',
self.html, re.DOTALL
)
if overlay:
self.assertNotIn('chat-safety-plan-btn', overlay.group(0))
def test_button_has_shield_icon(self):
"""Button includes a shield SVG icon."""
btn_match = re.search(
r'<button[^>]*id="chat-safety-plan-btn"[^>]*>(.*?)</button>',
self.html, re.DOTALL
)
self.assertIsNotNone(btn_match)
self.assertIn('svg', btn_match.group(1).lower())
# Shield path
self.assertIn('M12 22s8-4 8-10V5l-8-3-8 3v7c0 6 8 10 8 10z', btn_match.group(1))
def test_css_exists_for_button(self):
"""CSS rules exist for #chat-safety-plan-btn."""
self.assertIn('#chat-safety-plan-btn', self.html)
# Check for hover/focus styles
self.assertIn('#chat-safety-plan-btn:hover', self.html)
self.assertIn('#chat-safety-plan-btn:focus', self.html)
def test_javascript_listener_exists(self):
"""JavaScript event listener exists for the button."""
self.assertIn('chatSafetyPlanBtn', self.html)
self.assertIn("chatSafetyPlanBtn.addEventListener('click'", self.html)
def test_javascript_calls_load_safety_plan(self):
"""Click handler calls loadSafetyPlan() and shows modal."""
listener = re.search(
r'chatSafetyPlanBtn\.addEventListener.*?\{(.*?)\}',
self.html, re.DOTALL
)
self.assertIsNotNone(listener)
body = listener.group(1)
self.assertIn('loadSafetyPlan()', body)
self.assertIn("safetyPlanModal.classList.add('active')", body)
def test_focus_trap_uses_button_as_return_target(self):
"""Focus trap returns focus to chatSafetyPlanBtn when modal closes."""
self.assertIn('_activateSafetyPlanFocusTrap(chatSafetyPlanBtn)', self.html)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,74 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestSafetyPlanInlineFeedback(unittest.TestCase):
"""Test that safety plan uses inline feedback instead of blocking alert()."""
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_no_alert_calls(self):
"""Safety plan save must not use browser alert()."""
alert_matches = re.findall(r'alert\(', self.html)
self.assertEqual(
len(alert_matches), 0,
f'Found {len(alert_matches)} alert() calls - must use inline feedback instead.',
)
def test_sp_status_element_exists(self):
"""Modal footer must contain #sp-status element for inline feedback."""
self.assertRegex(
self.html,
r'id=["\']sp-status["\']',
'Expected #sp-status element in the safety plan modal.',
)
def test_sp_status_has_aria_live(self):
"""#sp-status must have aria-live for accessible announcements."""
self.assertRegex(
self.html,
r'aria-live=["\']polite["\']',
'Expected #sp-status to have aria-live="polite".',
)
def test_success_feedback_exists(self):
"""Must show success message on save."""
self.assertIn(
'Safety plan saved locally.',
self.html,
'Expected success message for safety plan save.',
)
def test_error_feedback_exists(self):
"""Must show error message on save failure."""
self.assertIn(
'Error saving plan.',
self.html,
'Expected error message for safety plan save failure.',
)
def test_css_success_state(self):
"""Must have CSS for .sp-status.success state."""
self.assertIn(
'sp-status.success',
self.html,
'Expected CSS for .sp-status.success state.',
)
def test_css_error_state(self):
"""Must have CSS for .sp-status.error state."""
self.assertIn(
'sp-status.error',
self.html,
'Expected CSS for .sp-status.error state.',
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,17 @@
from pathlib import Path
def test_index_contains_session_crisis_state_and_tracking_hooks():
html = Path('index.html').read_text()
assert 'var sessionCrisis' in html
assert 'function trackCrisis(text)' in html
assert 'function getSessionContext()' in html
assert 'function resetSessionCrisis()' in html
assert 'trackCrisis(text);' in html
assert 'SESSION CONTEXT:' in html
def test_new_session_resets_crisis_state():
html = Path('index.html').read_text()
assert 'resetSessionCrisis()' in html
assert "localStorage.removeItem('timmy_chat_history');" in html