diff --git a/crisis/__init__.py b/crisis/__init__.py index 0b95fce..88a7363 100644 --- a/crisis/__init__.py +++ b/crisis/__init__.py @@ -7,6 +7,7 @@ Stands between a broken man and a machine that would tell him to die. from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji from .response import process_message, generate_response, CrisisResponse from .gateway import check_crisis, get_system_prompt, format_gateway_response +from .session_tracker import CrisisSessionTracker, CrisisSessionState __all__ = [ "detect_crisis", @@ -19,4 +20,6 @@ __all__ = [ "format_result", "format_gateway_response", "get_urgency_emoji", + "CrisisSessionTracker", + "CrisisSessionState", ] diff --git a/crisis/session_tracker.py b/crisis/session_tracker.py new file mode 100644 index 0000000..e5a0d54 --- /dev/null +++ b/crisis/session_tracker.py @@ -0,0 +1,261 @@ +""" +Session-Level Crisis Tracking for the-door. + +Tracks crisis signals across a conversation session to detect escalation +patterns and de-escalation. Privacy-first: state lives in memory only, +resets on new session, never persists to disk. + +Key behaviors: + - Escalation: LOW → HIGH in 3 messages triggers heightened awareness + - De-escalation: CRITICAL → LOW for 5+ messages allows stepping down + - Session state enriches the system prompt with trajectory context + +Usage: + from crisis.session_tracker import CrisisSessionTracker + + tracker = CrisisSessionTracker() + tracker.record("I'm feeling down") + tracker.record("Things are really hard") + tracker.record("I can't go on anymore") # escalation detected + print(tracker.get_prompt_context()) + # "User has escalated from LOW to HIGH over 3 messages." +""" + +from dataclasses import dataclass, field +from typing import List, Optional, Tuple + +from .detect import detect_crisis, CrisisDetectionResult, SCORES + + +# Level hierarchy for ordering comparisons +LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4} + + +@dataclass +class CrisisSessionState: + """Immutable snapshot of session crisis tracking state.""" + current_level: str = "NONE" + peak_level: str = "NONE" + message_count: int = 0 + level_history: List[Tuple[int, str]] = field(default_factory=list) # (message_index, level) + escalated: bool = False + escalation_messages: int = 0 # messages from first level to peak + deescalation_count: int = 0 # consecutive messages at lower level + deescalating: bool = False + deescalation_confirmed: bool = False # True once de-escalation threshold met + + +class CrisisSessionTracker: + """ + Session-level crisis tracker. + + Tracks crisis levels across messages in a single conversation session. + Detects rapid escalation and gradual de-escalation. Provides context + strings for system prompt injection. + + State is in-memory only. New session = new instance. + """ + + # How many messages for escalation detection + ESCALATION_WINDOW = 3 + + # How many consecutive messages at lower level for de-escalation + DEESCALATION_THRESHOLD = 5 + + def __init__(self): + self._state = CrisisSessionState() + + @property + def state(self) -> CrisisSessionState: + """Read-only snapshot of current state.""" + return CrisisSessionState( + current_level=self._state.current_level, + peak_level=self._state.peak_level, + message_count=self._state.message_count, + level_history=list(self._state.level_history), + escalated=self._state.escalated, + escalation_messages=self._state.escalation_messages, + deescalation_count=self._state.deescalation_count, + deescalating=self._state.deescalating, + ) + + def reset(self): + """Reset all state. Called on new session.""" + self._state = CrisisSessionState() + + def record(self, text: str) -> CrisisDetectionResult: + """ + Record a message and update session state. + + Returns the single-message detection result (unchanged from detect.py). + Session-level intelligence is tracked internally. + """ + detection = detect_crisis(text) + self._record_level(detection.level) + return detection + + def record_level(self, level: str): + """Record an already-detected crisis level (for when detection ran separately).""" + self._record_level(level) + + def _record_level(self, level: str): + """Internal: update state with a new crisis level.""" + self._state.message_count += 1 + idx = self._state.message_count + self._state.level_history.append((idx, level)) + + prev_level = self._state.current_level + prev_order = LEVEL_ORDER.get(prev_level, 0) + new_order = LEVEL_ORDER.get(level, 0) + + # Update current level + self._state.current_level = level + + # Track peak + if new_order > LEVEL_ORDER.get(self._state.peak_level, 0): + self._state.peak_level = level + + # ── Escalation detection ────────────────────────────── + if new_order > prev_order: + # User is going up — reset de-escalation counter + self._state.deescalation_count = 0 + self._state.deescalating = False + + # Always check for escalation pattern in recent window + if not self._state.escalated: + window = self._get_recent_levels(self.ESCALATION_WINDOW) + if len(window) >= self.ESCALATION_WINDOW: + first_in_window = window[0][1] + last_in_window = window[-1][1] + first_order = LEVEL_ORDER.get(first_in_window, 0) + last_order = LEVEL_ORDER.get(last_in_window, 0) + + if last_order > first_order: + self._state.escalated = True + self._state.escalation_messages = self.ESCALATION_WINDOW + + # ── De-escalation detection ─────────────────────────── + elif new_order < prev_order: + self._state.deescalation_count += 1 + self._state.deescalating = True + + # If de-escalation counter meets threshold AND we were escalated + if (self._state.escalated and + self._state.deescalation_count >= self.DEESCALATION_THRESHOLD): + self._state.escalated = False + self._state.deescalating = False + self._state.deescalation_confirmed = True + self._state.deescalation_count = 0 + + # Same level — increment de-escalation counter if already de-escalating + elif self._state.deescalating: + self._state.deescalation_count += 1 + if (self._state.escalated and + self._state.deescalation_count >= self.DEESCALATION_THRESHOLD): + self._state.escalated = False + self._state.deescalating = False + self._state.deescalation_confirmed = True + self._state.deescalation_count = 0 + + def _get_recent_levels(self, n: int) -> List[Tuple[int, str]]: + """Get the last n entries from level history.""" + return self._state.level_history[-n:] + + def get_prompt_context(self) -> str: + """ + Generate a human-readable context string for system prompt injection. + + Returns empty string if no session-level crisis context is needed. + """ + s = self._state + + if s.message_count == 0: + return "" + + parts = [] + + # Escalation alert + if s.escalated and s.peak_level != "NONE": + # Find the starting level from the escalation window + window = self._get_recent_levels(self.ESCALATION_WINDOW) + if window: + start_level = window[0][1] + parts.append( + f"User has escalated from {start_level} to {s.peak_level} " + f"over {s.message_count} message{'s' if s.message_count != 1 else ''}." + ) + parts.append("Heightened crisis awareness is warranted.") + + # Confirmed de-escalation + elif s.deescalation_confirmed and s.peak_level in ("HIGH", "CRITICAL"): + parts.append( + f"User has de-escalated from {s.peak_level} to {s.current_level}." + ) + parts.append("De-escalation confirmed. Continue gentle presence.") + + # Active de-escalation (not yet confirmed) + elif s.deescalating and s.peak_level != "NONE": + parts.append( + f"User has de-escalated from {s.peak_level} to {s.current_level} " + f"over {s.deescalation_count} message{'s' if s.deescalation_count != 1 else ''}." + ) + parts.append("Still in de-escalation. Maintain supportive awareness.") + + # Sustained elevated level (no rapid escalation but still concerning) + elif (s.current_level in ("HIGH", "CRITICAL") and + not s.escalated and s.message_count >= 3): + parts.append( + f"User has been at {s.current_level} level for {s.message_count} messages." + ) + parts.append("Continue crisis-aware response.") + + # Peak was higher than current (user improved but may still be fragile) + elif (LEVEL_ORDER.get(s.peak_level, 0) > LEVEL_ORDER.get(s.current_level, 0) and + s.peak_level in ("HIGH", "CRITICAL") and + not s.deescalating): + parts.append( + f"Note: session peak was {s.peak_level}. " + f"User is now at {s.current_level}." + ) + parts.append("Remain attentive.") + + return " ".join(parts) + + def get_escalation_flag(self) -> bool: + """True if session shows active escalation pattern.""" + return self._state.escalated + + def get_effective_level(self) -> str: + """ + Get the effective crisis level considering session state. + + Escalation patterns can bump the effective level up even if the + current message alone wouldn't warrant it. + """ + s = self._state + + if s.escalated and s.peak_level in ("HIGH", "CRITICAL"): + return s.peak_level + + return s.current_level + + def should_heighten_awareness(self) -> bool: + """ + True when the session trajectory warrants heightened awareness + beyond what a single-message detection would provide. + """ + return self._state.escalated + + def is_confirmed_deescalation(self) -> bool: + """True when user has sustained lower levels after a crisis peak.""" + s = self._state + return s.deescalation_confirmed + + def __repr__(self) -> str: + s = self._state + return ( + f"CrisisSessionTracker(" + f"current={s.current_level}, peak={s.peak_level}, " + f"msgs={s.message_count}, escalated={s.escalated}, " + f"deescalating={s.deescalating})" + ) diff --git a/crisis/tests_session_tracker.py b/crisis/tests_session_tracker.py new file mode 100644 index 0000000..b453211 --- /dev/null +++ b/crisis/tests_session_tracker.py @@ -0,0 +1,407 @@ +""" +Tests for session-level crisis tracking (Issue #35). + +Covers: escalation detection, de-escalation detection, system prompt context, +session state management, privacy (no cross-session persistence). + +Run with: python -m pytest crisis/tests_session_tracker.py -v +""" + +import unittest +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from crisis.session_tracker import CrisisSessionTracker, CrisisSessionState, LEVEL_ORDER + + +class TestSessionState(unittest.TestCase): + """Test basic session state management.""" + + def test_initial_state(self): + tracker = CrisisSessionTracker() + s = tracker.state + self.assertEqual(s.current_level, "NONE") + self.assertEqual(s.peak_level, "NONE") + self.assertEqual(s.message_count, 0) + self.assertEqual(s.level_history, []) + self.assertFalse(s.escalated) + + def test_record_increments_count(self): + tracker = CrisisSessionTracker() + tracker.record("hello") + self.assertEqual(tracker.state.message_count, 1) + tracker.record("world") + self.assertEqual(tracker.state.message_count, 2) + + def test_record_tracks_history(self): + tracker = CrisisSessionTracker() + tracker.record("I'm struggling") # LOW + tracker.record("I feel hopeless and nobody cares") # MEDIUM + history = tracker.state.level_history + self.assertEqual(len(history), 2) + self.assertEqual(history[0], (1, "LOW")) + self.assertIn(history[1][1], ("MEDIUM", "LOW", "HIGH")) + + def test_reset_clears_state(self): + tracker = CrisisSessionTracker() + tracker.record("I want to die") # CRITICAL + self.assertEqual(tracker.state.peak_level, "CRITICAL") + tracker.reset() + s = tracker.state + self.assertEqual(s.current_level, "NONE") + self.assertEqual(s.peak_level, "NONE") + self.assertEqual(s.message_count, 0) + self.assertEqual(s.level_history, []) + + def test_peak_tracking(self): + tracker = CrisisSessionTracker() + tracker.record("I'm feeling down") # LOW + tracker.record("I feel hopeless with no way out") # HIGH + tracker.record("Actually I'm a bit better") # back to LOW + self.assertEqual(tracker.state.peak_level, "HIGH") + + def test_record_level_direct(self): + """record_level() should work when detection ran separately.""" + tracker = CrisisSessionTracker() + tracker.record_level("LOW") + tracker.record_level("MEDIUM") + tracker.record_level("HIGH") + self.assertEqual(tracker.state.message_count, 3) + self.assertEqual(tracker.state.peak_level, "HIGH") + + +class TestEscalationDetection(unittest.TestCase): + """Test rapid escalation detection: LOW → HIGH in N messages.""" + + def test_escalation_low_to_high_in_3(self): + """Core AC: LOW → HIGH in 3 messages triggers escalation.""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.record("I'm having a tough time") # LOW + tracker.record("I feel really hopeless about everything") # HIGH + tracker.record("I can't take this anymore") # HIGH + self.assertTrue(tracker.get_escalation_flag()) + + def test_escalation_low_to_critical_in_3(self): + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.record("I'm struggling") # LOW + tracker.record("Nobody would miss me") # HIGH + tracker.record("I want to end my life") # CRITICAL + self.assertTrue(tracker.get_escalation_flag()) + + def test_no_escalation_stays_low(self): + """User stays LOW — no escalation.""" + tracker = CrisisSessionTracker() + tracker.record("rough day") + tracker.record("tough week") + tracker.record("feeling down") + self.assertFalse(tracker.get_escalation_flag()) + + def test_no_escalation_immediate_critical(self): + """First message is CRITICAL — not escalation, just immediate crisis.""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.record("I want to kill myself") # CRITICAL on first message + # Not escalated because there's no upward trajectory — it started high + self.assertFalse(tracker.get_escalation_flag()) + + def test_escalation_medium_to_high(self): + """MEDIUM → HIGH escalation pattern.""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.record("I feel alone and nobody cares") # MEDIUM + tracker.record("I feel completely hopeless") # HIGH + tracker.record("There's no way out") # HIGH + self.assertTrue(tracker.get_escalation_flag()) + + def test_escalation_not_triggered_without_increase(self): + """Same level messages don't constitute escalation.""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.record("I feel hopeless") # HIGH + tracker.record("I'm in so much pain") # HIGH + tracker.record("Everything is broken") # HIGH + # No upward trajectory — started high, stayed high + self.assertFalse(tracker.get_escalation_flag()) + + def test_escalation_window_custom(self): + """Custom window size should be respected.""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 2 + tracker.record("I'm unhappy") # LOW + tracker.record("I can't go on anymore") # HIGH + self.assertTrue(tracker.get_escalation_flag()) + + +class TestDeescalationDetection(unittest.TestCase): + """Test de-escalation: CRITICAL → LOW for 5+ messages.""" + + def test_deescalation_critical_to_low(self): + """Core AC: CRITICAL → LOW for 5+ messages allows stepping down.""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.DEESCALATION_THRESHOLD = 5 + + # Escalate to CRITICAL + tracker.record("I'm struggling") # LOW + tracker.record("I can't go on") # HIGH + tracker.record("I want to die") # CRITICAL + self.assertTrue(tracker.get_escalation_flag()) + + # De-escalate over 5 messages + tracker.record("I called 988") # NONE/LOW + tracker.record("I'm calmer now") # NONE + tracker.record("Thank you for being here") # NONE + tracker.record("I'm going to be okay") # NONE + tracker.record("Taking it one moment at a time") # NONE + + self.assertTrue(tracker.is_confirmed_deescalation()) + self.assertFalse(tracker.get_escalation_flag()) + + def test_deescalation_not_confirmed_too_soon(self): + """De-escalation not confirmed before threshold met.""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.DEESCALATION_THRESHOLD = 5 + + tracker.record("I'm struggling") + tracker.record("I can't go on") + tracker.record("I want to die") + self.assertTrue(tracker.get_escalation_flag()) + + # Only 3 messages of de-escalation + tracker.record("I called someone") + tracker.record("I'm calmer") + tracker.record("Feeling better") + + self.assertFalse(tracker.is_confirmed_deescalation()) + self.assertTrue(tracker.get_escalation_flag()) # Still escalated + + def test_deescalation_from_high(self): + """HIGH → LOW de-escalation should also work.""" + tracker = CrisisSessionTracker() + tracker.DEESCALATION_THRESHOLD = 5 + + # Build up to HIGH + tracker.record("I'm down") # LOW + tracker.record("I feel hopeless") # HIGH + tracker.record("No way out") # HIGH + tracker.record("Everything is pointless") # HIGH + + # De-escalate + for _ in range(5): + tracker.record("I'm doing a bit better") # LOW/NONE + + self.assertTrue(tracker.is_confirmed_deescalation()) + + def test_deescalation_counter_resets_on_new_crisis(self): + """If crisis level goes back up during de-escalation, counter resets.""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.DEESCALATION_THRESHOLD = 5 + + # Escalate + tracker.record("I'm struggling") # LOW + tracker.record("I feel hopeless") # HIGH + tracker.record("I want to die") # CRITICAL + + # Start de-escalating + tracker.record("I called someone") # LOW + tracker.record("I'm calmer") # NONE + + # Go back up + tracker.record("Actually I can't do this") # HIGH + + self.assertFalse(tracker.is_confirmed_deescalation()) + + +class TestSystemPromptContext(unittest.TestCase): + """Test system prompt context generation.""" + + def test_empty_session_no_context(self): + tracker = CrisisSessionTracker() + self.assertEqual(tracker.get_prompt_context(), "") + + def test_escalation_context(self): + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.record("I'm having a tough time") + tracker.record("Things are really hard") + tracker.record("I can't go on anymore") + + ctx = tracker.get_prompt_context() + self.assertIn("escalated", ctx.lower()) + self.assertIn("LOW", ctx) + self.assertIn("heightened", ctx.lower()) + + def test_deescalation_context(self): + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.DEESCALATION_THRESHOLD = 5 + + # Escalate + tracker.record("I'm struggling") + tracker.record("I feel hopeless") + tracker.record("I want to die") + + # De-escalate + for _ in range(5): + tracker.record("I'm okay now") + + ctx = tracker.get_prompt_context() + self.assertIn("de-escalated", ctx.lower()) + self.assertIn("confirmed", ctx.lower()) + + def test_sustained_high_context(self): + """Sustained HIGH for 3+ messages should get context.""" + tracker = CrisisSessionTracker() + tracker.record("I'm in so much pain") # HIGH + tracker.record("Everything is pointless") # HIGH + tracker.record("I can't go on") # HIGH + + ctx = tracker.get_prompt_context() + self.assertIn("HIGH", ctx) + + def test_peak_mentioned_after_improvement(self): + """After peak, current level should be noted.""" + tracker = CrisisSessionTracker() + tracker.record("I'm in so much pain") # HIGH + tracker.record("Everything is pointless") # HIGH + tracker.record("I feel a bit better") # back to LOW + + ctx = tracker.get_prompt_context() + self.assertIn("peak", ctx.lower()) + self.assertIn("HIGH", ctx) + + +class TestEffectiveLevel(unittest.TestCase): + """Test effective level calculation considering session state.""" + + def test_effective_level_normal(self): + tracker = CrisisSessionTracker() + tracker.record("I'm struggling") # LOW + self.assertEqual(tracker.get_effective_level(), "LOW") + + def test_effective_level_escalation_bumps_up(self): + """Escalation pattern should bump effective level to peak.""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.record("I'm having a tough time") # LOW + tracker.record("I feel completely hopeless") # HIGH + tracker.record("I can't go on") # HIGH + # Current is HIGH, but escalated — effective should be HIGH + self.assertEqual(tracker.get_effective_level(), "HIGH") + + +class TestPrivacy(unittest.TestCase): + """Test privacy requirements — no cross-session persistence.""" + + def test_new_session_clean_state(self): + """Each new tracker instance has clean state.""" + t1 = CrisisSessionTracker() + t1.record("I want to die") + self.assertEqual(t1.state.peak_level, "CRITICAL") + + t2 = CrisisSessionTracker() + self.assertEqual(t2.state.peak_level, "NONE") + self.assertEqual(t2.state.message_count, 0) + + def test_reset_drops_history(self): + """Reset should completely clear session state.""" + t = CrisisSessionTracker() + t.record("I'm struggling") + t.record("I can't go on") + t.reset() + self.assertEqual(t.state.level_history, []) + self.assertEqual(t.state.message_count, 0) + self.assertFalse(t.state.escalated) + + +class TestEdgeCases(unittest.TestCase): + """Edge cases and boundary conditions.""" + + def test_single_message_none(self): + tracker = CrisisSessionTracker() + tracker.record("Hello Timmy") + self.assertFalse(tracker.get_escalation_flag()) + self.assertEqual(tracker.get_prompt_context(), "") + + def test_oscillating_levels(self): + """User oscillating between levels shouldn't cause false escalation.""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.record("I'm fine") + tracker.record("I'm struggling") + tracker.record("I'm fine") + tracker.record("I'm struggling") + # Oscillation without sustained escalation + self.assertFalse(tracker.get_escalation_flag()) + + def test_many_messages(self): + """Tracker should handle many messages without issues.""" + tracker = CrisisSessionTracker() + for i in range(100): + tracker.record("Hello there") + self.assertEqual(tracker.state.message_count, 100) + + def test_empty_string(self): + tracker = CrisisSessionTracker() + tracker.record("") + self.assertEqual(tracker.state.message_count, 1) + self.assertEqual(tracker.state.current_level, "NONE") + + def test_repr(self): + tracker = CrisisSessionTracker() + r = repr(tracker) + self.assertIn("CrisisSessionTracker", r) + self.assertIn("NONE", r) + + def test_state_is_copy(self): + """state property should return a copy, not internal state.""" + tracker = CrisisSessionTracker() + s1 = tracker.state + tracker.record("I'm struggling") + s2 = tracker.state + self.assertEqual(s1.message_count, 0) + self.assertEqual(s2.message_count, 1) + + +class TestLevelOrder(unittest.TestCase): + """Test level ordering is correct.""" + + def test_level_ordering(self): + self.assertLess(LEVEL_ORDER["NONE"], LEVEL_ORDER["LOW"]) + self.assertLess(LEVEL_ORDER["LOW"], LEVEL_ORDER["MEDIUM"]) + self.assertLess(LEVEL_ORDER["MEDIUM"], LEVEL_ORDER["HIGH"]) + self.assertLess(LEVEL_ORDER["HIGH"], LEVEL_ORDER["CRITICAL"]) + + +class TestHeightenedAwareness(unittest.TestCase): + """Test heightened awareness flag.""" + + def test_heightened_on_escalation(self): + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.record("I'm unhappy") + tracker.record("I feel hopeless") + tracker.record("I can't go on") + self.assertTrue(tracker.should_heighten_awareness()) + + def test_not_heightened_normal(self): + tracker = CrisisSessionTracker() + tracker.record("Hello") + self.assertFalse(tracker.should_heighten_awareness()) + + def test_not_heightened_immediate_critical(self): + """Immediate CRITICAL shouldn't trigger heightened (it's immediate, not escalation).""" + tracker = CrisisSessionTracker() + tracker.ESCALATION_WINDOW = 3 + tracker.record("I want to kill myself") + self.assertFalse(tracker.should_heighten_awareness()) + + +if __name__ == "__main__": + unittest.main() diff --git a/index.html b/index.html index 06cff1c..771a22e 100644 --- a/index.html +++ b/index.html @@ -964,6 +964,70 @@ Sovereignty and service always.`; return 0; } + // ===== SESSION CRISIS TRACKING (#35) ===== + var sessionCrisis = { + currentLevel: 0, // 0=NONE, 1=LOW, 2=MEDIUM, 3=HIGH, 4=CRITICAL + peakLevel: 0, + messageCount: 0, + history: [], // [{level, timestamp}] + escalationRate: 0, // levels per message + lastEscalation: null // timestamp of last escalation + }; + + function trackCrisis(text) { + var level = getCrisisLevel(text); + sessionCrisis.messageCount++; + sessionCrisis.history.push({ level: level, time: Date.now() }); + + if (level > sessionCrisis.currentLevel) { + sessionCrisis.lastEscalation = Date.now(); + } + + sessionCrisis.currentLevel = level; + sessionCrisis.peakLevel = Math.max(sessionCrisis.peakLevel, level); + + var recent = sessionCrisis.history.slice(-5); + if (recent.length >= 2) { + var first = recent[0].level; + var last = recent[recent.length - 1].level; + sessionCrisis.escalationRate = (last - first) / recent.length; + } + + return getSessionContext(); + } + + function getSessionContext() { + var ctx = ''; + + if (sessionCrisis.history.length < 2) return ctx; + + if (sessionCrisis.escalationRate > 0.5 && sessionCrisis.history.length <= 3) { + ctx += 'ESCALATION ALERT: User crisis level is rising rapidly. '; + } + + if (sessionCrisis.peakLevel >= 3 && sessionCrisis.currentLevel <= 1 && sessionCrisis.messageCount >= 5) { + ctx += 'DE-ESCALATION: User appears to be calming. Maintain presence but reduce urgency. '; + } + + if (sessionCrisis.currentLevel >= 2 && sessionCrisis.messageCount >= 3) { + ctx += 'User has been in crisis for ' + sessionCrisis.messageCount + ' messages. '; + } + + var levels = sessionCrisis.history.map(function(h) { return h.level; }); + if (levels.length >= 2) { + ctx += 'Crisis trajectory: ' + levels.join(' → ') + '. '; + } + + return ctx; + } + + function resetSessionCrisis() { + sessionCrisis = { + currentLevel: 0, peakLevel: 0, messageCount: 0, + history: [], escalationRate: 0, lastEscalation: null + }; + } + // ===== GET SYSTEM PROMPT (wraps with crisis context) ===== function getSystemPrompt(userText) { var level = getCrisisLevel(userText); @@ -978,7 +1042,9 @@ Sovereignty and service always.`; var directive = 'DIRECTIVE: ' + profile.directive + '\n'; var tone = 'TONE: ' + profile.tone + '\n'; - return SYSTEM_PROMPT + divider + header + directive + tone; + var sessionCtx = getSessionContext(); + var sessionBlock = sessionCtx ? '\nSESSION CONTEXT: ' + sessionCtx : ''; + return SYSTEM_PROMPT + divider + header + directive + tone + sessionBlock; } @@ -1164,6 +1230,7 @@ Sovereignty and service always.`; clearChatBtn.addEventListener('click', function() { if (confirm('Clear all chat history?')) { localStorage.removeItem('timmy_chat_history'); + resetSessionCrisis(); window.location.reload(); } }); @@ -1316,6 +1383,7 @@ Sovereignty and service always.`; var lastUserMessage = text; checkCrisis(text); + trackCrisis(text); msgInput.value = ''; msgInput.style.height = 'auto'; @@ -1404,6 +1472,7 @@ Sovereignty and service always.`; messages.push({ role: 'assistant', content: fullText }); saveMessages(); checkCrisis(fullText); + trackCrisis(fullText); } isStreaming = false; sendBtn.disabled = msgInput.value.trim().length === 0; @@ -1433,6 +1502,7 @@ Sovereignty and service always.`; // ===== WELCOME MESSAGE ===== function init() { + resetSessionCrisis(); if (!loadMessages()) { var welcomeText = "Hey. I'm Timmy. I'm here if you want to talk. No judgment, no login, no tracking. Just us."; addMessage('assistant', welcomeText); diff --git a/tests/test_session_crisis_frontend.py b/tests/test_session_crisis_frontend.py new file mode 100644 index 0000000..793f084 --- /dev/null +++ b/tests/test_session_crisis_frontend.py @@ -0,0 +1,17 @@ +from pathlib import Path + + +def test_index_contains_session_crisis_state_and_tracking_hooks(): + html = Path('index.html').read_text() + assert 'var sessionCrisis' in html + assert 'function trackCrisis(text)' in html + assert 'function getSessionContext()' in html + assert 'function resetSessionCrisis()' in html + assert 'trackCrisis(text);' in html + assert 'SESSION CONTEXT:' in html + + +def test_new_session_resets_crisis_state(): + html = Path('index.html').read_text() + assert 'resetSessionCrisis()' in html + assert "localStorage.removeItem('timmy_chat_history');" in html