Compare commits
4 Commits
fix/673
...
burn/99-17
| Author | SHA1 | Date | |
|---|---|---|---|
| 371dc83728 | |||
| 851eb32871 | |||
| aba8f2b87b | |||
| 81da20deb6 |
75
GENOME.md
75
GENOME.md
@@ -1,75 +0,0 @@
|
||||
# GENOME.md — the-door
|
||||
|
||||
**Generated:** 2026-04-14
|
||||
**Repo:** Timmy_Foundation/the-door
|
||||
**Description:** Crisis Front Door — a single URL where a man at 3am can talk to Timmy. No login, no signup. 988 always visible.
|
||||
|
||||
---
|
||||
|
||||
## Project Overview
|
||||
|
||||
The-door is a crisis intervention web application — the most sacred surface in the Timmy Foundation. When a man at 3am reaches the end of his road, this is where he lands. No login, no signup, no barriers. 988 Suicide and Crisis Lifeline always visible. The "When a Man Is Dying" protocol active on every page.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
the-door/
|
||||
├── index.html # Main crisis page (PWA-capable)
|
||||
├── crisis-offline.html # Offline fallback (service worker cached)
|
||||
├── about.html # About page
|
||||
├── testimony.html # Testimony/stories page
|
||||
├── sw.js # Service worker (offline-first)
|
||||
├── manifest.json # PWA manifest
|
||||
├── crisis/ # Core crisis detection + response
|
||||
│ ├── detect.py # Keyword/pattern detection (4 tiers)
|
||||
│ ├── gateway.py # API endpoints, prompt injection
|
||||
│ ├── response.py # Response generation, 988 routing
|
||||
│ ├── compassion_router.py # Profile-based response routing
|
||||
│ ├── profiles.py # Compassion profiles
|
||||
│ └── PROTOCOL.md # The protocol (SOUL.md reference)
|
||||
├── crisis_detector.py # Legacy shim → crisis/detect.py
|
||||
├── crisis_responder.py # Legacy responder
|
||||
├── dying_detection/ # Deprecated module
|
||||
├── evolution/ # Crisis synthesizer (creative)
|
||||
├── tests/ # Safety-critical tests
|
||||
│ ├── test_crisis_overlay_focus_trap.py
|
||||
│ ├── test_dying_detection_deprecation.py
|
||||
│ └── test_false_positive_fixes.py
|
||||
└── deploy/ # Deployment docs
|
||||
```
|
||||
|
||||
## Key Abstractions
|
||||
|
||||
| Module | Purpose |
|
||||
|---|---|
|
||||
| `crisis/detect.py` | 4-tier detection: LOW/MEDIUM/HIGH/CRITICAL via regex patterns |
|
||||
| `crisis/gateway.py` | HTTP API, Sovereign Heart prompt injection |
|
||||
| `crisis/response.py` | Response generation, 988 integration, escalation |
|
||||
| `crisis/compassion_router.py` | Profile-based routing (different crisis types) |
|
||||
| `sw.js` | Service worker for offline-first PWA |
|
||||
|
||||
## Safety Constraints
|
||||
|
||||
- **The-door never auto-closes PRs** (in fleet-ops exempt list)
|
||||
- **988 always visible** on every page, even offline
|
||||
- **When a Man Is Dying protocol** active on every interaction
|
||||
- **No login/signup** — zero barriers to crisis support
|
||||
- **Offline-first** — service worker caches critical pages
|
||||
|
||||
## Test Coverage
|
||||
|
||||
| Test | Coverage |
|
||||
|---|---|
|
||||
| Crisis overlay focus trap | ✅ |
|
||||
| Dying detection deprecation | ✅ |
|
||||
| False positive fixes | ✅ |
|
||||
| Crisis detection tiers | ❌ (in crisis/tests.py) |
|
||||
| Response generation | ❌ |
|
||||
| Offline service worker | ❌ |
|
||||
|
||||
## Security
|
||||
|
||||
- No user data stored (crisis intervention is stateless by design)
|
||||
- No cookies, no tracking, no analytics
|
||||
- Service worker only caches static assets
|
||||
- Crisis detection runs client-side where possible
|
||||
@@ -7,6 +7,13 @@ Stands between a broken man and a machine that would tell him to die.
|
||||
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
|
||||
from .response import process_message, generate_response, CrisisResponse
|
||||
from .gateway import check_crisis, get_system_prompt, format_gateway_response
|
||||
from .session_tracker import (
|
||||
CrisisSessionTracker,
|
||||
CrisisSession,
|
||||
CrisisLevel,
|
||||
EscalationEvent,
|
||||
get_tracker,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"detect_crisis",
|
||||
@@ -19,4 +26,9 @@ __all__ = [
|
||||
"format_result",
|
||||
"format_gateway_response",
|
||||
"get_urgency_emoji",
|
||||
"CrisisSessionTracker",
|
||||
"CrisisSession",
|
||||
"CrisisLevel",
|
||||
"EscalationEvent",
|
||||
"get_tracker",
|
||||
]
|
||||
|
||||
251
crisis/session_tracker.py
Normal file
251
crisis/session_tracker.py
Normal file
@@ -0,0 +1,251 @@
|
||||
"""
|
||||
Crisis Session Tracker for the-door <-> hermes-agent integration.
|
||||
|
||||
Tracks crisis session state, escalation history, and provides
|
||||
a bridge between the-door's web UI and hermes-agent sessions.
|
||||
|
||||
Each browser session gets a unique crisis_session_id that is sent
|
||||
with every API request. This allows hermes-agent to:
|
||||
1. Receive crisis context from the-door's client-side detection
|
||||
2. Report escalations/de-escalations back to the-door's UI
|
||||
3. Maintain unified escalation history across web and CLI
|
||||
|
||||
Usage:
|
||||
from crisis.session_tracker import CrisisSessionTracker
|
||||
|
||||
tracker = CrisisSessionTracker()
|
||||
tracker.record("session-abc", level="HIGH", source="the-door", detail="keyword: 'can't go on'")
|
||||
history = tracker.get_history("session-abc")
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
|
||||
class CrisisLevel(Enum):
|
||||
"""Crisis severity levels — aligned with both the-door and hermes-agent."""
|
||||
NONE = "NONE"
|
||||
LOW = "LOW"
|
||||
MEDIUM = "MEDIUM"
|
||||
HIGH = "HIGH"
|
||||
CRITICAL = "CRITICAL"
|
||||
|
||||
@classmethod
|
||||
def from_string(cls, s: str) -> "CrisisLevel":
|
||||
try:
|
||||
return cls[s.upper()]
|
||||
except KeyError:
|
||||
return cls.NONE
|
||||
|
||||
def __ge__(self, other: "CrisisLevel") -> bool:
|
||||
order = [self.NONE, self.LOW, self.MEDIUM, self.HIGH, self.CRITICAL]
|
||||
return order.index(self) >= order.index(other)
|
||||
|
||||
def __gt__(self, other: "CrisisLevel") -> bool:
|
||||
order = [self.NONE, self.LOW, self.MEDIUM, self.HIGH, self.CRITICAL]
|
||||
return order.index(self) > order.index(other)
|
||||
|
||||
|
||||
@dataclass
|
||||
class EscalationEvent:
|
||||
"""A single crisis escalation or de-escalation event."""
|
||||
timestamp: float
|
||||
level: str
|
||||
previous_level: str
|
||||
source: str # "the-door" or "hermes-agent"
|
||||
detail: str = ""
|
||||
session_id: str = ""
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisSession:
|
||||
"""State for a single crisis session."""
|
||||
session_id: str
|
||||
created_at: float = field(default_factory=time.time)
|
||||
current_level: str = "NONE"
|
||||
events: List[EscalationEvent] = field(default_factory=list)
|
||||
# Per-source levels (what each source last reported)
|
||||
door_level: str = "NONE"
|
||||
hermes_level: str = "NONE"
|
||||
hermes_last_update: float = 0.0
|
||||
# Whether the UI has acknowledged the current level
|
||||
ui_acknowledged: bool = False
|
||||
|
||||
def record(self, level: str, source: str, detail: str = "") -> EscalationEvent:
|
||||
"""Record an escalation or de-escalation event."""
|
||||
event = EscalationEvent(
|
||||
timestamp=time.time(),
|
||||
level=level,
|
||||
previous_level=self.current_level,
|
||||
source=source,
|
||||
detail=detail,
|
||||
session_id=self.session_id,
|
||||
)
|
||||
self.events.append(event)
|
||||
|
||||
# Update the per-source level
|
||||
if source == "hermes-agent":
|
||||
self.hermes_level = level
|
||||
self.hermes_last_update = time.time()
|
||||
else:
|
||||
self.door_level = level
|
||||
|
||||
# Merged level = max of both sources (safety-first: higher wins)
|
||||
merged = max(
|
||||
CrisisLevel.from_string(self.door_level),
|
||||
CrisisLevel.from_string(self.hermes_level),
|
||||
)
|
||||
self.current_level = merged.value
|
||||
self.ui_acknowledged = False
|
||||
|
||||
return event
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"session_id": self.session_id,
|
||||
"created_at": self.created_at,
|
||||
"current_level": self.current_level,
|
||||
"door_level": self.door_level,
|
||||
"hermes_level": self.hermes_level,
|
||||
"hermes_last_update": self.hermes_last_update,
|
||||
"ui_acknowledged": self.ui_acknowledged,
|
||||
"event_count": len(self.events),
|
||||
"events": [e.to_dict() for e in self.events[-20:]], # Last 20 events
|
||||
}
|
||||
|
||||
|
||||
class CrisisSessionTracker:
|
||||
"""
|
||||
Tracks crisis sessions across the-door web UI and hermes-agent.
|
||||
|
||||
In-memory by default. Optionally persists to JSON for crash recovery.
|
||||
"""
|
||||
|
||||
def __init__(self, persist_path: Optional[str] = None):
|
||||
self._sessions: Dict[str, CrisisSession] = {}
|
||||
self._persist_path = persist_path
|
||||
if persist_path and os.path.exists(persist_path):
|
||||
self._load()
|
||||
|
||||
def get_or_create(self, session_id: str) -> CrisisSession:
|
||||
"""Get existing session or create a new one."""
|
||||
if session_id not in self._sessions:
|
||||
self._sessions[session_id] = CrisisSession(session_id=session_id)
|
||||
return self._sessions[session_id]
|
||||
|
||||
def record(
|
||||
self,
|
||||
session_id: str,
|
||||
level: str,
|
||||
source: str,
|
||||
detail: str = "",
|
||||
) -> dict:
|
||||
"""
|
||||
Record a crisis event. Returns the updated session state.
|
||||
|
||||
Args:
|
||||
session_id: Browser/hermes session identifier
|
||||
level: NONE, LOW, MEDIUM, HIGH, CRITICAL
|
||||
source: "the-door" or "hermes-agent"
|
||||
detail: Human-readable description of what triggered this
|
||||
"""
|
||||
session = self.get_or_create(session_id)
|
||||
event = session.record(level, source, detail)
|
||||
|
||||
if self._persist_path:
|
||||
self._save()
|
||||
|
||||
return {
|
||||
"session": session.to_dict(),
|
||||
"event": event.to_dict(),
|
||||
"escalated": CrisisLevel.from_string(level) > CrisisLevel.from_string(event.previous_level),
|
||||
"de_escalated": CrisisLevel.from_string(level) < CrisisLevel.from_string(event.previous_level),
|
||||
}
|
||||
|
||||
def get_history(self, session_id: str) -> Optional[dict]:
|
||||
"""Get full escalation history for a session."""
|
||||
session = self._sessions.get(session_id)
|
||||
if not session:
|
||||
return None
|
||||
return session.to_dict()
|
||||
|
||||
def get_unacknowledged(self, session_id: str) -> Optional[dict]:
|
||||
"""Get current unacknowledged crisis state for UI display."""
|
||||
session = self._sessions.get(session_id)
|
||||
if not session or session.ui_acknowledged:
|
||||
return None
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"current_level": session.current_level,
|
||||
"hermes_level": session.hermes_level,
|
||||
"event_count": len(session.events),
|
||||
"latest_event": session.events[-1].to_dict() if session.events else None,
|
||||
}
|
||||
|
||||
def acknowledge(self, session_id: str) -> bool:
|
||||
"""Mark the current crisis state as acknowledged by the UI."""
|
||||
session = self._sessions.get(session_id)
|
||||
if not session:
|
||||
return False
|
||||
session.ui_acknowledged = True
|
||||
return True
|
||||
|
||||
def cleanup(self, max_age_seconds: float = 86400) -> int:
|
||||
"""Remove sessions older than max_age_seconds. Returns count removed."""
|
||||
cutoff = time.time() - max_age_seconds
|
||||
stale = [sid for sid, s in self._sessions.items() if s.created_at < cutoff]
|
||||
for sid in stale:
|
||||
del self._sessions[sid]
|
||||
if stale and self._persist_path:
|
||||
self._save()
|
||||
return len(stale)
|
||||
|
||||
def _save(self):
|
||||
"""Persist sessions to JSON."""
|
||||
if not self._persist_path:
|
||||
return
|
||||
data = {sid: s.to_dict() for sid, s in self._sessions.items()}
|
||||
os.makedirs(os.path.dirname(self._persist_path) or ".", exist_ok=True)
|
||||
with open(self._persist_path, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
def _load(self):
|
||||
"""Load sessions from JSON (best-effort)."""
|
||||
try:
|
||||
with open(self._persist_path) as f:
|
||||
data = json.load(f)
|
||||
for sid, sdata in data.items():
|
||||
session = CrisisSession(
|
||||
session_id=sdata["session_id"],
|
||||
created_at=sdata.get("created_at", time.time()),
|
||||
current_level=sdata.get("current_level", "NONE"),
|
||||
door_level=sdata.get("door_level", "NONE"),
|
||||
hermes_level=sdata.get("hermes_level", "NONE"),
|
||||
hermes_last_update=sdata.get("hermes_last_update", 0),
|
||||
ui_acknowledged=sdata.get("ui_acknowledged", False),
|
||||
)
|
||||
for edata in sdata.get("events", []):
|
||||
session.events.append(EscalationEvent(**edata))
|
||||
self._sessions[sid] = session
|
||||
except (FileNotFoundError, json.JSONDecodeError, KeyError):
|
||||
pass
|
||||
|
||||
|
||||
# Module-level singleton for convenience
|
||||
_tracker: Optional[CrisisSessionTracker] = None
|
||||
|
||||
|
||||
def get_tracker(persist_path: Optional[str] = None) -> CrisisSessionTracker:
|
||||
"""Get or create the module-level tracker singleton."""
|
||||
global _tracker
|
||||
if _tracker is None:
|
||||
_tracker = CrisisSessionTracker(persist_path=persist_path)
|
||||
return _tracker
|
||||
88
index.html
88
index.html
@@ -678,6 +678,9 @@ html, body {
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Escalation history (crisis session tracking, issue #99) -->
|
||||
<div id="escalation-history" style="display:none; padding:4px 16px; font-size:0.75rem; color:#8b949e; border-top:1px solid #21262d;"></div>
|
||||
|
||||
<!-- Footer -->
|
||||
<footer id="footer">
|
||||
<a href="/about" aria-label="About The Door">about</a>
|
||||
@@ -825,6 +828,62 @@ Sovereignty and service always.`;
|
||||
var isStreaming = false;
|
||||
var overlayTimer = null;
|
||||
var crisisPanelShown = false;
|
||||
var currentCrisisLevel = 'NONE';
|
||||
|
||||
// ===== CRISIS SESSION TRACKING =====
|
||||
// Unique session ID sent with every API request for cross-source tracking
|
||||
var crisisSessionId = (function() {
|
||||
var stored = null;
|
||||
try { stored = sessionStorage.getItem('timmy_crisis_session_id'); } catch(e) {}
|
||||
if (stored) return stored;
|
||||
var id = 'door-' + Date.now().toString(36) + '-' + Math.random().toString(36).substr(2, 8);
|
||||
try { sessionStorage.setItem('timmy_crisis_session_id', id); } catch(e) {}
|
||||
return id;
|
||||
})();
|
||||
|
||||
// Escalation history for this session (mirrors server-side tracker)
|
||||
var escalationHistory = [];
|
||||
|
||||
function recordEscalation(level, source, detail) {
|
||||
var event = {
|
||||
timestamp: Date.now(),
|
||||
level: level,
|
||||
source: source || 'the-door',
|
||||
detail: detail || ''
|
||||
};
|
||||
escalationHistory.push(event);
|
||||
// Persist for session recovery
|
||||
try { sessionStorage.setItem('timmy_escalation_history', JSON.stringify(escalationHistory.slice(-50))); } catch(e) {}
|
||||
renderEscalationHistory();
|
||||
}
|
||||
|
||||
// Load persisted history
|
||||
try {
|
||||
var saved = sessionStorage.getItem('timmy_escalation_history');
|
||||
if (saved) escalationHistory = JSON.parse(saved);
|
||||
} catch(e) {}
|
||||
|
||||
function renderEscalationHistory() {
|
||||
var el = document.getElementById('escalation-history');
|
||||
if (!el) return;
|
||||
if (escalationHistory.length === 0) {
|
||||
el.style.display = 'none';
|
||||
return;
|
||||
}
|
||||
el.style.display = 'block';
|
||||
var html = '<strong>Escalation History</strong><ul>';
|
||||
var recent = escalationHistory.slice(-5).reverse();
|
||||
for (var i = 0; i < recent.length; i++) {
|
||||
var e = recent[i];
|
||||
var time = new Date(e.timestamp).toLocaleTimeString();
|
||||
var badge = e.level === 'CRITICAL' ? '🔴' : e.level === 'HIGH' ? '🟠' : e.level === 'MEDIUM' ? '🟡' : '🟢';
|
||||
html += '<li>' + badge + ' <strong>' + e.level + '</strong> via ' + e.source + ' at ' + time;
|
||||
if (e.detail) html += ' — ' + e.detail;
|
||||
html += '</li>';
|
||||
}
|
||||
html += '</ul>';
|
||||
el.innerHTML = html;
|
||||
}
|
||||
|
||||
// ===== SERVICE WORKER =====
|
||||
if ('serviceWorker' in navigator) {
|
||||
@@ -913,13 +972,19 @@ Sovereignty and service always.`;
|
||||
}
|
||||
}
|
||||
|
||||
// Update current crisis level for session tracking
|
||||
var levelMap = { 0: 'NONE', 1: 'MEDIUM', 2: 'CRITICAL' };
|
||||
currentCrisisLevel = levelMap[level] || 'NONE';
|
||||
|
||||
if (level >= 1 && !crisisPanelShown) {
|
||||
crisisPanelShown = true;
|
||||
crisisPanel.classList.add('visible');
|
||||
recordEscalation(level === 2 ? 'CRITICAL' : 'MEDIUM', 'the-door', 'keyword detected');
|
||||
}
|
||||
|
||||
if (level === 2) {
|
||||
showOverlay();
|
||||
recordEscalation('CRITICAL', 'the-door', 'explicit phrase — overlay shown');
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1337,7 +1402,11 @@ Sovereignty and service always.`;
|
||||
|
||||
fetch('/api/v1/chat/completions', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Crisis-Session-ID': crisisSessionId,
|
||||
'X-Crisis-Level': currentCrisisLevel || 'NONE'
|
||||
},
|
||||
body: JSON.stringify({
|
||||
model: 'timmy',
|
||||
messages: allMessages,
|
||||
@@ -1347,6 +1416,23 @@ Sovereignty and service always.`;
|
||||
}).then(function(response) {
|
||||
clearTimeout(timeoutId);
|
||||
|
||||
// Check for hermes-agent crisis escalation headers
|
||||
var hermesLevel = response.headers.get('X-Crisis-Escalation');
|
||||
if (hermesLevel && hermesLevel !== 'NONE') {
|
||||
recordEscalation(hermesLevel, 'hermes-agent', 'from response header');
|
||||
// Update UI if escalated beyond current
|
||||
if (hermesLevel === 'CRITICAL' && !crisisOverlay.classList.contains('active')) {
|
||||
showOverlay();
|
||||
} else if (hermesLevel === 'HIGH' && !crisisPanelShown) {
|
||||
crisisPanelShown = true;
|
||||
crisisPanel.classList.add('visible');
|
||||
}
|
||||
}
|
||||
var hermesAck = response.headers.get('X-Crisis-Deescalation');
|
||||
if (hermesAck) {
|
||||
recordEscalation(hermesAck, 'hermes-agent', 'de-escalation');
|
||||
}
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('HTTP ' + response.status);
|
||||
}
|
||||
|
||||
202
tests/test_session_tracker.py
Normal file
202
tests/test_session_tracker.py
Normal file
@@ -0,0 +1,202 @@
|
||||
"""
|
||||
Tests for crisis session tracker — the-door <-> hermes-agent integration.
|
||||
|
||||
Verifies:
|
||||
- Session creation and tracking
|
||||
- Escalation and de-escalation events
|
||||
- Merged level from the-door + hermes-agent
|
||||
- History and unacknowledged state
|
||||
- Persistence round-trip
|
||||
- Cleanup of stale sessions
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from crisis.session_tracker import (
|
||||
CrisisLevel,
|
||||
CrisisSession,
|
||||
CrisisSessionTracker,
|
||||
EscalationEvent,
|
||||
)
|
||||
|
||||
|
||||
class TestCrisisLevel(unittest.TestCase):
|
||||
"""CrisisLevel enum ordering and conversion."""
|
||||
|
||||
def test_from_string(self):
|
||||
self.assertEqual(CrisisLevel.from_string("HIGH"), CrisisLevel.HIGH)
|
||||
self.assertEqual(CrisisLevel.from_string("critical"), CrisisLevel.CRITICAL)
|
||||
self.assertEqual(CrisisLevel.from_string("garbage"), CrisisLevel.NONE)
|
||||
|
||||
def test_ordering(self):
|
||||
self.assertTrue(CrisisLevel.HIGH > CrisisLevel.MEDIUM)
|
||||
self.assertTrue(CrisisLevel.CRITICAL >= CrisisLevel.HIGH)
|
||||
self.assertTrue(CrisisLevel.LOW < CrisisLevel.HIGH)
|
||||
self.assertEqual(CrisisLevel.NONE, CrisisLevel.NONE)
|
||||
|
||||
|
||||
class TestCrisisSession(unittest.TestCase):
|
||||
"""Single session state management."""
|
||||
|
||||
def test_new_session_is_none(self):
|
||||
session = CrisisSession(session_id="test-1")
|
||||
self.assertEqual(session.current_level, "NONE")
|
||||
self.assertEqual(session.hermes_level, "NONE")
|
||||
self.assertEqual(len(session.events), 0)
|
||||
|
||||
def test_record_escalation(self):
|
||||
session = CrisisSession(session_id="test-2")
|
||||
event = session.record("HIGH", "the-door", "keyword: despair")
|
||||
self.assertEqual(event.level, "HIGH")
|
||||
self.assertEqual(event.previous_level, "NONE")
|
||||
self.assertEqual(event.source, "the-door")
|
||||
self.assertEqual(session.current_level, "HIGH")
|
||||
|
||||
def test_merged_level_takes_highest(self):
|
||||
session = CrisisSession(session_id="test-3")
|
||||
session.record("MEDIUM", "the-door")
|
||||
session.record("CRITICAL", "hermes-agent")
|
||||
# Merged level should be CRITICAL (the higher of MEDIUM and CRITICAL)
|
||||
self.assertEqual(session.current_level, "CRITICAL")
|
||||
self.assertEqual(session.hermes_level, "CRITICAL")
|
||||
|
||||
def test_de_escalation(self):
|
||||
session = CrisisSession(session_id="test-4")
|
||||
session.record("HIGH", "the-door")
|
||||
session.record("MEDIUM", "the-door")
|
||||
self.assertEqual(session.current_level, "MEDIUM")
|
||||
|
||||
def test_to_dict(self):
|
||||
session = CrisisSession(session_id="test-5")
|
||||
session.record("HIGH", "the-door", "test")
|
||||
d = session.to_dict()
|
||||
self.assertEqual(d["session_id"], "test-5")
|
||||
self.assertEqual(d["current_level"], "HIGH")
|
||||
self.assertEqual(d["event_count"], 1)
|
||||
self.assertEqual(len(d["events"]), 1)
|
||||
|
||||
|
||||
class TestCrisisSessionTracker(unittest.TestCase):
|
||||
"""Full tracker with multiple sessions."""
|
||||
|
||||
def setUp(self):
|
||||
self.tracker = CrisisSessionTracker()
|
||||
|
||||
def test_get_or_create(self):
|
||||
s1 = self.tracker.get_or_create("s1")
|
||||
s2 = self.tracker.get_or_create("s1")
|
||||
self.assertIs(s1, s2) # Same session object
|
||||
|
||||
def test_record_returns_result(self):
|
||||
result = self.tracker.record("s1", "HIGH", "the-door", "keyword: help")
|
||||
self.assertIn("session", result)
|
||||
self.assertIn("event", result)
|
||||
self.assertTrue(result["escalated"])
|
||||
self.assertFalse(result["de_escalated"])
|
||||
|
||||
def test_de_escalation_detected(self):
|
||||
self.tracker.record("s1", "HIGH", "the-door")
|
||||
result = self.tracker.record("s1", "LOW", "the-door", "user calmed down")
|
||||
self.assertFalse(result["escalated"])
|
||||
self.assertTrue(result["de_escalated"])
|
||||
|
||||
def test_get_history(self):
|
||||
self.tracker.record("s1", "HIGH", "the-door")
|
||||
self.tracker.record("s1", "CRITICAL", "hermes-agent")
|
||||
history = self.tracker.get_history("s1")
|
||||
self.assertEqual(history["event_count"], 2)
|
||||
self.assertEqual(history["current_level"], "CRITICAL")
|
||||
self.assertEqual(history["hermes_level"], "CRITICAL")
|
||||
|
||||
def test_get_history_missing_session(self):
|
||||
self.assertIsNone(self.tracker.get_history("nonexistent"))
|
||||
|
||||
def test_unacknowledged_state(self):
|
||||
self.tracker.record("s1", "HIGH", "the-door")
|
||||
unacked = self.tracker.get_unacknowledged("s1")
|
||||
self.assertIsNotNone(unacked)
|
||||
self.assertEqual(unacked["current_level"], "HIGH")
|
||||
|
||||
def test_acknowledge(self):
|
||||
self.tracker.record("s1", "HIGH", "the-door")
|
||||
self.assertTrue(self.tracker.acknowledge("s1"))
|
||||
self.assertIsNone(self.tracker.get_unacknowledged("s1"))
|
||||
|
||||
def test_acknowledge_missing(self):
|
||||
self.assertFalse(self.tracker.acknowledge("nonexistent"))
|
||||
|
||||
def test_cleanup(self):
|
||||
self.tracker.record("old-session", "LOW", "the-door")
|
||||
self.tracker._sessions["old-session"].created_at = time.time() - 100000
|
||||
self.tracker.record("new-session", "LOW", "the-door")
|
||||
removed = self.tracker.cleanup(max_age_seconds=50000)
|
||||
self.assertEqual(removed, 1)
|
||||
self.assertNotIn("old-session", self.tracker._sessions)
|
||||
self.assertIn("new-session", self.tracker._sessions)
|
||||
|
||||
|
||||
class TestPersistence(unittest.TestCase):
|
||||
"""JSON persistence round-trip."""
|
||||
|
||||
def test_save_and_load(self):
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
path = f.name
|
||||
|
||||
try:
|
||||
t1 = CrisisSessionTracker(persist_path=path)
|
||||
t1.record("s1", "HIGH", "the-door", "test persist")
|
||||
t1.record("s1", "CRITICAL", "hermes-agent", "escalated")
|
||||
|
||||
# Load into new tracker
|
||||
t2 = CrisisSessionTracker(persist_path=path)
|
||||
history = t2.get_history("s1")
|
||||
self.assertIsNotNone(history)
|
||||
self.assertEqual(history["current_level"], "CRITICAL")
|
||||
self.assertEqual(history["event_count"], 2)
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
def test_load_missing_file(self):
|
||||
t = CrisisSessionTracker(persist_path="/tmp/nonexistent-tracker.json")
|
||||
self.assertEqual(len(t._sessions), 0)
|
||||
|
||||
|
||||
class TestCrossSourceIntegration(unittest.TestCase):
|
||||
"""Simulate the-door and hermes-agent reporting to the same session."""
|
||||
|
||||
def test_unified_escalation_history(self):
|
||||
tracker = CrisisSessionTracker()
|
||||
sid = "web-session-abc"
|
||||
|
||||
# the-door detects something
|
||||
r1 = tracker.record(sid, "MEDIUM", "the-door", "keyword: 'alone in this'")
|
||||
self.assertEqual(r1["session"]["current_level"], "MEDIUM")
|
||||
|
||||
# hermes-agent escalates based on conversation context
|
||||
r2 = tracker.record(sid, "HIGH", "hermes-agent", "user repeated 'no way out' 3x")
|
||||
self.assertEqual(r2["session"]["current_level"], "HIGH")
|
||||
self.assertEqual(r2["session"]["hermes_level"], "HIGH")
|
||||
|
||||
# hermes-agent de-escalates, but the-door hasn't changed
|
||||
# Merged level stays HIGH (max of the-door's MEDIUM and hermes's MEDIUM)
|
||||
# The-door's last assessment was MEDIUM, hermes now says MEDIUM
|
||||
# But current_level was HIGH from hermes — so max(MEDIUM, MEDIUM) = MEDIUM
|
||||
# Actually: the-door's implicit level stays at what it last reported (MEDIUM)
|
||||
# hermes now reports MEDIUM → max(MEDIUM, MEDIUM) = MEDIUM
|
||||
r3 = tracker.record(sid, "MEDIUM", "hermes-agent", "user said 'feeling a bit better'")
|
||||
self.assertEqual(r3["session"]["current_level"], "MEDIUM")
|
||||
self.assertTrue(r3["de_escalated"])
|
||||
|
||||
# Verify full history
|
||||
history = tracker.get_history(sid)
|
||||
self.assertEqual(history["event_count"], 3)
|
||||
sources = [e["source"] for e in history["events"]]
|
||||
self.assertIn("the-door", sources)
|
||||
self.assertIn("hermes-agent", sources)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user