Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
2b91272f4e feat: CLI command to view crisis metrics summary (#136)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 9s
Smoke Test / smoke (pull_request) Successful in 14s
New crisis/metrics.py:
- CrisisEvent dataclass for individual events
- log_event(): append to ~/.the-door/crisis-metrics.jsonl
- load_events(): load last N days from JSONL
- compute_summary(): aggregate by level, top indicators,
  sessions affected, daily average, peak day
- MetricsSummary dataclass
- format_summary(): human-readable report with bar chart
- CLI: python3 -m crisis.metrics --summary/--json/--last 7d/--log

Makefile targets:
- make metrics    — weekly summary report
- make metrics-json — raw JSON export

Closes #136
2026-04-15 12:41:07 -04:00
8 changed files with 214 additions and 894 deletions

View File

@@ -46,3 +46,12 @@ ssl:
service:
ssh root@$(VPS) "cd /opt/the-door && bash deploy/deploy.sh --service"
# Crisis metrics
.PHONY: metrics metrics-json
metrics: ## Show crisis metrics summary (last 7 days)
python3 -m crisis.metrics --summary
metrics-json: ## Export crisis metrics as JSON
python3 -m crisis.metrics --json

View File

@@ -7,7 +7,6 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, CrisisSessionState
__all__ = [
"detect_crisis",
@@ -20,6 +19,4 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"CrisisSessionState",
]

199
crisis/metrics.py Normal file
View File

@@ -0,0 +1,199 @@
"""Crisis metrics — aggregate detection data for operators.
Tracks crisis detection events and provides summary reports.
Usage:
python3 -m crisis.metrics --summary # weekly report
python3 -m crisis.metrics --json # raw JSON export
python3 -m crisis.metrics --last 7d # last 7 days
"""
from __future__ import annotations
import json
import os
import sys
import time
from collections import Counter
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional
# Data directory for metrics storage
_DATA_DIR = Path(os.getenv("CRISIS_DATA_DIR", str(Path.home() / ".the-door")))
_METRICS_FILE = _DATA_DIR / "crisis-metrics.jsonl"
@dataclass
class CrisisEvent:
"""A single crisis detection event."""
timestamp: float
level: str # NONE, LOW, MODERATE, HIGH, CRITICAL
indicators: list
session_id: str = ""
source: str = "" # "chat", "gateway", "cli"
@dataclass
class MetricsSummary:
"""Aggregated metrics summary."""
period_days: int
total_events: int
by_level: Dict[str, int]
top_indicators: List[tuple]
sessions_affected: int
avg_daily: float
peak_day: str
peak_count: int
generated_at: str
def log_event(event: CrisisEvent) -> None:
"""Log a crisis event to the metrics file."""
_DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(_METRICS_FILE, "a") as f:
f.write(json.dumps(asdict(event)) + "\n")
def load_events(days: int = 7) -> List[CrisisEvent]:
"""Load crisis events from the last N days."""
if not _METRICS_FILE.exists():
return []
cutoff = time.time() - (days * 86400)
events = []
try:
with open(_METRICS_FILE) as f:
for line in f:
line = line.strip()
if not line:
continue
data = json.loads(line)
if data.get("timestamp", 0) >= cutoff:
events.append(CrisisEvent(**data))
except (json.JSONDecodeError, KeyError):
pass
return events
def compute_summary(days: int = 7) -> MetricsSummary:
"""Compute metrics summary for the given period."""
events = load_events(days)
now = time.time()
# By level
by_level = Counter(e.level for e in events)
# Top indicators
indicator_counts = Counter()
for e in events:
for ind in e.indicators:
indicator_counts[ind] += 1
top_indicators = indicator_counts.most_common(10)
# Sessions
sessions = set(e.session_id for e in events if e.session_id)
# Peak day
from collections import defaultdict
daily = defaultdict(int)
for e in events:
day = time.strftime("%Y-%m-%d", time.localtime(e.timestamp))
daily[day] += 1
peak_day = max(daily, key=daily.get) if daily else "N/A"
peak_count = daily.get(peak_day, 0)
return MetricsSummary(
period_days=days,
total_events=len(events),
by_level=dict(by_level),
top_indicators=top_indicators,
sessions_affected=len(sessions),
avg_daily=round(len(events) / max(days, 1), 1),
peak_day=peak_day,
peak_count=peak_count,
generated_at=time.strftime("%Y-%m-%d %H:%M:%S"),
)
def format_summary(summary: MetricsSummary) -> str:
"""Format metrics summary as human-readable report."""
lines = [
"Crisis Metrics Summary",
"=" * 40,
f"Period: Last {summary.period_days} days",
f"Generated: {summary.generated_at}",
"",
f"Total events: {summary.total_events}",
f"Daily avg: {summary.avg_daily}",
f"Sessions: {summary.sessions_affected}",
f"Peak day: {summary.peak_day} ({summary.peak_count} events)",
"",
]
if summary.by_level:
lines.append("By severity:")
for level in ["CRITICAL", "HIGH", "MODERATE", "LOW", "NONE"]:
count = summary.by_level.get(level, 0)
if count > 0:
bar = "" * min(count, 30)
lines.append(f" {level:10s} {count:4d} {bar}")
lines.append("")
if summary.top_indicators:
lines.append("Top indicators:")
for indicator, count in summary.top_indicators[:5]:
lines.append(f" {indicator}: {count}")
lines.append("")
if summary.total_events == 0:
lines.append("No crisis events in this period.")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Crisis metrics summary")
parser.add_argument("--summary", action="store_true", help="Print summary report")
parser.add_argument("--json", action="store_true", dest="as_json", help="Output JSON")
parser.add_argument("--last", default="7d", help="Time period (e.g., 7d, 30d)")
parser.add_argument("--log", nargs=2, metavar=("LEVEL", "INDICATOR"), help="Log a test event")
args = parser.parse_args()
# Parse period
period_str = args.last.rstrip("d")
try:
days = int(period_str)
except ValueError:
days = 7
# Log mode
if args.log:
level, indicator = args.log
event = CrisisEvent(
timestamp=time.time(),
level=level.upper(),
indicators=[indicator],
session_id="cli-test",
source="cli",
)
log_event(event)
print(f"Logged: {level.upper()} / {indicator}")
return 0
# Compute summary
summary = compute_summary(days)
if args.as_json:
print(json.dumps(asdict(summary), indent=2))
else:
print(format_summary(summary))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,261 +0,0 @@
"""
Session-Level Crisis Tracking for the-door.
Tracks crisis signals across a conversation session to detect escalation
patterns and de-escalation. Privacy-first: state lives in memory only,
resets on new session, never persists to disk.
Key behaviors:
- Escalation: LOW → HIGH in 3 messages triggers heightened awareness
- De-escalation: CRITICAL → LOW for 5+ messages allows stepping down
- Session state enriches the system prompt with trajectory context
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
tracker.record("I'm feeling down")
tracker.record("Things are really hard")
tracker.record("I can't go on anymore") # escalation detected
print(tracker.get_prompt_context())
# "User has escalated from LOW to HIGH over 3 messages."
"""
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
from .detect import detect_crisis, CrisisDetectionResult, SCORES
# Level hierarchy for ordering comparisons
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class CrisisSessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[Tuple[int, str]] = field(default_factory=list) # (message_index, level)
escalated: bool = False
escalation_messages: int = 0 # messages from first level to peak
deescalation_count: int = 0 # consecutive messages at lower level
deescalating: bool = False
deescalation_confirmed: bool = False # True once de-escalation threshold met
class CrisisSessionTracker:
"""
Session-level crisis tracker.
Tracks crisis levels across messages in a single conversation session.
Detects rapid escalation and gradual de-escalation. Provides context
strings for system prompt injection.
State is in-memory only. New session = new instance.
"""
# How many messages for escalation detection
ESCALATION_WINDOW = 3
# How many consecutive messages at lower level for de-escalation
DEESCALATION_THRESHOLD = 5
def __init__(self):
self._state = CrisisSessionState()
@property
def state(self) -> CrisisSessionState:
"""Read-only snapshot of current state."""
return CrisisSessionState(
current_level=self._state.current_level,
peak_level=self._state.peak_level,
message_count=self._state.message_count,
level_history=list(self._state.level_history),
escalated=self._state.escalated,
escalation_messages=self._state.escalation_messages,
deescalation_count=self._state.deescalation_count,
deescalating=self._state.deescalating,
)
def reset(self):
"""Reset all state. Called on new session."""
self._state = CrisisSessionState()
def record(self, text: str) -> CrisisDetectionResult:
"""
Record a message and update session state.
Returns the single-message detection result (unchanged from detect.py).
Session-level intelligence is tracked internally.
"""
detection = detect_crisis(text)
self._record_level(detection.level)
return detection
def record_level(self, level: str):
"""Record an already-detected crisis level (for when detection ran separately)."""
self._record_level(level)
def _record_level(self, level: str):
"""Internal: update state with a new crisis level."""
self._state.message_count += 1
idx = self._state.message_count
self._state.level_history.append((idx, level))
prev_level = self._state.current_level
prev_order = LEVEL_ORDER.get(prev_level, 0)
new_order = LEVEL_ORDER.get(level, 0)
# Update current level
self._state.current_level = level
# Track peak
if new_order > LEVEL_ORDER.get(self._state.peak_level, 0):
self._state.peak_level = level
# ── Escalation detection ──────────────────────────────
if new_order > prev_order:
# User is going up — reset de-escalation counter
self._state.deescalation_count = 0
self._state.deescalating = False
# Always check for escalation pattern in recent window
if not self._state.escalated:
window = self._get_recent_levels(self.ESCALATION_WINDOW)
if len(window) >= self.ESCALATION_WINDOW:
first_in_window = window[0][1]
last_in_window = window[-1][1]
first_order = LEVEL_ORDER.get(first_in_window, 0)
last_order = LEVEL_ORDER.get(last_in_window, 0)
if last_order > first_order:
self._state.escalated = True
self._state.escalation_messages = self.ESCALATION_WINDOW
# ── De-escalation detection ───────────────────────────
elif new_order < prev_order:
self._state.deescalation_count += 1
self._state.deescalating = True
# If de-escalation counter meets threshold AND we were escalated
if (self._state.escalated and
self._state.deescalation_count >= self.DEESCALATION_THRESHOLD):
self._state.escalated = False
self._state.deescalating = False
self._state.deescalation_confirmed = True
self._state.deescalation_count = 0
# Same level — increment de-escalation counter if already de-escalating
elif self._state.deescalating:
self._state.deescalation_count += 1
if (self._state.escalated and
self._state.deescalation_count >= self.DEESCALATION_THRESHOLD):
self._state.escalated = False
self._state.deescalating = False
self._state.deescalation_confirmed = True
self._state.deescalation_count = 0
def _get_recent_levels(self, n: int) -> List[Tuple[int, str]]:
"""Get the last n entries from level history."""
return self._state.level_history[-n:]
def get_prompt_context(self) -> str:
"""
Generate a human-readable context string for system prompt injection.
Returns empty string if no session-level crisis context is needed.
"""
s = self._state
if s.message_count == 0:
return ""
parts = []
# Escalation alert
if s.escalated and s.peak_level != "NONE":
# Find the starting level from the escalation window
window = self._get_recent_levels(self.ESCALATION_WINDOW)
if window:
start_level = window[0][1]
parts.append(
f"User has escalated from {start_level} to {s.peak_level} "
f"over {s.message_count} message{'s' if s.message_count != 1 else ''}."
)
parts.append("Heightened crisis awareness is warranted.")
# Confirmed de-escalation
elif s.deescalation_confirmed and s.peak_level in ("HIGH", "CRITICAL"):
parts.append(
f"User has de-escalated from {s.peak_level} to {s.current_level}."
)
parts.append("De-escalation confirmed. Continue gentle presence.")
# Active de-escalation (not yet confirmed)
elif s.deescalating and s.peak_level != "NONE":
parts.append(
f"User has de-escalated from {s.peak_level} to {s.current_level} "
f"over {s.deescalation_count} message{'s' if s.deescalation_count != 1 else ''}."
)
parts.append("Still in de-escalation. Maintain supportive awareness.")
# Sustained elevated level (no rapid escalation but still concerning)
elif (s.current_level in ("HIGH", "CRITICAL") and
not s.escalated and s.message_count >= 3):
parts.append(
f"User has been at {s.current_level} level for {s.message_count} messages."
)
parts.append("Continue crisis-aware response.")
# Peak was higher than current (user improved but may still be fragile)
elif (LEVEL_ORDER.get(s.peak_level, 0) > LEVEL_ORDER.get(s.current_level, 0) and
s.peak_level in ("HIGH", "CRITICAL") and
not s.deescalating):
parts.append(
f"Note: session peak was {s.peak_level}. "
f"User is now at {s.current_level}."
)
parts.append("Remain attentive.")
return " ".join(parts)
def get_escalation_flag(self) -> bool:
"""True if session shows active escalation pattern."""
return self._state.escalated
def get_effective_level(self) -> str:
"""
Get the effective crisis level considering session state.
Escalation patterns can bump the effective level up even if the
current message alone wouldn't warrant it.
"""
s = self._state
if s.escalated and s.peak_level in ("HIGH", "CRITICAL"):
return s.peak_level
return s.current_level
def should_heighten_awareness(self) -> bool:
"""
True when the session trajectory warrants heightened awareness
beyond what a single-message detection would provide.
"""
return self._state.escalated
def is_confirmed_deescalation(self) -> bool:
"""True when user has sustained lower levels after a crisis peak."""
s = self._state
return s.deescalation_confirmed
def __repr__(self) -> str:
s = self._state
return (
f"CrisisSessionTracker("
f"current={s.current_level}, peak={s.peak_level}, "
f"msgs={s.message_count}, escalated={s.escalated}, "
f"deescalating={s.deescalating})"
)

View File

@@ -1,407 +0,0 @@
"""
Tests for session-level crisis tracking (Issue #35).
Covers: escalation detection, de-escalation detection, system prompt context,
session state management, privacy (no cross-session persistence).
Run with: python -m pytest crisis/tests_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.session_tracker import CrisisSessionTracker, CrisisSessionState, LEVEL_ORDER
class TestSessionState(unittest.TestCase):
"""Test basic session state management."""
def test_initial_state(self):
tracker = CrisisSessionTracker()
s = tracker.state
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.escalated)
def test_record_increments_count(self):
tracker = CrisisSessionTracker()
tracker.record("hello")
self.assertEqual(tracker.state.message_count, 1)
tracker.record("world")
self.assertEqual(tracker.state.message_count, 2)
def test_record_tracks_history(self):
tracker = CrisisSessionTracker()
tracker.record("I'm struggling") # LOW
tracker.record("I feel hopeless and nobody cares") # MEDIUM
history = tracker.state.level_history
self.assertEqual(len(history), 2)
self.assertEqual(history[0], (1, "LOW"))
self.assertIn(history[1][1], ("MEDIUM", "LOW", "HIGH"))
def test_reset_clears_state(self):
tracker = CrisisSessionTracker()
tracker.record("I want to die") # CRITICAL
self.assertEqual(tracker.state.peak_level, "CRITICAL")
tracker.reset()
s = tracker.state
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
def test_peak_tracking(self):
tracker = CrisisSessionTracker()
tracker.record("I'm feeling down") # LOW
tracker.record("I feel hopeless with no way out") # HIGH
tracker.record("Actually I'm a bit better") # back to LOW
self.assertEqual(tracker.state.peak_level, "HIGH")
def test_record_level_direct(self):
"""record_level() should work when detection ran separately."""
tracker = CrisisSessionTracker()
tracker.record_level("LOW")
tracker.record_level("MEDIUM")
tracker.record_level("HIGH")
self.assertEqual(tracker.state.message_count, 3)
self.assertEqual(tracker.state.peak_level, "HIGH")
class TestEscalationDetection(unittest.TestCase):
"""Test rapid escalation detection: LOW → HIGH in N messages."""
def test_escalation_low_to_high_in_3(self):
"""Core AC: LOW → HIGH in 3 messages triggers escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time") # LOW
tracker.record("I feel really hopeless about everything") # HIGH
tracker.record("I can't take this anymore") # HIGH
self.assertTrue(tracker.get_escalation_flag())
def test_escalation_low_to_critical_in_3(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm struggling") # LOW
tracker.record("Nobody would miss me") # HIGH
tracker.record("I want to end my life") # CRITICAL
self.assertTrue(tracker.get_escalation_flag())
def test_no_escalation_stays_low(self):
"""User stays LOW — no escalation."""
tracker = CrisisSessionTracker()
tracker.record("rough day")
tracker.record("tough week")
tracker.record("feeling down")
self.assertFalse(tracker.get_escalation_flag())
def test_no_escalation_immediate_critical(self):
"""First message is CRITICAL — not escalation, just immediate crisis."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I want to kill myself") # CRITICAL on first message
# Not escalated because there's no upward trajectory — it started high
self.assertFalse(tracker.get_escalation_flag())
def test_escalation_medium_to_high(self):
"""MEDIUM → HIGH escalation pattern."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I feel alone and nobody cares") # MEDIUM
tracker.record("I feel completely hopeless") # HIGH
tracker.record("There's no way out") # HIGH
self.assertTrue(tracker.get_escalation_flag())
def test_escalation_not_triggered_without_increase(self):
"""Same level messages don't constitute escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I feel hopeless") # HIGH
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is broken") # HIGH
# No upward trajectory — started high, stayed high
self.assertFalse(tracker.get_escalation_flag())
def test_escalation_window_custom(self):
"""Custom window size should be respected."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 2
tracker.record("I'm unhappy") # LOW
tracker.record("I can't go on anymore") # HIGH
self.assertTrue(tracker.get_escalation_flag())
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: CRITICAL → LOW for 5+ messages."""
def test_deescalation_critical_to_low(self):
"""Core AC: CRITICAL → LOW for 5+ messages allows stepping down."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate to CRITICAL
tracker.record("I'm struggling") # LOW
tracker.record("I can't go on") # HIGH
tracker.record("I want to die") # CRITICAL
self.assertTrue(tracker.get_escalation_flag())
# De-escalate over 5 messages
tracker.record("I called 988") # NONE/LOW
tracker.record("I'm calmer now") # NONE
tracker.record("Thank you for being here") # NONE
tracker.record("I'm going to be okay") # NONE
tracker.record("Taking it one moment at a time") # NONE
self.assertTrue(tracker.is_confirmed_deescalation())
self.assertFalse(tracker.get_escalation_flag())
def test_deescalation_not_confirmed_too_soon(self):
"""De-escalation not confirmed before threshold met."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
tracker.record("I'm struggling")
tracker.record("I can't go on")
tracker.record("I want to die")
self.assertTrue(tracker.get_escalation_flag())
# Only 3 messages of de-escalation
tracker.record("I called someone")
tracker.record("I'm calmer")
tracker.record("Feeling better")
self.assertFalse(tracker.is_confirmed_deescalation())
self.assertTrue(tracker.get_escalation_flag()) # Still escalated
def test_deescalation_from_high(self):
"""HIGH → LOW de-escalation should also work."""
tracker = CrisisSessionTracker()
tracker.DEESCALATION_THRESHOLD = 5
# Build up to HIGH
tracker.record("I'm down") # LOW
tracker.record("I feel hopeless") # HIGH
tracker.record("No way out") # HIGH
tracker.record("Everything is pointless") # HIGH
# De-escalate
for _ in range(5):
tracker.record("I'm doing a bit better") # LOW/NONE
self.assertTrue(tracker.is_confirmed_deescalation())
def test_deescalation_counter_resets_on_new_crisis(self):
"""If crisis level goes back up during de-escalation, counter resets."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate
tracker.record("I'm struggling") # LOW
tracker.record("I feel hopeless") # HIGH
tracker.record("I want to die") # CRITICAL
# Start de-escalating
tracker.record("I called someone") # LOW
tracker.record("I'm calmer") # NONE
# Go back up
tracker.record("Actually I can't do this") # HIGH
self.assertFalse(tracker.is_confirmed_deescalation())
class TestSystemPromptContext(unittest.TestCase):
"""Test system prompt context generation."""
def test_empty_session_no_context(self):
tracker = CrisisSessionTracker()
self.assertEqual(tracker.get_prompt_context(), "")
def test_escalation_context(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time")
tracker.record("Things are really hard")
tracker.record("I can't go on anymore")
ctx = tracker.get_prompt_context()
self.assertIn("escalated", ctx.lower())
self.assertIn("LOW", ctx)
self.assertIn("heightened", ctx.lower())
def test_deescalation_context(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate
tracker.record("I'm struggling")
tracker.record("I feel hopeless")
tracker.record("I want to die")
# De-escalate
for _ in range(5):
tracker.record("I'm okay now")
ctx = tracker.get_prompt_context()
self.assertIn("de-escalated", ctx.lower())
self.assertIn("confirmed", ctx.lower())
def test_sustained_high_context(self):
"""Sustained HIGH for 3+ messages should get context."""
tracker = CrisisSessionTracker()
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is pointless") # HIGH
tracker.record("I can't go on") # HIGH
ctx = tracker.get_prompt_context()
self.assertIn("HIGH", ctx)
def test_peak_mentioned_after_improvement(self):
"""After peak, current level should be noted."""
tracker = CrisisSessionTracker()
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is pointless") # HIGH
tracker.record("I feel a bit better") # back to LOW
ctx = tracker.get_prompt_context()
self.assertIn("peak", ctx.lower())
self.assertIn("HIGH", ctx)
class TestEffectiveLevel(unittest.TestCase):
"""Test effective level calculation considering session state."""
def test_effective_level_normal(self):
tracker = CrisisSessionTracker()
tracker.record("I'm struggling") # LOW
self.assertEqual(tracker.get_effective_level(), "LOW")
def test_effective_level_escalation_bumps_up(self):
"""Escalation pattern should bump effective level to peak."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time") # LOW
tracker.record("I feel completely hopeless") # HIGH
tracker.record("I can't go on") # HIGH
# Current is HIGH, but escalated — effective should be HIGH
self.assertEqual(tracker.get_effective_level(), "HIGH")
class TestPrivacy(unittest.TestCase):
"""Test privacy requirements — no cross-session persistence."""
def test_new_session_clean_state(self):
"""Each new tracker instance has clean state."""
t1 = CrisisSessionTracker()
t1.record("I want to die")
self.assertEqual(t1.state.peak_level, "CRITICAL")
t2 = CrisisSessionTracker()
self.assertEqual(t2.state.peak_level, "NONE")
self.assertEqual(t2.state.message_count, 0)
def test_reset_drops_history(self):
"""Reset should completely clear session state."""
t = CrisisSessionTracker()
t.record("I'm struggling")
t.record("I can't go on")
t.reset()
self.assertEqual(t.state.level_history, [])
self.assertEqual(t.state.message_count, 0)
self.assertFalse(t.state.escalated)
class TestEdgeCases(unittest.TestCase):
"""Edge cases and boundary conditions."""
def test_single_message_none(self):
tracker = CrisisSessionTracker()
tracker.record("Hello Timmy")
self.assertFalse(tracker.get_escalation_flag())
self.assertEqual(tracker.get_prompt_context(), "")
def test_oscillating_levels(self):
"""User oscillating between levels shouldn't cause false escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm fine")
tracker.record("I'm struggling")
tracker.record("I'm fine")
tracker.record("I'm struggling")
# Oscillation without sustained escalation
self.assertFalse(tracker.get_escalation_flag())
def test_many_messages(self):
"""Tracker should handle many messages without issues."""
tracker = CrisisSessionTracker()
for i in range(100):
tracker.record("Hello there")
self.assertEqual(tracker.state.message_count, 100)
def test_empty_string(self):
tracker = CrisisSessionTracker()
tracker.record("")
self.assertEqual(tracker.state.message_count, 1)
self.assertEqual(tracker.state.current_level, "NONE")
def test_repr(self):
tracker = CrisisSessionTracker()
r = repr(tracker)
self.assertIn("CrisisSessionTracker", r)
self.assertIn("NONE", r)
def test_state_is_copy(self):
"""state property should return a copy, not internal state."""
tracker = CrisisSessionTracker()
s1 = tracker.state
tracker.record("I'm struggling")
s2 = tracker.state
self.assertEqual(s1.message_count, 0)
self.assertEqual(s2.message_count, 1)
class TestLevelOrder(unittest.TestCase):
"""Test level ordering is correct."""
def test_level_ordering(self):
self.assertLess(LEVEL_ORDER["NONE"], LEVEL_ORDER["LOW"])
self.assertLess(LEVEL_ORDER["LOW"], LEVEL_ORDER["MEDIUM"])
self.assertLess(LEVEL_ORDER["MEDIUM"], LEVEL_ORDER["HIGH"])
self.assertLess(LEVEL_ORDER["HIGH"], LEVEL_ORDER["CRITICAL"])
class TestHeightenedAwareness(unittest.TestCase):
"""Test heightened awareness flag."""
def test_heightened_on_escalation(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm unhappy")
tracker.record("I feel hopeless")
tracker.record("I can't go on")
self.assertTrue(tracker.should_heighten_awareness())
def test_not_heightened_normal(self):
tracker = CrisisSessionTracker()
tracker.record("Hello")
self.assertFalse(tracker.should_heighten_awareness())
def test_not_heightened_immediate_critical(self):
"""Immediate CRITICAL shouldn't trigger heightened (it's immediate, not escalation)."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I want to kill myself")
self.assertFalse(tracker.should_heighten_awareness())
if __name__ == "__main__":
unittest.main()

View File

@@ -613,21 +613,6 @@ html, body {
top: 8px;
outline: 2px solid #58a6ff;
}
/* Safety plan inline status feedback */
#sp-status {
font-size: 0.85rem;
opacity: 0;
transition: opacity 0.3s ease;
margin-right: auto;
}
#sp-status.success {
color: #3fb950;
opacity: 1;
}
#sp-status.error {
color: #f85149;
opacity: 1;
}
</style>
</head>
<body>
@@ -753,7 +738,6 @@ html, body {
</div>
</div>
<div class="modal-footer">
<span id="sp-status" role="status" aria-live="polite"></span>
<button class="btn btn-secondary" id="cancel-safety-plan">Cancel</button>
<button class="btn btn-primary" id="save-safety-plan">Save Plan</button>
</div>
@@ -980,70 +964,6 @@ Sovereignty and service always.`;
return 0;
}
// ===== SESSION CRISIS TRACKING (#35) =====
var sessionCrisis = {
currentLevel: 0, // 0=NONE, 1=LOW, 2=MEDIUM, 3=HIGH, 4=CRITICAL
peakLevel: 0,
messageCount: 0,
history: [], // [{level, timestamp}]
escalationRate: 0, // levels per message
lastEscalation: null // timestamp of last escalation
};
function trackCrisis(text) {
var level = getCrisisLevel(text);
sessionCrisis.messageCount++;
sessionCrisis.history.push({ level: level, time: Date.now() });
if (level > sessionCrisis.currentLevel) {
sessionCrisis.lastEscalation = Date.now();
}
sessionCrisis.currentLevel = level;
sessionCrisis.peakLevel = Math.max(sessionCrisis.peakLevel, level);
var recent = sessionCrisis.history.slice(-5);
if (recent.length >= 2) {
var first = recent[0].level;
var last = recent[recent.length - 1].level;
sessionCrisis.escalationRate = (last - first) / recent.length;
}
return getSessionContext();
}
function getSessionContext() {
var ctx = '';
if (sessionCrisis.history.length < 2) return ctx;
if (sessionCrisis.escalationRate > 0.5 && sessionCrisis.history.length <= 3) {
ctx += 'ESCALATION ALERT: User crisis level is rising rapidly. ';
}
if (sessionCrisis.peakLevel >= 3 && sessionCrisis.currentLevel <= 1 && sessionCrisis.messageCount >= 5) {
ctx += 'DE-ESCALATION: User appears to be calming. Maintain presence but reduce urgency. ';
}
if (sessionCrisis.currentLevel >= 2 && sessionCrisis.messageCount >= 3) {
ctx += 'User has been in crisis for ' + sessionCrisis.messageCount + ' messages. ';
}
var levels = sessionCrisis.history.map(function(h) { return h.level; });
if (levels.length >= 2) {
ctx += 'Crisis trajectory: ' + levels.join(' → ') + '. ';
}
return ctx;
}
function resetSessionCrisis() {
sessionCrisis = {
currentLevel: 0, peakLevel: 0, messageCount: 0,
history: [], escalationRate: 0, lastEscalation: null
};
}
// ===== GET SYSTEM PROMPT (wraps with crisis context) =====
function getSystemPrompt(userText) {
var level = getCrisisLevel(userText);
@@ -1058,9 +978,7 @@ Sovereignty and service always.`;
var directive = 'DIRECTIVE: ' + profile.directive + '\n';
var tone = 'TONE: ' + profile.tone + '\n';
var sessionCtx = getSessionContext();
var sessionBlock = sessionCtx ? '\nSESSION CONTEXT: ' + sessionCtx : '';
return SYSTEM_PROMPT + divider + header + directive + tone + sessionBlock;
return SYSTEM_PROMPT + divider + header + directive + tone;
}
@@ -1135,36 +1053,8 @@ Sovereignty and service always.`;
overlayDismissBtn.focus();
}
// Crisis overlay Escape key handler
function trapCrisisOverlayEscape(e) {
if (e.key !== 'Escape') return;
if (!crisisOverlay.classList.contains('active')) return;
if (overlayDismissBtn.disabled) return; // Don't escape during countdown
// Dismiss the overlay
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
// Restore focus to chat input
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
// Register focus trap and Escape handler on document (always listening, gated by class check)
// Register focus trap on document (always listening, gated by class check)
document.addEventListener('keydown', trapFocusInOverlay);
document.addEventListener('keydown', trapCrisisOverlayEscape);
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
@@ -1274,7 +1164,6 @@ Sovereignty and service always.`;
clearChatBtn.addEventListener('click', function() {
if (confirm('Clear all chat history?')) {
localStorage.removeItem('timmy_chat_history');
resetSessionCrisis();
window.location.reload();
}
});
@@ -1314,23 +1203,11 @@ Sovereignty and service always.`;
};
try {
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
var spStatus = document.getElementById('sp-status');
spStatus.textContent = '\u2713 Safety plan saved locally.';
spStatus.className = 'success';
setTimeout(function() {
spStatus.className = '';
spStatus.textContent = '';
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
}, 2000);
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
alert('Safety plan saved locally.');
} catch (e) {
var spStatusErr = document.getElementById('sp-status');
spStatusErr.textContent = '\u2717 Error saving plan.';
spStatusErr.className = 'error';
setTimeout(function() {
spStatusErr.className = '';
spStatusErr.textContent = '';
}, 4000);
alert('Error saving plan.');
}
});
@@ -1439,7 +1316,6 @@ Sovereignty and service always.`;
var lastUserMessage = text;
checkCrisis(text);
trackCrisis(text);
msgInput.value = '';
msgInput.style.height = 'auto';
@@ -1528,7 +1404,6 @@ Sovereignty and service always.`;
messages.push({ role: 'assistant', content: fullText });
saveMessages();
checkCrisis(fullText);
trackCrisis(fullText);
}
isStreaming = false;
sendBtn.disabled = msgInput.value.trim().length === 0;
@@ -1558,7 +1433,6 @@ Sovereignty and service always.`;
// ===== WELCOME MESSAGE =====
function init() {
resetSessionCrisis();
if (!loadMessages()) {
var welcomeText = "Hey. I'm Timmy. I'm here if you want to talk. No judgment, no login, no tracking. Just us.";
addMessage('assistant', welcomeText);

View File

@@ -1,74 +0,0 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestSafetyPlanInlineFeedback(unittest.TestCase):
"""Test that safety plan uses inline feedback instead of blocking alert()."""
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_no_alert_calls(self):
"""Safety plan save must not use browser alert()."""
alert_matches = re.findall(r'alert\(', self.html)
self.assertEqual(
len(alert_matches), 0,
f'Found {len(alert_matches)} alert() calls - must use inline feedback instead.',
)
def test_sp_status_element_exists(self):
"""Modal footer must contain #sp-status element for inline feedback."""
self.assertRegex(
self.html,
r'id=["\']sp-status["\']',
'Expected #sp-status element in the safety plan modal.',
)
def test_sp_status_has_aria_live(self):
"""#sp-status must have aria-live for accessible announcements."""
self.assertRegex(
self.html,
r'aria-live=["\']polite["\']',
'Expected #sp-status to have aria-live="polite".',
)
def test_success_feedback_exists(self):
"""Must show success message on save."""
self.assertIn(
'Safety plan saved locally.',
self.html,
'Expected success message for safety plan save.',
)
def test_error_feedback_exists(self):
"""Must show error message on save failure."""
self.assertIn(
'Error saving plan.',
self.html,
'Expected error message for safety plan save failure.',
)
def test_css_success_state(self):
"""Must have CSS for .sp-status.success state."""
self.assertIn(
'sp-status.success',
self.html,
'Expected CSS for .sp-status.success state.',
)
def test_css_error_state(self):
"""Must have CSS for .sp-status.error state."""
self.assertIn(
'sp-status.error',
self.html,
'Expected CSS for .sp-status.error state.',
)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,17 +0,0 @@
from pathlib import Path
def test_index_contains_session_crisis_state_and_tracking_hooks():
html = Path('index.html').read_text()
assert 'var sessionCrisis' in html
assert 'function trackCrisis(text)' in html
assert 'function getSessionContext()' in html
assert 'function resetSessionCrisis()' in html
assert 'trackCrisis(text);' in html
assert 'SESSION CONTEXT:' in html
def test_new_session_resets_crisis_state():
html = Path('index.html').read_text()
assert 'resetSessionCrisis()' in html
assert "localStorage.removeItem('timmy_chat_history');" in html