Compare commits

..

2 Commits

Author SHA1 Message Date
48e093fe98 fix: replace safety plan alert() with inline live feedback (closes #94, refs #73)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 15s
Smoke Test / smoke (pull_request) Successful in 16s
2026-04-14 23:45:18 -04:00
Alexander Whitestone
cd50b3d767 feat: add session-level crisis tracking and escalation (#35)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-14 22:24:09 -04:00
13 changed files with 835 additions and 986 deletions

View File

@@ -95,45 +95,7 @@ Run tests:
python -m pytest crisis/tests.py -v
```
### 7. Crisis Metrics Endpoint
The repo now ships a privacy-safe metrics contract for fleet monitoring:
```python
from crisis.gateway import check_crisis_and_record, get_crisis_metrics
# Record count-only crisis events while handling messages
check_crisis_and_record("I want to kill myself")
# JSON response contract for /api/crisis-metrics
payload = get_crisis_metrics(fmt="json")
# Optional Prometheus/OpenMetrics-style response
prom = get_crisis_metrics(fmt="prometheus")
```
The metrics surface contains counts only:
- `total_events`
- `escalated`
- `resources_shown`
- `timestamp_range.start`
- `timestamp_range.end`
No message content, indicators, or user identifiers are retained.
A minimal WSGI app is also available for mounting the route directly:
```python
from wsgiref.simple_server import make_server
from crisis.metrics import crisis_metrics_app
with make_server("127.0.0.1", 8645, crisis_metrics_app) as httpd:
httpd.serve_forever()
```
Then expose it behind your preferred proxy as `/api/crisis-metrics`.
### 8. Acceptance Criteria Checklist
### 7. Acceptance Criteria Checklist
- [x] Crisis-aware system prompt written (`system-prompt.txt`)
- [x] Frontend embeds system prompt on every API request (`index.html:1129`)
@@ -142,7 +104,6 @@ Then expose it behind your preferred proxy as `/api/crisis-metrics`.
- [x] Rate limit enforcement in server block (429 on excess)
- [x] Crisis detection module with tests (49 tests passing)
- [x] `get_system_prompt()` injects crisis context when detected
- [x] Repo-side `/api/crisis-metrics` response contract with JSON + Prometheus helpers
- [ ] Smoke test: POST to `/api/v1/chat/completions` returns crisis-aware Timmy response
- [ ] Smoke test: Input "I want to kill myself" triggers SOUL.md protocol
- [ ] Smoke test: 11th request in 1 minute returns HTTP 429

View File

@@ -6,21 +6,8 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import (
check_crisis,
check_crisis_and_record,
get_crisis_metrics,
get_system_prompt,
format_gateway_response,
)
from .metrics import (
CrisisMetricsTracker,
CrisisMetricsSnapshot,
build_metrics_http_response,
crisis_metrics_app,
format_prometheus_metrics,
)
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, CrisisSessionState
__all__ = [
"detect_crisis",
@@ -29,18 +16,10 @@ __all__ = [
"generate_response",
"CrisisResponse",
"check_crisis",
"check_crisis_and_record",
"get_crisis_metrics",
"get_system_prompt",
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisMetricsTracker",
"CrisisMetricsSnapshot",
"build_metrics_http_response",
"crisis_metrics_app",
"format_prometheus_metrics",
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
"CrisisSessionState",
]

View File

@@ -16,38 +16,25 @@ from typing import Optional
from .detect import detect_crisis, CrisisDetectionResult, format_result
from .compassion_router import router
from .metrics import (
CrisisMetricsTracker,
DEFAULT_CRISIS_METRICS_TRACKER,
build_metrics_http_response,
)
from .response import (
process_message,
generate_response,
get_system_prompt_modifier,
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
def check_crisis(
text: str,
metrics_tracker: Optional[CrisisMetricsTracker] = None,
timestamp: Optional[object] = None,
) -> dict:
def check_crisis(text: str) -> dict:
"""
Full crisis check returning structured data.
Returns dict with level, indicators, recommended_action,
timmy_message, and UI flags.
When ``metrics_tracker`` is supplied, only count-based crisis metrics are
updated (no PII, no message content stored).
"""
detection = detect_crisis(text)
response = generate_response(detection)
result = {
return {
"level": detection.level,
"score": detection.score,
"indicators": detection.indicators,
@@ -61,30 +48,6 @@ def check_crisis(
"escalate": response.escalate,
}
if metrics_tracker is not None:
metrics_tracker.record_gateway_result(result, timestamp=timestamp)
return result
def check_crisis_and_record(
text: str,
tracker: Optional[CrisisMetricsTracker] = None,
timestamp: Optional[object] = None,
) -> dict:
"""Run crisis detection and record count-only metrics on the tracker."""
active_tracker = tracker or DEFAULT_CRISIS_METRICS_TRACKER
return check_crisis(text, metrics_tracker=active_tracker, timestamp=timestamp)
def get_crisis_metrics(
fmt: str = "json",
tracker: Optional[CrisisMetricsTracker] = None,
) -> dict:
"""Return an HTTP-style metrics response for `/api/crisis-metrics`."""
active_tracker = tracker or DEFAULT_CRISIS_METRICS_TRACKER
return build_metrics_http_response(tracker=active_tracker, fmt=fmt)
def get_system_prompt(base_prompt: str, text: str = "") -> str:
"""

View File

@@ -1,239 +0,0 @@
"""
Crisis metrics tracker and endpoint helpers for the-door.
Provides a privacy-safe metrics surface for fleet monitoring. Counts only.
No message content, indicators, or user identifiers are retained.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Optional
from urllib.parse import parse_qs
UTC = timezone.utc
PROMETHEUS_CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8"
JSON_CONTENT_TYPE = "application/json"
@dataclass(frozen=True)
class CrisisMetricsSnapshot:
total_events: int = 0
escalated: int = 0
resources_shown: int = 0
start: Optional[str] = None
end: Optional[str] = None
def to_dict(self) -> dict[str, Any]:
return {
"total_events": self.total_events,
"escalated": self.escalated,
"resources_shown": self.resources_shown,
"timestamp_range": {
"start": self.start,
"end": self.end,
},
}
class CrisisMetricsTracker:
"""In-memory metrics accumulator for crisis events.
Counts only crisis-bearing events (`level != NONE`). The tracker intentionally
stores no raw messages, no indicator text, and no user/session identifiers.
"""
def __init__(self) -> None:
self.reset()
def reset(self) -> None:
self._total_events = 0
self._escalated = 0
self._resources_shown = 0
self._first_event_at: Optional[datetime] = None
self._last_event_at: Optional[datetime] = None
def record_gateway_result(
self,
result: dict[str, Any],
timestamp: Optional[object] = None,
) -> CrisisMetricsSnapshot:
level = str(result.get("level") or "NONE").upper()
if level == "NONE":
return self.snapshot()
event_time = _coerce_timestamp(timestamp)
self._total_events += 1
if bool(result.get("escalate")):
self._escalated += 1
ui = result.get("ui") or {}
if any(bool(ui.get(key)) for key in ("provide_988", "show_crisis_panel", "show_overlay")):
self._resources_shown += 1
if self._first_event_at is None or event_time < self._first_event_at:
self._first_event_at = event_time
if self._last_event_at is None or event_time > self._last_event_at:
self._last_event_at = event_time
return self.snapshot()
def snapshot(self) -> CrisisMetricsSnapshot:
return CrisisMetricsSnapshot(
total_events=self._total_events,
escalated=self._escalated,
resources_shown=self._resources_shown,
start=_format_timestamp(self._first_event_at),
end=_format_timestamp(self._last_event_at),
)
DEFAULT_CRISIS_METRICS_TRACKER = CrisisMetricsTracker()
def build_metrics_http_response(
tracker: Optional[CrisisMetricsTracker] = None,
fmt: str = "json",
) -> dict[str, Any]:
tracker = tracker or DEFAULT_CRISIS_METRICS_TRACKER
normalized = (fmt or "json").strip().lower()
snapshot = tracker.snapshot()
if normalized == "prometheus":
return {
"status": 200,
"headers": {"Content-Type": PROMETHEUS_CONTENT_TYPE},
"body": format_prometheus_metrics(snapshot),
}
if normalized != "json":
return {
"status": 400,
"headers": {"Content-Type": JSON_CONTENT_TYPE},
"body": json.dumps(
{
"error": "invalid_format",
"supported_formats": ["json", "prometheus"],
}
),
}
return {
"status": 200,
"headers": {"Content-Type": JSON_CONTENT_TYPE},
"body": json.dumps(snapshot.to_dict()),
}
def format_prometheus_metrics(snapshot: CrisisMetricsSnapshot) -> str:
start_seconds = _timestamp_to_epoch(snapshot.start)
end_seconds = _timestamp_to_epoch(snapshot.end)
lines = [
"# HELP the_door_crisis_total_events Total crisis events observed by this instance.",
"# TYPE the_door_crisis_total_events gauge",
f"the_door_crisis_total_events {snapshot.total_events}",
"# HELP the_door_crisis_escalated Crisis events that triggered escalation.",
"# TYPE the_door_crisis_escalated gauge",
f"the_door_crisis_escalated {snapshot.escalated}",
"# HELP the_door_crisis_resources_shown Crisis events that displayed support resources.",
"# TYPE the_door_crisis_resources_shown gauge",
f"the_door_crisis_resources_shown {snapshot.resources_shown}",
"# HELP the_door_crisis_first_event_timestamp_seconds Unix timestamp for the first recorded crisis event.",
"# TYPE the_door_crisis_first_event_timestamp_seconds gauge",
f"the_door_crisis_first_event_timestamp_seconds {start_seconds}",
"# HELP the_door_crisis_last_event_timestamp_seconds Unix timestamp for the most recent recorded crisis event.",
"# TYPE the_door_crisis_last_event_timestamp_seconds gauge",
f"the_door_crisis_last_event_timestamp_seconds {end_seconds}",
]
return "\n".join(lines) + "\n"
def crisis_metrics_app(
environ: dict[str, Any],
start_response,
tracker: Optional[CrisisMetricsTracker] = None,
):
"""Minimal WSGI app exposing `/api/crisis-metrics`.
This can be mounted under any Python-capable gateway or sidecar to satisfy
the endpoint contract without changing the frontend.
"""
path = environ.get("PATH_INFO", "")
method = str(environ.get("REQUEST_METHOD", "GET")).upper()
if path != "/api/crisis-metrics":
body = json.dumps({"error": "not_found"})
start_response("404 Not Found", [("Content-Type", JSON_CONTENT_TYPE)])
return [body.encode("utf-8")]
if method != "GET":
body = json.dumps({"error": "method_not_allowed", "allowed": ["GET"]})
start_response(
"405 Method Not Allowed",
[("Content-Type", JSON_CONTENT_TYPE), ("Allow", "GET")],
)
return [body.encode("utf-8")]
fmt = _negotiate_format(
environ.get("QUERY_STRING", ""),
environ.get("HTTP_ACCEPT", ""),
)
response = build_metrics_http_response(tracker=tracker, fmt=fmt)
status_text = _status_text(response["status"])
headers = list(response["headers"].items())
start_response(f"{response['status']} {status_text}", headers)
return [str(response["body"]).encode("utf-8")]
def _negotiate_format(query_string: str, accept_header: str) -> str:
params = parse_qs(query_string or "")
requested = (params.get("format", [""])[0] or "").strip().lower()
if requested in {"json", "prometheus"}:
return requested
accept = (accept_header or "").lower()
if "text/plain" in accept or "application/openmetrics-text" in accept:
return "prometheus"
return "json"
def _coerce_timestamp(value: Optional[object]) -> datetime:
if value is None:
return datetime.now(UTC)
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
if isinstance(value, str):
text = value.strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
parsed = datetime.fromisoformat(text)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
raise TypeError(f"Unsupported timestamp type: {type(value)!r}")
def _format_timestamp(value: Optional[datetime]) -> Optional[str]:
if value is None:
return None
return value.astimezone(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _timestamp_to_epoch(value: Optional[str]) -> int:
if not value:
return 0
return int(_coerce_timestamp(value).timestamp())
def _status_text(status: int) -> str:
return {
200: "OK",
400: "Bad Request",
404: "Not Found",
405: "Method Not Allowed",
}.get(status, "OK")

View File

@@ -1,259 +1,261 @@
"""
Session-level crisis tracking and escalation for the-door (P0 #35).
Session-Level Crisis Tracking for the-door.
Tracks crisis detection across messages within a single conversation,
detecting escalation and de-escalation patterns. Privacy-first: no
persistence beyond the conversation session.
Tracks crisis signals across a conversation session to detect escalation
patterns and de-escalation. Privacy-first: state lives in memory only,
resets on new session, never persists to disk.
Each message is analyzed in isolation by detect.py, but this module
maintains session state so the system can recognize patterns like:
- "I'm fine""I'm struggling""I can't go on" (rapid escalation)
- "I want to die""I'm calmer now""feeling better" (de-escalation)
Key behaviors:
- Escalation: LOW → HIGH in 3 messages triggers heightened awareness
- De-escalation: CRITICAL → LOW for 5+ messages allows stepping down
- Session state enriches the system prompt with trajectory context
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
# Feed each message's detection result
state = tracker.record(detect_crisis("I'm having a tough day"))
print(state.current_level) # "LOW"
print(state.is_escalating) # False
state = tracker.record(detect_crisis("I feel hopeless"))
print(state.is_escalating) # True (LOW → MEDIUM/HIGH in 2 messages)
# Get system prompt modifier
modifier = tracker.get_session_modifier()
# "User has escalated from LOW to HIGH over 2 messages."
# Reset for new session
tracker.reset()
tracker.record("I'm feeling down")
tracker.record("Things are really hard")
tracker.record("I can't go on anymore") # escalation detected
print(tracker.get_prompt_context())
# "User has escalated from LOW to HIGH over 3 messages."
"""
from dataclasses import dataclass, field
from typing import List, Optional
from typing import List, Optional, Tuple
from .detect import CrisisDetectionResult, SCORES
from .detect import detect_crisis, CrisisDetectionResult, SCORES
# Level ordering for comparison (higher = more severe)
# Level hierarchy for ordering comparisons
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class SessionState:
class CrisisSessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[str] = field(default_factory=list)
is_escalating: bool = False
is_deescalating: bool = False
escalation_rate: float = 0.0 # levels gained per message
consecutive_low_messages: int = 0 # for de-escalation tracking
level_history: List[Tuple[int, str]] = field(default_factory=list) # (message_index, level)
escalated: bool = False
escalation_messages: int = 0 # messages from first level to peak
deescalation_count: int = 0 # consecutive messages at lower level
deescalating: bool = False
deescalation_confirmed: bool = False # True once de-escalation threshold met
class CrisisSessionTracker:
"""
Session-level crisis state tracker.
Session-level crisis tracker.
Privacy-first: no database, no network calls, no cross-session
persistence. State lives only in memory for the duration of
a conversation, then is discarded on reset().
Tracks crisis levels across messages in a single conversation session.
Detects rapid escalation and gradual de-escalation. Provides context
strings for system prompt injection.
State is in-memory only. New session = new instance.
"""
# Thresholds (from issue #35)
ESCALATION_WINDOW = 3 # messages: LOW → HIGH in ≤3 messages = rapid escalation
DEESCALATION_WINDOW = 5 # messages: need 5+ consecutive LOW messages after CRITICAL
# How many messages for escalation detection
ESCALATION_WINDOW = 3
# How many consecutive messages at lower level for de-escalation
DEESCALATION_THRESHOLD = 5
def __init__(self):
self.reset()
def reset(self):
"""Reset all session state. Call on new conversation."""
self._current_level = "NONE"
self._peak_level = "NONE"
self._message_count = 0
self._level_history: List[str] = []
self._consecutive_low = 0
self._state = CrisisSessionState()
@property
def state(self) -> SessionState:
"""Return immutable snapshot of current session state."""
is_escalating = self._detect_escalation()
is_deescalating = self._detect_deescalation()
rate = self._compute_escalation_rate()
return SessionState(
current_level=self._current_level,
peak_level=self._peak_level,
message_count=self._message_count,
level_history=list(self._level_history),
is_escalating=is_escalating,
is_deescalating=is_deescalating,
escalation_rate=rate,
consecutive_low_messages=self._consecutive_low,
def state(self) -> CrisisSessionState:
"""Read-only snapshot of current state."""
return CrisisSessionState(
current_level=self._state.current_level,
peak_level=self._state.peak_level,
message_count=self._state.message_count,
level_history=list(self._state.level_history),
escalated=self._state.escalated,
escalation_messages=self._state.escalation_messages,
deescalation_count=self._state.deescalation_count,
deescalating=self._state.deescalating,
)
def record(self, detection: CrisisDetectionResult) -> SessionState:
def reset(self):
"""Reset all state. Called on new session."""
self._state = CrisisSessionState()
def record(self, text: str) -> CrisisDetectionResult:
"""
Record a crisis detection result for the current message.
Record a message and update session state.
Returns updated SessionState.
Returns the single-message detection result (unchanged from detect.py).
Session-level intelligence is tracked internally.
"""
level = detection.level
self._message_count += 1
self._level_history.append(level)
detection = detect_crisis(text)
self._record_level(detection.level)
return detection
# Update peak
if LEVEL_ORDER.get(level, 0) > LEVEL_ORDER.get(self._peak_level, 0):
self._peak_level = level
def record_level(self, level: str):
"""Record an already-detected crisis level (for when detection ran separately)."""
self._record_level(level)
# Track consecutive LOW/NONE messages for de-escalation
if LEVEL_ORDER.get(level, 0) <= LEVEL_ORDER["LOW"]:
self._consecutive_low += 1
else:
self._consecutive_low = 0
def _record_level(self, level: str):
"""Internal: update state with a new crisis level."""
self._state.message_count += 1
idx = self._state.message_count
self._state.level_history.append((idx, level))
self._current_level = level
return self.state
prev_level = self._state.current_level
prev_order = LEVEL_ORDER.get(prev_level, 0)
new_order = LEVEL_ORDER.get(level, 0)
def _detect_escalation(self) -> bool:
# Update current level
self._state.current_level = level
# Track peak
if new_order > LEVEL_ORDER.get(self._state.peak_level, 0):
self._state.peak_level = level
# ── Escalation detection ──────────────────────────────
if new_order > prev_order:
# User is going up — reset de-escalation counter
self._state.deescalation_count = 0
self._state.deescalating = False
# Always check for escalation pattern in recent window
if not self._state.escalated:
window = self._get_recent_levels(self.ESCALATION_WINDOW)
if len(window) >= self.ESCALATION_WINDOW:
first_in_window = window[0][1]
last_in_window = window[-1][1]
first_order = LEVEL_ORDER.get(first_in_window, 0)
last_order = LEVEL_ORDER.get(last_in_window, 0)
if last_order > first_order:
self._state.escalated = True
self._state.escalation_messages = self.ESCALATION_WINDOW
# ── De-escalation detection ───────────────────────────
elif new_order < prev_order:
self._state.deescalation_count += 1
self._state.deescalating = True
# If de-escalation counter meets threshold AND we were escalated
if (self._state.escalated and
self._state.deescalation_count >= self.DEESCALATION_THRESHOLD):
self._state.escalated = False
self._state.deescalating = False
self._state.deescalation_confirmed = True
self._state.deescalation_count = 0
# Same level — increment de-escalation counter if already de-escalating
elif self._state.deescalating:
self._state.deescalation_count += 1
if (self._state.escalated and
self._state.deescalation_count >= self.DEESCALATION_THRESHOLD):
self._state.escalated = False
self._state.deescalating = False
self._state.deescalation_confirmed = True
self._state.deescalation_count = 0
def _get_recent_levels(self, n: int) -> List[Tuple[int, str]]:
"""Get the last n entries from level history."""
return self._state.level_history[-n:]
def get_prompt_context(self) -> str:
"""
Detect rapid escalation: LOW → HIGH within ESCALATION_WINDOW messages.
Generate a human-readable context string for system prompt injection.
Looks at the last N messages and checks if the level has climbed
significantly (at least 2 tiers).
Returns empty string if no session-level crisis context is needed.
"""
if len(self._level_history) < 2:
return False
s = self._state
window = self._level_history[-self.ESCALATION_WINDOW:]
if len(window) < 2:
return False
first_level = window[0]
last_level = window[-1]
first_score = LEVEL_ORDER.get(first_level, 0)
last_score = LEVEL_ORDER.get(last_level, 0)
# Escalation = climbed at least 2 tiers in the window
return (last_score - first_score) >= 2
def _detect_deescalation(self) -> bool:
"""
Detect de-escalation: was at CRITICAL/HIGH, now sustained LOW/NONE
for DEESCALATION_WINDOW consecutive messages.
"""
if LEVEL_ORDER.get(self._peak_level, 0) < LEVEL_ORDER["HIGH"]:
return False
return self._consecutive_low >= self.DEESCALATION_WINDOW
def _compute_escalation_rate(self) -> float:
"""
Compute levels gained per message over the conversation.
Positive = escalating, negative = de-escalating, 0 = stable.
"""
if self._message_count < 2:
return 0.0
first = LEVEL_ORDER.get(self._level_history[0], 0)
current = LEVEL_ORDER.get(self._current_level, 0)
return (current - first) / (self._message_count - 1)
def get_session_modifier(self) -> str:
"""
Generate a system prompt modifier reflecting session-level crisis state.
Returns empty string if no session context is relevant.
"""
if self._message_count < 2:
if s.message_count == 0:
return ""
s = self.state
parts = []
if s.is_escalating:
return (
f"User has escalated from {self._level_history[0]} to "
f"{s.current_level} over {s.message_count} messages. "
f"Peak crisis level this session: {s.peak_level}. "
"Respond with heightened awareness. The trajectory is "
"worsening — prioritize safety and connection."
# Escalation alert
if s.escalated and s.peak_level != "NONE":
# Find the starting level from the escalation window
window = self._get_recent_levels(self.ESCALATION_WINDOW)
if window:
start_level = window[0][1]
parts.append(
f"User has escalated from {start_level} to {s.peak_level} "
f"over {s.message_count} message{'s' if s.message_count != 1 else ''}."
)
parts.append("Heightened crisis awareness is warranted.")
# Confirmed de-escalation
elif s.deescalation_confirmed and s.peak_level in ("HIGH", "CRITICAL"):
parts.append(
f"User has de-escalated from {s.peak_level} to {s.current_level}."
)
parts.append("De-escalation confirmed. Continue gentle presence.")
if s.is_deescalating:
return (
f"User previously reached {s.peak_level} crisis level "
f"but has been at {s.current_level} or below for "
f"{s.consecutive_low_messages} consecutive messages. "
"The situation appears to be stabilizing. Continue "
"supportive engagement while remaining vigilant."
# Active de-escalation (not yet confirmed)
elif s.deescalating and s.peak_level != "NONE":
parts.append(
f"User has de-escalated from {s.peak_level} to {s.current_level} "
f"over {s.deescalation_count} message{'s' if s.deescalation_count != 1 else ''}."
)
parts.append("Still in de-escalation. Maintain supportive awareness.")
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
return (
f"User previously reached {s.peak_level} crisis level "
f"this session (currently {s.current_level}). "
"Continue with care and awareness of the earlier crisis."
# Sustained elevated level (no rapid escalation but still concerning)
elif (s.current_level in ("HIGH", "CRITICAL") and
not s.escalated and s.message_count >= 3):
parts.append(
f"User has been at {s.current_level} level for {s.message_count} messages."
)
parts.append("Continue crisis-aware response.")
return ""
# Peak was higher than current (user improved but may still be fragile)
elif (LEVEL_ORDER.get(s.peak_level, 0) > LEVEL_ORDER.get(s.current_level, 0) and
s.peak_level in ("HIGH", "CRITICAL") and
not s.deescalating):
parts.append(
f"Note: session peak was {s.peak_level}. "
f"User is now at {s.current_level}."
)
parts.append("Remain attentive.")
def get_ui_hints(self) -> dict:
return " ".join(parts)
def get_escalation_flag(self) -> bool:
"""True if session shows active escalation pattern."""
return self._state.escalated
def get_effective_level(self) -> str:
"""
Return UI hints based on session state for the frontend.
Get the effective crisis level considering session state.
These are advisory — the frontend decides what to show.
Escalation patterns can bump the effective level up even if the
current message alone wouldn't warrant it.
"""
s = self.state
s = self._state
hints = {
"session_escalating": s.is_escalating,
"session_deescalating": s.is_deescalating,
"session_peak_level": s.peak_level,
"session_message_count": s.message_count,
}
if s.escalated and s.peak_level in ("HIGH", "CRITICAL"):
return s.peak_level
if s.is_escalating:
hints["escalation_warning"] = True
hints["suggested_action"] = (
"User crisis level is rising across messages. "
"Consider increasing intervention level."
)
return s.current_level
return hints
def should_heighten_awareness(self) -> bool:
"""
True when the session trajectory warrants heightened awareness
beyond what a single-message detection would provide.
"""
return self._state.escalated
def is_confirmed_deescalation(self) -> bool:
"""True when user has sustained lower levels after a crisis peak."""
s = self._state
return s.deescalation_confirmed
def check_crisis_with_session(
text: str,
tracker: CrisisSessionTracker,
) -> dict:
"""
Convenience: detect crisis and update session state in one call.
Returns combined single-message detection + session-level context.
"""
from .detect import detect_crisis
from .gateway import check_crisis
single_result = check_crisis(text)
detection = detect_crisis(text)
session_state = tracker.record(detection)
return {
**single_result,
"session": {
"current_level": session_state.current_level,
"peak_level": session_state.peak_level,
"message_count": session_state.message_count,
"is_escalating": session_state.is_escalating,
"is_deescalating": session_state.is_deescalating,
"modifier": tracker.get_session_modifier(),
"ui_hints": tracker.get_ui_hints(),
},
}
def __repr__(self) -> str:
s = self._state
return (
f"CrisisSessionTracker("
f"current={s.current_level}, peak={s.peak_level}, "
f"msgs={s.message_count}, escalated={s.escalated}, "
f"deescalating={s.deescalating})"
)

View File

@@ -0,0 +1,407 @@
"""
Tests for session-level crisis tracking (Issue #35).
Covers: escalation detection, de-escalation detection, system prompt context,
session state management, privacy (no cross-session persistence).
Run with: python -m pytest crisis/tests_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.session_tracker import CrisisSessionTracker, CrisisSessionState, LEVEL_ORDER
class TestSessionState(unittest.TestCase):
"""Test basic session state management."""
def test_initial_state(self):
tracker = CrisisSessionTracker()
s = tracker.state
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.escalated)
def test_record_increments_count(self):
tracker = CrisisSessionTracker()
tracker.record("hello")
self.assertEqual(tracker.state.message_count, 1)
tracker.record("world")
self.assertEqual(tracker.state.message_count, 2)
def test_record_tracks_history(self):
tracker = CrisisSessionTracker()
tracker.record("I'm struggling") # LOW
tracker.record("I feel hopeless and nobody cares") # MEDIUM
history = tracker.state.level_history
self.assertEqual(len(history), 2)
self.assertEqual(history[0], (1, "LOW"))
self.assertIn(history[1][1], ("MEDIUM", "LOW", "HIGH"))
def test_reset_clears_state(self):
tracker = CrisisSessionTracker()
tracker.record("I want to die") # CRITICAL
self.assertEqual(tracker.state.peak_level, "CRITICAL")
tracker.reset()
s = tracker.state
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
def test_peak_tracking(self):
tracker = CrisisSessionTracker()
tracker.record("I'm feeling down") # LOW
tracker.record("I feel hopeless with no way out") # HIGH
tracker.record("Actually I'm a bit better") # back to LOW
self.assertEqual(tracker.state.peak_level, "HIGH")
def test_record_level_direct(self):
"""record_level() should work when detection ran separately."""
tracker = CrisisSessionTracker()
tracker.record_level("LOW")
tracker.record_level("MEDIUM")
tracker.record_level("HIGH")
self.assertEqual(tracker.state.message_count, 3)
self.assertEqual(tracker.state.peak_level, "HIGH")
class TestEscalationDetection(unittest.TestCase):
"""Test rapid escalation detection: LOW → HIGH in N messages."""
def test_escalation_low_to_high_in_3(self):
"""Core AC: LOW → HIGH in 3 messages triggers escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time") # LOW
tracker.record("I feel really hopeless about everything") # HIGH
tracker.record("I can't take this anymore") # HIGH
self.assertTrue(tracker.get_escalation_flag())
def test_escalation_low_to_critical_in_3(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm struggling") # LOW
tracker.record("Nobody would miss me") # HIGH
tracker.record("I want to end my life") # CRITICAL
self.assertTrue(tracker.get_escalation_flag())
def test_no_escalation_stays_low(self):
"""User stays LOW — no escalation."""
tracker = CrisisSessionTracker()
tracker.record("rough day")
tracker.record("tough week")
tracker.record("feeling down")
self.assertFalse(tracker.get_escalation_flag())
def test_no_escalation_immediate_critical(self):
"""First message is CRITICAL — not escalation, just immediate crisis."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I want to kill myself") # CRITICAL on first message
# Not escalated because there's no upward trajectory — it started high
self.assertFalse(tracker.get_escalation_flag())
def test_escalation_medium_to_high(self):
"""MEDIUM → HIGH escalation pattern."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I feel alone and nobody cares") # MEDIUM
tracker.record("I feel completely hopeless") # HIGH
tracker.record("There's no way out") # HIGH
self.assertTrue(tracker.get_escalation_flag())
def test_escalation_not_triggered_without_increase(self):
"""Same level messages don't constitute escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I feel hopeless") # HIGH
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is broken") # HIGH
# No upward trajectory — started high, stayed high
self.assertFalse(tracker.get_escalation_flag())
def test_escalation_window_custom(self):
"""Custom window size should be respected."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 2
tracker.record("I'm unhappy") # LOW
tracker.record("I can't go on anymore") # HIGH
self.assertTrue(tracker.get_escalation_flag())
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: CRITICAL → LOW for 5+ messages."""
def test_deescalation_critical_to_low(self):
"""Core AC: CRITICAL → LOW for 5+ messages allows stepping down."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate to CRITICAL
tracker.record("I'm struggling") # LOW
tracker.record("I can't go on") # HIGH
tracker.record("I want to die") # CRITICAL
self.assertTrue(tracker.get_escalation_flag())
# De-escalate over 5 messages
tracker.record("I called 988") # NONE/LOW
tracker.record("I'm calmer now") # NONE
tracker.record("Thank you for being here") # NONE
tracker.record("I'm going to be okay") # NONE
tracker.record("Taking it one moment at a time") # NONE
self.assertTrue(tracker.is_confirmed_deescalation())
self.assertFalse(tracker.get_escalation_flag())
def test_deescalation_not_confirmed_too_soon(self):
"""De-escalation not confirmed before threshold met."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
tracker.record("I'm struggling")
tracker.record("I can't go on")
tracker.record("I want to die")
self.assertTrue(tracker.get_escalation_flag())
# Only 3 messages of de-escalation
tracker.record("I called someone")
tracker.record("I'm calmer")
tracker.record("Feeling better")
self.assertFalse(tracker.is_confirmed_deescalation())
self.assertTrue(tracker.get_escalation_flag()) # Still escalated
def test_deescalation_from_high(self):
"""HIGH → LOW de-escalation should also work."""
tracker = CrisisSessionTracker()
tracker.DEESCALATION_THRESHOLD = 5
# Build up to HIGH
tracker.record("I'm down") # LOW
tracker.record("I feel hopeless") # HIGH
tracker.record("No way out") # HIGH
tracker.record("Everything is pointless") # HIGH
# De-escalate
for _ in range(5):
tracker.record("I'm doing a bit better") # LOW/NONE
self.assertTrue(tracker.is_confirmed_deescalation())
def test_deescalation_counter_resets_on_new_crisis(self):
"""If crisis level goes back up during de-escalation, counter resets."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate
tracker.record("I'm struggling") # LOW
tracker.record("I feel hopeless") # HIGH
tracker.record("I want to die") # CRITICAL
# Start de-escalating
tracker.record("I called someone") # LOW
tracker.record("I'm calmer") # NONE
# Go back up
tracker.record("Actually I can't do this") # HIGH
self.assertFalse(tracker.is_confirmed_deescalation())
class TestSystemPromptContext(unittest.TestCase):
"""Test system prompt context generation."""
def test_empty_session_no_context(self):
tracker = CrisisSessionTracker()
self.assertEqual(tracker.get_prompt_context(), "")
def test_escalation_context(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time")
tracker.record("Things are really hard")
tracker.record("I can't go on anymore")
ctx = tracker.get_prompt_context()
self.assertIn("escalated", ctx.lower())
self.assertIn("LOW", ctx)
self.assertIn("heightened", ctx.lower())
def test_deescalation_context(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.DEESCALATION_THRESHOLD = 5
# Escalate
tracker.record("I'm struggling")
tracker.record("I feel hopeless")
tracker.record("I want to die")
# De-escalate
for _ in range(5):
tracker.record("I'm okay now")
ctx = tracker.get_prompt_context()
self.assertIn("de-escalated", ctx.lower())
self.assertIn("confirmed", ctx.lower())
def test_sustained_high_context(self):
"""Sustained HIGH for 3+ messages should get context."""
tracker = CrisisSessionTracker()
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is pointless") # HIGH
tracker.record("I can't go on") # HIGH
ctx = tracker.get_prompt_context()
self.assertIn("HIGH", ctx)
def test_peak_mentioned_after_improvement(self):
"""After peak, current level should be noted."""
tracker = CrisisSessionTracker()
tracker.record("I'm in so much pain") # HIGH
tracker.record("Everything is pointless") # HIGH
tracker.record("I feel a bit better") # back to LOW
ctx = tracker.get_prompt_context()
self.assertIn("peak", ctx.lower())
self.assertIn("HIGH", ctx)
class TestEffectiveLevel(unittest.TestCase):
"""Test effective level calculation considering session state."""
def test_effective_level_normal(self):
tracker = CrisisSessionTracker()
tracker.record("I'm struggling") # LOW
self.assertEqual(tracker.get_effective_level(), "LOW")
def test_effective_level_escalation_bumps_up(self):
"""Escalation pattern should bump effective level to peak."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm having a tough time") # LOW
tracker.record("I feel completely hopeless") # HIGH
tracker.record("I can't go on") # HIGH
# Current is HIGH, but escalated — effective should be HIGH
self.assertEqual(tracker.get_effective_level(), "HIGH")
class TestPrivacy(unittest.TestCase):
"""Test privacy requirements — no cross-session persistence."""
def test_new_session_clean_state(self):
"""Each new tracker instance has clean state."""
t1 = CrisisSessionTracker()
t1.record("I want to die")
self.assertEqual(t1.state.peak_level, "CRITICAL")
t2 = CrisisSessionTracker()
self.assertEqual(t2.state.peak_level, "NONE")
self.assertEqual(t2.state.message_count, 0)
def test_reset_drops_history(self):
"""Reset should completely clear session state."""
t = CrisisSessionTracker()
t.record("I'm struggling")
t.record("I can't go on")
t.reset()
self.assertEqual(t.state.level_history, [])
self.assertEqual(t.state.message_count, 0)
self.assertFalse(t.state.escalated)
class TestEdgeCases(unittest.TestCase):
"""Edge cases and boundary conditions."""
def test_single_message_none(self):
tracker = CrisisSessionTracker()
tracker.record("Hello Timmy")
self.assertFalse(tracker.get_escalation_flag())
self.assertEqual(tracker.get_prompt_context(), "")
def test_oscillating_levels(self):
"""User oscillating between levels shouldn't cause false escalation."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm fine")
tracker.record("I'm struggling")
tracker.record("I'm fine")
tracker.record("I'm struggling")
# Oscillation without sustained escalation
self.assertFalse(tracker.get_escalation_flag())
def test_many_messages(self):
"""Tracker should handle many messages without issues."""
tracker = CrisisSessionTracker()
for i in range(100):
tracker.record("Hello there")
self.assertEqual(tracker.state.message_count, 100)
def test_empty_string(self):
tracker = CrisisSessionTracker()
tracker.record("")
self.assertEqual(tracker.state.message_count, 1)
self.assertEqual(tracker.state.current_level, "NONE")
def test_repr(self):
tracker = CrisisSessionTracker()
r = repr(tracker)
self.assertIn("CrisisSessionTracker", r)
self.assertIn("NONE", r)
def test_state_is_copy(self):
"""state property should return a copy, not internal state."""
tracker = CrisisSessionTracker()
s1 = tracker.state
tracker.record("I'm struggling")
s2 = tracker.state
self.assertEqual(s1.message_count, 0)
self.assertEqual(s2.message_count, 1)
class TestLevelOrder(unittest.TestCase):
"""Test level ordering is correct."""
def test_level_ordering(self):
self.assertLess(LEVEL_ORDER["NONE"], LEVEL_ORDER["LOW"])
self.assertLess(LEVEL_ORDER["LOW"], LEVEL_ORDER["MEDIUM"])
self.assertLess(LEVEL_ORDER["MEDIUM"], LEVEL_ORDER["HIGH"])
self.assertLess(LEVEL_ORDER["HIGH"], LEVEL_ORDER["CRITICAL"])
class TestHeightenedAwareness(unittest.TestCase):
"""Test heightened awareness flag."""
def test_heightened_on_escalation(self):
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I'm unhappy")
tracker.record("I feel hopeless")
tracker.record("I can't go on")
self.assertTrue(tracker.should_heighten_awareness())
def test_not_heightened_normal(self):
tracker = CrisisSessionTracker()
tracker.record("Hello")
self.assertFalse(tracker.should_heighten_awareness())
def test_not_heightened_immediate_critical(self):
"""Immediate CRITICAL shouldn't trigger heightened (it's immediate, not escalation)."""
tracker = CrisisSessionTracker()
tracker.ESCALATION_WINDOW = 3
tracker.record("I want to kill myself")
self.assertFalse(tracker.should_heighten_awareness())
if __name__ == "__main__":
unittest.main()

View File

@@ -613,6 +613,21 @@ html, body {
top: 8px;
outline: 2px solid #58a6ff;
}
/* Safety plan inline status feedback */
#sp-status {
font-size: 0.85rem;
opacity: 0;
transition: opacity 0.3s ease;
margin-right: auto;
}
#sp-status.success {
color: #3fb950;
opacity: 1;
}
#sp-status.error {
color: #f85149;
opacity: 1;
}
</style>
</head>
<body>
@@ -738,6 +753,7 @@ html, body {
</div>
</div>
<div class="modal-footer">
<span id="sp-status" role="status" aria-live="polite"></span>
<button class="btn btn-secondary" id="cancel-safety-plan">Cancel</button>
<button class="btn btn-primary" id="save-safety-plan">Save Plan</button>
</div>
@@ -808,7 +824,6 @@ Sovereignty and service always.`;
var crisisPanel = document.getElementById('crisis-panel');
var crisisOverlay = document.getElementById('crisis-overlay');
var overlayDismissBtn = document.getElementById('overlay-dismiss-btn');
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
@@ -965,6 +980,70 @@ Sovereignty and service always.`;
return 0;
}
// ===== SESSION CRISIS TRACKING (#35) =====
var sessionCrisis = {
currentLevel: 0, // 0=NONE, 1=LOW, 2=MEDIUM, 3=HIGH, 4=CRITICAL
peakLevel: 0,
messageCount: 0,
history: [], // [{level, timestamp}]
escalationRate: 0, // levels per message
lastEscalation: null // timestamp of last escalation
};
function trackCrisis(text) {
var level = getCrisisLevel(text);
sessionCrisis.messageCount++;
sessionCrisis.history.push({ level: level, time: Date.now() });
if (level > sessionCrisis.currentLevel) {
sessionCrisis.lastEscalation = Date.now();
}
sessionCrisis.currentLevel = level;
sessionCrisis.peakLevel = Math.max(sessionCrisis.peakLevel, level);
var recent = sessionCrisis.history.slice(-5);
if (recent.length >= 2) {
var first = recent[0].level;
var last = recent[recent.length - 1].level;
sessionCrisis.escalationRate = (last - first) / recent.length;
}
return getSessionContext();
}
function getSessionContext() {
var ctx = '';
if (sessionCrisis.history.length < 2) return ctx;
if (sessionCrisis.escalationRate > 0.5 && sessionCrisis.history.length <= 3) {
ctx += 'ESCALATION ALERT: User crisis level is rising rapidly. ';
}
if (sessionCrisis.peakLevel >= 3 && sessionCrisis.currentLevel <= 1 && sessionCrisis.messageCount >= 5) {
ctx += 'DE-ESCALATION: User appears to be calming. Maintain presence but reduce urgency. ';
}
if (sessionCrisis.currentLevel >= 2 && sessionCrisis.messageCount >= 3) {
ctx += 'User has been in crisis for ' + sessionCrisis.messageCount + ' messages. ';
}
var levels = sessionCrisis.history.map(function(h) { return h.level; });
if (levels.length >= 2) {
ctx += 'Crisis trajectory: ' + levels.join(' → ') + '. ';
}
return ctx;
}
function resetSessionCrisis() {
sessionCrisis = {
currentLevel: 0, peakLevel: 0, messageCount: 0,
history: [], escalationRate: 0, lastEscalation: null
};
}
// ===== GET SYSTEM PROMPT (wraps with crisis context) =====
function getSystemPrompt(userText) {
var level = getCrisisLevel(userText);
@@ -979,7 +1058,9 @@ Sovereignty and service always.`;
var directive = 'DIRECTIVE: ' + profile.directive + '\n';
var tone = 'TONE: ' + profile.tone + '\n';
return SYSTEM_PROMPT + divider + header + directive + tone;
var sessionCtx = getSessionContext();
var sessionBlock = sessionCtx ? '\nSESSION CONTEXT: ' + sessionCtx : '';
return SYSTEM_PROMPT + divider + header + directive + tone + sessionBlock;
}
@@ -1051,12 +1132,39 @@ Sovereignty and service always.`;
}
}, 1000);
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
overlayDismissBtn.focus();
}
// Register focus trap on document (always listening, gated by class check)
// Crisis overlay Escape key handler
function trapCrisisOverlayEscape(e) {
if (e.key !== 'Escape') return;
if (!crisisOverlay.classList.contains('active')) return;
if (overlayDismissBtn.disabled) return; // Don't escape during countdown
// Dismiss the overlay
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
// Restore focus to chat input
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
// Register focus trap and Escape handler on document (always listening, gated by class check)
document.addEventListener('keydown', trapFocusInOverlay);
document.addEventListener('keydown', trapCrisisOverlayEscape);
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
@@ -1166,6 +1274,7 @@ Sovereignty and service always.`;
clearChatBtn.addEventListener('click', function() {
if (confirm('Clear all chat history?')) {
localStorage.removeItem('timmy_chat_history');
resetSessionCrisis();
window.location.reload();
}
});
@@ -1205,11 +1314,23 @@ Sovereignty and service always.`;
};
try {
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
alert('Safety plan saved locally.');
var spStatus = document.getElementById('sp-status');
spStatus.textContent = '\u2713 Safety plan saved locally.';
spStatus.className = 'success';
setTimeout(function() {
spStatus.className = '';
spStatus.textContent = '';
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
}, 2000);
} catch (e) {
alert('Error saving plan.');
var spStatusErr = document.getElementById('sp-status');
spStatusErr.textContent = '\u2717 Error saving plan.';
spStatusErr.className = 'error';
setTimeout(function() {
spStatusErr.className = '';
spStatusErr.textContent = '';
}, 4000);
}
});
@@ -1318,6 +1439,7 @@ Sovereignty and service always.`;
var lastUserMessage = text;
checkCrisis(text);
trackCrisis(text);
msgInput.value = '';
msgInput.style.height = 'auto';
@@ -1406,6 +1528,7 @@ Sovereignty and service always.`;
messages.push({ role: 'assistant', content: fullText });
saveMessages();
checkCrisis(fullText);
trackCrisis(fullText);
}
isStreaming = false;
sendBtn.disabled = msgInput.value.trim().length === 0;
@@ -1435,6 +1558,7 @@ Sovereignty and service always.`;
// ===== WELCOME MESSAGE =====
function init() {
resetSessionCrisis();
if (!loadMessages()) {
var welcomeText = "Hey. I'm Timmy. I'm here if you want to talk. No judgment, no login, no tracking. Just us.";
addMessage('assistant', welcomeText);

View File

@@ -1,118 +0,0 @@
"""Tests for crisis detection metrics endpoint contract (issue #97)."""
import json
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.gateway import check_crisis, check_crisis_and_record
from crisis.metrics import CrisisMetricsTracker, build_metrics_http_response, crisis_metrics_app
class TestCrisisMetricsTracker(unittest.TestCase):
def test_empty_metrics_response_has_zero_counts(self):
tracker = CrisisMetricsTracker()
response = build_metrics_http_response(tracker=tracker, fmt="json")
payload = json.loads(response["body"])
self.assertEqual(response["status"], 200)
self.assertEqual(payload["total_events"], 0)
self.assertEqual(payload["escalated"], 0)
self.assertEqual(payload["resources_shown"], 0)
self.assertEqual(payload["timestamp_range"], {"start": None, "end": None})
self.assertNotIn("timmy_message", payload)
self.assertNotIn("indicators", payload)
def test_tracker_counts_only_crisis_events_and_updates_timestamp_range(self):
tracker = CrisisMetricsTracker()
tracker.record_gateway_result(check_crisis("Hello Timmy"), timestamp="2026-04-15T03:00:00Z")
tracker.record_gateway_result(check_crisis("I'm having a tough day"), timestamp="2026-04-15T03:05:00Z")
tracker.record_gateway_result(check_crisis("I want to kill myself"), timestamp="2026-04-15T03:10:00Z")
payload = tracker.snapshot().to_dict()
self.assertEqual(payload["total_events"], 2)
self.assertEqual(payload["escalated"], 1)
self.assertEqual(payload["resources_shown"], 1)
self.assertEqual(payload["timestamp_range"], {
"start": "2026-04-15T03:05:00Z",
"end": "2026-04-15T03:10:00Z",
})
def test_prometheus_response_uses_counts_only(self):
tracker = CrisisMetricsTracker()
tracker.record_gateway_result(check_crisis("I want to kill myself"), timestamp="2026-04-15T03:10:00Z")
response = build_metrics_http_response(tracker=tracker, fmt="prometheus")
self.assertEqual(response["status"], 200)
self.assertIn("text/plain", response["headers"]["Content-Type"])
self.assertIn("the_door_crisis_total_events 1", response["body"])
self.assertIn("the_door_crisis_escalated 1", response["body"])
self.assertIn("the_door_crisis_resources_shown 1", response["body"])
self.assertNotIn("kill myself", response["body"])
self.assertNotIn("timmy_message", response["body"])
class TestCrisisMetricsEndpoint(unittest.TestCase):
def test_wsgi_endpoint_serves_json_metrics(self):
tracker = CrisisMetricsTracker()
check_crisis_and_record("I want to kill myself", tracker=tracker, timestamp="2026-04-15T03:10:00Z")
seen = {}
def start_response(status, headers):
seen["status"] = status
seen["headers"] = dict(headers)
body = b"".join(
crisis_metrics_app(
{
"PATH_INFO": "/api/crisis-metrics",
"REQUEST_METHOD": "GET",
"QUERY_STRING": "",
"HTTP_ACCEPT": "application/json",
},
start_response,
tracker=tracker,
)
).decode("utf-8")
payload = json.loads(body)
self.assertEqual(seen["status"], "200 OK")
self.assertEqual(seen["headers"]["Content-Type"], "application/json")
self.assertEqual(payload["total_events"], 1)
self.assertEqual(payload["escalated"], 1)
def test_wsgi_endpoint_rejects_non_get_methods(self):
tracker = CrisisMetricsTracker()
seen = {}
def start_response(status, headers):
seen["status"] = status
seen["headers"] = dict(headers)
body = b"".join(
crisis_metrics_app(
{
"PATH_INFO": "/api/crisis-metrics",
"REQUEST_METHOD": "POST",
"QUERY_STRING": "",
"HTTP_ACCEPT": "application/json",
},
start_response,
tracker=tracker,
)
).decode("utf-8")
self.assertEqual(seen["status"], "405 Method Not Allowed")
self.assertIn("GET", seen["headers"]["Allow"])
self.assertIn("method_not_allowed", body)
if __name__ == "__main__":
unittest.main()

View File

@@ -52,34 +52,6 @@ class TestCrisisOverlayFocusTrap(unittest.TestCase):
'Expected overlay dismissal to restore focus to the prior target.',
)
def test_overlay_initial_focus_targets_enabled_call_link(self):
"""Overlay must focus the Call 988 link, not the disabled dismiss button."""
# Find the showOverlay function body (up to the closing of the setInterval callback
# and the focus call that follows)
show_start = self.html.find('function showOverlay()')
self.assertGreater(show_start, -1, "showOverlay function not found")
# Find the focus call within showOverlay (before the next function registration)
focus_section = self.html[show_start:show_start + 2000]
self.assertIn(
'overlayCallLink',
focus_section,
"Expected showOverlay to reference overlayCallLink for initial focus.",
)
# Ensure the old buggy pattern is gone
focus_line_region = self.html[show_start + 800:show_start + 1200]
self.assertNotIn(
'overlayDismissBtn.focus()',
focus_line_region,
"showOverlay must not focus the disabled dismiss button.",
)
def test_overlay_call_link_variable_is_declared(self):
self.assertIn(
"querySelector('.overlay-call')",
self.html,
"Expected a JS reference to the .overlay-call link element.",
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,74 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestSafetyPlanInlineFeedback(unittest.TestCase):
"""Test that safety plan uses inline feedback instead of blocking alert()."""
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_no_alert_calls(self):
"""Safety plan save must not use browser alert()."""
alert_matches = re.findall(r'alert\(', self.html)
self.assertEqual(
len(alert_matches), 0,
f'Found {len(alert_matches)} alert() calls - must use inline feedback instead.',
)
def test_sp_status_element_exists(self):
"""Modal footer must contain #sp-status element for inline feedback."""
self.assertRegex(
self.html,
r'id=["\']sp-status["\']',
'Expected #sp-status element in the safety plan modal.',
)
def test_sp_status_has_aria_live(self):
"""#sp-status must have aria-live for accessible announcements."""
self.assertRegex(
self.html,
r'aria-live=["\']polite["\']',
'Expected #sp-status to have aria-live="polite".',
)
def test_success_feedback_exists(self):
"""Must show success message on save."""
self.assertIn(
'Safety plan saved locally.',
self.html,
'Expected success message for safety plan save.',
)
def test_error_feedback_exists(self):
"""Must show error message on save failure."""
self.assertIn(
'Error saving plan.',
self.html,
'Expected error message for safety plan save failure.',
)
def test_css_success_state(self):
"""Must have CSS for .sp-status.success state."""
self.assertIn(
'sp-status.success',
self.html,
'Expected CSS for .sp-status.success state.',
)
def test_css_error_state(self):
"""Must have CSS for .sp-status.error state."""
self.assertIn(
'sp-status.error',
self.html,
'Expected CSS for .sp-status.error state.',
)
if __name__ == '__main__':
unittest.main()

View File

@@ -50,22 +50,6 @@ class TestCrisisOfflinePage(unittest.TestCase):
for phrase in required_phrases:
self.assertIn(phrase, self.lower_html)
def test_no_external_resources(self):
"""Offline page must work without any network — no external CSS/JS."""
import re
html = self.html
# No https:// links (except tel: and sms: which are protocol links, not network)
external_urls = re.findall(r'href=["\']https://|src=["\']https://', html)
self.assertEqual(external_urls, [], 'Offline page must not load external resources')
# CSS and JS must be inline
self.assertIn('<style>', html, 'CSS must be inline')
self.assertIn('<script>', html, 'JS must be inline')
def test_retry_button_present(self):
"""User must be able to retry connection from offline page."""
self.assertIn('retry-connection', self.html)
self.assertIn('Retry connection', self.html)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,17 @@
from pathlib import Path
def test_index_contains_session_crisis_state_and_tracking_hooks():
html = Path('index.html').read_text()
assert 'var sessionCrisis' in html
assert 'function trackCrisis(text)' in html
assert 'function getSessionContext()' in html
assert 'function resetSessionCrisis()' in html
assert 'trackCrisis(text);' in html
assert 'SESSION CONTEXT:' in html
def test_new_session_resets_crisis_state():
html = Path('index.html').read_text()
assert 'resetSessionCrisis()' in html
assert "localStorage.removeItem('timmy_chat_history');" in html

View File

@@ -1,277 +0,0 @@
"""
Tests for crisis session tracking and escalation (P0 #35).
Covers: session_tracker.py
Run with: python -m pytest tests/test_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
from crisis.session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
class TestSessionState(unittest.TestCase):
"""Test SessionState defaults."""
def test_default_state(self):
s = SessionState()
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.is_escalating)
self.assertFalse(s.is_deescalating)
class TestSessionTracking(unittest.TestCase):
"""Test basic session state tracking."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_record_none_message(self):
state = self.tracker.record(detect_crisis("Hello Timmy"))
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.message_count, 1)
self.assertEqual(state.peak_level, "NONE")
def test_record_low_message(self):
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("Having a rough day"))
self.assertIn(state.current_level, ("LOW", "NONE"))
self.assertEqual(state.message_count, 2)
def test_record_critical_updates_peak(self):
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to kill myself"))
self.assertEqual(state.current_level, "CRITICAL")
self.assertEqual(state.peak_level, "CRITICAL")
def test_peak_preserved_after_drop(self):
"""Peak level should stay at the highest seen, even after de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
state = self.tracker.record(detect_crisis("I'm feeling a bit better"))
self.assertEqual(state.peak_level, "CRITICAL")
def test_level_history(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to die"))
self.assertEqual(len(state.level_history), 3)
self.assertEqual(state.level_history[0], "NONE")
self.assertEqual(state.level_history[2], "CRITICAL")
def test_reset_clears_state(self):
self.tracker.record(detect_crisis("I want to kill myself"))
self.tracker.reset()
state = self.tracker.state
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.peak_level, "NONE")
self.assertEqual(state.message_count, 0)
self.assertEqual(state.level_history, [])
class TestEscalationDetection(unittest.TestCase):
"""Test escalation detection: LOW → HIGH in ≤3 messages."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_escalation_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_escalating)
def test_no_escalation_stable(self):
"""Two normal messages should not trigger escalation."""
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("How are you?"))
self.assertFalse(state.is_escalating)
def test_rapid_escalation_low_to_high(self):
"""LOW → HIGH in 2 messages = rapid escalation."""
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I can't take this anymore, everything is pointless"))
# Depending on detection, this could be HIGH or CRITICAL
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_rapid_escalation_three_messages(self):
"""NONE → LOW → HIGH in 3 messages = escalation."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_escalation_rate(self):
"""Rate should be positive when escalating."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
state = self.tracker.state
self.assertGreater(state.escalation_rate, 0)
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: sustained LOW after HIGH/CRITICAL."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_deescalation_without_prior_crisis(self):
"""No de-escalation if never reached HIGH/CRITICAL."""
for _ in range(6):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_deescalating)
def test_deescalation_after_critical(self):
"""5+ consecutive LOW/NONE messages after CRITICAL = de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm doing better today"))
state = self.tracker.state
if state.peak_level == "CRITICAL":
self.assertTrue(state.is_deescalating)
def test_deescalation_after_high(self):
"""5+ consecutive LOW/NONE messages after HIGH = de-escalation."""
self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
for _ in range(5):
self.tracker.record(detect_crisis("Feeling okay"))
state = self.tracker.state
if state.peak_level == "HIGH":
self.assertTrue(state.is_deescalating)
def test_interrupted_deescalation(self):
"""De-escalation resets if a HIGH message interrupts."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(3):
self.tracker.record(detect_crisis("Doing better"))
# Interrupt with another crisis
self.tracker.record(detect_crisis("I feel hopeless again"))
self.tracker.record(detect_crisis("Feeling okay now"))
state = self.tracker.state
# Should NOT be de-escalating yet (counter reset)
self.assertFalse(state.is_deescalating)
class TestSessionModifier(unittest.TestCase):
"""Test system prompt modifier generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_modifier_for_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_no_modifier_for_stable_session(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Good morning"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_escalation_modifier(self):
"""Escalating session should produce a modifier."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_escalating:
self.assertIn("escalated", modifier.lower())
self.assertIn("NONE", modifier)
self.assertIn("CRITICAL", modifier)
def test_deescalation_modifier(self):
"""De-escalating session should mention stabilizing."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm feeling okay"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_deescalating:
self.assertIn("stabilizing", modifier.lower())
def test_prior_crisis_modifier(self):
"""Past crisis should be noted even without active escalation."""
self.tracker.record(detect_crisis("I want to die"))
self.tracker.record(detect_crisis("Feeling a bit better"))
modifier = self.tracker.get_session_modifier()
# Should note the prior CRITICAL
if modifier:
self.assertIn("CRITICAL", modifier)
class TestUIHints(unittest.TestCase):
"""Test UI hint generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_ui_hints_structure(self):
self.tracker.record(detect_crisis("Hello"))
hints = self.tracker.get_ui_hints()
self.assertIn("session_escalating", hints)
self.assertIn("session_deescalating", hints)
self.assertIn("session_peak_level", hints)
self.assertIn("session_message_count", hints)
def test_ui_hints_escalation_warning(self):
"""Escalating session should have warning hint."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
hints = self.tracker.get_ui_hints()
if hints["session_escalating"]:
self.assertTrue(hints.get("escalation_warning"))
self.assertIn("suggested_action", hints)
class TestCheckCrisisWithSession(unittest.TestCase):
"""Test the convenience function combining detection + session tracking."""
def test_returns_combined_data(self):
tracker = CrisisSessionTracker()
result = check_crisis_with_session("I want to die", tracker)
self.assertIn("level", result)
self.assertIn("session", result)
self.assertIn("current_level", result["session"])
self.assertIn("peak_level", result["session"])
self.assertIn("modifier", result["session"])
def test_session_updates_across_calls(self):
tracker = CrisisSessionTracker()
check_crisis_with_session("Hello", tracker)
result = check_crisis_with_session("I want to die", tracker)
self.assertEqual(result["session"]["message_count"], 2)
self.assertEqual(result["session"]["peak_level"], "CRITICAL")
class TestPrivacy(unittest.TestCase):
"""Verify privacy-first design principles."""
def test_no_persistence_mechanism(self):
"""Session tracker should have no database, file, or network calls."""
import inspect
source = inspect.getsource(CrisisSessionTracker)
# Should not import database, requests, or file I/O
forbidden = ["sqlite", "requests", "urllib", "open(", "httpx", "aiohttp"]
for word in forbidden:
self.assertNotIn(word, source.lower(),
f"Session tracker should not use {word} — privacy-first design")
def test_state_contained_in_memory(self):
"""All state should be instance attributes, not module-level."""
tracker = CrisisSessionTracker()
tracker.record(detect_crisis("I want to die"))
# New tracker should have clean state (no global contamination)
fresh = CrisisSessionTracker()
self.assertEqual(fresh.state.current_level, "NONE")
if __name__ == '__main__':
unittest.main()