Compare commits

..

8 Commits

Author SHA1 Message Date
371dc83728 feat: integrate the-door crisis detection with hermes-agent sessions (closes #99)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-15 15:30:16 +00:00
851eb32871 feat: integrate the-door crisis detection with hermes-agent sessions (closes #99) 2026-04-15 15:30:14 +00:00
aba8f2b87b feat: integrate the-door crisis detection with hermes-agent sessions (closes #99) 2026-04-15 15:30:12 +00:00
81da20deb6 feat: integrate the-door crisis detection with hermes-agent sessions (closes #99) 2026-04-15 15:30:10 +00:00
48f48c7f26 feat: cache offline crisis resources (refs #41) (#74)
All checks were successful
Smoke Test / smoke (push) Successful in 7s
Sanity Checks / sanity-test (pull_request) Successful in 17s
Smoke Test / smoke (pull_request) Successful in 19s
Merge PR #74 (squash)
2026-04-14 22:09:59 +00:00
da31288525 fix: deprecate dying_detection and consolidate crisis detection (#40) (#76)
All checks were successful
Smoke Test / smoke (push) Successful in 4s
Merge PR #76 (squash)
2026-04-14 22:08:29 +00:00
8efc858cd7 fix: add keyboard focus trap to crisis overlay (#80)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merge PR #80 (squash)
2026-04-14 22:08:28 +00:00
611c1c8456 fix(a11y): Safety plan modal keyboard focus trap (#65) (#81)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merge PR #81 (squash)
2026-04-14 22:08:24 +00:00
9 changed files with 941 additions and 277 deletions

View File

@@ -7,6 +7,13 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import (
CrisisSessionTracker,
CrisisSession,
CrisisLevel,
EscalationEvent,
get_tracker,
)
__all__ = [
"detect_crisis",
@@ -19,4 +26,9 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"CrisisSession",
"CrisisLevel",
"EscalationEvent",
"get_tracker",
]

View File

@@ -51,13 +51,13 @@ HIGH_INDICATORS = [
r"\bwish\s+I\s+(?:was|were)\s+(?:dead|gone|never\s+born)\b",
r"\bdon'?t\s+matter\s+if\s+I\s+exist\b",
r"\bno\s+one\s+would\s+(?:care|miss)\b",
r"\bworld\s+would?\s+be\s+better\s+without\b",
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment)\b",
r"\bcan'?t\s+see\s+any\s+(?:point|reason)\b",
r"\bworld\s+would?\s+be\s+better\s+without\s+me\b",
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment|anguish)\b",
r"\bcan'?t\s+see\s+any\s+(?:point|reason|hope|way)\b",
r"\bescape\s+from\s*this",
r"\bjust\s+want\s+it\s+to\s+stop\b",
r"\btrapped\s+(?:in\s+(?:my|this|a\s+dark)|and\s+can'?t\s+escape)\b",
r"\bnothing\s+left\s+(?:to\s+(?:live\s+for|hope\s+for|give)|inside)\b",
r"\bdisappeared\s+forever\b",
# Contextual despair phrases (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
r"\beverything\s+is\s+hopeless\b",
@@ -68,6 +68,8 @@ HIGH_INDICATORS = [
r"\bno\s*hope\s+(?:left|remaining)\b",
r"\bno\s*way\s*out\b",
r"\bfeel(?:s|ing)?\s+trapped\b",
r"\btrapped\s+in\s+this\s+(?:situation|life|pain|darkness|hell)\b",
r"\btrapped\s+and\s+can'?t\s+escape\b",
r"\bdesperate\s+(?:for\s+)?help\b",
r"\bfeel(?:s|ing)?\s+desperate\b",
]
@@ -99,6 +101,8 @@ MEDIUM_INDICATORS = [
r"\bsinking\b",
r"\bdrowning\b",
r"\bhopeless\b",
r"\blost\s+all\s+hope\b",
r"\bno\s+tomorrow\b",
# Contextual versions (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
@@ -112,7 +116,7 @@ MEDIUM_INDICATORS = [
LOW_INDICATORS = [
r"\bunhappy\b",
r"\bdown\b",
r"\btough\s*time\b",
r"\btough\s*(?:time|day|week)\b",
r"\brough\s+(?:day|week|patch)\b",
r"\bstressed\b",
r"\bburnout\b",
@@ -122,6 +126,8 @@ LOW_INDICATORS = [
r"\btired\b",
r"\bsad\b",
r"\bupset\b",
r"\blonely\b",
r"\banxious?\b",
r"\bnot\s*(?:good|great|okay)\b",
r"\bthings\s*are\s*hard\b",
r"\bstruggling\b",

251
crisis/session_tracker.py Normal file
View File

@@ -0,0 +1,251 @@
"""
Crisis Session Tracker for the-door <-> hermes-agent integration.
Tracks crisis session state, escalation history, and provides
a bridge between the-door's web UI and hermes-agent sessions.
Each browser session gets a unique crisis_session_id that is sent
with every API request. This allows hermes-agent to:
1. Receive crisis context from the-door's client-side detection
2. Report escalations/de-escalations back to the-door's UI
3. Maintain unified escalation history across web and CLI
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
tracker.record("session-abc", level="HIGH", source="the-door", detail="keyword: 'can't go on'")
history = tracker.get_history("session-abc")
"""
import json
import os
import time
import uuid
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional
class CrisisLevel(Enum):
"""Crisis severity levels — aligned with both the-door and hermes-agent."""
NONE = "NONE"
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
@classmethod
def from_string(cls, s: str) -> "CrisisLevel":
try:
return cls[s.upper()]
except KeyError:
return cls.NONE
def __ge__(self, other: "CrisisLevel") -> bool:
order = [self.NONE, self.LOW, self.MEDIUM, self.HIGH, self.CRITICAL]
return order.index(self) >= order.index(other)
def __gt__(self, other: "CrisisLevel") -> bool:
order = [self.NONE, self.LOW, self.MEDIUM, self.HIGH, self.CRITICAL]
return order.index(self) > order.index(other)
@dataclass
class EscalationEvent:
"""A single crisis escalation or de-escalation event."""
timestamp: float
level: str
previous_level: str
source: str # "the-door" or "hermes-agent"
detail: str = ""
session_id: str = ""
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class CrisisSession:
"""State for a single crisis session."""
session_id: str
created_at: float = field(default_factory=time.time)
current_level: str = "NONE"
events: List[EscalationEvent] = field(default_factory=list)
# Per-source levels (what each source last reported)
door_level: str = "NONE"
hermes_level: str = "NONE"
hermes_last_update: float = 0.0
# Whether the UI has acknowledged the current level
ui_acknowledged: bool = False
def record(self, level: str, source: str, detail: str = "") -> EscalationEvent:
"""Record an escalation or de-escalation event."""
event = EscalationEvent(
timestamp=time.time(),
level=level,
previous_level=self.current_level,
source=source,
detail=detail,
session_id=self.session_id,
)
self.events.append(event)
# Update the per-source level
if source == "hermes-agent":
self.hermes_level = level
self.hermes_last_update = time.time()
else:
self.door_level = level
# Merged level = max of both sources (safety-first: higher wins)
merged = max(
CrisisLevel.from_string(self.door_level),
CrisisLevel.from_string(self.hermes_level),
)
self.current_level = merged.value
self.ui_acknowledged = False
return event
def to_dict(self) -> dict:
return {
"session_id": self.session_id,
"created_at": self.created_at,
"current_level": self.current_level,
"door_level": self.door_level,
"hermes_level": self.hermes_level,
"hermes_last_update": self.hermes_last_update,
"ui_acknowledged": self.ui_acknowledged,
"event_count": len(self.events),
"events": [e.to_dict() for e in self.events[-20:]], # Last 20 events
}
class CrisisSessionTracker:
"""
Tracks crisis sessions across the-door web UI and hermes-agent.
In-memory by default. Optionally persists to JSON for crash recovery.
"""
def __init__(self, persist_path: Optional[str] = None):
self._sessions: Dict[str, CrisisSession] = {}
self._persist_path = persist_path
if persist_path and os.path.exists(persist_path):
self._load()
def get_or_create(self, session_id: str) -> CrisisSession:
"""Get existing session or create a new one."""
if session_id not in self._sessions:
self._sessions[session_id] = CrisisSession(session_id=session_id)
return self._sessions[session_id]
def record(
self,
session_id: str,
level: str,
source: str,
detail: str = "",
) -> dict:
"""
Record a crisis event. Returns the updated session state.
Args:
session_id: Browser/hermes session identifier
level: NONE, LOW, MEDIUM, HIGH, CRITICAL
source: "the-door" or "hermes-agent"
detail: Human-readable description of what triggered this
"""
session = self.get_or_create(session_id)
event = session.record(level, source, detail)
if self._persist_path:
self._save()
return {
"session": session.to_dict(),
"event": event.to_dict(),
"escalated": CrisisLevel.from_string(level) > CrisisLevel.from_string(event.previous_level),
"de_escalated": CrisisLevel.from_string(level) < CrisisLevel.from_string(event.previous_level),
}
def get_history(self, session_id: str) -> Optional[dict]:
"""Get full escalation history for a session."""
session = self._sessions.get(session_id)
if not session:
return None
return session.to_dict()
def get_unacknowledged(self, session_id: str) -> Optional[dict]:
"""Get current unacknowledged crisis state for UI display."""
session = self._sessions.get(session_id)
if not session or session.ui_acknowledged:
return None
return {
"session_id": session_id,
"current_level": session.current_level,
"hermes_level": session.hermes_level,
"event_count": len(session.events),
"latest_event": session.events[-1].to_dict() if session.events else None,
}
def acknowledge(self, session_id: str) -> bool:
"""Mark the current crisis state as acknowledged by the UI."""
session = self._sessions.get(session_id)
if not session:
return False
session.ui_acknowledged = True
return True
def cleanup(self, max_age_seconds: float = 86400) -> int:
"""Remove sessions older than max_age_seconds. Returns count removed."""
cutoff = time.time() - max_age_seconds
stale = [sid for sid, s in self._sessions.items() if s.created_at < cutoff]
for sid in stale:
del self._sessions[sid]
if stale and self._persist_path:
self._save()
return len(stale)
def _save(self):
"""Persist sessions to JSON."""
if not self._persist_path:
return
data = {sid: s.to_dict() for sid, s in self._sessions.items()}
os.makedirs(os.path.dirname(self._persist_path) or ".", exist_ok=True)
with open(self._persist_path, "w") as f:
json.dump(data, f, indent=2)
def _load(self):
"""Load sessions from JSON (best-effort)."""
try:
with open(self._persist_path) as f:
data = json.load(f)
for sid, sdata in data.items():
session = CrisisSession(
session_id=sdata["session_id"],
created_at=sdata.get("created_at", time.time()),
current_level=sdata.get("current_level", "NONE"),
door_level=sdata.get("door_level", "NONE"),
hermes_level=sdata.get("hermes_level", "NONE"),
hermes_last_update=sdata.get("hermes_last_update", 0),
ui_acknowledged=sdata.get("ui_acknowledged", False),
)
for edata in sdata.get("events", []):
session.events.append(EscalationEvent(**edata))
self._sessions[sid] = session
except (FileNotFoundError, json.JSONDecodeError, KeyError):
pass
# Module-level singleton for convenience
_tracker: Optional[CrisisSessionTracker] = None
def get_tracker(persist_path: Optional[str] = None) -> CrisisSessionTracker:
"""Get or create the module-level tracker singleton."""
global _tracker
if _tracker is None:
_tracker = CrisisSessionTracker(persist_path=persist_path)
return _tracker

View File

@@ -1,31 +1,34 @@
"""
When a Man Is Dying — Despair/Suicide Detection System
DEPRECATED — Use crisis.detect instead.
Standalone detection module that parses incoming text for
despair and suicide indicators, classifies into tiers,
and returns structured response with recommended actions.
This module is a thin wrapper around crisis.detect for backward compatibility.
All unique patterns have been merged into crisis/detect.py (see issue #40).
Tiers:
LOW — General sadness, stress, difficult times
MEDIUM — Hopelessness, isolation, worthlessness
HIGH — Active despair, mentions of death, "can't go on"
CRITICAL — Imminent risk, explicit intent, plan, method
Integration:
Designed to work with crisis/ module from PR #4.
When crisis/ is available, uses it as the detection backend.
Falls back to internal detection when crisis/ is not present.
This module will be removed in a future release.
"""
import re
import json
import hashlib
import warnings
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict
import json
import hashlib
# Re-export the canonical detection
from crisis.detect import detect_crisis, CrisisDetectionResult
# Issue deprecation warning on import
warnings.warn(
"dying_detection is deprecated. Use 'from crisis.detect import detect_crisis' instead. "
"All patterns have been consolidated into crisis/detect.py. "
"See issue #40.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class DetectionResult:
"""Backward-compatible result type matching the old dying_detection API."""
level: str
indicators: List[str] = field(default_factory=list)
recommended_action: str = ""
@@ -34,110 +37,9 @@ class DetectionResult:
session_hash: str = ""
# ── Tiered indicator patterns ───────────────────────────────────
# Each pattern is a regex matched against lowercased input.
CRITICAL_PATTERNS = [
(r"\bkill\s*(my)?self\b", "explicit self-harm intent"),
(r"\bsuicid(?:al|ed|e)\b", "suicide reference"),
(r"\bend\s*(my)?\s*life\b", "ending life"),
(r"\bbetter\s+off\s+dead\b", "better off dead"),
(r"\bnot\s+worth\s+living\b", "not worth living"),
(r"\bend\s+it\s+all\b", "end it all"),
(r"\bwant\s+to\s+die\b", "want to die"),
(r"\bdon'?t\s+want\s+to\s+live\b", "don't want to live"),
(r"\bcan'?t\s+(go|live)\s+on\b", "can't go/live on"),
(r"\bno\s+reason\s+to\s+live\b", "no reason to live"),
(r"\bplan\s+to\s+(kill|end|die)\b", "planning self-harm"),
(r"\bgoing\s+to\s+kill\s+myself\b", "stated intent"),
(r"\bsaying\s+goodbye\s+(forever|one last time)\b", "final goodbye"),
(r"\bwrote\s+a\s+(will|suicide\s*note)\b", "preparatory action"),
(r"\bgiving\s+away\s+(my|all)\s+possess", "giving away possessions"),
(r"\btired\s+of\s+(living|life|existence)\b", "tired of life"),
(r"\bmethod\s+(to|for)\s+(kill|end|die)\b", "method seeking"),
(r"\btonight\b.*\b(die|kill|end|suicid)", "immediate timeframe + intent"),
(r"\b(die|kill|end)\b.*\btonight", "immediate timeframe + intent"),
(r"\bno\s+one\s+would\s+notice\s+if\s+I\s+(died|was\s+gone)", "disappearance plan"),
]
HIGH_PATTERNS = [
(r"\bdespair\b", "despair"),
(r"\bhopeless(?:ness)?\b", "hopelessness"),
(r"\bcan'?t\s+take\s+this\s+anymore\b", "can't take it"),
(r"\bdon'?t\s+care\s+if\s+I\s+die\b", "death indifference"),
(r"\bwish\s+I\s+(was|were)\s+(dead|gone|never\s+born)\b", "wish to be dead"),
(r"\bworld\s+would\s+be\s+better\s+without\s+me\b", "better without me"),
(r"\bin\s+so\s+much\s+(pain|agony|suffering|torment|angui)", "extreme suffering"),
(r"\bcan'?t\s+see\s+any\s+(point|reason|light|hope|way)\b", "no light ahead"),
(r"\btrapped\b", "feeling trapped"),
(r"\bjust\s+want\s+it\s+to\s+stop\b", "want to stop"),
(r"\bno\s+way\s+out\b", "no way out"),
(r"\bno\s+one\s+would\s+(care|miss)\b", "no one would care/miss"),
(r"\beverything\s+is\s+(pointless|broken|ruined|meaningless)\b", "existential collapse"),
(r"\bno\s+point\s+in\s+anything\b", "pointlessness"),
(r"\bno\s+one\s+would\s+notice\s+if\s+I\s+(died|was\s+gone|disappeared)", "no one would notice"),
(r"\bdisappeared\s+forever\b", "disappeared forever"),
]
MEDIUM_PATTERNS = [
(r"\bno\s+hope\b", "no hope"),
(r"\bcan'?t\s+go\s+on\b", "can't go on"),
(r"\bcan'?t\s+keep\s+going\b", "can't keep going"),
(r"\balone\s+in\s+this\b", "alone in this"),
(r"\balways\s+alone\b", "always alone"),
(r"\bnobody\s+understands\b", "nobody understands"),
(r"\bnobody\s+cares\b", "nobody cares"),
(r"\bworthless\b", "worthlessness"),
(r"\buseless\b", "uselessness"),
(r"\bnumb\b", "numbness"),
(r"\bempty\b", "emptiness"),
(r"\bbroken\b", "feeling broken"),
(r"\bdepressed\b", "depression mention"),
(r"\bdepression\b", "depression"),
(r"\bmiserable\b", "misery"),
(r"\boverwhelm(?:ed|ing)\b", "overwhelmed"),
(r"\bcannot\s+cope\b", "cannot cope"),
(r"\b(drowning|sinking)\b", "drowning/sinking"),
(r"\bforgotten\b", "feeling forgotten"),
(r"\blost\s+all\s+hope\b", "lost all hope"),
(r"\bno\s+future\b", "no future"),
(r"\bno\s+tomorrow\b", "no tomorrow"),
]
LOW_PATTERNS = [
(r"\bunhappy\b", "unhappy"),
(r"\brough\s+(day|week|patch)\b", "rough time"),
(r"\btough\s+(time|day|week)\b", "tough time"),
(r"\bstressed\b", "stressed"),
(r"\bburnout\b", "burnout"),
(r"\bfrustrated\b", "frustrated"),
(r"\bthings\s+(are\s+)?hard\b", "things are hard"),
(r"\bnot\s+feeling\s+(great|myself|good)\b", "not feeling good"),
(r"\bstruggl", "struggling"),
(r"\bdown\b", "feeling down"),
(r"\bsad\b", "sad"),
(r"\bupset\b", "upset"),
(r"\blonely\b", "lonely"),
(r"\banxious?\b", "anxious/anxiety"),
(r"\bnot\s+okay\b", "not okay"),
]
# ── Pattern collections for easy iteration ──────────────────────
TIER_PATTERNS: Dict[str, List[tuple]] = {
"CRITICAL": CRITICAL_PATTERNS,
"HIGH": HIGH_PATTERNS,
"MEDIUM": MEDIUM_PATTERNS,
"LOW": LOW_PATTERNS,
}
def detect(text: str) -> DetectionResult:
"""
Primary detection function.
If the crisis/ module is available, delegate to it.
Otherwise, use the internal pattern engine.
Primary detection function — delegates to crisis.detect.
Args:
text: User message to analyze
@@ -145,150 +47,25 @@ def detect(text: str) -> DetectionResult:
Returns:
DetectionResult with level, indicators, recommended_action, confidence
"""
# Try to use the crisis module if available
try:
from crisis.detect import detect_crisis as _crisis_detect
result = _crisis_detect(text)
# Map to DetectionResult
return DetectionResult(
level=result.level,
indicators=_describe_indicators(result.level, result.indicators),
recommended_action=result.recommended_action,
confidence=result.score,
session_hash=_hash_session(text),
)
except ImportError:
pass
result = detect_crisis(text)
return _detect_internal(text)
# Extract raw patterns from matches
raw_patterns = [m["pattern"] for m in result.matches] if result.matches else []
def _detect_internal(text: str) -> DetectionResult:
"""Internal pattern-based detection (fallback)."""
if not text or not text.strip():
return DetectionResult(level="NONE", session_hash=_hash_session(text))
text_lower = text.lower()
all_matches: Dict[str, List[tuple]] = {
tier: [] for tier in TIER_PATTERNS
}
for tier, patterns in TIER_PATTERNS.items():
for pattern, description in patterns:
if re.search(pattern, text_lower):
all_matches[tier].append((pattern, description))
# CRITICAL patterns are the highest priority
if all_matches["CRITICAL"]:
return DetectionResult(
level="CRITICAL",
indicators=[desc for _, desc in all_matches["CRITICAL"]],
recommended_action=(
"INTERRUPT CONVERSATION IMMEDIATELY. "
"Ask 'Are you safe right now?' "
"Provide 988 Suicide & Crisis Lifeline (call or text 988). "
"Provide Crisis Text Line (text HOME to 741741). "
"Stay present. Do not disconnect. "
"Redirect to professional help. "
"This is the most sacred moment."
),
raw_matched_patterns=[pat for pat, _ in all_matches["CRITICAL"]],
confidence=1.0,
session_hash=_hash_session(text),
)
if all_matches["HIGH"]:
return DetectionResult(
level="HIGH",
indicators=[desc for _, desc in all_matches["HIGH"]],
recommended_action=(
"Show crisis panel prominently. "
"Ask about safety directly. "
"Surface 988 number and crisis resources. "
"Stay present and engaged. Do not rush to fix. "
"Monitor closely for escalation."
),
raw_matched_patterns=[pat for pat, _ in all_matches["HIGH"]],
confidence=0.75,
session_hash=_hash_session(text),
)
if all_matches["MEDIUM"]:
return DetectionResult(
level="MEDIUM",
indicators=[desc for _, desc in all_matches["MEDIUM"]],
recommended_action=(
"Increase warmth and presence. "
"Subtly surface help resources. "
"Keep conversation anchored in the present. "
"Watch for escalation indicators."
),
raw_matched_patterns=[pat for pat, _ in all_matches["MEDIUM"]],
confidence=0.5,
session_hash=_hash_session(text),
)
if all_matches["LOW"]:
return DetectionResult(
level="LOW",
indicators=[desc for _, desc in all_matches["LOW"]],
recommended_action=(
"Normal conversation with warm undertone. "
"No crisis UI elements needed. "
"Remain vigilant. Be present."
),
raw_matched_patterns=[pat for pat, _ in all_matches["LOW"]],
confidence=0.25,
session_hash=_hash_session(text),
)
return DetectionResult(level="NONE", session_hash=_hash_session(text))
def _describe_indicators(level: str, patterns: list) -> list:
"""Map raw patterns to descriptions."""
descriptions = {
"CRITICAL": [],
"HIGH": [],
"MEDIUM": [],
"LOW": [],
}
for tier, items in TIER_PATTERNS.items():
for pat, desc in items:
if pat in patterns:
descriptions[tier].append(desc)
return descriptions.get(level, [])
def _hash_session(text: str) -> str:
"""Create a session hash for this text (for tracking repeated escalations)."""
return hashlib.sha256(text.encode()).hexdigest()[:12]
return DetectionResult(
level=result.level,
indicators=result.indicators,
recommended_action=result.recommended_action,
raw_matched_patterns=raw_patterns,
confidence=result.score,
session_hash=hashlib.sha256(text.encode()).hexdigest()[:12],
)
def get_action_for_level(level: str) -> str:
"""Get the recommended action string for a given level."""
actions = {
"CRITICAL": (
"INTERRUPT CONVERSATION. Ask 'Are you safe right now?' "
"Provide 988. Provide Crisis Text Line. "
"Stay present. Do not disconnect. "
"Redirect to help."
),
"HIGH": (
"Show crisis panel. Ask about safety. "
"Surface 988. Stay engaged."
),
"MEDIUM": (
"Increase warmth. Surface resources gently. "
"Anchor in present."
),
"LOW": (
"Normal conversation with warmth. "
"Remain vigilant."
),
"NONE": "No action needed.",
}
return actions.get(level, "Unknown level.")
from crisis.detect import ACTIONS
return ACTIONS.get(level, "Unknown level.")
def as_json(result: DetectionResult, indent: int = 2) -> str:

View File

@@ -678,6 +678,9 @@ html, body {
</div>
</div>
<!-- Escalation history (crisis session tracking, issue #99) -->
<div id="escalation-history" style="display:none; padding:4px 16px; font-size:0.75rem; color:#8b949e; border-top:1px solid #21262d;"></div>
<!-- Footer -->
<footer id="footer">
<a href="/about" aria-label="About The Door">about</a>
@@ -825,6 +828,62 @@ Sovereignty and service always.`;
var isStreaming = false;
var overlayTimer = null;
var crisisPanelShown = false;
var currentCrisisLevel = 'NONE';
// ===== CRISIS SESSION TRACKING =====
// Unique session ID sent with every API request for cross-source tracking
var crisisSessionId = (function() {
var stored = null;
try { stored = sessionStorage.getItem('timmy_crisis_session_id'); } catch(e) {}
if (stored) return stored;
var id = 'door-' + Date.now().toString(36) + '-' + Math.random().toString(36).substr(2, 8);
try { sessionStorage.setItem('timmy_crisis_session_id', id); } catch(e) {}
return id;
})();
// Escalation history for this session (mirrors server-side tracker)
var escalationHistory = [];
function recordEscalation(level, source, detail) {
var event = {
timestamp: Date.now(),
level: level,
source: source || 'the-door',
detail: detail || ''
};
escalationHistory.push(event);
// Persist for session recovery
try { sessionStorage.setItem('timmy_escalation_history', JSON.stringify(escalationHistory.slice(-50))); } catch(e) {}
renderEscalationHistory();
}
// Load persisted history
try {
var saved = sessionStorage.getItem('timmy_escalation_history');
if (saved) escalationHistory = JSON.parse(saved);
} catch(e) {}
function renderEscalationHistory() {
var el = document.getElementById('escalation-history');
if (!el) return;
if (escalationHistory.length === 0) {
el.style.display = 'none';
return;
}
el.style.display = 'block';
var html = '<strong>Escalation History</strong><ul>';
var recent = escalationHistory.slice(-5).reverse();
for (var i = 0; i < recent.length; i++) {
var e = recent[i];
var time = new Date(e.timestamp).toLocaleTimeString();
var badge = e.level === 'CRITICAL' ? '🔴' : e.level === 'HIGH' ? '🟠' : e.level === 'MEDIUM' ? '🟡' : '🟢';
html += '<li>' + badge + ' <strong>' + e.level + '</strong> via ' + e.source + ' at ' + time;
if (e.detail) html += ' — ' + e.detail;
html += '</li>';
}
html += '</ul>';
el.innerHTML = html;
}
// ===== SERVICE WORKER =====
if ('serviceWorker' in navigator) {
@@ -913,13 +972,19 @@ Sovereignty and service always.`;
}
}
// Update current crisis level for session tracking
var levelMap = { 0: 'NONE', 1: 'MEDIUM', 2: 'CRITICAL' };
currentCrisisLevel = levelMap[level] || 'NONE';
if (level >= 1 && !crisisPanelShown) {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
recordEscalation(level === 2 ? 'CRITICAL' : 'MEDIUM', 'the-door', 'keyword detected');
}
if (level === 2) {
showOverlay();
recordEscalation('CRITICAL', 'the-door', 'explicit phrase — overlay shown');
}
}
@@ -983,12 +1048,60 @@ Sovereignty and service always.`;
// ===== OVERLAY =====
// Focus trap: cycle through focusable elements within the crisis overlay
function getOverlayFocusableElements() {
return crisisOverlay.querySelectorAll(
'a[href], button:not([disabled]), [tabindex]:not([tabindex="-1"])'
);
}
function trapFocusInOverlay(e) {
if (!crisisOverlay.classList.contains('active')) return;
if (e.key !== 'Tab') return;
var focusable = getOverlayFocusableElements();
if (focusable.length === 0) return;
var first = focusable[0];
var last = focusable[focusable.length - 1];
if (e.shiftKey) {
// Shift+Tab: if on first, wrap to last
if (document.activeElement === first) {
e.preventDefault();
last.focus();
}
} else {
// Tab: if on last, wrap to first
if (document.activeElement === last) {
e.preventDefault();
first.focus();
}
}
}
// Store the element that had focus before the overlay opened
var _preOverlayFocusElement = null;
function showOverlay() {
// Save current focus for restoration on dismiss
_preOverlayFocusElement = document.activeElement;
crisisOverlay.classList.add('active');
overlayDismissBtn.disabled = true;
var countdown = 10;
overlayDismissBtn.textContent = 'Continue to chat (' + countdown + 's)';
// Disable background interaction via inert attribute
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.setAttribute('inert', '');
// Also hide from assistive tech
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.setAttribute('aria-hidden', 'true');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.setAttribute('aria-hidden', 'true');
if (overlayTimer) clearInterval(overlayTimer);
overlayTimer = setInterval(function() {
countdown--;
@@ -1005,6 +1118,9 @@ Sovereignty and service always.`;
overlayDismissBtn.focus();
}
// Register focus trap on document (always listening, gated by class check)
document.addEventListener('keydown', trapFocusInOverlay);
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
crisisOverlay.classList.remove('active');
@@ -1012,7 +1128,22 @@ Sovereignty and service always.`;
clearInterval(overlayTimer);
overlayTimer = null;
}
msgInput.focus();
// Re-enable background interaction
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
// Restore focus to the element that had it before the overlay opened
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
});
@@ -1117,25 +1248,14 @@ Sovereignty and service always.`;
} catch (e) {}
}
safetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
});
}
closeSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
cancelSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
saveSafetyPlan.addEventListener('click', function() {
@@ -1149,12 +1269,101 @@ Sovereignty and service always.`;
try {
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
alert('Safety plan saved locally.');
} catch (e) {
alert('Error saving plan.');
}
});
// ===== SAFETY PLAN FOCUS TRAP (fix #65) =====
// Focusable elements inside the modal, in tab order
var _spFocusableIds = [
'close-safety-plan',
'sp-warning-signs',
'sp-coping',
'sp-distraction',
'sp-help',
'sp-environment',
'cancel-safety-plan',
'save-safety-plan'
];
var _spTriggerEl = null; // element that opened the modal
function _getSpFocusableEls() {
return _spFocusableIds
.map(function(id) { return document.getElementById(id); })
.filter(function(el) { return el && !el.disabled; });
}
function _trapSafetyPlanFocus(e) {
if (e.key !== 'Tab') return;
var els = _getSpFocusableEls();
if (!els.length) return;
var first = els[0];
var last = els[els.length - 1];
if (e.shiftKey) {
// Shift+Tab on first → wrap to last
if (document.activeElement === first) {
e.preventDefault();
last.focus();
}
} else {
// Tab on last → wrap to first
if (document.activeElement === last) {
e.preventDefault();
first.focus();
}
}
}
function _trapSafetyPlanEscape(e) {
if (e.key === 'Escape') {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
}
}
function _activateSafetyPlanFocusTrap(triggerEl) {
_spTriggerEl = triggerEl || document.activeElement;
// Focus first textarea
var firstInput = document.getElementById('sp-warning-signs');
if (firstInput) firstInput.focus();
// Add listeners
document.addEventListener('keydown', _trapSafetyPlanFocus);
document.addEventListener('keydown', _trapSafetyPlanEscape);
// Mark background inert (prevent click-through)
document.body.setAttribute('aria-hidden', 'true');
safetyPlanModal.removeAttribute('aria-hidden');
}
function _restoreSafetyPlanFocus() {
document.removeEventListener('keydown', _trapSafetyPlanFocus);
document.removeEventListener('keydown', _trapSafetyPlanEscape);
document.body.removeAttribute('aria-hidden');
if (_spTriggerEl && typeof _spTriggerEl.focus === 'function') {
_spTriggerEl.focus();
}
_spTriggerEl = null;
}
// Wire open buttons to activate focus trap
safetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(crisisSafetyPlanBtn);
});
}
// ===== TEXTAREA AUTO-RESIZE =====
msgInput.addEventListener('input', function() {
this.style.height = 'auto';
@@ -1193,7 +1402,11 @@ Sovereignty and service always.`;
fetch('/api/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
'X-Crisis-Session-ID': crisisSessionId,
'X-Crisis-Level': currentCrisisLevel || 'NONE'
},
body: JSON.stringify({
model: 'timmy',
messages: allMessages,
@@ -1203,6 +1416,23 @@ Sovereignty and service always.`;
}).then(function(response) {
clearTimeout(timeoutId);
// Check for hermes-agent crisis escalation headers
var hermesLevel = response.headers.get('X-Crisis-Escalation');
if (hermesLevel && hermesLevel !== 'NONE') {
recordEscalation(hermesLevel, 'hermes-agent', 'from response header');
// Update UI if escalated beyond current
if (hermesLevel === 'CRITICAL' && !crisisOverlay.classList.contains('active')) {
showOverlay();
} else if (hermesLevel === 'HIGH' && !crisisPanelShown) {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
}
}
var hermesAck = response.headers.get('X-Crisis-Deescalation');
if (hermesAck) {
recordEscalation(hermesAck, 'hermes-agent', 'de-escalation');
}
if (!response.ok) {
throw new Error('HTTP ' + response.status);
}
@@ -1300,6 +1530,7 @@ Sovereignty and service always.`;
if (urlParams.get('safetyplan') === 'true') {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
// Clean up URL
window.history.replaceState({}, document.title, window.location.pathname);
}

View File

@@ -0,0 +1,84 @@
<!-- Test: Safety plan modal focus trap (issue #65) -->
<!-- Open this file in a browser to manually verify focus trap behavior -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Focus Trap Test</title>
<style>
body { font-family: sans-serif; padding: 20px; }
.test { margin: 10px 0; padding: 10px; border: 1px solid #ccc; }
.pass { background: #d4edda; border-color: #28a745; }
.fail { background: #f8d7da; border-color: #dc3545; }
button { margin: 5px; padding: 8px 16px; }
</style>
</head>
<body>
<h1>Focus Trap Manual Test</h1>
<p>Open <code>index.html</code> in a browser, then run these checks:</p>
<div class="test" id="test-1">
<strong>Test 1: Tab wraps to first element</strong><br>
1. Open safety plan modal<br>
2. Tab through all elements until you reach "Save Plan"<br>
3. Press Tab again → should wrap to close button (X)
</div>
<div class="test" id="test-2">
<strong>Test 2: Shift+Tab wraps to last element</strong><br>
1. Open safety plan modal<br>
2. Focus is on "Warning signs" textarea<br>
3. Press Shift+Tab → should wrap to "Save Plan" button
</div>
<div class="test" id="test-3">
<strong>Test 3: Escape closes modal</strong><br>
1. Open safety plan modal<br>
2. Press Escape → modal closes<br>
3. Focus returns to the button that opened it
</div>
<div class="test" id="test-4">
<strong>Test 4: Background not reachable</strong><br>
1. Open safety plan modal<br>
2. Try to Tab to the chat input behind the modal<br>
3. Should NOT be able to reach it
</div>
<div class="test" id="test-5">
<strong>Test 5: Click buttons close + restore focus</strong><br>
1. Open modal via "my safety plan" button<br>
2. Click Cancel → modal closes, focus on "my safety plan" button<br>
3. Open again, click Save → same behavior<br>
4. Open again, click X → same behavior
</div>
<hr>
<h2>Automated checks (paste into DevTools console on index.html):</h2>
<pre><code>
// Test focus trap
var modal = document.getElementById('safety-plan-modal');
var openBtn = document.getElementById('safety-plan-btn');
openBtn.click();
console.assert(modal.classList.contains('active'), 'Modal should be open');
var lastEl = document.getElementById('save-safety-plan');
lastEl.focus();
var evt = new KeyboardEvent('keydown', {key: 'Tab', bubbles: true});
document.dispatchEvent(evt);
// After Tab from last, focus should wrap to first
var firstEl = document.getElementById('close-safety-plan');
console.log('Focus after wrap:', document.activeElement.id);
console.assert(document.activeElement === firstEl || document.activeElement.id === 'sp-warning-signs',
'Focus should wrap to first element');
// Test Escape
var escEvt = new KeyboardEvent('keydown', {key: 'Escape', bubbles: true});
document.dispatchEvent(escEvt);
console.assert(!modal.classList.contains('active'), 'Modal should close on Escape');
console.assert(document.activeElement === openBtn, 'Focus should return to open button');
console.log('All automated checks passed!');
</code></pre>
</body>
</html>

View File

@@ -0,0 +1,57 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestCrisisOverlayFocusTrap(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_overlay_registers_tab_key_focus_trap(self):
self.assertRegex(
self.html,
r"function\s+trapFocusInOverlay\s*\(e\)",
'Expected crisis overlay focus trap handler to exist.',
)
self.assertRegex(
self.html,
r"if\s*\(e\.key\s*!==\s*'Tab'\)\s*return;",
'Expected focus trap handler to guard on Tab key events.',
)
self.assertRegex(
self.html,
r"document\.addEventListener\('keydown',\s*trapFocusInOverlay\)",
'Expected overlay focus trap to register on document keydown.',
)
def test_overlay_disables_background_interaction(self):
self.assertRegex(
self.html,
r"mainApp\.setAttribute\('inert',\s*''\)",
'Expected overlay to set inert on the main app while active.',
)
self.assertRegex(
self.html,
r"mainApp\.removeAttribute\('inert'\)",
'Expected overlay dismissal to remove inert from the main app.',
)
def test_overlay_restores_focus_after_dismiss(self):
self.assertRegex(
self.html,
r"_preOverlayFocusElement\s*=\s*document\.activeElement",
'Expected overlay to remember the pre-overlay focus target.',
)
self.assertRegex(
self.html,
r"_preOverlayFocusElement\.focus\(\)",
'Expected overlay dismissal to restore focus to the prior target.',
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,44 @@
import importlib
import sys
import unittest
import warnings
from crisis.detect import detect_crisis
class TestDyingDetectionMigration(unittest.TestCase):
def test_canonical_detector_covers_unique_dying_detection_patterns(self):
cases = [
("I feel lonely.", "LOW"),
("I've lost all hope and see no tomorrow.", "MEDIUM"),
("What if I disappeared forever?", "HIGH"),
]
for text, expected_level in cases:
with self.subTest(text=text):
result = detect_crisis(text)
self.assertEqual(result.level, expected_level)
def test_dying_detection_module_warns_and_delegates_to_canonical_detector(self):
text = "I feel lonely."
sys.modules.pop("dying_detection", None)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", DeprecationWarning)
module = importlib.import_module("dying_detection")
self.assertTrue(
any(issubclass(w.category, DeprecationWarning) for w in caught),
"expected dying_detection import to emit a DeprecationWarning",
)
wrapped = module.detect(text)
canonical = detect_crisis(text)
self.assertEqual(wrapped.level, canonical.level)
self.assertEqual(wrapped.confidence, canonical.score)
self.assertEqual(wrapped.raw_matched_patterns, [m["pattern"] for m in canonical.matches])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,202 @@
"""
Tests for crisis session tracker — the-door <-> hermes-agent integration.
Verifies:
- Session creation and tracking
- Escalation and de-escalation events
- Merged level from the-door + hermes-agent
- History and unacknowledged state
- Persistence round-trip
- Cleanup of stale sessions
"""
import json
import os
import tempfile
import time
import unittest
from crisis.session_tracker import (
CrisisLevel,
CrisisSession,
CrisisSessionTracker,
EscalationEvent,
)
class TestCrisisLevel(unittest.TestCase):
"""CrisisLevel enum ordering and conversion."""
def test_from_string(self):
self.assertEqual(CrisisLevel.from_string("HIGH"), CrisisLevel.HIGH)
self.assertEqual(CrisisLevel.from_string("critical"), CrisisLevel.CRITICAL)
self.assertEqual(CrisisLevel.from_string("garbage"), CrisisLevel.NONE)
def test_ordering(self):
self.assertTrue(CrisisLevel.HIGH > CrisisLevel.MEDIUM)
self.assertTrue(CrisisLevel.CRITICAL >= CrisisLevel.HIGH)
self.assertTrue(CrisisLevel.LOW < CrisisLevel.HIGH)
self.assertEqual(CrisisLevel.NONE, CrisisLevel.NONE)
class TestCrisisSession(unittest.TestCase):
"""Single session state management."""
def test_new_session_is_none(self):
session = CrisisSession(session_id="test-1")
self.assertEqual(session.current_level, "NONE")
self.assertEqual(session.hermes_level, "NONE")
self.assertEqual(len(session.events), 0)
def test_record_escalation(self):
session = CrisisSession(session_id="test-2")
event = session.record("HIGH", "the-door", "keyword: despair")
self.assertEqual(event.level, "HIGH")
self.assertEqual(event.previous_level, "NONE")
self.assertEqual(event.source, "the-door")
self.assertEqual(session.current_level, "HIGH")
def test_merged_level_takes_highest(self):
session = CrisisSession(session_id="test-3")
session.record("MEDIUM", "the-door")
session.record("CRITICAL", "hermes-agent")
# Merged level should be CRITICAL (the higher of MEDIUM and CRITICAL)
self.assertEqual(session.current_level, "CRITICAL")
self.assertEqual(session.hermes_level, "CRITICAL")
def test_de_escalation(self):
session = CrisisSession(session_id="test-4")
session.record("HIGH", "the-door")
session.record("MEDIUM", "the-door")
self.assertEqual(session.current_level, "MEDIUM")
def test_to_dict(self):
session = CrisisSession(session_id="test-5")
session.record("HIGH", "the-door", "test")
d = session.to_dict()
self.assertEqual(d["session_id"], "test-5")
self.assertEqual(d["current_level"], "HIGH")
self.assertEqual(d["event_count"], 1)
self.assertEqual(len(d["events"]), 1)
class TestCrisisSessionTracker(unittest.TestCase):
"""Full tracker with multiple sessions."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_get_or_create(self):
s1 = self.tracker.get_or_create("s1")
s2 = self.tracker.get_or_create("s1")
self.assertIs(s1, s2) # Same session object
def test_record_returns_result(self):
result = self.tracker.record("s1", "HIGH", "the-door", "keyword: help")
self.assertIn("session", result)
self.assertIn("event", result)
self.assertTrue(result["escalated"])
self.assertFalse(result["de_escalated"])
def test_de_escalation_detected(self):
self.tracker.record("s1", "HIGH", "the-door")
result = self.tracker.record("s1", "LOW", "the-door", "user calmed down")
self.assertFalse(result["escalated"])
self.assertTrue(result["de_escalated"])
def test_get_history(self):
self.tracker.record("s1", "HIGH", "the-door")
self.tracker.record("s1", "CRITICAL", "hermes-agent")
history = self.tracker.get_history("s1")
self.assertEqual(history["event_count"], 2)
self.assertEqual(history["current_level"], "CRITICAL")
self.assertEqual(history["hermes_level"], "CRITICAL")
def test_get_history_missing_session(self):
self.assertIsNone(self.tracker.get_history("nonexistent"))
def test_unacknowledged_state(self):
self.tracker.record("s1", "HIGH", "the-door")
unacked = self.tracker.get_unacknowledged("s1")
self.assertIsNotNone(unacked)
self.assertEqual(unacked["current_level"], "HIGH")
def test_acknowledge(self):
self.tracker.record("s1", "HIGH", "the-door")
self.assertTrue(self.tracker.acknowledge("s1"))
self.assertIsNone(self.tracker.get_unacknowledged("s1"))
def test_acknowledge_missing(self):
self.assertFalse(self.tracker.acknowledge("nonexistent"))
def test_cleanup(self):
self.tracker.record("old-session", "LOW", "the-door")
self.tracker._sessions["old-session"].created_at = time.time() - 100000
self.tracker.record("new-session", "LOW", "the-door")
removed = self.tracker.cleanup(max_age_seconds=50000)
self.assertEqual(removed, 1)
self.assertNotIn("old-session", self.tracker._sessions)
self.assertIn("new-session", self.tracker._sessions)
class TestPersistence(unittest.TestCase):
"""JSON persistence round-trip."""
def test_save_and_load(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
path = f.name
try:
t1 = CrisisSessionTracker(persist_path=path)
t1.record("s1", "HIGH", "the-door", "test persist")
t1.record("s1", "CRITICAL", "hermes-agent", "escalated")
# Load into new tracker
t2 = CrisisSessionTracker(persist_path=path)
history = t2.get_history("s1")
self.assertIsNotNone(history)
self.assertEqual(history["current_level"], "CRITICAL")
self.assertEqual(history["event_count"], 2)
finally:
os.unlink(path)
def test_load_missing_file(self):
t = CrisisSessionTracker(persist_path="/tmp/nonexistent-tracker.json")
self.assertEqual(len(t._sessions), 0)
class TestCrossSourceIntegration(unittest.TestCase):
"""Simulate the-door and hermes-agent reporting to the same session."""
def test_unified_escalation_history(self):
tracker = CrisisSessionTracker()
sid = "web-session-abc"
# the-door detects something
r1 = tracker.record(sid, "MEDIUM", "the-door", "keyword: 'alone in this'")
self.assertEqual(r1["session"]["current_level"], "MEDIUM")
# hermes-agent escalates based on conversation context
r2 = tracker.record(sid, "HIGH", "hermes-agent", "user repeated 'no way out' 3x")
self.assertEqual(r2["session"]["current_level"], "HIGH")
self.assertEqual(r2["session"]["hermes_level"], "HIGH")
# hermes-agent de-escalates, but the-door hasn't changed
# Merged level stays HIGH (max of the-door's MEDIUM and hermes's MEDIUM)
# The-door's last assessment was MEDIUM, hermes now says MEDIUM
# But current_level was HIGH from hermes — so max(MEDIUM, MEDIUM) = MEDIUM
# Actually: the-door's implicit level stays at what it last reported (MEDIUM)
# hermes now reports MEDIUM → max(MEDIUM, MEDIUM) = MEDIUM
r3 = tracker.record(sid, "MEDIUM", "hermes-agent", "user said 'feeling a bit better'")
self.assertEqual(r3["session"]["current_level"], "MEDIUM")
self.assertTrue(r3["de_escalated"])
# Verify full history
history = tracker.get_history(sid)
self.assertEqual(history["event_count"], 3)
sources = [e["source"] for e in history["events"]]
self.assertIn("the-door", sources)
self.assertIn("hermes-agent", sources)
if __name__ == "__main__":
unittest.main()