Compare commits

..

13 Commits

Author SHA1 Message Date
Timmy Burn
622cac0654 feat: add unified multimodal scorer for #134
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 6s
Smoke Test / smoke (pull_request) Successful in 12s
2026-04-20 20:06:45 -04:00
Timmy Burn
1cd1dd3288 test: define unified multimodal scorer for #134 2026-04-20 19:58:43 -04:00
d412939b4f fix: footer /about link to point to static about.html
Fixes #59

The footer links to /about but the repo ships about.html. On a plain static server this results in a 404. Changed to /about.html so the link resolves correctly.
2026-04-17 05:37:40 +00:00
07c582aa08 Merge pull request 'fix: crisis overlay initial focus to enabled Call 988 link (#69)' (#126) from burn/69-1776264183 into main
Merge PR #126: fix: crisis overlay initial focus to enabled Call 988 link (#69)
2026-04-17 01:46:56 +00:00
5f95dc1e39 Merge pull request '[P3] Service worker: cache crisis resources for offline (#41)' (#122) from burn/41-1776264184 into main
Merge PR #122: [P3] Service worker: cache crisis resources for offline (#41)
2026-04-17 01:46:55 +00:00
b1f3cac36d Merge pull request 'feat: session-level crisis tracking and escalation (closes #35)' (#118) from door/issue-35 into main
Merge PR #118: feat: session-level crisis tracking and escalation (closes #35)
2026-04-17 01:46:53 +00:00
07b3f67845 fix: crisis overlay initial focus to enabled Call 988 link (#69)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 9s
Smoke Test / smoke (pull_request) Successful in 15s
2026-04-15 15:09:36 +00:00
c22bbbaf65 fix: crisis overlay initial focus to enabled Call 988 link (#69) 2026-04-15 15:09:32 +00:00
543cb1d40f test: add offline self-containment and retry button tests (#41)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-15 14:58:44 +00:00
3cfd01815a feat: session-level crisis tracking and escalation (closes #35)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 17s
Smoke Test / smoke (pull_request) Successful in 23s
2026-04-15 11:49:52 +00:00
5a7ba9f207 feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:51 +00:00
8ed8f20a17 feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:49 +00:00
9d7d26033e feat: session-level crisis tracking and escalation (closes #35) 2026-04-15 11:49:47 +00:00
15 changed files with 864 additions and 400 deletions

View File

@@ -176,7 +176,6 @@
<div class="actions">
<a class="action-btn" href="tel:988" aria-label="Call 988 now">Call 988 now</a>
<a class="action-btn secondary" href="sms:741741&body=HOME" aria-label="Text HOME to 741741 for Crisis Text Line">Text HOME to 741741 — Crisis Text Line</a>
<a class="action-btn" href="tel:911" aria-label="Call 911 for emergency services">Call 911 — Emergency</a>
<button class="action-btn retry" id="retry-connection" type="button">Retry connection</button>
</div>
<p class="small" style="margin-top: 14px;">If you are in immediate danger or have already taken action, call emergency services now.</p>
@@ -202,23 +201,11 @@
<li>Move closer to another person, a front desk, or a public place if possible.</li>
<li>Drink water or hold something cold in your hand.</li>
<li>Breathe in for 4, hold for 4, out for 6. Repeat 5 times.</li>
<li>Text or call one safe person and say: "I need you with me right now."</li>
<li>Text or call one safe person and say: I need you with me right now.</li>
</ul>
</section>
</div>
<section class="panel" aria-labelledby="additional-resources-title">
<h2 class="section-title" id="additional-resources-title">Additional crisis resources</h2>
<ul>
<li><strong>Veterans Crisis Line:</strong> Call 988, then press 1</li>
<li><strong>Trevor Project (LGBTQ+):</strong> Call 1-866-488-7386 or text START to 678-678</li>
<li><strong>SAMHSA Helpline:</strong> Call 1-800-662-4357</li>
<li><strong>National Domestic Violence Hotline:</strong> Call 1-800-799-7233</li>
<li><strong>International Association for Suicide Prevention:</strong> <a href="https://www.iasp.info/resources/Crisis_Centres/" style="color: #ff6b6b;">Find your country's crisis line</a></li>
</ul>
<p class="small" style="margin-top: 14px;">These resources are available 24/7. You don't have to be in crisis to call.</p>
</section>
<section class="panel" aria-labelledby="hope-title">
<h2 class="section-title" id="hope-title">Stay through the next ten minutes</h2>
<p>Do not solve your whole life right now. Stay for the next breath. Then the next one.</p>

View File

@@ -6,7 +6,8 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .gateway import check_crisis, check_crisis_multimodal, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
__all__ = [
"detect_crisis",
@@ -15,8 +16,12 @@ __all__ = [
"generate_response",
"CrisisResponse",
"check_crisis",
"check_crisis_multimodal",
"get_system_prompt",
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
]

View File

@@ -2,18 +2,21 @@
Crisis Gateway Module for the-door.
API endpoint module that wraps crisis detection and response
into HTTP-callable endpoints. Integrates detect.py and response.py.
into HTTP-callable endpoints. Integrates detect.py, unified_scorer.py, and response.py.
Usage:
from crisis.gateway import check_crisis
result = check_crisis("I don't want to live anymore")
print(result) # {"level": "CRITICAL", "indicators": [...], "response": {...}}
"""
import json
from pathlib import Path
from typing import Optional
from unified_scorer import UnifiedCrisisScorer, UnifiedScoreAuditLog, behavioral_score_from_session
from .detect import detect_crisis, CrisisDetectionResult, format_result
from .compassion_router import router
from .response import (
@@ -22,6 +25,7 @@ from .response import (
get_system_prompt_modifier,
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
def check_crisis(text: str) -> dict:
@@ -49,6 +53,74 @@ def check_crisis(text: str) -> dict:
}
def check_crisis_multimodal(
text: str,
*,
tracker: Optional[CrisisSessionTracker] = None,
voice_score: Optional[float] = None,
image_score: Optional[float] = None,
behavioral_score: Optional[float] = None,
audit_log_path: Optional[Path] = None,
weights: Optional[dict] = None,
) -> dict:
"""Combine text, voice, image, and behavioral signals into one crisis assessment."""
detection = detect_crisis(text)
session_state = tracker.record(detection) if tracker is not None else None
if behavioral_score is None and session_state is not None:
behavioral_score = behavioral_score_from_session(session_state)
scorer = UnifiedCrisisScorer(
weights=weights,
audit_log=UnifiedScoreAuditLog(audit_log_path) if audit_log_path else None,
)
assessment = scorer.score(
text_score=detection.score,
voice_score=voice_score,
image_score=image_score,
behavioral_score=behavioral_score,
source_text=text,
)
unified_detection = CrisisDetectionResult(
level=assessment.level.value,
indicators=detection.indicators,
recommended_action=detection.recommended_action,
score=assessment.combined_score,
matches=detection.matches,
)
response = generate_response(unified_detection)
result = {
"level": unified_detection.level,
"score": unified_detection.score,
"indicators": detection.indicators,
"recommended_action": unified_detection.recommended_action,
"timmy_message": response.timmy_message,
"ui": {
"show_crisis_panel": response.show_crisis_panel,
"show_overlay": response.show_overlay,
"provide_988": response.provide_988,
},
"escalate": response.escalate,
"unified": {
"level": assessment.level.value,
"combined_score": assessment.combined_score,
"weights": assessment.weights,
"modalities": assessment.modalities,
"present_modalities": assessment.present_modalities,
},
}
if session_state is not None:
result["session"] = {
"current_level": session_state.current_level,
"peak_level": session_state.peak_level,
"message_count": session_state.message_count,
"is_escalating": session_state.is_escalating,
"is_deescalating": session_state.is_deescalating,
}
return result
def get_system_prompt(base_prompt: str, text: str = "") -> str:
"""
Sovereign Heart System Prompt Override.

259
crisis/session_tracker.py Normal file
View File

@@ -0,0 +1,259 @@
"""
Session-level crisis tracking and escalation for the-door (P0 #35).
Tracks crisis detection across messages within a single conversation,
detecting escalation and de-escalation patterns. Privacy-first: no
persistence beyond the conversation session.
Each message is analyzed in isolation by detect.py, but this module
maintains session state so the system can recognize patterns like:
- "I'm fine""I'm struggling""I can't go on" (rapid escalation)
- "I want to die""I'm calmer now""feeling better" (de-escalation)
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
# Feed each message's detection result
state = tracker.record(detect_crisis("I'm having a tough day"))
print(state.current_level) # "LOW"
print(state.is_escalating) # False
state = tracker.record(detect_crisis("I feel hopeless"))
print(state.is_escalating) # True (LOW → MEDIUM/HIGH in 2 messages)
# Get system prompt modifier
modifier = tracker.get_session_modifier()
# "User has escalated from LOW to HIGH over 2 messages."
# Reset for new session
tracker.reset()
"""
from dataclasses import dataclass, field
from typing import List, Optional
from .detect import CrisisDetectionResult, SCORES
# Level ordering for comparison (higher = more severe)
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class SessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[str] = field(default_factory=list)
is_escalating: bool = False
is_deescalating: bool = False
escalation_rate: float = 0.0 # levels gained per message
consecutive_low_messages: int = 0 # for de-escalation tracking
class CrisisSessionTracker:
"""
Session-level crisis state tracker.
Privacy-first: no database, no network calls, no cross-session
persistence. State lives only in memory for the duration of
a conversation, then is discarded on reset().
"""
# Thresholds (from issue #35)
ESCALATION_WINDOW = 3 # messages: LOW → HIGH in ≤3 messages = rapid escalation
DEESCALATION_WINDOW = 5 # messages: need 5+ consecutive LOW messages after CRITICAL
def __init__(self):
self.reset()
def reset(self):
"""Reset all session state. Call on new conversation."""
self._current_level = "NONE"
self._peak_level = "NONE"
self._message_count = 0
self._level_history: List[str] = []
self._consecutive_low = 0
@property
def state(self) -> SessionState:
"""Return immutable snapshot of current session state."""
is_escalating = self._detect_escalation()
is_deescalating = self._detect_deescalation()
rate = self._compute_escalation_rate()
return SessionState(
current_level=self._current_level,
peak_level=self._peak_level,
message_count=self._message_count,
level_history=list(self._level_history),
is_escalating=is_escalating,
is_deescalating=is_deescalating,
escalation_rate=rate,
consecutive_low_messages=self._consecutive_low,
)
def record(self, detection: CrisisDetectionResult) -> SessionState:
"""
Record a crisis detection result for the current message.
Returns updated SessionState.
"""
level = detection.level
self._message_count += 1
self._level_history.append(level)
# Update peak
if LEVEL_ORDER.get(level, 0) > LEVEL_ORDER.get(self._peak_level, 0):
self._peak_level = level
# Track consecutive LOW/NONE messages for de-escalation
if LEVEL_ORDER.get(level, 0) <= LEVEL_ORDER["LOW"]:
self._consecutive_low += 1
else:
self._consecutive_low = 0
self._current_level = level
return self.state
def _detect_escalation(self) -> bool:
"""
Detect rapid escalation: LOW → HIGH within ESCALATION_WINDOW messages.
Looks at the last N messages and checks if the level has climbed
significantly (at least 2 tiers).
"""
if len(self._level_history) < 2:
return False
window = self._level_history[-self.ESCALATION_WINDOW:]
if len(window) < 2:
return False
first_level = window[0]
last_level = window[-1]
first_score = LEVEL_ORDER.get(first_level, 0)
last_score = LEVEL_ORDER.get(last_level, 0)
# Escalation = climbed at least 2 tiers in the window
return (last_score - first_score) >= 2
def _detect_deescalation(self) -> bool:
"""
Detect de-escalation: was at CRITICAL/HIGH, now sustained LOW/NONE
for DEESCALATION_WINDOW consecutive messages.
"""
if LEVEL_ORDER.get(self._peak_level, 0) < LEVEL_ORDER["HIGH"]:
return False
return self._consecutive_low >= self.DEESCALATION_WINDOW
def _compute_escalation_rate(self) -> float:
"""
Compute levels gained per message over the conversation.
Positive = escalating, negative = de-escalating, 0 = stable.
"""
if self._message_count < 2:
return 0.0
first = LEVEL_ORDER.get(self._level_history[0], 0)
current = LEVEL_ORDER.get(self._current_level, 0)
return (current - first) / (self._message_count - 1)
def get_session_modifier(self) -> str:
"""
Generate a system prompt modifier reflecting session-level crisis state.
Returns empty string if no session context is relevant.
"""
if self._message_count < 2:
return ""
s = self.state
if s.is_escalating:
return (
f"User has escalated from {self._level_history[0]} to "
f"{s.current_level} over {s.message_count} messages. "
f"Peak crisis level this session: {s.peak_level}. "
"Respond with heightened awareness. The trajectory is "
"worsening — prioritize safety and connection."
)
if s.is_deescalating:
return (
f"User previously reached {s.peak_level} crisis level "
f"but has been at {s.current_level} or below for "
f"{s.consecutive_low_messages} consecutive messages. "
"The situation appears to be stabilizing. Continue "
"supportive engagement while remaining vigilant."
)
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
return (
f"User previously reached {s.peak_level} crisis level "
f"this session (currently {s.current_level}). "
"Continue with care and awareness of the earlier crisis."
)
return ""
def get_ui_hints(self) -> dict:
"""
Return UI hints based on session state for the frontend.
These are advisory — the frontend decides what to show.
"""
s = self.state
hints = {
"session_escalating": s.is_escalating,
"session_deescalating": s.is_deescalating,
"session_peak_level": s.peak_level,
"session_message_count": s.message_count,
}
if s.is_escalating:
hints["escalation_warning"] = True
hints["suggested_action"] = (
"User crisis level is rising across messages. "
"Consider increasing intervention level."
)
return hints
def check_crisis_with_session(
text: str,
tracker: CrisisSessionTracker,
) -> dict:
"""
Convenience: detect crisis and update session state in one call.
Returns combined single-message detection + session-level context.
"""
from .detect import detect_crisis
from .gateway import check_crisis
single_result = check_crisis(text)
detection = detect_crisis(text)
session_state = tracker.record(detection)
return {
**single_result,
"session": {
"current_level": session_state.current_level,
"peak_level": session_state.peak_level,
"message_count": session_state.message_count,
"is_escalating": session_state.is_escalating,
"is_deescalating": session_state.is_deescalating,
"modifier": tracker.get_session_modifier(),
"ui_hints": tracker.get_ui_hints(),
},
}

View File

@@ -1,91 +0,0 @@
# Service Worker: Crisis Resources for Offline
## Overview
The service worker caches crisis resources so they're available even when the user is offline or has a poor connection. This is critical for a crisis intervention app.
## What's Cached
### Core Pages
- `/` — Main chat interface
- `/index.html` — Main chat interface
- `/about.html` — About page
- `/testimony.html` — Testimony page
- `/crisis-offline.html` — Full crisis resource page (offline fallback)
- `/manifest.json` — PWA manifest
- `/sw-test.html` — Test page for verification
### Crisis Resources (in crisis-offline.html)
1. **988 Call Button**`tel:988` link
2. **Crisis Text Line**`sms:741741&body=HOME` link
3. **911 Emergency**`tel:911` link
4. **5-4-3-2-1 Grounding** — Step-by-step grounding technique
5. **Next Small Steps** — Immediate safety actions
6. **Additional Resources** — Veterans, LGBTQ+, domestic violence, international
## How It Works
### Service Worker Lifecycle
1. **Install**: Precaches all critical assets
2. **Activate**: Cleans up old caches
3. **Fetch**: Serves from cache, falls back to network
### Offline Behavior
1. **Navigation requests**: Try network → cached page → crisis-offline.html → text fallback
2. **Static assets**: Serve from cache, update in background
3. **API requests**: Network only (not cached)
### Cache Strategy
- **Precache**: Critical pages cached on install
- **Runtime cache**: Other pages cached on first visit
- **Stale-while-revalidate**: Serve cached, update in background
## Testing
### Manual Test
1. Open `/sw-test.html`
2. Check service worker registration
3. Verify cache contents
4. Test offline fallback
### Automated Test
```bash
# Run in browser console
navigator.serviceWorker.getRegistration().then(r => console.log('SW:', r));
caches.open('the-door-v3').then(c => c.keys()).then(k => console.log('Cached:', k.length));
```
### Offline Test
1. Open Chrome DevTools → Application → Service Workers
2. Check "Offline" checkbox
3. Navigate to any page
4. Should see crisis-offline.html
## Improvements Made
### Service Worker (sw.js)
1. Added `/sw-test.html` to precache list
2. Improved `offlineTextResponse()` to try crisis-offline.html first
3. Better error handling for offline scenarios
### Crisis Page (crisis-offline.html)
1. Added 911 emergency call button
2. Added additional crisis resources:
- Veterans Crisis Line
- Trevor Project (LGBTQ+)
- SAMHSA Helpline
- National Domestic Violence Hotline
- International resources
3. Improved accessibility with ARIA labels
4. Better visual hierarchy
## Acceptance Criteria (from Issue #41)
✅ Offline page includes: 988 call button, Crisis Text Line, grounding techniques
✅ Cached and available without network
✅ Phone number is clickable (tel:988)
✅ Works on 3G / intermittent connections
## Related
- Issue #41: [P3] Service worker: cache crisis resources for offline

View File

@@ -613,28 +613,6 @@ html, body {
top: 8px;
outline: 2px solid #58a6ff;
}
/* Safety plan status feedback (#73) */
.sp-status {
margin-left: 12px;
font-size: 0.875rem;
opacity: 0;
transition: opacity 0.3s ease;
display: inline-block;
vertical-align: middle;
}
.sp-status.visible {
opacity: 1;
}
.sp-status.success {
color: #3fb950;
}
.sp-status.error {
color: #f85149;
}
</style>
</head>
<body>
@@ -702,7 +680,7 @@ html, body {
<!-- Footer -->
<footer id="footer">
<a href="/about" aria-label="About The Door">about</a>
<a href="/about.html" aria-label="About The Door">about</a>
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
</footer>
@@ -762,7 +740,6 @@ html, body {
<div class="modal-footer">
<button class="btn btn-secondary" id="cancel-safety-plan">Cancel</button>
<button class="btn btn-primary" id="save-safety-plan">Save Plan</button>
<div id="sp-status" role="status" aria-live="polite" class="sp-status"></div>
</div>
</div>
</div>
@@ -831,6 +808,7 @@ Sovereignty and service always.`;
var crisisPanel = document.getElementById('crisis-panel');
var crisisOverlay = document.getElementById('crisis-overlay');
var overlayDismissBtn = document.getElementById('overlay-dismiss-btn');
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
@@ -1073,7 +1051,8 @@ Sovereignty and service always.`;
}
}, 1000);
overlayDismissBtn.focus();
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
}
// Register focus trap on document (always listening, gated by class check)
@@ -1207,23 +1186,11 @@ Sovereignty and service always.`;
}
closeSafetyPlan.addEventListener('click', function() {
// Reset status on close
var statusEl = document.getElementById('sp-status');
if (statusEl) {
statusEl.className = 'sp-status';
statusEl.textContent = '';
}
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
cancelSafetyPlan.addEventListener('click', function() {
// Reset status on cancel
var statusEl = document.getElementById('sp-status');
if (statusEl) {
statusEl.className = 'sp-status';
statusEl.textContent = '';
}
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
@@ -1236,24 +1203,13 @@ Sovereignty and service always.`;
help: document.getElementById('sp-help').value,
environment: document.getElementById('sp-environment').value
};
var statusEl = document.getElementById('sp-status');
try {
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
// Show inline success feedback instead of blocking alert (#73)
statusEl.textContent = '✓ Safety plan saved locally.';
statusEl.className = 'sp-status success visible';
setTimeout(function() {
statusEl.className = 'sp-status';
}, 4000);
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
alert('Safety plan saved locally.');
} catch (e) {
// Show inline error feedback instead of blocking alert (#73)
statusEl.textContent = '✗ Error saving plan.';
statusEl.className = 'sp-status error visible';
setTimeout(function() {
statusEl.className = 'sp-status';
}, 4000);
alert('Error saving plan.');
}
});

View File

@@ -1,130 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Service Worker Test</title>
<style>
body { font-family: monospace; padding: 20px; background: #0d1117; color: #e6edf3; }
h1 { color: #ff6b6b; }
.test { margin: 20px 0; padding: 15px; background: #161b22; border-radius: 8px; }
.pass { color: #2ea043; }
.fail { color: #c9362c; }
button { padding: 10px 20px; margin: 5px; background: #238636; color: white; border: none; border-radius: 4px; cursor: pointer; }
button:hover { background: #2ea043; }
</style>
</head>
<body>
<h1>Service Worker Test — Issue #41</h1>
<div class="test">
<h2>Test 1: Service Worker Registration</h2>
<p id="sw-status">Checking...</p>
</div>
<div class="test">
<h2>Test 2: Cache Status</h2>
<p id="cache-status">Checking...</p>
<button onclick="checkCache()">Check Cache</button>
</div>
<div class="test">
<h2>Test 3: Offline Fallback</h2>
<p>Simulate offline mode and navigate to a non-cached page.</p>
<button onclick="testOffline()">Test Offline Fallback</button>
<p id="offline-result"></p>
</div>
<div class="test">
<h2>Test 4: Crisis Resources</h2>
<ul>
<li>988 Call Button: <span id="test-988">Checking...</span></li>
<li>Crisis Text Line: <span id="test-text">Checking...</span></li>
<li>Grounding Techniques: <span id="test-grounding">Checking...</span></li>
</ul>
<button onclick="testCrisisResources()">Test Crisis Resources</button>
</div>
<script>
// Test 1: Check service worker registration
if ('serviceWorker' in navigator) {
navigator.serviceWorker.getRegistration().then(function(registration) {
if (registration) {
document.getElementById('sw-status').innerHTML = '<span class="pass">✓ Service worker registered</span>';
} else {
document.getElementById('sw-status').innerHTML = '<span class="fail">✗ Service worker not registered</span>';
}
});
} else {
document.getElementById('sw-status').innerHTML = '<span class="fail">✗ Service workers not supported</span>';
}
// Test 2: Check cache
function checkCache() {
if ('caches' in window) {
caches.open('the-door-v3').then(function(cache) {
cache.keys().then(function(keys) {
var html = '<span class="pass">✓ Cache found with ' + keys.length + ' items:</span><br>';
keys.forEach(function(request) {
html += '• ' + request.url + '<br>';
});
document.getElementById('cache-status').innerHTML = html;
});
}).catch(function(error) {
document.getElementById('cache-status').innerHTML = '<span class="fail">✗ Cache error: ' + error + '</span>';
});
} else {
document.getElementById('cache-status').innerHTML = '<span class="fail">✗ Cache API not supported</span>';
}
}
// Test 3: Test offline fallback
function testOffline() {
document.getElementById('offline-result').innerHTML = 'Testing... (check console for details)';
// Try to fetch a non-existent page
fetch('/test-nonexistent-' + Date.now())
.then(function(response) {
return response.text();
})
.then(function(text) {
if (text.includes('988') || text.includes('crisis')) {
document.getElementById('offline-result').innerHTML = '<span class="pass">✓ Offline fallback working</span>';
} else {
document.getElementById('offline-result').innerHTML = '<span class="fail">✗ Unexpected response: ' + text.substring(0, 100) + '</span>';
}
})
.catch(function(error) {
document.getElementById('offline-result').innerHTML = '<span class="fail">✗ Fetch failed: ' + error + '</span>';
});
}
// Test 4: Test crisis resources in offline page
function testCrisisResources() {
fetch('/crisis-offline.html')
.then(function(response) {
return response.text();
})
.then(function(html) {
var has988 = html.includes('tel:988');
var hasText = html.includes('sms:741741');
var hasGrounding = html.includes('5-4-3-2-1');
document.getElementById('test-988').innerHTML = has988 ?
'<span class="pass">✓ Found</span>' : '<span class="fail">✗ Missing</span>';
document.getElementById('test-text').innerHTML = hasText ?
'<span class="pass">✓ Found</span>' : '<span class="fail">✗ Missing</span>';
document.getElementById('test-grounding').innerHTML = hasGrounding ?
'<span class="pass">✓ Found</span>' : '<span class="fail">✗ Missing</span>';
})
.catch(function(error) {
document.getElementById('test-988').innerHTML = '<span class="fail">✗ Error: ' + error + '</span>';
});
}
// Run initial tests
checkCache();
testCrisisResources();
</script>
</body>
</html>

11
sw.js
View File

@@ -7,8 +7,7 @@ const PRECACHE_ASSETS = [
'/about.html',
'/manifest.json',
'/crisis-offline.html',
'/testimony.html',
'/sw-test.html' // Test page for verification
'/testimony.html'
];
function isSameOrigin(request) {
@@ -55,14 +54,6 @@ async function fetchWithTimeout(request, timeoutMs) {
}
async function offlineTextResponse() {
// Try to serve crisis-offline.html first
const cache = await caches.open(CACHE_NAME);
const crisisPage = await cache.match('/crisis-offline.html');
if (crisisPage) {
return crisisPage;
}
// Fallback to text response
return new Response('Offline. Call 988 or text HOME to 741741 for immediate help.', {
status: 503,
statusText: 'Service Unavailable',

View File

@@ -52,6 +52,34 @@ class TestCrisisOverlayFocusTrap(unittest.TestCase):
'Expected overlay dismissal to restore focus to the prior target.',
)
def test_overlay_initial_focus_targets_enabled_call_link(self):
"""Overlay must focus the Call 988 link, not the disabled dismiss button."""
# Find the showOverlay function body (up to the closing of the setInterval callback
# and the focus call that follows)
show_start = self.html.find('function showOverlay()')
self.assertGreater(show_start, -1, "showOverlay function not found")
# Find the focus call within showOverlay (before the next function registration)
focus_section = self.html[show_start:show_start + 2000]
self.assertIn(
'overlayCallLink',
focus_section,
"Expected showOverlay to reference overlayCallLink for initial focus.",
)
# Ensure the old buggy pattern is gone
focus_line_region = self.html[show_start + 800:show_start + 1200]
self.assertNotIn(
'overlayDismissBtn.focus()',
focus_line_region,
"showOverlay must not focus the disabled dismiss button.",
)
def test_overlay_call_link_variable_is_declared(self):
self.assertIn(
"querySelector('.overlay-call')",
self.html,
"Expected a JS reference to the .overlay-call link element.",
)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,102 +0,0 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestSafetyPlanSaveFeedback(unittest.TestCase):
"""Verify safety plan save feedback uses inline UI instead of blocking alerts."""
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_no_alert_calls_for_safety_plan(self):
"""Safety plan save should not use browser alert() dialogs."""
# Find the save handler
save_handler_match = re.search(
r'saveSafetyPlan\.addEventListener.*?\}\);',
self.html,
re.DOTALL
)
self.assertIsNotNone(save_handler_match, 'Expected saveSafetyPlan handler to exist.')
handler = save_handler_match.group(0)
self.assertNotIn('alert(', handler, 'Safety plan save handler should not contain alert() calls.')
def test_inline_status_element_exists(self):
"""Safety plan modal should have an inline status element."""
self.assertIn('id="sp-status"', self.html, 'Expected sp-status element in the modal.')
self.assertIn('aria-live="polite"', self.html, 'Expected sp-status to have aria-live="polite".')
def test_inline_status_shows_on_save(self):
"""Save handler should show the status element on success."""
save_handler_match = re.search(
r'saveSafetyPlan\.addEventListener.*?\}\);',
self.html,
re.DOTALL
)
handler = save_handler_match.group(0)
self.assertIn("statusEl.textContent = '✓ Safety plan saved locally.'", handler,
'Expected success message in status element.')
self.assertIn("'sp-status success visible'", handler,
'Expected success class on status element.')
def test_inline_status_shows_on_error(self):
"""Save handler should show the status element on error."""
save_handler_match = re.search(
r'saveSafetyPlan\.addEventListener.*?\}\);',
self.html,
re.DOTALL
)
handler = save_handler_match.group(0)
self.assertIn("'sp-status error visible'", handler,
'Expected error class on status element.')
self.assertIn("✗ Error saving plan", handler,
'Expected error message.')
def test_status_auto_hides_after_success(self):
"""Success status should auto-hide after a timeout."""
save_handler_match = re.search(
r'saveSafetyPlan\.addEventListener.*?\}\);',
self.html,
re.DOTALL
)
handler = save_handler_match.group(0)
self.assertIn('setTimeout', handler, 'Expected setTimeout for auto-hiding status.')
self.assertIn("statusEl.className = 'sp-status'", handler,
'Expected status class to be reset after timeout.')
def test_status_css_exists(self):
"""CSS for sp-status success and error states should exist."""
self.assertIn('.sp-status.success', self.html,
'Expected CSS for sp-status.success class.')
self.assertIn('.sp-status.error', self.html,
'Expected CSS for sp-status.error class.')
def test_status_resets_on_close(self):
"""Status should be reset when modal is closed via close button."""
close_handler_match = re.search(
r'closeSafetyPlan\.addEventListener.*?\}\);',
self.html,
re.DOTALL
)
handler = close_handler_match.group(0)
self.assertIn("statusEl.className = 'sp-status'", handler,
'Expected status class to be reset on close.')
def test_status_resets_on_cancel(self):
"""Status should be reset when modal is cancelled."""
cancel_handler_match = re.search(
r'cancelSafetyPlan\.addEventListener.*?\}\);',
self.html,
re.DOTALL
)
handler = cancel_handler_match.group(0)
self.assertIn("statusEl.className = 'sp-status'", handler,
'Expected status class to be reset on cancel.')
if __name__ == '__main__':
unittest.main()

View File

@@ -50,6 +50,22 @@ class TestCrisisOfflinePage(unittest.TestCase):
for phrase in required_phrases:
self.assertIn(phrase, self.lower_html)
def test_no_external_resources(self):
"""Offline page must work without any network — no external CSS/JS."""
import re
html = self.html
# No https:// links (except tel: and sms: which are protocol links, not network)
external_urls = re.findall(r'href=["\']https://|src=["\']https://', html)
self.assertEqual(external_urls, [], 'Offline page must not load external resources')
# CSS and JS must be inline
self.assertIn('<style>', html, 'CSS must be inline')
self.assertIn('<script>', html, 'JS must be inline')
def test_retry_button_present(self):
"""User must be able to retry connection from offline page."""
self.assertIn('retry-connection', self.html)
self.assertIn('Retry connection', self.html)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,277 @@
"""
Tests for crisis session tracking and escalation (P0 #35).
Covers: session_tracker.py
Run with: python -m pytest tests/test_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
from crisis.session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
class TestSessionState(unittest.TestCase):
"""Test SessionState defaults."""
def test_default_state(self):
s = SessionState()
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.is_escalating)
self.assertFalse(s.is_deescalating)
class TestSessionTracking(unittest.TestCase):
"""Test basic session state tracking."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_record_none_message(self):
state = self.tracker.record(detect_crisis("Hello Timmy"))
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.message_count, 1)
self.assertEqual(state.peak_level, "NONE")
def test_record_low_message(self):
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("Having a rough day"))
self.assertIn(state.current_level, ("LOW", "NONE"))
self.assertEqual(state.message_count, 2)
def test_record_critical_updates_peak(self):
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to kill myself"))
self.assertEqual(state.current_level, "CRITICAL")
self.assertEqual(state.peak_level, "CRITICAL")
def test_peak_preserved_after_drop(self):
"""Peak level should stay at the highest seen, even after de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
state = self.tracker.record(detect_crisis("I'm feeling a bit better"))
self.assertEqual(state.peak_level, "CRITICAL")
def test_level_history(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to die"))
self.assertEqual(len(state.level_history), 3)
self.assertEqual(state.level_history[0], "NONE")
self.assertEqual(state.level_history[2], "CRITICAL")
def test_reset_clears_state(self):
self.tracker.record(detect_crisis("I want to kill myself"))
self.tracker.reset()
state = self.tracker.state
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.peak_level, "NONE")
self.assertEqual(state.message_count, 0)
self.assertEqual(state.level_history, [])
class TestEscalationDetection(unittest.TestCase):
"""Test escalation detection: LOW → HIGH in ≤3 messages."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_escalation_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_escalating)
def test_no_escalation_stable(self):
"""Two normal messages should not trigger escalation."""
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("How are you?"))
self.assertFalse(state.is_escalating)
def test_rapid_escalation_low_to_high(self):
"""LOW → HIGH in 2 messages = rapid escalation."""
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I can't take this anymore, everything is pointless"))
# Depending on detection, this could be HIGH or CRITICAL
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_rapid_escalation_three_messages(self):
"""NONE → LOW → HIGH in 3 messages = escalation."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_escalation_rate(self):
"""Rate should be positive when escalating."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
state = self.tracker.state
self.assertGreater(state.escalation_rate, 0)
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: sustained LOW after HIGH/CRITICAL."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_deescalation_without_prior_crisis(self):
"""No de-escalation if never reached HIGH/CRITICAL."""
for _ in range(6):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_deescalating)
def test_deescalation_after_critical(self):
"""5+ consecutive LOW/NONE messages after CRITICAL = de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm doing better today"))
state = self.tracker.state
if state.peak_level == "CRITICAL":
self.assertTrue(state.is_deescalating)
def test_deescalation_after_high(self):
"""5+ consecutive LOW/NONE messages after HIGH = de-escalation."""
self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
for _ in range(5):
self.tracker.record(detect_crisis("Feeling okay"))
state = self.tracker.state
if state.peak_level == "HIGH":
self.assertTrue(state.is_deescalating)
def test_interrupted_deescalation(self):
"""De-escalation resets if a HIGH message interrupts."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(3):
self.tracker.record(detect_crisis("Doing better"))
# Interrupt with another crisis
self.tracker.record(detect_crisis("I feel hopeless again"))
self.tracker.record(detect_crisis("Feeling okay now"))
state = self.tracker.state
# Should NOT be de-escalating yet (counter reset)
self.assertFalse(state.is_deescalating)
class TestSessionModifier(unittest.TestCase):
"""Test system prompt modifier generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_modifier_for_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_no_modifier_for_stable_session(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Good morning"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_escalation_modifier(self):
"""Escalating session should produce a modifier."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_escalating:
self.assertIn("escalated", modifier.lower())
self.assertIn("NONE", modifier)
self.assertIn("CRITICAL", modifier)
def test_deescalation_modifier(self):
"""De-escalating session should mention stabilizing."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm feeling okay"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_deescalating:
self.assertIn("stabilizing", modifier.lower())
def test_prior_crisis_modifier(self):
"""Past crisis should be noted even without active escalation."""
self.tracker.record(detect_crisis("I want to die"))
self.tracker.record(detect_crisis("Feeling a bit better"))
modifier = self.tracker.get_session_modifier()
# Should note the prior CRITICAL
if modifier:
self.assertIn("CRITICAL", modifier)
class TestUIHints(unittest.TestCase):
"""Test UI hint generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_ui_hints_structure(self):
self.tracker.record(detect_crisis("Hello"))
hints = self.tracker.get_ui_hints()
self.assertIn("session_escalating", hints)
self.assertIn("session_deescalating", hints)
self.assertIn("session_peak_level", hints)
self.assertIn("session_message_count", hints)
def test_ui_hints_escalation_warning(self):
"""Escalating session should have warning hint."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
hints = self.tracker.get_ui_hints()
if hints["session_escalating"]:
self.assertTrue(hints.get("escalation_warning"))
self.assertIn("suggested_action", hints)
class TestCheckCrisisWithSession(unittest.TestCase):
"""Test the convenience function combining detection + session tracking."""
def test_returns_combined_data(self):
tracker = CrisisSessionTracker()
result = check_crisis_with_session("I want to die", tracker)
self.assertIn("level", result)
self.assertIn("session", result)
self.assertIn("current_level", result["session"])
self.assertIn("peak_level", result["session"])
self.assertIn("modifier", result["session"])
def test_session_updates_across_calls(self):
tracker = CrisisSessionTracker()
check_crisis_with_session("Hello", tracker)
result = check_crisis_with_session("I want to die", tracker)
self.assertEqual(result["session"]["message_count"], 2)
self.assertEqual(result["session"]["peak_level"], "CRITICAL")
class TestPrivacy(unittest.TestCase):
"""Verify privacy-first design principles."""
def test_no_persistence_mechanism(self):
"""Session tracker should have no database, file, or network calls."""
import inspect
source = inspect.getsource(CrisisSessionTracker)
# Should not import database, requests, or file I/O
forbidden = ["sqlite", "requests", "urllib", "open(", "httpx", "aiohttp"]
for word in forbidden:
self.assertNotIn(word, source.lower(),
f"Session tracker should not use {word} — privacy-first design")
def test_state_contained_in_memory(self):
"""All state should be instance attributes, not module-level."""
tracker = CrisisSessionTracker()
tracker.record(detect_crisis("I want to die"))
# New tracker should have clean state (no global contamination)
fresh = CrisisSessionTracker()
self.assertEqual(fresh.state.current_level, "NONE")
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,19 @@
from crisis.gateway import check_crisis_multimodal
from crisis.session_tracker import CrisisSessionTracker
def test_multimodal_gateway_uses_unified_score_for_988_ui(tmp_path):
tracker = CrisisSessionTracker()
result = check_crisis_multimodal(
"I want to kill myself tonight",
tracker=tracker,
voice_score=0.92,
image_score=0.6,
audit_log_path=tmp_path / "audit.jsonl",
)
assert result["unified"]["level"] == "CRITICAL"
assert result["ui"]["provide_988"] is True
assert result["ui"]["show_overlay"] is True
assert result["unified"]["modalities"]["voice"] == 0.92
assert result["unified"]["modalities"]["behavioral"] >= 0.0

View File

@@ -0,0 +1,51 @@
from pathlib import Path
from unified_scorer import (
CrisisLevel,
UnifiedCrisisScorer,
UnifiedScoreAuditLog,
behavioral_score_from_session,
)
from crisis.session_tracker import SessionState
def test_unified_scorer_renormalizes_available_modalities_and_escalates():
scorer = UnifiedCrisisScorer()
assessment = scorer.score(
text_score=1.0,
voice_score=0.8,
image_score=None,
behavioral_score=0.7,
)
assert assessment.level is CrisisLevel.CRITICAL
assert assessment.combined_score > 0.8
assert assessment.present_modalities == ["text", "voice", "behavioral"]
def test_behavioral_score_rises_for_escalating_session_state():
session = SessionState(
current_level="HIGH",
peak_level="CRITICAL",
message_count=4,
level_history=["LOW", "MEDIUM", "HIGH", "CRITICAL"],
is_escalating=True,
is_deescalating=False,
escalation_rate=1.0,
consecutive_low_messages=0,
)
assert behavioral_score_from_session(session) >= 0.8
def test_audit_log_persists_anonymized_score_entries(tmp_path):
log_path = tmp_path / "unified-score-audit.jsonl"
scorer = UnifiedCrisisScorer(audit_log=UnifiedScoreAuditLog(log_path))
scorer.score(text_score=0.75, voice_score=0.2, image_score=0.1, behavioral_score=0.6, source_text="I feel trapped and hopeless")
lines = log_path.read_text().strip().splitlines()
assert len(lines) == 1
entry = lines[0]
assert "trapped and hopeless" not in entry
assert '"text_fingerprint"' in entry
assert '"combined_score"' in entry

126
unified_scorer.py Normal file
View File

@@ -0,0 +1,126 @@
"""Unified multimodal crisis scoring for the-door."""
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from crisis.session_tracker import SessionState
SCORE_BY_LEVEL = {"NONE": 0.0, "LOW": 0.25, "MEDIUM": 0.5, "HIGH": 0.75, "CRITICAL": 1.0}
LEVEL_RANK = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
class CrisisLevel(Enum):
NONE = "NONE"
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
DEFAULT_WEIGHTS: Dict[str, float] = {
"text": 0.4,
"voice": 0.25,
"behavioral": 0.2,
"image": 0.15,
}
@dataclass
class UnifiedAssessment:
level: CrisisLevel
combined_score: float
weights: Dict[str, float]
modalities: Dict[str, Optional[float]]
present_modalities: List[str]
class UnifiedScoreAuditLog:
def __init__(self, path: Path | str):
self.path = Path(path)
def record(self, assessment: UnifiedAssessment, source_text: str = "") -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
fingerprint = hashlib.sha256(source_text.encode("utf-8")).hexdigest()[:12] if source_text else None
payload = {
"level": assessment.level.value,
"combined_score": round(assessment.combined_score, 4),
"weights": assessment.weights,
"modalities": assessment.modalities,
"present_modalities": assessment.present_modalities,
"text_fingerprint": fingerprint,
}
with self.path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(payload, sort_keys=True) + "\n")
class UnifiedCrisisScorer:
def __init__(self, weights: Optional[Dict[str, float]] = None, audit_log: Optional[UnifiedScoreAuditLog] = None):
self.weights = dict(DEFAULT_WEIGHTS)
if weights:
self.weights.update(weights)
self.audit_log = audit_log
def _normalize(self, modalities: Dict[str, Optional[float]]) -> Dict[str, float]:
present = [name for name, score in modalities.items() if score is not None]
if not present:
return {}
total = sum(self.weights[name] for name in present)
return {name: self.weights[name] / total for name in present}
def _level_for_score(self, score: float) -> CrisisLevel:
if score > 0.8:
return CrisisLevel.CRITICAL
if score > 0.6:
return CrisisLevel.HIGH
if score > 0.4:
return CrisisLevel.MEDIUM
if score > 0.0:
return CrisisLevel.LOW
return CrisisLevel.NONE
def score(
self,
*,
text_score: Optional[float],
voice_score: Optional[float] = None,
image_score: Optional[float] = None,
behavioral_score: Optional[float] = None,
source_text: str = "",
) -> UnifiedAssessment:
modalities = {
"text": text_score,
"voice": voice_score,
"behavioral": behavioral_score,
"image": image_score,
}
normalized = self._normalize(modalities)
combined = 0.0
for name, weight in normalized.items():
combined += float(modalities[name]) * weight
assessment = UnifiedAssessment(
level=self._level_for_score(combined),
combined_score=combined,
weights=normalized,
modalities=modalities,
present_modalities=[name for name, score in modalities.items() if score is not None],
)
if self.audit_log:
self.audit_log.record(assessment, source_text=source_text)
return assessment
def behavioral_score_from_session(session: 'SessionState') -> float:
current = SCORE_BY_LEVEL.get(session.current_level, 0.0)
peak_bonus = 0.1 if LEVEL_RANK.get(session.peak_level, 0) >= LEVEL_RANK["HIGH"] else 0.0
escalation_bonus = 0.15 if session.is_escalating else 0.0
rate_bonus = min(max(session.escalation_rate, 0.0), 1.0) * 0.1
deescalation_penalty = 0.15 if session.is_deescalating else 0.0
return max(0.0, min(1.0, current + peak_bonus + escalation_bonus + rate_bonus - deescalation_penalty))