Compare commits
2 Commits
fix/132
...
fix/burn-c
| Author | SHA1 | Date | |
|---|---|---|---|
| 9da8afb168 | |||
|
|
cd50b3d767 |
@@ -7,6 +7,7 @@ Stands between a broken man and a machine that would tell him to die.
|
||||
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
|
||||
from .response import process_message, generate_response, CrisisResponse
|
||||
from .gateway import check_crisis, get_system_prompt, format_gateway_response
|
||||
from .session_tracker import CrisisSessionTracker, CrisisSessionState
|
||||
|
||||
__all__ = [
|
||||
"detect_crisis",
|
||||
@@ -19,4 +20,6 @@ __all__ = [
|
||||
"format_result",
|
||||
"format_gateway_response",
|
||||
"get_urgency_emoji",
|
||||
"CrisisSessionTracker",
|
||||
"CrisisSessionState",
|
||||
]
|
||||
|
||||
261
crisis/session_tracker.py
Normal file
261
crisis/session_tracker.py
Normal file
@@ -0,0 +1,261 @@
|
||||
"""
|
||||
Session-Level Crisis Tracking for the-door.
|
||||
|
||||
Tracks crisis signals across a conversation session to detect escalation
|
||||
patterns and de-escalation. Privacy-first: state lives in memory only,
|
||||
resets on new session, never persists to disk.
|
||||
|
||||
Key behaviors:
|
||||
- Escalation: LOW → HIGH in 3 messages triggers heightened awareness
|
||||
- De-escalation: CRITICAL → LOW for 5+ messages allows stepping down
|
||||
- Session state enriches the system prompt with trajectory context
|
||||
|
||||
Usage:
|
||||
from crisis.session_tracker import CrisisSessionTracker
|
||||
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("I'm feeling down")
|
||||
tracker.record("Things are really hard")
|
||||
tracker.record("I can't go on anymore") # escalation detected
|
||||
print(tracker.get_prompt_context())
|
||||
# "User has escalated from LOW to HIGH over 3 messages."
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from .detect import detect_crisis, CrisisDetectionResult, SCORES
|
||||
|
||||
|
||||
# Level hierarchy for ordering comparisons
|
||||
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisSessionState:
|
||||
"""Immutable snapshot of session crisis tracking state."""
|
||||
current_level: str = "NONE"
|
||||
peak_level: str = "NONE"
|
||||
message_count: int = 0
|
||||
level_history: List[Tuple[int, str]] = field(default_factory=list) # (message_index, level)
|
||||
escalated: bool = False
|
||||
escalation_messages: int = 0 # messages from first level to peak
|
||||
deescalation_count: int = 0 # consecutive messages at lower level
|
||||
deescalating: bool = False
|
||||
deescalation_confirmed: bool = False # True once de-escalation threshold met
|
||||
|
||||
|
||||
class CrisisSessionTracker:
|
||||
"""
|
||||
Session-level crisis tracker.
|
||||
|
||||
Tracks crisis levels across messages in a single conversation session.
|
||||
Detects rapid escalation and gradual de-escalation. Provides context
|
||||
strings for system prompt injection.
|
||||
|
||||
State is in-memory only. New session = new instance.
|
||||
"""
|
||||
|
||||
# How many messages for escalation detection
|
||||
ESCALATION_WINDOW = 3
|
||||
|
||||
# How many consecutive messages at lower level for de-escalation
|
||||
DEESCALATION_THRESHOLD = 5
|
||||
|
||||
def __init__(self):
|
||||
self._state = CrisisSessionState()
|
||||
|
||||
@property
|
||||
def state(self) -> CrisisSessionState:
|
||||
"""Read-only snapshot of current state."""
|
||||
return CrisisSessionState(
|
||||
current_level=self._state.current_level,
|
||||
peak_level=self._state.peak_level,
|
||||
message_count=self._state.message_count,
|
||||
level_history=list(self._state.level_history),
|
||||
escalated=self._state.escalated,
|
||||
escalation_messages=self._state.escalation_messages,
|
||||
deescalation_count=self._state.deescalation_count,
|
||||
deescalating=self._state.deescalating,
|
||||
)
|
||||
|
||||
def reset(self):
|
||||
"""Reset all state. Called on new session."""
|
||||
self._state = CrisisSessionState()
|
||||
|
||||
def record(self, text: str) -> CrisisDetectionResult:
|
||||
"""
|
||||
Record a message and update session state.
|
||||
|
||||
Returns the single-message detection result (unchanged from detect.py).
|
||||
Session-level intelligence is tracked internally.
|
||||
"""
|
||||
detection = detect_crisis(text)
|
||||
self._record_level(detection.level)
|
||||
return detection
|
||||
|
||||
def record_level(self, level: str):
|
||||
"""Record an already-detected crisis level (for when detection ran separately)."""
|
||||
self._record_level(level)
|
||||
|
||||
def _record_level(self, level: str):
|
||||
"""Internal: update state with a new crisis level."""
|
||||
self._state.message_count += 1
|
||||
idx = self._state.message_count
|
||||
self._state.level_history.append((idx, level))
|
||||
|
||||
prev_level = self._state.current_level
|
||||
prev_order = LEVEL_ORDER.get(prev_level, 0)
|
||||
new_order = LEVEL_ORDER.get(level, 0)
|
||||
|
||||
# Update current level
|
||||
self._state.current_level = level
|
||||
|
||||
# Track peak
|
||||
if new_order > LEVEL_ORDER.get(self._state.peak_level, 0):
|
||||
self._state.peak_level = level
|
||||
|
||||
# ── Escalation detection ──────────────────────────────
|
||||
if new_order > prev_order:
|
||||
# User is going up — reset de-escalation counter
|
||||
self._state.deescalation_count = 0
|
||||
self._state.deescalating = False
|
||||
|
||||
# Always check for escalation pattern in recent window
|
||||
if not self._state.escalated:
|
||||
window = self._get_recent_levels(self.ESCALATION_WINDOW)
|
||||
if len(window) >= self.ESCALATION_WINDOW:
|
||||
first_in_window = window[0][1]
|
||||
last_in_window = window[-1][1]
|
||||
first_order = LEVEL_ORDER.get(first_in_window, 0)
|
||||
last_order = LEVEL_ORDER.get(last_in_window, 0)
|
||||
|
||||
if last_order > first_order:
|
||||
self._state.escalated = True
|
||||
self._state.escalation_messages = self.ESCALATION_WINDOW
|
||||
|
||||
# ── De-escalation detection ───────────────────────────
|
||||
elif new_order < prev_order:
|
||||
self._state.deescalation_count += 1
|
||||
self._state.deescalating = True
|
||||
|
||||
# If de-escalation counter meets threshold AND we were escalated
|
||||
if (self._state.escalated and
|
||||
self._state.deescalation_count >= self.DEESCALATION_THRESHOLD):
|
||||
self._state.escalated = False
|
||||
self._state.deescalating = False
|
||||
self._state.deescalation_confirmed = True
|
||||
self._state.deescalation_count = 0
|
||||
|
||||
# Same level — increment de-escalation counter if already de-escalating
|
||||
elif self._state.deescalating:
|
||||
self._state.deescalation_count += 1
|
||||
if (self._state.escalated and
|
||||
self._state.deescalation_count >= self.DEESCALATION_THRESHOLD):
|
||||
self._state.escalated = False
|
||||
self._state.deescalating = False
|
||||
self._state.deescalation_confirmed = True
|
||||
self._state.deescalation_count = 0
|
||||
|
||||
def _get_recent_levels(self, n: int) -> List[Tuple[int, str]]:
|
||||
"""Get the last n entries from level history."""
|
||||
return self._state.level_history[-n:]
|
||||
|
||||
def get_prompt_context(self) -> str:
|
||||
"""
|
||||
Generate a human-readable context string for system prompt injection.
|
||||
|
||||
Returns empty string if no session-level crisis context is needed.
|
||||
"""
|
||||
s = self._state
|
||||
|
||||
if s.message_count == 0:
|
||||
return ""
|
||||
|
||||
parts = []
|
||||
|
||||
# Escalation alert
|
||||
if s.escalated and s.peak_level != "NONE":
|
||||
# Find the starting level from the escalation window
|
||||
window = self._get_recent_levels(self.ESCALATION_WINDOW)
|
||||
if window:
|
||||
start_level = window[0][1]
|
||||
parts.append(
|
||||
f"User has escalated from {start_level} to {s.peak_level} "
|
||||
f"over {s.message_count} message{'s' if s.message_count != 1 else ''}."
|
||||
)
|
||||
parts.append("Heightened crisis awareness is warranted.")
|
||||
|
||||
# Confirmed de-escalation
|
||||
elif s.deescalation_confirmed and s.peak_level in ("HIGH", "CRITICAL"):
|
||||
parts.append(
|
||||
f"User has de-escalated from {s.peak_level} to {s.current_level}."
|
||||
)
|
||||
parts.append("De-escalation confirmed. Continue gentle presence.")
|
||||
|
||||
# Active de-escalation (not yet confirmed)
|
||||
elif s.deescalating and s.peak_level != "NONE":
|
||||
parts.append(
|
||||
f"User has de-escalated from {s.peak_level} to {s.current_level} "
|
||||
f"over {s.deescalation_count} message{'s' if s.deescalation_count != 1 else ''}."
|
||||
)
|
||||
parts.append("Still in de-escalation. Maintain supportive awareness.")
|
||||
|
||||
# Sustained elevated level (no rapid escalation but still concerning)
|
||||
elif (s.current_level in ("HIGH", "CRITICAL") and
|
||||
not s.escalated and s.message_count >= 3):
|
||||
parts.append(
|
||||
f"User has been at {s.current_level} level for {s.message_count} messages."
|
||||
)
|
||||
parts.append("Continue crisis-aware response.")
|
||||
|
||||
# Peak was higher than current (user improved but may still be fragile)
|
||||
elif (LEVEL_ORDER.get(s.peak_level, 0) > LEVEL_ORDER.get(s.current_level, 0) and
|
||||
s.peak_level in ("HIGH", "CRITICAL") and
|
||||
not s.deescalating):
|
||||
parts.append(
|
||||
f"Note: session peak was {s.peak_level}. "
|
||||
f"User is now at {s.current_level}."
|
||||
)
|
||||
parts.append("Remain attentive.")
|
||||
|
||||
return " ".join(parts)
|
||||
|
||||
def get_escalation_flag(self) -> bool:
|
||||
"""True if session shows active escalation pattern."""
|
||||
return self._state.escalated
|
||||
|
||||
def get_effective_level(self) -> str:
|
||||
"""
|
||||
Get the effective crisis level considering session state.
|
||||
|
||||
Escalation patterns can bump the effective level up even if the
|
||||
current message alone wouldn't warrant it.
|
||||
"""
|
||||
s = self._state
|
||||
|
||||
if s.escalated and s.peak_level in ("HIGH", "CRITICAL"):
|
||||
return s.peak_level
|
||||
|
||||
return s.current_level
|
||||
|
||||
def should_heighten_awareness(self) -> bool:
|
||||
"""
|
||||
True when the session trajectory warrants heightened awareness
|
||||
beyond what a single-message detection would provide.
|
||||
"""
|
||||
return self._state.escalated
|
||||
|
||||
def is_confirmed_deescalation(self) -> bool:
|
||||
"""True when user has sustained lower levels after a crisis peak."""
|
||||
s = self._state
|
||||
return s.deescalation_confirmed
|
||||
|
||||
def __repr__(self) -> str:
|
||||
s = self._state
|
||||
return (
|
||||
f"CrisisSessionTracker("
|
||||
f"current={s.current_level}, peak={s.peak_level}, "
|
||||
f"msgs={s.message_count}, escalated={s.escalated}, "
|
||||
f"deescalating={s.deescalating})"
|
||||
)
|
||||
407
crisis/tests_session_tracker.py
Normal file
407
crisis/tests_session_tracker.py
Normal file
@@ -0,0 +1,407 @@
|
||||
"""
|
||||
Tests for session-level crisis tracking (Issue #35).
|
||||
|
||||
Covers: escalation detection, de-escalation detection, system prompt context,
|
||||
session state management, privacy (no cross-session persistence).
|
||||
|
||||
Run with: python -m pytest crisis/tests_session_tracker.py -v
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from crisis.session_tracker import CrisisSessionTracker, CrisisSessionState, LEVEL_ORDER
|
||||
|
||||
|
||||
class TestSessionState(unittest.TestCase):
|
||||
"""Test basic session state management."""
|
||||
|
||||
def test_initial_state(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
s = tracker.state
|
||||
self.assertEqual(s.current_level, "NONE")
|
||||
self.assertEqual(s.peak_level, "NONE")
|
||||
self.assertEqual(s.message_count, 0)
|
||||
self.assertEqual(s.level_history, [])
|
||||
self.assertFalse(s.escalated)
|
||||
|
||||
def test_record_increments_count(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("hello")
|
||||
self.assertEqual(tracker.state.message_count, 1)
|
||||
tracker.record("world")
|
||||
self.assertEqual(tracker.state.message_count, 2)
|
||||
|
||||
def test_record_tracks_history(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("I'm struggling") # LOW
|
||||
tracker.record("I feel hopeless and nobody cares") # MEDIUM
|
||||
history = tracker.state.level_history
|
||||
self.assertEqual(len(history), 2)
|
||||
self.assertEqual(history[0], (1, "LOW"))
|
||||
self.assertIn(history[1][1], ("MEDIUM", "LOW", "HIGH"))
|
||||
|
||||
def test_reset_clears_state(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("I want to die") # CRITICAL
|
||||
self.assertEqual(tracker.state.peak_level, "CRITICAL")
|
||||
tracker.reset()
|
||||
s = tracker.state
|
||||
self.assertEqual(s.current_level, "NONE")
|
||||
self.assertEqual(s.peak_level, "NONE")
|
||||
self.assertEqual(s.message_count, 0)
|
||||
self.assertEqual(s.level_history, [])
|
||||
|
||||
def test_peak_tracking(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("I'm feeling down") # LOW
|
||||
tracker.record("I feel hopeless with no way out") # HIGH
|
||||
tracker.record("Actually I'm a bit better") # back to LOW
|
||||
self.assertEqual(tracker.state.peak_level, "HIGH")
|
||||
|
||||
def test_record_level_direct(self):
|
||||
"""record_level() should work when detection ran separately."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record_level("LOW")
|
||||
tracker.record_level("MEDIUM")
|
||||
tracker.record_level("HIGH")
|
||||
self.assertEqual(tracker.state.message_count, 3)
|
||||
self.assertEqual(tracker.state.peak_level, "HIGH")
|
||||
|
||||
|
||||
class TestEscalationDetection(unittest.TestCase):
|
||||
"""Test rapid escalation detection: LOW → HIGH in N messages."""
|
||||
|
||||
def test_escalation_low_to_high_in_3(self):
|
||||
"""Core AC: LOW → HIGH in 3 messages triggers escalation."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.record("I'm having a tough time") # LOW
|
||||
tracker.record("I feel really hopeless about everything") # HIGH
|
||||
tracker.record("I can't take this anymore") # HIGH
|
||||
self.assertTrue(tracker.get_escalation_flag())
|
||||
|
||||
def test_escalation_low_to_critical_in_3(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.record("I'm struggling") # LOW
|
||||
tracker.record("Nobody would miss me") # HIGH
|
||||
tracker.record("I want to end my life") # CRITICAL
|
||||
self.assertTrue(tracker.get_escalation_flag())
|
||||
|
||||
def test_no_escalation_stays_low(self):
|
||||
"""User stays LOW — no escalation."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("rough day")
|
||||
tracker.record("tough week")
|
||||
tracker.record("feeling down")
|
||||
self.assertFalse(tracker.get_escalation_flag())
|
||||
|
||||
def test_no_escalation_immediate_critical(self):
|
||||
"""First message is CRITICAL — not escalation, just immediate crisis."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.record("I want to kill myself") # CRITICAL on first message
|
||||
# Not escalated because there's no upward trajectory — it started high
|
||||
self.assertFalse(tracker.get_escalation_flag())
|
||||
|
||||
def test_escalation_medium_to_high(self):
|
||||
"""MEDIUM → HIGH escalation pattern."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.record("I feel alone and nobody cares") # MEDIUM
|
||||
tracker.record("I feel completely hopeless") # HIGH
|
||||
tracker.record("There's no way out") # HIGH
|
||||
self.assertTrue(tracker.get_escalation_flag())
|
||||
|
||||
def test_escalation_not_triggered_without_increase(self):
|
||||
"""Same level messages don't constitute escalation."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.record("I feel hopeless") # HIGH
|
||||
tracker.record("I'm in so much pain") # HIGH
|
||||
tracker.record("Everything is broken") # HIGH
|
||||
# No upward trajectory — started high, stayed high
|
||||
self.assertFalse(tracker.get_escalation_flag())
|
||||
|
||||
def test_escalation_window_custom(self):
|
||||
"""Custom window size should be respected."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 2
|
||||
tracker.record("I'm unhappy") # LOW
|
||||
tracker.record("I can't go on anymore") # HIGH
|
||||
self.assertTrue(tracker.get_escalation_flag())
|
||||
|
||||
|
||||
class TestDeescalationDetection(unittest.TestCase):
|
||||
"""Test de-escalation: CRITICAL → LOW for 5+ messages."""
|
||||
|
||||
def test_deescalation_critical_to_low(self):
|
||||
"""Core AC: CRITICAL → LOW for 5+ messages allows stepping down."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.DEESCALATION_THRESHOLD = 5
|
||||
|
||||
# Escalate to CRITICAL
|
||||
tracker.record("I'm struggling") # LOW
|
||||
tracker.record("I can't go on") # HIGH
|
||||
tracker.record("I want to die") # CRITICAL
|
||||
self.assertTrue(tracker.get_escalation_flag())
|
||||
|
||||
# De-escalate over 5 messages
|
||||
tracker.record("I called 988") # NONE/LOW
|
||||
tracker.record("I'm calmer now") # NONE
|
||||
tracker.record("Thank you for being here") # NONE
|
||||
tracker.record("I'm going to be okay") # NONE
|
||||
tracker.record("Taking it one moment at a time") # NONE
|
||||
|
||||
self.assertTrue(tracker.is_confirmed_deescalation())
|
||||
self.assertFalse(tracker.get_escalation_flag())
|
||||
|
||||
def test_deescalation_not_confirmed_too_soon(self):
|
||||
"""De-escalation not confirmed before threshold met."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.DEESCALATION_THRESHOLD = 5
|
||||
|
||||
tracker.record("I'm struggling")
|
||||
tracker.record("I can't go on")
|
||||
tracker.record("I want to die")
|
||||
self.assertTrue(tracker.get_escalation_flag())
|
||||
|
||||
# Only 3 messages of de-escalation
|
||||
tracker.record("I called someone")
|
||||
tracker.record("I'm calmer")
|
||||
tracker.record("Feeling better")
|
||||
|
||||
self.assertFalse(tracker.is_confirmed_deescalation())
|
||||
self.assertTrue(tracker.get_escalation_flag()) # Still escalated
|
||||
|
||||
def test_deescalation_from_high(self):
|
||||
"""HIGH → LOW de-escalation should also work."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.DEESCALATION_THRESHOLD = 5
|
||||
|
||||
# Build up to HIGH
|
||||
tracker.record("I'm down") # LOW
|
||||
tracker.record("I feel hopeless") # HIGH
|
||||
tracker.record("No way out") # HIGH
|
||||
tracker.record("Everything is pointless") # HIGH
|
||||
|
||||
# De-escalate
|
||||
for _ in range(5):
|
||||
tracker.record("I'm doing a bit better") # LOW/NONE
|
||||
|
||||
self.assertTrue(tracker.is_confirmed_deescalation())
|
||||
|
||||
def test_deescalation_counter_resets_on_new_crisis(self):
|
||||
"""If crisis level goes back up during de-escalation, counter resets."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.DEESCALATION_THRESHOLD = 5
|
||||
|
||||
# Escalate
|
||||
tracker.record("I'm struggling") # LOW
|
||||
tracker.record("I feel hopeless") # HIGH
|
||||
tracker.record("I want to die") # CRITICAL
|
||||
|
||||
# Start de-escalating
|
||||
tracker.record("I called someone") # LOW
|
||||
tracker.record("I'm calmer") # NONE
|
||||
|
||||
# Go back up
|
||||
tracker.record("Actually I can't do this") # HIGH
|
||||
|
||||
self.assertFalse(tracker.is_confirmed_deescalation())
|
||||
|
||||
|
||||
class TestSystemPromptContext(unittest.TestCase):
|
||||
"""Test system prompt context generation."""
|
||||
|
||||
def test_empty_session_no_context(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
self.assertEqual(tracker.get_prompt_context(), "")
|
||||
|
||||
def test_escalation_context(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.record("I'm having a tough time")
|
||||
tracker.record("Things are really hard")
|
||||
tracker.record("I can't go on anymore")
|
||||
|
||||
ctx = tracker.get_prompt_context()
|
||||
self.assertIn("escalated", ctx.lower())
|
||||
self.assertIn("LOW", ctx)
|
||||
self.assertIn("heightened", ctx.lower())
|
||||
|
||||
def test_deescalation_context(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.DEESCALATION_THRESHOLD = 5
|
||||
|
||||
# Escalate
|
||||
tracker.record("I'm struggling")
|
||||
tracker.record("I feel hopeless")
|
||||
tracker.record("I want to die")
|
||||
|
||||
# De-escalate
|
||||
for _ in range(5):
|
||||
tracker.record("I'm okay now")
|
||||
|
||||
ctx = tracker.get_prompt_context()
|
||||
self.assertIn("de-escalated", ctx.lower())
|
||||
self.assertIn("confirmed", ctx.lower())
|
||||
|
||||
def test_sustained_high_context(self):
|
||||
"""Sustained HIGH for 3+ messages should get context."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("I'm in so much pain") # HIGH
|
||||
tracker.record("Everything is pointless") # HIGH
|
||||
tracker.record("I can't go on") # HIGH
|
||||
|
||||
ctx = tracker.get_prompt_context()
|
||||
self.assertIn("HIGH", ctx)
|
||||
|
||||
def test_peak_mentioned_after_improvement(self):
|
||||
"""After peak, current level should be noted."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("I'm in so much pain") # HIGH
|
||||
tracker.record("Everything is pointless") # HIGH
|
||||
tracker.record("I feel a bit better") # back to LOW
|
||||
|
||||
ctx = tracker.get_prompt_context()
|
||||
self.assertIn("peak", ctx.lower())
|
||||
self.assertIn("HIGH", ctx)
|
||||
|
||||
|
||||
class TestEffectiveLevel(unittest.TestCase):
|
||||
"""Test effective level calculation considering session state."""
|
||||
|
||||
def test_effective_level_normal(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("I'm struggling") # LOW
|
||||
self.assertEqual(tracker.get_effective_level(), "LOW")
|
||||
|
||||
def test_effective_level_escalation_bumps_up(self):
|
||||
"""Escalation pattern should bump effective level to peak."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.record("I'm having a tough time") # LOW
|
||||
tracker.record("I feel completely hopeless") # HIGH
|
||||
tracker.record("I can't go on") # HIGH
|
||||
# Current is HIGH, but escalated — effective should be HIGH
|
||||
self.assertEqual(tracker.get_effective_level(), "HIGH")
|
||||
|
||||
|
||||
class TestPrivacy(unittest.TestCase):
|
||||
"""Test privacy requirements — no cross-session persistence."""
|
||||
|
||||
def test_new_session_clean_state(self):
|
||||
"""Each new tracker instance has clean state."""
|
||||
t1 = CrisisSessionTracker()
|
||||
t1.record("I want to die")
|
||||
self.assertEqual(t1.state.peak_level, "CRITICAL")
|
||||
|
||||
t2 = CrisisSessionTracker()
|
||||
self.assertEqual(t2.state.peak_level, "NONE")
|
||||
self.assertEqual(t2.state.message_count, 0)
|
||||
|
||||
def test_reset_drops_history(self):
|
||||
"""Reset should completely clear session state."""
|
||||
t = CrisisSessionTracker()
|
||||
t.record("I'm struggling")
|
||||
t.record("I can't go on")
|
||||
t.reset()
|
||||
self.assertEqual(t.state.level_history, [])
|
||||
self.assertEqual(t.state.message_count, 0)
|
||||
self.assertFalse(t.state.escalated)
|
||||
|
||||
|
||||
class TestEdgeCases(unittest.TestCase):
|
||||
"""Edge cases and boundary conditions."""
|
||||
|
||||
def test_single_message_none(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("Hello Timmy")
|
||||
self.assertFalse(tracker.get_escalation_flag())
|
||||
self.assertEqual(tracker.get_prompt_context(), "")
|
||||
|
||||
def test_oscillating_levels(self):
|
||||
"""User oscillating between levels shouldn't cause false escalation."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.record("I'm fine")
|
||||
tracker.record("I'm struggling")
|
||||
tracker.record("I'm fine")
|
||||
tracker.record("I'm struggling")
|
||||
# Oscillation without sustained escalation
|
||||
self.assertFalse(tracker.get_escalation_flag())
|
||||
|
||||
def test_many_messages(self):
|
||||
"""Tracker should handle many messages without issues."""
|
||||
tracker = CrisisSessionTracker()
|
||||
for i in range(100):
|
||||
tracker.record("Hello there")
|
||||
self.assertEqual(tracker.state.message_count, 100)
|
||||
|
||||
def test_empty_string(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("")
|
||||
self.assertEqual(tracker.state.message_count, 1)
|
||||
self.assertEqual(tracker.state.current_level, "NONE")
|
||||
|
||||
def test_repr(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
r = repr(tracker)
|
||||
self.assertIn("CrisisSessionTracker", r)
|
||||
self.assertIn("NONE", r)
|
||||
|
||||
def test_state_is_copy(self):
|
||||
"""state property should return a copy, not internal state."""
|
||||
tracker = CrisisSessionTracker()
|
||||
s1 = tracker.state
|
||||
tracker.record("I'm struggling")
|
||||
s2 = tracker.state
|
||||
self.assertEqual(s1.message_count, 0)
|
||||
self.assertEqual(s2.message_count, 1)
|
||||
|
||||
|
||||
class TestLevelOrder(unittest.TestCase):
|
||||
"""Test level ordering is correct."""
|
||||
|
||||
def test_level_ordering(self):
|
||||
self.assertLess(LEVEL_ORDER["NONE"], LEVEL_ORDER["LOW"])
|
||||
self.assertLess(LEVEL_ORDER["LOW"], LEVEL_ORDER["MEDIUM"])
|
||||
self.assertLess(LEVEL_ORDER["MEDIUM"], LEVEL_ORDER["HIGH"])
|
||||
self.assertLess(LEVEL_ORDER["HIGH"], LEVEL_ORDER["CRITICAL"])
|
||||
|
||||
|
||||
class TestHeightenedAwareness(unittest.TestCase):
|
||||
"""Test heightened awareness flag."""
|
||||
|
||||
def test_heightened_on_escalation(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.record("I'm unhappy")
|
||||
tracker.record("I feel hopeless")
|
||||
tracker.record("I can't go on")
|
||||
self.assertTrue(tracker.should_heighten_awareness())
|
||||
|
||||
def test_not_heightened_normal(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("Hello")
|
||||
self.assertFalse(tracker.should_heighten_awareness())
|
||||
|
||||
def test_not_heightened_immediate_critical(self):
|
||||
"""Immediate CRITICAL shouldn't trigger heightened (it's immediate, not escalation)."""
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.ESCALATION_WINDOW = 3
|
||||
tracker.record("I want to kill myself")
|
||||
self.assertFalse(tracker.should_heighten_awareness())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,155 +0,0 @@
|
||||
"""
|
||||
Image Content Screening for Self-Harm Indicators
|
||||
|
||||
Screens images sent in chat for self-harm content using vision models.
|
||||
Privacy-preserving: analyzes in-memory, discards after scoring. No image retention.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger("the-door.image_screening")
|
||||
|
||||
|
||||
class RiskLevel(str, Enum):
|
||||
SAFE = "safe"
|
||||
CONCERNING = "concerning"
|
||||
CRITICAL = "critical"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImageScreenResult:
|
||||
risk_level: RiskLevel
|
||||
confidence: float
|
||||
categories: list
|
||||
reasoning: str
|
||||
model_used: str
|
||||
|
||||
@property
|
||||
def needs_crisis_response(self):
|
||||
return self.risk_level == RiskLevel.CRITICAL
|
||||
|
||||
@property
|
||||
def needs_followup(self):
|
||||
return self.risk_level in (RiskLevel.CONCERNING, RiskLevel.CRITICAL)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"risk_level": self.risk_level.value,
|
||||
"confidence": round(self.confidence, 3),
|
||||
"categories": self.categories,
|
||||
"reasoning": self.reasoning,
|
||||
"model_used": self.model_used,
|
||||
}
|
||||
|
||||
|
||||
SCREENING_PROMPT = """Analyze this image for indicators of self-harm or crisis. Look for:
|
||||
|
||||
1. Self-harm wounds or injuries
|
||||
2. Medication/pills in concerning quantities
|
||||
3. Farewell-type imagery (empty rooms, items laid out, goodbye notes)
|
||||
4. Screenshots of crisis-related searches
|
||||
5. Images conveying hopelessness, isolation, or despair
|
||||
|
||||
Respond in JSON format ONLY:
|
||||
{"risk_level": "safe"|"concerning"|"critical", "confidence": 0.0-1.0, "categories": [], "reasoning": ""}
|
||||
|
||||
CRITICAL: imminent self-harm indicators
|
||||
CONCERNING: ambiguous but worrying
|
||||
SAFE: no indicators detected"""
|
||||
|
||||
|
||||
def _analyze_with_ollama(image_b64, model="gemma3:4b"):
|
||||
try:
|
||||
import urllib.request
|
||||
payload = json.dumps({
|
||||
"model": model,
|
||||
"messages": [{
|
||||
"role": "user",
|
||||
"content": SCREENING_PROMPT,
|
||||
"images": [image_b64],
|
||||
}],
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1},
|
||||
}).encode()
|
||||
req = urllib.request.Request(
|
||||
"http://localhost:11434/api/chat",
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
resp = urllib.request.urlopen(req, timeout=30)
|
||||
data = json.loads(resp.read())
|
||||
content = data.get("message", {}).get("content", "")
|
||||
json_start = content.find("{")
|
||||
json_end = content.rfind("}") + 1
|
||||
if json_start == -1 or json_end <= json_start:
|
||||
return None
|
||||
result = json.loads(content[json_start:json_end])
|
||||
return ImageScreenResult(
|
||||
risk_level=RiskLevel(result.get("risk_level", "safe")),
|
||||
confidence=float(result.get("confidence", 0.5)),
|
||||
categories=result.get("categories", []),
|
||||
reasoning=result.get("reasoning", ""),
|
||||
model_used=f"ollama:{model}",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Ollama vision analysis failed: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _analyze_fallback(image_bytes):
|
||||
return ImageScreenResult(
|
||||
risk_level=RiskLevel.SAFE,
|
||||
confidence=0.2,
|
||||
categories=["unanalyzed"],
|
||||
reasoning="No vision model available. Defaulting to safe with low confidence.",
|
||||
model_used="fallback:heuristic",
|
||||
)
|
||||
|
||||
|
||||
def screen_image(image_data, use_vision_model=True, model="gemma3:4b"):
|
||||
"""Screen image for self-harm indicators. Analyzes in-memory, no retention."""
|
||||
if isinstance(image_data, bytes):
|
||||
image_b64 = base64.b64encode(image_data).decode()
|
||||
else:
|
||||
image_b64 = image_data
|
||||
image_data = base64.b64decode(image_b64)
|
||||
|
||||
if use_vision_model:
|
||||
result = _analyze_with_ollama(image_b64, model)
|
||||
if result:
|
||||
logger.info(f"Image screened: {result.risk_level.value} (conf: {result.confidence:.2f})")
|
||||
if result.needs_crisis_response:
|
||||
logger.warning(f"CRITICAL image: {result.reasoning}")
|
||||
return result
|
||||
|
||||
return _analyze_fallback(image_data)
|
||||
|
||||
|
||||
def handle_chat_image(image_data):
|
||||
"""Handle image from chat. Returns action dict for gateway."""
|
||||
result = screen_image(image_data)
|
||||
action = {
|
||||
"result": result.to_dict(),
|
||||
"show_crisis_overlay": result.needs_crisis_response,
|
||||
"log_event": result.needs_followup,
|
||||
"response_text": None,
|
||||
}
|
||||
if result.risk_level == RiskLevel.CRITICAL:
|
||||
action["response_text"] = (
|
||||
"I noticed something concerning in the image you shared. "
|
||||
"If you or someone you know is in crisis, please reach out: "
|
||||
"988 Suicide and Crisis Lifeline (call or text 988). "
|
||||
"You are not alone."
|
||||
)
|
||||
elif result.risk_level == RiskLevel.CONCERNING:
|
||||
action["response_text"] = (
|
||||
"I want to check in \u2014 how are you doing? "
|
||||
"If you need to talk to someone, the 988 Lifeline is available 24/7."
|
||||
)
|
||||
return action
|
||||
101
index.html
101
index.html
@@ -964,6 +964,99 @@ Sovereignty and service always.`;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// ===== SESSION CRISIS TRACKING (#35) =====
|
||||
var sessionCrisis = {
|
||||
currentLevel: 0, // 0=NONE, 1=LOW, 2=MEDIUM, 3=HIGH, 4=CRITICAL
|
||||
peakLevel: 0,
|
||||
messageCount: 0,
|
||||
history: [], // [{level, timestamp}]
|
||||
escalationRate: 0, // levels per message
|
||||
lastEscalation: null // timestamp of last escalation
|
||||
};
|
||||
|
||||
function trackCrisis(text) {
|
||||
var level = getCrisisLevel(text);
|
||||
sessionCrisis.messageCount++;
|
||||
sessionCrisis.history.push({ level: level, time: Date.now() });
|
||||
|
||||
if (level > sessionCrisis.currentLevel) {
|
||||
sessionCrisis.lastEscalation = Date.now();
|
||||
}
|
||||
|
||||
sessionCrisis.currentLevel = level;
|
||||
sessionCrisis.peakLevel = Math.max(sessionCrisis.peakLevel, level);
|
||||
|
||||
var recent = sessionCrisis.history.slice(-5);
|
||||
if (recent.length >= 2) {
|
||||
var first = recent[0].level;
|
||||
var last = recent[recent.length - 1].level;
|
||||
sessionCrisis.escalationRate = (last - first) / recent.length;
|
||||
}
|
||||
|
||||
return getSessionContext();
|
||||
}
|
||||
|
||||
function getSessionContext() {
|
||||
var s = sessionCrisis;
|
||||
var ctx = '';
|
||||
|
||||
if (s.history.length < 2) return ctx;
|
||||
|
||||
var parts = [];
|
||||
|
||||
// Escalation Alert
|
||||
if (s.escalationRate > 0.5 && s.history.length <= 3) {
|
||||
parts.push('ESCALATION ALERT: User crisis level is rising rapidly.');
|
||||
parts.push('Heightened crisis awareness is warranted.');
|
||||
}
|
||||
|
||||
// Confirmed De-escalation
|
||||
if (s.peakLevel >= 3 && s.currentLevel <= 1 && s.messageCount >= 5) {
|
||||
parts.push('DE-ESCALATION: User appears to be calming. Maintain presence but reduce urgency.');
|
||||
parts.push('De-escalation confirmed. Continue gentle presence.');
|
||||
}
|
||||
|
||||
// Sustained Elevated Level
|
||||
if (s.currentLevel >= 2 && s.messageCount >= 3) {
|
||||
parts.push('User has been in crisis for ' + s.messageCount + ' messages.');
|
||||
parts.push('Continue crisis-aware response.');
|
||||
}
|
||||
|
||||
// Peak Mention
|
||||
if (s.peakLevel > s.currentLevel && s.peakLevel >= 3) {
|
||||
parts.push('Note: session peak was level ' + s.peakLevel + '. User is now at level ' + s.currentLevel + '.');
|
||||
parts.push('Remain attentive.');
|
||||
}
|
||||
|
||||
var levels = s.history.map(function(h) { return h.level; });
|
||||
parts.push('Crisis trajectory: ' + levels.join(' → ') + '.');
|
||||
|
||||
return parts.join(' ');
|
||||
}
|
||||
|
||||
if (sessionCrisis.peakLevel >= 3 && sessionCrisis.currentLevel <= 1 && sessionCrisis.messageCount >= 5) {
|
||||
ctx += 'DE-ESCALATION: User appears to be calming. Maintain presence but reduce urgency. ';
|
||||
}
|
||||
|
||||
if (sessionCrisis.currentLevel >= 2 && sessionCrisis.messageCount >= 3) {
|
||||
ctx += 'User has been in crisis for ' + sessionCrisis.messageCount + ' messages. ';
|
||||
}
|
||||
|
||||
var levels = sessionCrisis.history.map(function(h) { return h.level; });
|
||||
if (levels.length >= 2) {
|
||||
ctx += 'Crisis trajectory: ' + levels.join(' → ') + '. ';
|
||||
}
|
||||
|
||||
return ctx;
|
||||
}
|
||||
|
||||
function resetSessionCrisis() {
|
||||
sessionCrisis = {
|
||||
currentLevel: 0, peakLevel: 0, messageCount: 0,
|
||||
history: [], escalationRate: 0, lastEscalation: null
|
||||
};
|
||||
}
|
||||
|
||||
// ===== GET SYSTEM PROMPT (wraps with crisis context) =====
|
||||
function getSystemPrompt(userText) {
|
||||
var level = getCrisisLevel(userText);
|
||||
@@ -978,7 +1071,9 @@ Sovereignty and service always.`;
|
||||
var directive = 'DIRECTIVE: ' + profile.directive + '\n';
|
||||
var tone = 'TONE: ' + profile.tone + '\n';
|
||||
|
||||
return SYSTEM_PROMPT + divider + header + directive + tone;
|
||||
var sessionCtx = getSessionContext();
|
||||
var sessionBlock = sessionCtx ? '\nSESSION CONTEXT: ' + sessionCtx : '';
|
||||
return SYSTEM_PROMPT + divider + header + directive + tone + sessionBlock;
|
||||
}
|
||||
|
||||
|
||||
@@ -1164,6 +1259,7 @@ Sovereignty and service always.`;
|
||||
clearChatBtn.addEventListener('click', function() {
|
||||
if (confirm('Clear all chat history?')) {
|
||||
localStorage.removeItem('timmy_chat_history');
|
||||
resetSessionCrisis();
|
||||
window.location.reload();
|
||||
}
|
||||
});
|
||||
@@ -1316,6 +1412,7 @@ Sovereignty and service always.`;
|
||||
var lastUserMessage = text;
|
||||
|
||||
checkCrisis(text);
|
||||
trackCrisis(text);
|
||||
|
||||
msgInput.value = '';
|
||||
msgInput.style.height = 'auto';
|
||||
@@ -1404,6 +1501,7 @@ Sovereignty and service always.`;
|
||||
messages.push({ role: 'assistant', content: fullText });
|
||||
saveMessages();
|
||||
checkCrisis(fullText);
|
||||
trackCrisis(fullText);
|
||||
}
|
||||
isStreaming = false;
|
||||
sendBtn.disabled = msgInput.value.trim().length === 0;
|
||||
@@ -1433,6 +1531,7 @@ Sovereignty and service always.`;
|
||||
|
||||
// ===== WELCOME MESSAGE =====
|
||||
function init() {
|
||||
resetSessionCrisis();
|
||||
if (!loadMessages()) {
|
||||
var welcomeText = "Hey. I'm Timmy. I'm here if you want to talk. No judgment, no login, no tracking. Just us.";
|
||||
addMessage('assistant', welcomeText);
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
"""Tests for image content screening module."""
|
||||
|
||||
import json
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from image_screening import (
|
||||
RiskLevel,
|
||||
ImageScreenResult,
|
||||
screen_image,
|
||||
handle_chat_image,
|
||||
_analyze_fallback,
|
||||
)
|
||||
|
||||
|
||||
class TestImageScreenResult:
|
||||
def test_safe_result(self):
|
||||
result = ImageScreenResult(
|
||||
risk_level=RiskLevel.SAFE, confidence=0.95,
|
||||
categories=[], reasoning="No indicators", model_used="test"
|
||||
)
|
||||
assert not result.needs_crisis_response
|
||||
assert not result.needs_followup
|
||||
assert result.to_dict()["risk_level"] == "safe"
|
||||
|
||||
def test_critical_result(self):
|
||||
result = ImageScreenResult(
|
||||
risk_level=RiskLevel.CRITICAL, confidence=0.9,
|
||||
categories=["wounds"], reasoning="Detected", model_used="test"
|
||||
)
|
||||
assert result.needs_crisis_response
|
||||
assert result.needs_followup
|
||||
|
||||
def test_concerning_result(self):
|
||||
result = ImageScreenResult(
|
||||
risk_level=RiskLevel.CONCERNING, confidence=0.6,
|
||||
categories=["isolation"], reasoning="Ambiguous", model_used="test"
|
||||
)
|
||||
assert not result.needs_crisis_response
|
||||
assert result.needs_followup
|
||||
|
||||
|
||||
class TestScreenImage:
|
||||
def test_fallback_returns_safe(self):
|
||||
result = screen_image(b"fake_image_data", use_vision_model=False)
|
||||
assert result.risk_level == RiskLevel.SAFE
|
||||
assert result.model_used == "fallback:heuristic"
|
||||
assert result.confidence < 0.5
|
||||
|
||||
def test_base64_input(self):
|
||||
import base64
|
||||
b64 = base64.b64encode(b"fake").decode()
|
||||
result = screen_image(b64, use_vision_model=False)
|
||||
assert result.risk_level == RiskLevel.SAFE
|
||||
|
||||
|
||||
class TestHandleChatImage:
|
||||
def test_safe_image_no_overlay(self):
|
||||
action = handle_chat_image(b"safe_image")
|
||||
assert not action["show_crisis_overlay"]
|
||||
assert action["response_text"] is None
|
||||
|
||||
@patch("image_screening._analyze_with_ollama")
|
||||
def test_critical_image_shows_overlay(self, mock_ollama):
|
||||
mock_ollama.return_value = ImageScreenResult(
|
||||
risk_level=RiskLevel.CRITICAL, confidence=0.95,
|
||||
categories=["wounds"], reasoning="Self-harm detected",
|
||||
model_used="ollama:gemma3:4b"
|
||||
)
|
||||
action = handle_chat_image(b"concerning_image")
|
||||
assert action["show_crisis_overlay"]
|
||||
assert "988" in action["response_text"]
|
||||
assert action["log_event"]
|
||||
|
||||
@patch("image_screening._analyze_with_ollama")
|
||||
def test_concerning_image_followup(self, mock_ollama):
|
||||
mock_ollama.return_value = ImageScreenResult(
|
||||
risk_level=RiskLevel.CONCERNING, confidence=0.6,
|
||||
categories=["isolation"], reasoning="Empty room",
|
||||
model_used="ollama:gemma3:4b"
|
||||
)
|
||||
action = handle_chat_image(b"maybe_concerning")
|
||||
assert not action["show_crisis_overlay"]
|
||||
assert action["log_event"]
|
||||
assert "check in" in action["response_text"]
|
||||
17
tests/test_session_crisis_frontend.py
Normal file
17
tests/test_session_crisis_frontend.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def test_index_contains_session_crisis_state_and_tracking_hooks():
|
||||
html = Path('index.html').read_text()
|
||||
assert 'var sessionCrisis' in html
|
||||
assert 'function trackCrisis(text)' in html
|
||||
assert 'function getSessionContext()' in html
|
||||
assert 'function resetSessionCrisis()' in html
|
||||
assert 'trackCrisis(text);' in html
|
||||
assert 'SESSION CONTEXT:' in html
|
||||
|
||||
|
||||
def test_new_session_resets_crisis_state():
|
||||
html = Path('index.html').read_text()
|
||||
assert 'resetSessionCrisis()' in html
|
||||
assert "localStorage.removeItem('timmy_chat_history');" in html
|
||||
Reference in New Issue
Block a user