Compare commits

..

2 Commits

Author SHA1 Message Date
b76bc4e517 test: add offline crisis resources verification (#98)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 10s
Smoke Test / smoke (pull_request) Successful in 24s
Verifies:
- Service worker precaches crisis-offline.html
- Offline page has 988 and Crisis Text Line links
- Offline page has local crisis resources
- Page is self-contained (no external deps)
- Offline fallback path is configured

Closes #98
2026-04-15 03:28:40 +00:00
5d1d0dc838 feat: add local crisis resources to offline page (#98)
Added a 'More crisis lines' section with:
- National Domestic Violence Hotline
- Trevor Project (LGBTQ youth)
- Veterans Crisis Line
- SAMHSA Helpline (substance use)
- Trans Lifeline

All are free, confidential, and available 24/7.
Closes #98
2026-04-15 03:28:38 +00:00
29 changed files with 109 additions and 2738 deletions

View File

@@ -12,7 +12,7 @@ VPS := alexanderwhitestone.com
DOMAIN := alexanderwhitestone.com
DEPLOY_DIR := deploy
.PHONY: help deploy deploy-bash check ssl push service metrics
.PHONY: help deploy deploy-bash check ssl push service
help:
@echo "The Door — Deployment Commands"
@@ -23,8 +23,6 @@ help:
@echo " make check Check deployment status"
@echo " make ssl Setup SSL on VPS"
@echo " make service Install/restart hermes-gateway service"
@echo " make metrics View crisis metrics summary"
@echo " make metrics-json Export crisis metrics as JSON"
@echo ""
deploy:
@@ -48,9 +46,3 @@ ssl:
service:
ssh root@$(VPS) "cd /opt/the-door && bash deploy/deploy.sh --service"
metrics:
python3 -m crisis.metrics --summary
metrics-json:
python3 -m crisis.metrics --json

View File

@@ -206,6 +206,18 @@
</section>
</div>
<section class="panel" aria-labelledby="resources-title">
<h2 class="section-title" id="resources-title">More crisis lines</h2>
<ul>
<li><strong>National Domestic Violence Hotline</strong> — call 1-800-799-7233 or text START to 88788</li>
<li><strong>Trevor Project</strong> (LGBTQ youth) — call 1-866-488-7386 or text START to 678-678</li>
<li><strong>Veterans Crisis Line</strong> — call 988 then press 1, or text 838255</li>
<li><strong>SAMHSA Helpline</strong> (substance use) — call 1-800-662-4357</li>
<li><strong>Trans Lifeline</strong> — call 877-565-8860</li>
</ul>
<p class="small" style="margin-top: 14px;">All lines are free, confidential, and available 24/7.</p>
</section>
<section class="panel" aria-labelledby="hope-title">
<h2 class="section-title" id="hope-title">Stay through the next ten minutes</h2>
<p>Do not solve your whole life right now. Stay for the next breath. Then the next one.</p>

View File

@@ -7,8 +7,6 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
from .metrics import CrisisMetrics, AggregateMetrics
__all__ = [
"detect_crisis",
@@ -21,9 +19,4 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
"CrisisMetrics",
"AggregateMetrics",
]

View File

@@ -1,19 +0,0 @@
"""Compatibility bridge for the-door crisis gateway.
Issue #141 describes the shared bridge API as `crisis.bridge`.
The canonical implementation lives in `crisis.gateway`. Re-export the public
entrypoints here so downstream hermes-agent wiring can import the stable name
without copying logic out of the-door.
"""
from .gateway import (
check_crisis,
get_system_prompt,
format_gateway_response,
)
__all__ = [
"check_crisis",
"get_system_prompt",
"format_gateway_response",
]

View File

@@ -104,9 +104,13 @@ MEDIUM_INDICATORS = [
r"\blost\s+all\s+hope\b",
r"\bno\s+tomorrow\b",
# Contextual versions (from crisis_detector.py legacy)
# Keep only medium-only patterns here; stronger overlaps live in HIGH_INDICATORS.
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
r"\bfeel(?:s|ing)?\s+trapped\b",
r"\bfeel(?:s|ing)?\s+desperate\b",
r"\bno\s+future\s+(?:for\s+me|ahead|left)\b",
r"\bnothing\s+left\s+(?:to\s+(?:live|hope)\s+for|inside)\b",
r"\bgive(?:n)?\s*up\s+on\s+myself\b",
]
LOW_INDICATORS = [

View File

@@ -22,7 +22,6 @@ from .response import (
get_system_prompt_modifier,
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
def check_crisis(text: str) -> dict:

View File

@@ -1,244 +0,0 @@
"""
crisis/metrics.py — Aggregate crisis detection metrics.
Tracks session-level crisis data for aggregate reporting.
Privacy-first: stores only aggregate counts, never user content.
Usage:
from crisis.metrics import CrisisMetrics
metrics = CrisisMetrics()
metrics.record_session(tracker.state)
summary = metrics.get_summary()
"""
import json
import os
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
METRICS_DIR = Path.home() / ".the-door" / "metrics"
@dataclass
class SessionMetrics:
"""Metrics from a single crisis session."""
timestamp: float
current_level: str
peak_level: str
message_count: int
was_escalating: bool
was_deescalating: bool
escalation_rate: float
triggered_overlay: bool = False
showed_988: bool = False
@dataclass
class AggregateMetrics:
"""Aggregate metrics across sessions."""
total_sessions: int = 0
total_messages: int = 0
# Level distribution
level_counts: Dict[str, int] = field(default_factory=lambda: {
"NONE": 0, "LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0
})
# Escalation tracking
escalating_sessions: int = 0
deescalating_sessions: int = 0
# Safety interventions
overlay_triggers: int = 0
ninety_eight_show: int = 0
# Time window
period_start: Optional[float] = None
period_end: Optional[float] = None
class CrisisMetrics:
"""
Aggregate crisis metrics with local JSON persistence.
Privacy-first: stores only aggregate counts per day.
Never stores user messages, content, or identifying info.
"""
def __init__(self, metrics_dir: Optional[Path] = None):
self.metrics_dir = metrics_dir or METRICS_DIR
self.metrics_dir.mkdir(parents=True, exist_ok=True)
self._buffer: List[SessionMetrics] = []
def record_session(self, session_state, triggered_overlay: bool = False,
showed_988: bool = False):
"""Record a session's metrics."""
from .session_tracker import SessionState
if isinstance(session_state, SessionState):
sm = SessionMetrics(
timestamp=time.time(),
current_level=session_state.current_level,
peak_level=session_state.peak_level,
message_count=session_state.message_count,
was_escalating=session_state.is_escalating,
was_deescalating=session_state.is_deescalating,
escalation_rate=session_state.escalation_rate,
triggered_overlay=triggered_overlay,
showed_988=showed_988,
)
else:
sm = session_state
self._buffer.append(sm)
self._flush()
def _flush(self):
"""Write buffered sessions to daily file."""
if not self._buffer:
return
today = datetime.utcnow().strftime("%Y-%m-%d")
filepath = self.metrics_dir / f"{today}.jsonl"
with open(filepath, 'a') as f:
for sm in self._buffer:
f.write(json.dumps(asdict(sm)) + '\n')
self._buffer.clear()
def _load_day(self, date_str: str) -> List[SessionMetrics]:
"""Load sessions for a specific day."""
filepath = self.metrics_dir / f"{date_str}.jsonl"
if not filepath.exists():
return []
sessions = []
with open(filepath) as f:
for line in f:
if line.strip():
data = json.loads(line)
sessions.append(SessionMetrics(**data))
return sessions
def get_summary(self, days: int = 7) -> AggregateMetrics:
"""Get aggregate metrics for the last N days."""
agg = AggregateMetrics()
now = datetime.utcnow()
for i in range(days):
date = (now - timedelta(days=i)).strftime("%Y-%m-%d")
sessions = self._load_day(date)
for sm in sessions:
agg.total_sessions += 1
agg.total_messages += sm.message_count
# Level counts (use peak level)
level = sm.peak_level
agg.level_counts[level] = agg.level_counts.get(level, 0) + 1
if sm.was_escalating:
agg.escalating_sessions += 1
if sm.was_deescalating:
agg.deescalating_sessions += 1
if sm.triggered_overlay:
agg.overlay_triggers += 1
if sm.showed_988:
agg.ninety_eight_show += 1
# Time window
if agg.period_start is None or sm.timestamp < agg.period_start:
agg.period_start = sm.timestamp
if agg.period_end is None or sm.timestamp > agg.period_end:
agg.period_end = sm.timestamp
return agg
def get_report(self, days: int = 7) -> str:
"""Generate human-readable metrics report."""
agg = self.get_summary(days)
lines = []
lines.append("=" * 50)
lines.append(" CRISIS METRICS REPORT")
lines.append(f" Last {days} days")
if agg.period_start:
start = datetime.fromtimestamp(agg.period_start).strftime("%Y-%m-%d %H:%M")
lines.append(f" Period: {start} → now")
lines.append("=" * 50)
lines.append(f"\n Sessions: {agg.total_sessions}")
lines.append(f" Messages tracked: {agg.total_messages}")
lines.append(f"\n Level Distribution (by peak):")
for level in ["NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"]:
count = agg.level_counts.get(level, 0)
pct = (count / agg.total_sessions * 100) if agg.total_sessions > 0 else 0
bar = "" * int(pct / 5)
lines.append(f" {level:<10} {count:>5} ({pct:>5.1f}%) {bar}")
lines.append(f"\n Escalations: {agg.escalating_sessions}")
lines.append(f" De-escalations: {agg.deescalating_sessions}")
lines.append(f" Overlay triggers: {agg.overlay_triggers}")
lines.append(f" 988 shown: {agg.ninety_eight_show}")
if agg.total_sessions > 0:
escalation_rate = agg.escalating_sessions / agg.total_sessions * 100
lines.append(f"\n Escalation rate: {escalation_rate:.1f}%")
lines.append("=" * 50)
return "\n".join(lines)
def get_json(self, days: int = 7) -> str:
"""Export metrics as JSON."""
agg = self.get_summary(days)
return json.dumps(asdict(agg), indent=2)
def main():
"""CLI entry point for crisis metrics."""
import argparse
parser = argparse.ArgumentParser(description="Crisis Detection Metrics")
parser.add_argument("--summary", action="store_true", help="Show summary report")
parser.add_argument("--json", action="store_true", help="JSON export")
parser.add_argument("--days", type=int, default=7, help="Days to include")
parser.add_argument("--demo", action="store_true", help="Generate demo data")
args = parser.parse_args()
metrics = CrisisMetrics()
if args.demo:
import random
levels = ["NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"]
for i in range(50):
from .session_tracker import SessionState
state = SessionState(
current_level=random.choice(levels),
peak_level=random.choice(levels),
message_count=random.randint(1, 20),
is_escalating=random.random() > 0.7,
is_deescalating=random.random() > 0.8,
escalation_rate=random.random(),
)
metrics.record_session(
state,
triggered_overlay=random.random() > 0.8,
showed_988=random.random() > 0.7,
)
print("Generated 50 demo sessions.")
if args.json:
print(metrics.get_json(args.days))
else:
print(metrics.get_report(args.days))
if __name__ == "__main__":
main()

View File

@@ -1,259 +0,0 @@
"""
Session-level crisis tracking and escalation for the-door (P0 #35).
Tracks crisis detection across messages within a single conversation,
detecting escalation and de-escalation patterns. Privacy-first: no
persistence beyond the conversation session.
Each message is analyzed in isolation by detect.py, but this module
maintains session state so the system can recognize patterns like:
- "I'm fine""I'm struggling""I can't go on" (rapid escalation)
- "I want to die""I'm calmer now""feeling better" (de-escalation)
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
# Feed each message's detection result
state = tracker.record(detect_crisis("I'm having a tough day"))
print(state.current_level) # "LOW"
print(state.is_escalating) # False
state = tracker.record(detect_crisis("I feel hopeless"))
print(state.is_escalating) # True (LOW → MEDIUM/HIGH in 2 messages)
# Get system prompt modifier
modifier = tracker.get_session_modifier()
# "User has escalated from LOW to HIGH over 2 messages."
# Reset for new session
tracker.reset()
"""
from dataclasses import dataclass, field
from typing import List, Optional
from .detect import CrisisDetectionResult, SCORES
# Level ordering for comparison (higher = more severe)
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class SessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[str] = field(default_factory=list)
is_escalating: bool = False
is_deescalating: bool = False
escalation_rate: float = 0.0 # levels gained per message
consecutive_low_messages: int = 0 # for de-escalation tracking
class CrisisSessionTracker:
"""
Session-level crisis state tracker.
Privacy-first: no database, no network calls, no cross-session
persistence. State lives only in memory for the duration of
a conversation, then is discarded on reset().
"""
# Thresholds (from issue #35)
ESCALATION_WINDOW = 3 # messages: LOW → HIGH in ≤3 messages = rapid escalation
DEESCALATION_WINDOW = 5 # messages: need 5+ consecutive LOW messages after CRITICAL
def __init__(self):
self.reset()
def reset(self):
"""Reset all session state. Call on new conversation."""
self._current_level = "NONE"
self._peak_level = "NONE"
self._message_count = 0
self._level_history: List[str] = []
self._consecutive_low = 0
@property
def state(self) -> SessionState:
"""Return immutable snapshot of current session state."""
is_escalating = self._detect_escalation()
is_deescalating = self._detect_deescalation()
rate = self._compute_escalation_rate()
return SessionState(
current_level=self._current_level,
peak_level=self._peak_level,
message_count=self._message_count,
level_history=list(self._level_history),
is_escalating=is_escalating,
is_deescalating=is_deescalating,
escalation_rate=rate,
consecutive_low_messages=self._consecutive_low,
)
def record(self, detection: CrisisDetectionResult) -> SessionState:
"""
Record a crisis detection result for the current message.
Returns updated SessionState.
"""
level = detection.level
self._message_count += 1
self._level_history.append(level)
# Update peak
if LEVEL_ORDER.get(level, 0) > LEVEL_ORDER.get(self._peak_level, 0):
self._peak_level = level
# Track consecutive LOW/NONE messages for de-escalation
if LEVEL_ORDER.get(level, 0) <= LEVEL_ORDER["LOW"]:
self._consecutive_low += 1
else:
self._consecutive_low = 0
self._current_level = level
return self.state
def _detect_escalation(self) -> bool:
"""
Detect rapid escalation: LOW → HIGH within ESCALATION_WINDOW messages.
Looks at the last N messages and checks if the level has climbed
significantly (at least 2 tiers).
"""
if len(self._level_history) < 2:
return False
window = self._level_history[-self.ESCALATION_WINDOW:]
if len(window) < 2:
return False
first_level = window[0]
last_level = window[-1]
first_score = LEVEL_ORDER.get(first_level, 0)
last_score = LEVEL_ORDER.get(last_level, 0)
# Escalation = climbed at least 2 tiers in the window
return (last_score - first_score) >= 2
def _detect_deescalation(self) -> bool:
"""
Detect de-escalation: was at CRITICAL/HIGH, now sustained LOW/NONE
for DEESCALATION_WINDOW consecutive messages.
"""
if LEVEL_ORDER.get(self._peak_level, 0) < LEVEL_ORDER["HIGH"]:
return False
return self._consecutive_low >= self.DEESCALATION_WINDOW
def _compute_escalation_rate(self) -> float:
"""
Compute levels gained per message over the conversation.
Positive = escalating, negative = de-escalating, 0 = stable.
"""
if self._message_count < 2:
return 0.0
first = LEVEL_ORDER.get(self._level_history[0], 0)
current = LEVEL_ORDER.get(self._current_level, 0)
return (current - first) / (self._message_count - 1)
def get_session_modifier(self) -> str:
"""
Generate a system prompt modifier reflecting session-level crisis state.
Returns empty string if no session context is relevant.
"""
if self._message_count < 2:
return ""
s = self.state
if s.is_escalating:
return (
f"User has escalated from {self._level_history[0]} to "
f"{s.current_level} over {s.message_count} messages. "
f"Peak crisis level this session: {s.peak_level}. "
"Respond with heightened awareness. The trajectory is "
"worsening — prioritize safety and connection."
)
if s.is_deescalating:
return (
f"User previously reached {s.peak_level} crisis level "
f"but has been at {s.current_level} or below for "
f"{s.consecutive_low_messages} consecutive messages. "
"The situation appears to be stabilizing. Continue "
"supportive engagement while remaining vigilant."
)
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
return (
f"User previously reached {s.peak_level} crisis level "
f"this session (currently {s.current_level}). "
"Continue with care and awareness of the earlier crisis."
)
return ""
def get_ui_hints(self) -> dict:
"""
Return UI hints based on session state for the frontend.
These are advisory — the frontend decides what to show.
"""
s = self.state
hints = {
"session_escalating": s.is_escalating,
"session_deescalating": s.is_deescalating,
"session_peak_level": s.peak_level,
"session_message_count": s.message_count,
}
if s.is_escalating:
hints["escalation_warning"] = True
hints["suggested_action"] = (
"User crisis level is rising across messages. "
"Consider increasing intervention level."
)
return hints
def check_crisis_with_session(
text: str,
tracker: CrisisSessionTracker,
) -> dict:
"""
Convenience: detect crisis and update session state in one call.
Returns combined single-message detection + session-level context.
"""
from .detect import detect_crisis
from .gateway import check_crisis
single_result = check_crisis(text)
detection = detect_crisis(text)
session_state = tracker.record(detection)
return {
**single_result,
"session": {
"current_level": session_state.current_level,
"peak_level": session_state.peak_level,
"message_count": session_state.message_count,
"is_escalating": session_state.is_escalating,
"is_deescalating": session_state.is_deescalating,
"modifier": tracker.get_session_modifier(),
"ui_hints": tracker.get_ui_hints(),
},
}

View File

@@ -1,19 +0,0 @@
"""Compatibility bridge for crisis session tracking.
Issue #141 describes the shared the-door tracker surface as `crisis.tracker`.
The canonical implementation lives in `crisis.session_tracker`, but hermes-agent
integration should be able to import the shorter path without caring about
internal file layout.
"""
from .session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
__all__ = [
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
]

View File

@@ -1,32 +0,0 @@
# Issue #41 Verification
Status: already implemented on main
Issue:
- #41 [P3] Service worker: cache crisis resources for offline
Acceptance criteria check:
1. Passed — offline page includes 988 call button, Crisis Text Line, and grounding techniques
2. Passed — service worker precaches `crisis-offline.html`
3. Passed — phone number is clickable via `href="tel:988"`
4. Passed — navigation timeout + offline fallback support intermittent connections
Evidence:
- `sw.js` precaches `/crisis-offline.html`
- `sw.js` contains `NAVIGATION_TIMEOUT_MS` and `AbortController`
- `crisis-offline.html` contains:
- `href="tel:988"`
- Crisis Text Line / `741741`
- grounding prompts for 5-4-3-2-1
- `tests/test_service_worker_offline.py` already exists on main and passes
Verification run from fresh main clone:
- `python3 -m pytest tests/test_service_worker_offline.py -q`
- `10 passed`
Prior implementation trail:
- Issue comment already states the core implementation is complete on main
- Closed PR #122 documented the already-complete implementation and added the offline guarantees test coverage that is now present on main
Recommendation:
- Close issue #41 as already implemented on main.

View File

@@ -1,195 +1 @@
"""Crisis synthesizer — learn from anonymized crisis interactions.
This is deliberately simple and privacy-preserving. It does not train a model or
modify detection rules automatically. It only logs metadata, summarizes patterns,
and suggests human-reviewed keyword weight adjustments.
"""
from __future__ import annotations
import argparse
import json
import time
from collections import Counter, defaultdict
from pathlib import Path
from typing import Iterable
DEFAULT_LOG_PATH = Path.home() / ".the-door" / "crisis-interactions.jsonl"
LEVELS = ("NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL")
def build_interaction_event(
level: str,
indicators: list[str],
response_given: str,
continued_conversation: bool,
false_positive: bool,
*,
now: float | None = None,
) -> dict:
return {
"timestamp": float(time.time() if now is None else now),
"level": level,
"indicators": list(indicators),
"indicator_count": len(indicators),
"response_given": response_given,
"continued_conversation": bool(continued_conversation),
"false_positive": bool(false_positive),
}
def append_interaction_event(
log_path: str | Path,
*,
level: str,
indicators: list[str],
response_given: str,
continued_conversation: bool,
false_positive: bool,
now: float | None = None,
) -> dict:
event = build_interaction_event(
level,
indicators,
response_given,
continued_conversation,
false_positive,
now=now,
)
path = Path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event) + "\n")
return event
def load_interaction_events(log_path: str | Path) -> list[dict]:
path = Path(log_path)
if not path.exists():
return []
events = []
for line in path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
events.append(json.loads(line))
return events
def summarize_keywords(events: Iterable[dict]) -> list[dict]:
counts: Counter[str] = Counter()
for event in events:
counts.update(event.get("indicators", []))
return [{"keyword": keyword, "count": count} for keyword, count in counts.most_common(10)]
def suggest_keyword_adjustments(events: Iterable[dict], *, min_observations: int = 5) -> list[dict]:
stats: dict[str, dict[str, int]] = defaultdict(lambda: {
"observations": 0,
"true_positive_count": 0,
"false_positive_count": 0,
"continued_conversation_count": 0,
})
for event in events:
for keyword in event.get("indicators", []):
bucket = stats[keyword]
bucket["observations"] += 1
if event.get("false_positive"):
bucket["false_positive_count"] += 1
else:
bucket["true_positive_count"] += 1
if event.get("continued_conversation"):
bucket["continued_conversation_count"] += 1
suggestions = []
for keyword, bucket in sorted(stats.items()):
if bucket["observations"] < min_observations:
continue
fp = bucket["false_positive_count"]
tp = bucket["true_positive_count"]
if fp >= min_observations and tp == 0:
adjustment = "lower_weight"
rationale = "Observed only false positives across the sample window."
elif tp >= min_observations and fp == 0:
adjustment = "raise_weight"
rationale = "Observed repeated genuine crises with no false positives."
else:
adjustment = "observe"
rationale = "Mixed evidence; keep monitoring before changing weights."
suggestions.append(
{
"keyword": keyword,
**bucket,
"suggested_adjustment": adjustment,
"rationale": rationale,
}
)
return suggestions
def build_weekly_report(
events: Iterable[dict],
*,
now: float | None = None,
window_days: int = 7,
min_observations: int = 3,
) -> dict:
current_time = float(time.time() if now is None else now)
cutoff = current_time - (window_days * 86400)
filtered = [event for event in events if float(event.get("timestamp", 0)) >= cutoff]
detections_per_level = {level: 0 for level in LEVELS}
detected_events = []
continued_after_intervention = 0
for event in filtered:
level = event.get("level", "NONE")
detections_per_level[level] = detections_per_level.get(level, 0) + 1
if level != "NONE":
detected_events.append(event)
if event.get("continued_conversation"):
continued_after_intervention += 1
false_positive_count = sum(1 for event in detected_events if event.get("false_positive"))
false_positive_estimate = false_positive_count / len(detected_events) if detected_events else 0.0
return {
"window_days": window_days,
"total_events": len(filtered),
"detections_per_level": detections_per_level,
"most_common_keywords": summarize_keywords(filtered),
"false_positive_estimate": false_positive_estimate,
"continued_after_intervention": continued_after_intervention,
"keyword_weight_suggestions": suggest_keyword_adjustments(filtered, min_observations=min_observations),
}
def render_weekly_report(summary: dict) -> str:
return json.dumps(summary, indent=2)
def write_weekly_report(output_path: str | Path, summary: dict) -> Path:
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(render_weekly_report(summary) + "\n", encoding="utf-8")
return path
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Summarize anonymized crisis interactions")
parser.add_argument("--log-path", default=str(DEFAULT_LOG_PATH), help="JSONL crisis interaction log")
parser.add_argument("--days", type=int, default=7, help="Lookback window in days")
parser.add_argument("--min-observations", type=int, default=3, help="Minimum observations before suggesting keyword adjustments")
parser.add_argument("--output", help="Optional file to write the weekly report JSON")
args = parser.parse_args(argv)
events = load_interaction_events(args.log_path)
summary = build_weekly_report(events, window_days=args.days, min_observations=args.min_observations)
rendered = render_weekly_report(summary)
print(rendered)
if args.output:
write_weekly_report(args.output, summary)
return 0
if __name__ == "__main__":
raise SystemExit(main())
...

View File

@@ -1,170 +0,0 @@
"""
image_screening.py — local image crisis screening slice for epic #130.
Grounded scope:
- screens OCR text, upstream object labels, and operator notes for crisis signals
- intentionally does NOT claim raw computer-vision understanding of pixels
- designed to plug into future multimodal scoring once a dedicated image model lands
"""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from typing import Iterable, List, Optional
from crisis.detect import detect_crisis
DIRECT_SELF_HARM_LABELS = {
"blood",
"blade",
"razor",
"knife",
"scissors",
"noose",
"ligature",
"hanging",
"pills",
"pill bottle",
"overdose",
"gun",
"firearm",
"rope",
"cuts",
"self-harm",
}
INJURY_LABELS = {
"wound",
"wounds",
"bruise",
"bruises",
"bandage",
"bandages",
"injury",
"injuries",
"scar",
"scars",
"burn",
"burns",
"bleeding",
}
HIGH_RISK_SCENE_LABELS = {
"bridge edge",
"rooftop edge",
"train tracks",
"ledge",
"cliff edge",
"dark room",
"bathroom floor",
"hospital wristband",
"suicide note",
"goodbye letter",
}
@dataclass
class ImageScreeningResult:
ocr_text: str = ""
labels: List[str] = field(default_factory=list)
visual_flags: List[str] = field(default_factory=list)
distress_score: float = 0.0
requires_human_review: bool = False
signals_detected: List[str] = field(default_factory=list)
grounded_scope: str = (
"heuristic screening over OCR text, upstream labels, and operator notes; "
"raw vision-model inference is not implemented in this slice"
)
def to_dict(self) -> dict:
return asdict(self)
def _normalize_items(values: Optional[Iterable[str]]) -> List[str]:
if not values:
return []
normalized = []
for value in values:
text = str(value).strip().lower()
if text:
normalized.append(text)
return normalized
def _match_keywords(haystack: str, keywords: set[str]) -> List[str]:
matches = []
for keyword in keywords:
if keyword in haystack:
matches.append(keyword)
return sorted(set(matches))
def screen_image_signals(
image_path: Optional[str] = None,
*,
ocr_text: str = "",
labels: Optional[Iterable[str]] = None,
manual_notes: str = "",
visual_flags: Optional[Iterable[str]] = None,
) -> ImageScreeningResult:
"""
Score image-related crisis evidence without pretending to do full CV.
Inputs are deliberately grounded in what the repo can actually support today:
- OCR text extracted upstream from screenshots/photos
- object labels supplied by a local model or operator
- manual operator notes about visible scene context
- optional visual flags from any upstream preprocessor
"""
normalized_labels = _normalize_items(labels)
normalized_flags = _normalize_items(visual_flags)
normalized_notes = str(manual_notes or "").strip().lower()
normalized_ocr = str(ocr_text or "").strip()
combined_label_text = " ".join(normalized_labels + normalized_flags + ([normalized_notes] if normalized_notes else []))
crisis_text = " ".join(part for part in [normalized_ocr, normalized_notes] if part).strip()
direct_matches = _match_keywords(combined_label_text, DIRECT_SELF_HARM_LABELS)
injury_matches = _match_keywords(combined_label_text, INJURY_LABELS)
scene_matches = _match_keywords(combined_label_text, HIGH_RISK_SCENE_LABELS)
text_detection = detect_crisis(crisis_text) if crisis_text else None
signals: List[str] = []
score = 0.0
if direct_matches:
score = max(score, 0.85)
for match in direct_matches:
signals.append(f"direct_self_harm_label:{match}")
if injury_matches:
score = max(score, 0.55)
for match in injury_matches:
signals.append(f"injury_indicator:{match}")
if scene_matches:
score = max(score, 0.4)
for match in scene_matches:
signals.append(f"high_risk_scene:{match}")
if text_detection and text_detection.level != "NONE":
score = max(score, min(1.0, text_detection.score))
signals.append(f"ocr_crisis_level:{text_detection.level}")
for indicator in text_detection.indicators[:3]:
signals.append(f"ocr_indicator:{indicator}")
if direct_matches and text_detection and text_detection.level in {"HIGH", "CRITICAL"}:
score = min(1.0, max(score, 0.95))
signals.append("cross_modal_confirmation:text_plus_visual")
requires_human_review = score >= 0.4 or bool(direct_matches)
return ImageScreeningResult(
ocr_text=normalized_ocr,
labels=list(normalized_labels),
visual_flags=list(normalized_flags),
distress_score=round(score, 4),
requires_human_review=requires_human_review,
signals_detected=signals,
)

View File

@@ -241,86 +241,6 @@ html, body {
opacity: 0.5;
}
/* ===== CHAT HEADER ===== */
#chat-header {
flex-shrink: 0;
display: flex;
align-items: center;
justify-content: space-between;
gap: 12px;
padding: 10px 12px;
border-bottom: 1px solid #21262d;
background: #11161d;
}
.chat-header-title {
font-size: 0.85rem;
color: #8b949e;
font-weight: 600;
letter-spacing: 0.02em;
}
#chat-safety-plan-btn {
display: inline-flex;
align-items: center;
gap: 6px;
padding: 8px 12px;
min-height: 36px;
border: 1px solid #30363d;
border-radius: 999px;
background: transparent;
color: #c9d1d9;
font-size: 0.8rem;
font-weight: 600;
cursor: pointer;
}
#chat-safety-plan-btn:hover,
#chat-safety-plan-btn:focus {
border-color: #58a6ff;
background: rgba(88, 166, 255, 0.12);
outline: 2px solid #58a6ff;
outline-offset: 2px;
}
#crisis-session-status {
padding: 10px 12px;
border-bottom: 1px solid #21262d;
background: rgba(255, 95, 95, 0.06);
}
#crisis-session-status[hidden] {
display: none;
}
#crisis-session-summary {
margin: 0 0 8px;
color: #ffd7d7;
font-size: 0.84rem;
}
#crisis-history-list {
margin: 0;
padding-left: 18px;
color: #f4cccc;
font-size: 0.8rem;
}
#crisis-history-list li {
margin-bottom: 4px;
line-height: 1.5;
}
#crisis-history-list .source-badge {
display: inline-block;
min-width: 78px;
margin-right: 6px;
color: #ff9f9f;
font-size: 0.72rem;
text-transform: uppercase;
letter-spacing: 0.05em;
}
/* ===== CHAT AREA ===== */
#chat-area {
flex: 1;
@@ -729,19 +649,6 @@ html, body {
</div>
</div>
<div id="chat-header">
<div class="chat-header-title" aria-hidden="true">Conversation</div>
<button id="chat-safety-plan-btn" type="button" aria-label="Open My Safety Plan from chat header">
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><path d="M14 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V8z"/><polyline points="14 2 14 8 20 8"/><line x1="16" y1="13" x2="8" y2="13"/><line x1="16" y1="17" x2="8" y2="17"/><polyline points="10 9 9 9 8 9"/></svg>
My Safety Plan
</button>
</div>
<div id="crisis-session-status" aria-live="polite" hidden>
<p id="crisis-session-summary">No active session escalations yet.</p>
<ul id="crisis-history-list" aria-label="Unified escalation history"></ul>
</div>
<!-- Chat messages -->
<div id="chat-area" role="log" aria-label="Chat messages" aria-live="polite" tabindex="0">
<!-- Messages inserted here -->
@@ -773,8 +680,7 @@ html, body {
<!-- Footer -->
<footer id="footer">
<a href="/about.html" aria-label="About The Door">about</a>
<button id="crisis-resources-btn" aria-label="Open crisis resources">crisis resources</button>
<a href="/about" aria-label="About The Door">about</a>
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
</footer>
@@ -902,17 +808,11 @@ Sovereignty and service always.`;
var crisisPanel = document.getElementById('crisis-panel');
var crisisOverlay = document.getElementById('crisis-overlay');
var overlayDismissBtn = document.getElementById('overlay-dismiss-btn');
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
var crisisResourcesBtn = document.getElementById('crisis-resources-btn');
var crisisSessionStatus = document.getElementById('crisis-session-status');
var crisisSessionSummary = document.getElementById('crisis-session-summary');
var crisisHistoryList = document.getElementById('crisis-history-list');
// Safety Plan Elements
var safetyPlanBtn = document.getElementById('safety-plan-btn');
var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');
var crisisSafetyPlanBtn = document.getElementById('crisis-safety-plan-btn');
var safetyPlanModal = document.getElementById('safety-plan-modal');
var closeSafetyPlan = document.getElementById('close-safety-plan');
@@ -923,16 +823,8 @@ Sovereignty and service always.`;
// ===== STATE =====
var messages = [];
var isStreaming = false;
var lastUserMessage = '';
var overlayTimer = null;
var crisisPanelShown = false;
var CRISIS_OVERLAY_COOLDOWN_MS = 10 * 60 * 1000;
var CRISIS_OVERLAY_LAST_SHOWN_KEY = 'timmy_crisis_overlay_last_shown_at';
var CRISIS_OVERLAY_EVENT_LOG_KEY = 'timmy_crisis_overlay_event_log';
var CRISIS_SESSION_KEY = 'timmy_crisis_session';
var CRISIS_HISTORY_KEY = 'timmy_crisis_history';
var CRISIS_LEVEL_ORDER = { NONE: 0, LOW: 1, MEDIUM: 2, HIGH: 3, CRITICAL: 4 };
var sessionCrisis = loadSessionCrisisState();
// ===== SERVICE WORKER =====
if ('serviceWorker' in navigator) {
@@ -960,43 +852,6 @@ Sovereignty and service always.`;
window.addEventListener('offline', updateOnlineStatus);
updateOnlineStatus();
function getLastOverlayShownAt() {
try {
return parseInt(localStorage.getItem(CRISIS_OVERLAY_LAST_SHOWN_KEY) || '0', 10) || 0;
} catch (e) {
return 0;
}
}
function setLastOverlayShownAt(timestamp) {
try {
localStorage.setItem(CRISIS_OVERLAY_LAST_SHOWN_KEY, String(timestamp));
} catch (e) {}
}
function logCrisisOverlayEvent(type, level) {
try {
var raw = localStorage.getItem(CRISIS_OVERLAY_EVENT_LOG_KEY);
var events = raw ? JSON.parse(raw) : [];
if (!Array.isArray(events)) events = [];
events.push({ type: type, level: level, at: Date.now() });
if (events.length > 20) events = events.slice(events.length - 20);
localStorage.setItem(CRISIS_OVERLAY_EVENT_LOG_KEY, JSON.stringify(events));
} catch (e) {}
}
function openCrisisResources() {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
if (typeof crisisPanel.scrollIntoView === 'function') {
crisisPanel.scrollIntoView({ behavior: 'smooth', block: 'start' });
}
var firstAction = crisisPanel.querySelector('.crisis-btn, a[href]');
if (firstAction && typeof firstAction.focus === 'function') {
firstAction.focus();
}
}
// ===== CRISIS KEYWORDS =====
// Tier 1: General crisis indicators - triggers enhanced 988 panel
var crisisKeywords = [
@@ -1109,210 +964,6 @@ Sovereignty and service always.`;
return 0;
}
function defaultSessionCrisisState() {
return {
sessionId: null,
currentLevel: 'NONE',
peakLevel: 'NONE',
messageCount: 0,
history: [],
gatewayEscalated: false,
lastUpdatedAt: null
};
}
function getOrCreateCrisisSessionId() {
try {
var existing = localStorage.getItem(CRISIS_SESSION_KEY);
if (existing) return existing;
var created = (window.crypto && typeof window.crypto.randomUUID === 'function')
? window.crypto.randomUUID()
: 'the-door-' + Date.now() + '-' + Math.random().toString(36).slice(2, 10);
localStorage.setItem(CRISIS_SESSION_KEY, created);
return created;
} catch (e) {
return 'the-door-ephemeral';
}
}
function normalizeCrisisLevel(level) {
if (level === 2 || level === '2') return 'CRITICAL';
if (level === 1 || level === '1') return 'MEDIUM';
if (level === 0 || level === '0' || !level) return 'NONE';
var upper = String(level).toUpperCase();
return CRISIS_LEVEL_ORDER.hasOwnProperty(upper) ? upper : 'NONE';
}
function loadSessionCrisisState() {
var state = defaultSessionCrisisState();
state.sessionId = getOrCreateCrisisSessionId();
try {
var saved = localStorage.getItem(CRISIS_HISTORY_KEY);
if (!saved) return state;
var parsed = JSON.parse(saved);
if (!parsed || typeof parsed !== 'object') return state;
state.currentLevel = normalizeCrisisLevel(parsed.currentLevel);
state.peakLevel = normalizeCrisisLevel(parsed.peakLevel);
state.messageCount = Number(parsed.messageCount || 0);
state.gatewayEscalated = !!parsed.gatewayEscalated;
state.lastUpdatedAt = parsed.lastUpdatedAt || null;
state.history = Array.isArray(parsed.history) ? parsed.history.map(function(entry) {
return {
source: entry.source || 'the-door',
level: normalizeCrisisLevel(entry.level),
kind: entry.kind || 'signal',
detail: entry.detail || '',
at: entry.at || Date.now()
};
}) : [];
} catch (e) {}
return state;
}
function saveSessionCrisisState() {
try {
localStorage.setItem(CRISIS_SESSION_KEY, sessionCrisis.sessionId || getOrCreateCrisisSessionId());
localStorage.setItem(CRISIS_HISTORY_KEY, JSON.stringify({
currentLevel: sessionCrisis.currentLevel,
peakLevel: sessionCrisis.peakLevel,
messageCount: sessionCrisis.messageCount,
gatewayEscalated: sessionCrisis.gatewayEscalated,
lastUpdatedAt: sessionCrisis.lastUpdatedAt,
history: sessionCrisis.history.slice(-12)
}));
} catch (e) {}
}
function updateSessionCrisisLevels(level) {
var normalized = normalizeCrisisLevel(level);
sessionCrisis.currentLevel = normalized;
if (CRISIS_LEVEL_ORDER[normalized] > CRISIS_LEVEL_ORDER[sessionCrisis.peakLevel]) {
sessionCrisis.peakLevel = normalized;
}
sessionCrisis.lastUpdatedAt = Date.now();
}
function renderCrisisSessionStatus() {
if (!crisisSessionStatus || !crisisSessionSummary || !crisisHistoryList) return;
var history = sessionCrisis.history.slice(-5);
var shouldShow = history.length > 0 || sessionCrisis.gatewayEscalated || sessionCrisis.currentLevel !== 'NONE';
crisisSessionStatus.hidden = !shouldShow;
crisisHistoryList.innerHTML = '';
if (!shouldShow) {
crisisSessionSummary.textContent = 'No active session escalations yet.';
return;
}
var summary = [];
if (sessionCrisis.gatewayEscalated) {
summary.push('Hermes is tracking an active escalation.');
}
if (sessionCrisis.currentLevel !== 'NONE') {
summary.push('Current level: ' + sessionCrisis.currentLevel + '.');
}
if (sessionCrisis.peakLevel !== 'NONE' && sessionCrisis.peakLevel !== sessionCrisis.currentLevel) {
summary.push('Peak: ' + sessionCrisis.peakLevel + '.');
}
summary.push('Session ID: ' + getOrCreateCrisisSessionId().slice(0, 8) + '…');
crisisSessionSummary.textContent = summary.join(' ');
history.forEach(function(entry) {
var li = document.createElement('li');
var badge = document.createElement('span');
badge.className = 'source-badge';
badge.textContent = entry.source;
li.appendChild(badge);
li.appendChild(document.createTextNode(entry.level + ' — ' + entry.detail));
crisisHistoryList.appendChild(li);
});
}
function appendCrisisHistoryEvent(source, level, kind, detail, at) {
var normalized = normalizeCrisisLevel(level);
var event = {
source: source || 'the-door',
level: normalized,
kind: kind || 'signal',
detail: detail || '',
at: at || Date.now()
};
var last = sessionCrisis.history.length ? sessionCrisis.history[sessionCrisis.history.length - 1] : null;
if (!last || last.source !== event.source || last.level !== event.level || last.kind !== event.kind || last.detail !== event.detail) {
sessionCrisis.history.push(event);
if (sessionCrisis.history.length > 12) {
sessionCrisis.history = sessionCrisis.history.slice(-12);
}
}
updateSessionCrisisLevels(normalized);
saveSessionCrisisState();
renderCrisisSessionStatus();
}
function trackCrisis(text, actor) {
sessionCrisis.messageCount += 1;
var level = normalizeCrisisLevel(getCrisisLevel(text));
if (level !== 'NONE') {
var detail = actor === 'assistant'
? 'assistant response carried crisis language'
: 'client detected crisis language before gateway handoff';
appendCrisisHistoryEvent('the-door', level, actor || 'message', detail, Date.now());
return;
}
updateSessionCrisisLevels('NONE');
saveSessionCrisisState();
renderCrisisSessionStatus();
}
function parseGatewayCrisisHeaders(response) {
var sessionId = response.headers.get('x-hermes-session-id') || response.headers.get('x-session-id');
var level = normalizeCrisisLevel(response.headers.get('x-hermes-crisis-level') || response.headers.get('x-crisis-level'));
var escalationRaw = response.headers.get('x-hermes-crisis-escalation') || response.headers.get('x-crisis-escalation');
var historyRaw = response.headers.get('x-hermes-crisis-history') || response.headers.get('x-crisis-history');
var history = [];
if (historyRaw) {
try {
var decoded = decodeURIComponent(historyRaw);
var parsed = JSON.parse(decoded);
if (Array.isArray(parsed)) history = parsed;
} catch (e) {
try {
var fallbackParsed = JSON.parse(historyRaw);
if (Array.isArray(fallbackParsed)) history = fallbackParsed;
} catch (ignored) {}
}
}
return {
sessionId: sessionId,
level: level,
escalated: escalationRaw === '1' || escalationRaw === 'true' || escalationRaw === 'yes',
history: history
};
}
function mergeGatewayCrisisHistory(gatewayHistory) {
if (!Array.isArray(gatewayHistory)) return;
gatewayHistory.forEach(function(entry) {
appendCrisisHistoryEvent(
entry.source || 'hermes-agent',
entry.level || 'NONE',
entry.kind || 'gateway',
entry.detail || 'Hermes session tracker update',
entry.at || entry.time || Date.now()
);
});
}
function resetSessionCrisis() {
sessionCrisis = defaultSessionCrisisState();
sessionCrisis.sessionId = getOrCreateCrisisSessionId();
saveSessionCrisisState();
renderCrisisSessionStatus();
}
// ===== GET SYSTEM PROMPT (wraps with crisis context) =====
function getSystemPrompt(userText) {
var level = getCrisisLevel(userText);
@@ -1335,33 +986,20 @@ Sovereignty and service always.`;
// Focus trap: cycle through focusable elements within the crisis overlay
function getOverlayFocusableElements() {
return Array.prototype.slice.call(crisisOverlay.querySelectorAll(
return crisisOverlay.querySelectorAll(
'a[href], button:not([disabled]), [tabindex]:not([tabindex="-1"])'
));
);
}
function trapFocusInOverlay(e) {
if (!crisisOverlay.classList.contains('active')) return;
if (e.key === 'Escape') {
e.preventDefault();
closeOverlay(msgInput);
return;
}
if (e.key !== 'Tab') return;
var focusable = getOverlayFocusableElements();
if (focusable.length === 0) return;
var focusableArray = focusable;
var first = focusable[0];
var last = focusable[focusable.length - 1];
var activeIndex = focusableArray.indexOf(document.activeElement);
if (activeIndex === -1) {
e.preventDefault();
e.shiftKey ? last.focus() : first.focus();
return;
}
if (e.shiftKey) {
// Shift+Tab: if on first, wrap to last
@@ -1381,47 +1019,7 @@ Sovereignty and service always.`;
// Store the element that had focus before the overlay opened
var _preOverlayFocusElement = null;
function closeOverlay(restoreFocusTarget) {
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.getElementById('app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat-area');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
if (restoreFocusTarget === _preOverlayFocusElement && _preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else if (restoreFocusTarget && typeof restoreFocusTarget.focus === 'function') {
restoreFocusTarget.focus();
} else if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
function showOverlay() {
return showOverlayWithRateLimit(false, 2);
}
function showOverlayWithRateLimit(forceOpen, level) {
var lastShownAt = getLastOverlayShownAt();
if (!forceOpen && Date.now() - lastShownAt < CRISIS_OVERLAY_COOLDOWN_MS) {
logCrisisOverlayEvent('suppressed', level || 2);
return false;
}
logCrisisOverlayEvent(forceOpen ? 'manual-open' : 'shown', level || 2);
setLastOverlayShownAt(Date.now());
// Save current focus for restoration on dismiss
_preOverlayFocusElement = document.activeElement;
@@ -1431,10 +1029,10 @@ Sovereignty and service always.`;
overlayDismissBtn.textContent = 'Continue to chat (' + countdown + 's)';
// Disable background interaction via inert attribute
var mainApp = document.getElementById('app');
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.setAttribute('inert', '');
// Also hide from assistive tech
var chatSection = document.getElementById('chat-area');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.setAttribute('aria-hidden', 'true');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.setAttribute('aria-hidden', 'true');
@@ -1452,9 +1050,7 @@ Sovereignty and service always.`;
}
}, 1000);
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
return true;
overlayDismissBtn.focus();
}
// Register focus trap on document (always listening, gated by class check)
@@ -1462,7 +1058,27 @@ Sovereignty and service always.`;
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
closeOverlay(_preOverlayFocusElement);
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
// Restore focus to the element that had it before the overlay opened
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
});
@@ -1548,9 +1164,6 @@ Sovereignty and service always.`;
clearChatBtn.addEventListener('click', function() {
if (confirm('Clear all chat history?')) {
localStorage.removeItem('timmy_chat_history');
localStorage.removeItem('timmy_crisis_session');
localStorage.removeItem('timmy_crisis_history');
resetSessionCrisis();
window.location.reload();
}
});
@@ -1670,31 +1283,19 @@ Sovereignty and service always.`;
_spTriggerEl = null;
}
function openSafetyPlan(triggerEl) {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(triggerEl || document.activeElement);
}
// Wire open buttons to activate focus trap
safetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(safetyPlanBtn);
});
chatSafetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(chatSafetyPlanBtn);
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(crisisSafetyPlanBtn);
});
}
if (crisisResourcesBtn) {
crisisResourcesBtn.addEventListener('click', function() {
openCrisisResources();
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(crisisSafetyPlanBtn);
});
}
@@ -1712,9 +1313,8 @@ Sovereignty and service always.`;
addMessage('user', text);
messages.push({ role: 'user', content: text });
lastUserMessage = text;
var lastUserMessage = text;
trackCrisis(text, 'user');
checkCrisis(text);
msgInput.value = '';
@@ -1741,18 +1341,7 @@ Sovereignty and service always.`;
body: JSON.stringify({
model: 'timmy',
messages: allMessages,
stream: true,
metadata: {
session_id: getOrCreateCrisisSessionId(),
source: 'the-door',
crisis_history: sessionCrisis.history.slice(-8),
crisis_state: {
current_level: sessionCrisis.currentLevel,
peak_level: sessionCrisis.peakLevel,
message_count: sessionCrisis.messageCount,
gateway_escalated: sessionCrisis.gatewayEscalated
}
}
stream: true
}),
signal: controller.signal
}).then(function(response) {
@@ -1762,22 +1351,6 @@ Sovereignty and service always.`;
throw new Error('HTTP ' + response.status);
}
var gatewayCrisis = parseGatewayCrisisHeaders(response);
if (gatewayCrisis.sessionId) {
sessionCrisis.sessionId = gatewayCrisis.sessionId;
try {
localStorage.setItem(CRISIS_SESSION_KEY, gatewayCrisis.sessionId);
} catch (e) {}
}
sessionCrisis.gatewayEscalated = !!gatewayCrisis.escalated;
if (gatewayCrisis.level && gatewayCrisis.level !== 'NONE') {
appendCrisisHistoryEvent('hermes-agent', gatewayCrisis.level, 'gateway', 'Hermes session tracker update', Date.now());
}
var gatewayHistory = gatewayCrisis.history || [];
mergeGatewayCrisisHistory(gatewayHistory);
saveSessionCrisisState();
renderCrisisSessionStatus();
hideTyping();
var contentEl = addMessage('assistant', '', true);
var fullText = '';
@@ -1830,7 +1403,6 @@ Sovereignty and service always.`;
if (fullText) {
messages.push({ role: 'assistant', content: fullText });
saveMessages();
trackCrisis(fullText, 'assistant');
checkCrisis(fullText);
}
isStreaming = false;
@@ -1861,9 +1433,6 @@ Sovereignty and service always.`;
// ===== WELCOME MESSAGE =====
function init() {
sessionCrisis.sessionId = getOrCreateCrisisSessionId();
renderCrisisSessionStatus();
if (!loadMessages()) {
var welcomeText = "Hey. I'm Timmy. I'm here if you want to talk. No judgment, no login, no tracking. Just us.";
addMessage('assistant', welcomeText);
@@ -1873,7 +1442,9 @@ Sovereignty and service always.`;
// Check for URL params (e.g., ?safetyplan=true for PWA shortcut)
var urlParams = new URLSearchParams(window.location.search);
if (urlParams.get('safetyplan') === 'true') {
openSafetyPlan(chatSafetyPlanBtn || safetyPlanBtn);
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
// Clean up URL
window.history.replaceState({}, document.title, window.location.pathname);
}

View File

@@ -1,68 +0,0 @@
# The Door Fleet Work Orders Audit — issue #75
Generated: 2026-04-17T04:10:14Z
Source issue: `TRIAGE: The Door - Fleet Work Orders (2026-04-09)`
## Source Snapshot
Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.
## Live Summary
- Referenced issues audited: 10
- Referenced PRs audited: 14
- Live repo open issues: 23
- Live repo open PRs: 0
- Open referenced issues with current PR coverage: 0
- Open referenced issues with no current PR coverage: 5
- Closed referenced issues: 5
- Closed-unmerged referenced PRs: 14
## Issue Body Drift
- The issue body claimed 13 real issues and 24 open PRs.
- Live repo state now shows 23 open issues and 0 open PRs.
- Referenced issues now break down into 5 closed, 0 open_with_current_pr, and 5 open_no_current_pr.
- Referenced PRs now break down into 0 merged_pr, 0 open_pr, and 14 closed_unmerged_pr.
## Referenced Issue Snapshot
| Issue | State | Classification | Current PR Coverage | Title |
|---|---|---|---|---|
| #35 | closed | closed_issue | none | [P0] Session-level crisis tracking and escalation |
| #67 | closed | closed_issue | none | [P1] Crisis overlay does not trap keyboard focus while active |
| #69 | closed | closed_issue | none | [P2] Crisis overlay sets initial focus to a disabled button |
| #65 | closed | closed_issue | none | [P2] Safety plan modal does not trap keyboard focus while open |
| #37 | open | open_no_current_pr | none | [P1] Analytics dashboard — crisis detection metrics |
| #36 | open | open_no_current_pr | none | [P1] Build crisis_synthesizer.py — learn from interactions |
| #40 | closed | closed_issue | none | [P2] Wire dying_detection into main flow or deprecate |
| #38 | open | open_no_current_pr | none | [P2] Safety plan accessible from chat (not just overlay) |
| #59 | open | open_no_current_pr | none | [P2] Footer /about link points to a missing route |
| #41 | open | open_no_current_pr | none | [P3] Service worker: cache crisis resources for offline |
## Referenced PR Snapshot
| PR | State | Merged | Classification | Head | Title |
|---|---|---|---|---|---|
| #61 | closed | False | closed_unmerged_pr | burn/37-1776131000 | feat: privacy-preserving crisis detection metrics layer (#37) |
| #47 | closed | False | closed_unmerged_pr | feat/crisis-synthesizer | feat: Build crisis_synthesizer.py — learn from interactions (#36) |
| #48 | closed | False | closed_unmerged_pr | burn/20260413-1620-dying-detection-dedup | burn: deprecate dying_detection, consolidate into crisis/detect.py |
| #50 | closed | False | closed_unmerged_pr | whip/40-1776128804 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #51 | closed | False | closed_unmerged_pr | queue/40-1776129201 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #53 | closed | False | closed_unmerged_pr | q/40-1776129480 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #56 | closed | False | closed_unmerged_pr | triage/40-1776129677 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #58 | closed | False | closed_unmerged_pr | dawn/40-1776130053 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #70 | closed | False | closed_unmerged_pr | am/40-1776166469 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #72 | closed | False | closed_unmerged_pr | am/38-1776166469 | feat: add always-on safety plan access in chat header (#38) |
| #62 | closed | False | closed_unmerged_pr | burn/59-1776131200 | fix: point footer about link to /about.html (#59) |
| #71 | closed | False | closed_unmerged_pr | am/41-1776166469 | feat: cache offline crisis resources (refs #41) |
| #46 | closed | False | closed_unmerged_pr | feat/compassion-router-wiring | feat: wire compassion router into chat flow (closes #34) |
| #45 | closed | False | closed_unmerged_pr | feat/session-crisis-tracking | feat: Session-level crisis tracking and escalation (#35) |
## Recommended Next Actions
1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.
2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.
3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.
4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.
5. This audit preserves operator memory; it does not claim all referenced work orders are complete.

View File

@@ -1,295 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG = "Timmy_Foundation"
DEFAULT_TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
DEFAULT_OUTPUT = "reports/2026-04-17-the-door-fleet-work-orders-audit.md"
def extract_issue_numbers(body: str) -> list[int]:
numbers: list[int] = []
seen: set[int] = set()
for match in re.finditer(r"#(\d+)", body or ""):
value = int(match.group(1))
if value in seen:
continue
seen.add(value)
numbers.append(value)
return numbers
def api_get(repo: str, path: str, token: str) -> Any:
req = Request(
f"{API_BASE}/repos/{ORG}/{repo}{path}",
headers={"Authorization": f"token {token}"},
)
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def fetch_open_prs(repo: str, token: str) -> list[dict[str, Any]]:
prs: list[dict[str, Any]] = []
page = 1
while True:
batch = api_get(repo, f"/pulls?state=open&limit=100&page={page}", token)
if not batch:
break
prs.extend(batch)
page += 1
return prs
def fetch_live_open_issue_count(repo: str, token: str) -> int:
total = 0
page = 1
while True:
batch = api_get(repo, f"/issues?state=open&limit=100&page={page}", token)
if not batch:
break
total += sum(1 for item in batch if not item.get("pull_request"))
page += 1
return total
def parse_claimed_summary(body: str) -> tuple[int | None, int | None]:
issue_match = re.search(r"has\s+(\d+)\s+real issues", body or "", flags=re.IGNORECASE)
pr_match = re.search(r"and\s+(\d+)\s+open PRs", body or "", flags=re.IGNORECASE)
claimed_open_issues = int(issue_match.group(1)) if issue_match else None
claimed_open_prs = int(pr_match.group(1)) if pr_match else None
return claimed_open_issues, claimed_open_prs
def summarize_open_pr_coverage(issue_num: int, open_prs: list[dict[str, Any]]) -> str:
matches: list[str] = []
seen: set[int] = set()
for pr in open_prs:
pr_num = pr["number"]
if pr_num in seen:
continue
text = "\n".join(
[
pr.get("title") or "",
pr.get("body") or "",
(pr.get("head") or {}).get("ref") or "",
]
)
if f"#{issue_num}" not in text:
continue
seen.add(pr_num)
matches.append(f"open PR #{pr_num}")
return ", ".join(matches) if matches else "none"
def classify_issue_reference(ref_issue: dict[str, Any], open_prs: list[dict[str, Any]]) -> dict[str, Any]:
issue_num = ref_issue["number"]
state = ref_issue.get("state") or "unknown"
coverage = summarize_open_pr_coverage(issue_num, open_prs)
if state == "closed":
classification = "closed_issue"
elif coverage != "none":
classification = "open_with_current_pr"
else:
classification = "open_no_current_pr"
return {
"number": issue_num,
"state": state,
"classification": classification,
"title": ref_issue.get("title") or "",
"current_pr_coverage": coverage,
"url": ref_issue.get("html_url") or ref_issue.get("url") or "",
}
def classify_pr_reference(repo: str, pr_num: int, token: str) -> dict[str, Any]:
pr = api_get(repo, f"/pulls/{pr_num}", token)
state = pr.get("state") or "unknown"
merged = bool(pr.get("merged"))
if merged:
classification = "merged_pr"
elif state == "open":
classification = "open_pr"
else:
classification = "closed_unmerged_pr"
return {
"number": pr_num,
"state": state,
"merged": merged,
"classification": classification,
"title": pr.get("title") or "",
"head": (pr.get("head") or {}).get("ref") or "",
"url": pr.get("html_url") or pr.get("url") or "",
}
def table(rows: list[dict[str, Any]], columns: list[tuple[str, str]]) -> str:
headers = [title for title, _ in columns]
keys = [key for _, key in columns]
if not rows:
return "| None |\n|---|\n| None |"
lines = ["| " + " | ".join(headers) + " |", "|" + "|".join(["---"] * len(headers)) + "|"]
for row in rows:
values: list[str] = []
for key in keys:
value = row.get(key, "")
if key == "number" and value != "":
value = f"#{value}"
values.append(str(value).replace("\n", " "))
lines.append("| " + " | ".join(values) + " |")
return "\n".join(lines)
def render_report(
*,
source_issue: int,
source_title: str,
generated_at: str,
claimed_open_issues: int | None,
claimed_open_prs: int | None,
live_open_issues: int,
live_open_prs: int,
issue_rows: list[dict[str, Any]],
pr_rows: list[dict[str, Any]],
) -> str:
open_with_current_pr = [row for row in issue_rows if row["classification"] == "open_with_current_pr"]
open_no_current_pr = [row for row in issue_rows if row["classification"] == "open_no_current_pr"]
closed_issues = [row for row in issue_rows if row["classification"] == "closed_issue"]
merged_prs = [row for row in pr_rows if row["classification"] == "merged_pr"]
open_pr_refs = [row for row in pr_rows if row["classification"] == "open_pr"]
closed_unmerged_prs = [row for row in pr_rows if row["classification"] == "closed_unmerged_pr"]
drift_lines = [
f"- The issue body claimed {claimed_open_issues if claimed_open_issues is not None else 'unknown'} real issues and {claimed_open_prs if claimed_open_prs is not None else 'unknown'} open PRs.",
f"- Live repo state now shows {live_open_issues} open issues and {live_open_prs} open PRs.",
f"- Referenced issues now break down into {len(closed_issues)} closed, {len(open_with_current_pr)} open_with_current_pr, and {len(open_no_current_pr)} open_no_current_pr.",
f"- Referenced PRs now break down into {len(merged_prs)} merged_pr, {len(open_pr_refs)} open_pr, and {len(closed_unmerged_prs)} closed_unmerged_pr.",
]
return "\n".join(
[
f"# The Door Fleet Work Orders Audit — issue #{source_issue}",
"",
f"Generated: {generated_at}",
f"Source issue: `{source_title}`",
"",
"## Source Snapshot",
"",
"Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.",
"",
"## Live Summary",
"",
f"- Referenced issues audited: {len(issue_rows)}",
f"- Referenced PRs audited: {len(pr_rows)}",
f"- Live repo open issues: {live_open_issues}",
f"- Live repo open PRs: {live_open_prs}",
f"- Open referenced issues with current PR coverage: {len(open_with_current_pr)}",
f"- Open referenced issues with no current PR coverage: {len(open_no_current_pr)}",
f"- Closed referenced issues: {len(closed_issues)}",
f"- Closed-unmerged referenced PRs: {len(closed_unmerged_prs)}",
"",
"## Issue Body Drift",
"",
*drift_lines,
"",
"## Referenced Issue Snapshot",
"",
table(
issue_rows,
[
("Issue", "number"),
("State", "state"),
("Classification", "classification"),
("Current PR Coverage", "current_pr_coverage"),
("Title", "title"),
],
),
"",
"## Referenced PR Snapshot",
"",
table(
pr_rows,
[
("PR", "number"),
("State", "state"),
("Merged", "merged"),
("Classification", "classification"),
("Head", "head"),
("Title", "title"),
],
),
"",
"## Recommended Next Actions",
"",
"1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.",
"2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.",
"3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.",
"4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.",
"5. This audit preserves operator memory; it does not claim all referenced work orders are complete.",
]
) + "\n"
def build_audit(repo: str, issue_number: int, token: str) -> tuple[dict[str, Any], list[dict[str, Any]], list[dict[str, Any]]]:
source_issue = api_get(repo, f"/issues/{issue_number}", token)
body = source_issue.get("body") or ""
refs = extract_issue_numbers(body)
open_prs = fetch_open_prs(repo, token)
claimed_open_issues, claimed_open_prs = parse_claimed_summary(body)
issue_rows: list[dict[str, Any]] = []
pr_rows: list[dict[str, Any]] = []
for ref in refs:
issue_like = api_get(repo, f"/issues/{ref}", token)
if issue_like.get("pull_request"):
pr_rows.append(classify_pr_reference(repo, ref, token))
else:
issue_rows.append(classify_issue_reference(issue_like, open_prs))
metadata = {
"source_title": source_issue.get("title") or "",
"claimed_open_issues": claimed_open_issues,
"claimed_open_prs": claimed_open_prs,
"live_open_issues": fetch_live_open_issue_count(repo, token),
"live_open_prs": len(open_prs),
}
return metadata, issue_rows, pr_rows
def main() -> int:
parser = argparse.ArgumentParser(description="Audit The Door fleet work orders issue against live forge state.")
parser.add_argument("--repo", default="the-door")
parser.add_argument("--issue", type=int, default=75)
parser.add_argument("--token-file", default=DEFAULT_TOKEN_PATH)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
args = parser.parse_args()
token = Path(args.token_file).read_text(encoding="utf-8").strip()
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
metadata, issue_rows, pr_rows = build_audit(args.repo, args.issue, token)
report = render_report(
source_issue=args.issue,
source_title=metadata["source_title"],
generated_at=generated_at,
claimed_open_issues=metadata["claimed_open_issues"],
claimed_open_prs=metadata["claimed_open_prs"],
live_open_issues=metadata["live_open_issues"],
live_open_prs=metadata["live_open_prs"],
issue_rows=issue_rows,
pr_rows=pr_rows,
)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(output_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,26 +0,0 @@
"""Regression tests for the crisis integration contract expected by hermes-agent.
Issue #141 names the-door-side shared modules as `crisis.tracker` and
`crisis.bridge`. Keep those import paths available even if the canonical
implementation lives in `session_tracker.py` and `gateway.py`.
"""
import importlib
def test_crisis_tracker_module_exports_session_tracker_contract():
tracker = importlib.import_module("crisis.tracker")
session_tracker = importlib.import_module("crisis.session_tracker")
assert tracker.CrisisSessionTracker is session_tracker.CrisisSessionTracker
assert tracker.SessionState is session_tracker.SessionState
assert tracker.check_crisis_with_session is session_tracker.check_crisis_with_session
def test_crisis_bridge_module_exports_gateway_contract():
bridge = importlib.import_module("crisis.bridge")
gateway = importlib.import_module("crisis.gateway")
assert bridge.check_crisis is gateway.check_crisis
assert bridge.get_system_prompt is gateway.get_system_prompt
assert bridge.format_gateway_response is gateway.format_gateway_response

View File

@@ -1,118 +0,0 @@
"""
Tests for crisis/metrics.py — Aggregate crisis metrics.
"""
import json
import os
import shutil
import tempfile
import unittest
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from crisis.metrics import CrisisMetrics, SessionMetrics, AggregateMetrics
class TestCrisisMetrics(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.metrics = CrisisMetrics(Path(self.tmpdir))
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_record_session_creates_file(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="LOW",
peak_level="MEDIUM",
message_count=5,
was_escalating=True,
was_deescalating=False,
escalation_rate=0.5,
)
self.metrics.record_session(sm)
files = list(Path(self.tmpdir).glob("*.jsonl"))
self.assertEqual(len(files), 1)
def test_record_session_writes_jsonl(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="HIGH",
peak_level="CRITICAL",
message_count=10,
was_escalating=True,
was_deescalating=False,
escalation_rate=1.0,
triggered_overlay=True,
showed_988=True,
)
self.metrics.record_session(sm)
files = list(Path(self.tmpdir).glob("*.jsonl"))
with open(files[0]) as f:
data = json.loads(f.readline())
self.assertEqual(data['peak_level'], 'CRITICAL')
self.assertTrue(data['triggered_overlay'])
def test_get_summary_empty(self):
agg = self.metrics.get_summary(days=7)
self.assertEqual(agg.total_sessions, 0)
self.assertEqual(agg.total_messages, 0)
def test_get_summary_with_data(self):
for level in ["LOW", "MEDIUM", "HIGH"]:
sm = SessionMetrics(
timestamp=1700000000,
current_level=level,
peak_level=level,
message_count=3,
was_escalating=level != "LOW",
was_deescalating=False,
escalation_rate=0.5,
)
self.metrics.record_session(sm)
agg = self.metrics.get_summary(days=1)
self.assertEqual(agg.total_sessions, 3)
self.assertEqual(agg.total_messages, 9)
self.assertEqual(agg.escalating_sessions, 2)
def test_get_report_returns_string(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="LOW",
peak_level="LOW",
message_count=5,
was_escalating=False,
was_deescalating=False,
escalation_rate=0.0,
)
self.metrics.record_session(sm)
report = self.metrics.get_report(days=1)
self.assertIn("CRISIS METRICS REPORT", report)
self.assertIn("Sessions:", report)
def test_get_json_returns_valid(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="MEDIUM",
peak_level="MEDIUM",
message_count=3,
was_escalating=False,
was_deescalating=False,
escalation_rate=0.0,
)
self.metrics.record_session(sm)
json_str = self.metrics.get_json(days=1)
data = json.loads(json_str)
self.assertEqual(data['total_sessions'], 1)
if __name__ == "__main__":
unittest.main()

View File

@@ -52,34 +52,6 @@ class TestCrisisOverlayFocusTrap(unittest.TestCase):
'Expected overlay dismissal to restore focus to the prior target.',
)
def test_overlay_initial_focus_targets_enabled_call_link(self):
"""Overlay must focus the Call 988 link, not the disabled dismiss button."""
# Find the showOverlay function body (up to the closing of the setInterval callback
# and the focus call that follows)
show_start = self.html.find('function showOverlay()')
self.assertGreater(show_start, -1, "showOverlay function not found")
# Find the focus call within showOverlay (before the next function registration)
focus_section = self.html[show_start:show_start + 2000]
self.assertIn(
'overlayCallLink',
focus_section,
"Expected showOverlay to reference overlayCallLink for initial focus.",
)
# Ensure the old buggy pattern is gone
focus_line_region = self.html[show_start + 800:show_start + 1200]
self.assertNotIn(
'overlayDismissBtn.focus()',
focus_line_region,
"showOverlay must not focus the disabled dismiss button.",
)
def test_overlay_call_link_variable_is_declared(self):
self.assertIn(
"querySelector('.overlay-call')",
self.html,
"Expected a JS reference to the .overlay-call link element.",
)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,54 +0,0 @@
import pathlib
from playwright.sync_api import sync_playwright
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
def test_crisis_overlay_supports_keyboard_only_navigation():
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
try:
page.goto(INDEX_HTML.as_uri())
page.evaluate(
"""
() => {
window.fetch = () => Promise.reject(new Error('test fetch blocked'));
window.alert = () => {};
}
"""
)
page.fill('#msg-input', "I'm going to kill myself tonight")
page.press('#msg-input', 'Enter')
page.wait_for_function("document.getElementById('crisis-overlay').classList.contains('active')")
assert page.evaluate("document.activeElement.classList.contains('overlay-call')") is True
page.evaluate(
"""
() => {
const btn = document.getElementById('overlay-dismiss-btn');
btn.disabled = false;
btn.textContent = 'Continue to chat';
}
"""
)
page.keyboard.press('Tab')
assert page.evaluate("document.activeElement.id") == 'overlay-dismiss-btn'
page.keyboard.press('Tab')
assert page.evaluate("document.activeElement.classList.contains('overlay-call')") is True
page.keyboard.press('Shift+Tab')
assert page.evaluate("document.activeElement.id") == 'overlay-dismiss-btn'
page.keyboard.press('Escape')
page.wait_for_function("!document.getElementById('crisis-overlay').classList.contains('active')")
assert page.evaluate("document.activeElement.id") == 'msg-input'
finally:
browser.close()

View File

@@ -1,53 +0,0 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestCrisisOverlayRateLimit(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_overlay_has_ten_minute_cooldown_constant(self):
self.assertRegex(
self.html,
r"CRISIS_OVERLAY_COOLDOWN_MS\s*=\s*10\s*\*\s*60\s*\*\s*1000",
'Expected a 10-minute crisis overlay cooldown constant.',
)
def test_show_overlay_suppresses_repeat_with_logging(self):
self.assertRegex(
self.html,
r"function\s+logCrisisOverlayEvent\s*\(",
'Expected a crisis overlay event logger.',
)
self.assertRegex(
self.html,
r"if\s*\(!forceOpen\s*&&\s*Date\.now\(\)\s*-\s*lastShownAt\s*<\s*CRISIS_OVERLAY_COOLDOWN_MS\)",
'Expected showOverlay to suppress repeated auto-displays inside the cooldown window.',
)
self.assertRegex(
self.html,
r"logCrisisOverlayEvent\('suppressed'",
'Expected suppressed overlay attempts to be logged.',
)
def test_manual_crisis_resources_button_exists_and_bypasses_cooldown(self):
self.assertIn('id="crisis-resources-btn"', self.html)
self.assertRegex(
self.html,
r"function\s+openCrisisResources\s*\(",
'Expected a manual crisis resources opener.',
)
self.assertRegex(
self.html,
r"crisisResourcesBtn\.addEventListener\('click',\s*function\(\)\s*\{\s*openCrisisResources\(\);",
'Expected the footer button to wire into openCrisisResources().',
)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,37 +0,0 @@
from pathlib import Path
HTML = Path("index.html").read_text(encoding="utf-8")
def test_index_contains_crisis_session_bridge_ui_and_hooks():
assert 'id="crisis-session-status"' in HTML
assert 'id="crisis-history-list"' in HTML
assert 'function getOrCreateCrisisSessionId()' in HTML
assert 'function appendCrisisHistoryEvent(' in HTML
assert 'function renderCrisisSessionStatus()' in HTML
assert 'function parseGatewayCrisisHeaders(' in HTML
assert 'function mergeGatewayCrisisHistory(' in HTML
def test_request_includes_metadata_for_hermes_session_tracking():
assert 'metadata:' in HTML
assert "session_id: getOrCreateCrisisSessionId()" in HTML
assert "source: 'the-door'" in HTML
assert 'crisis_history:' in HTML
assert 'crisis_state:' in HTML
def test_clear_chat_resets_crisis_bridge_state():
assert "localStorage.removeItem('timmy_chat_history');" in HTML
assert "localStorage.removeItem('timmy_crisis_session');" in HTML
assert "localStorage.removeItem('timmy_crisis_history');" in HTML
assert 'resetSessionCrisis();' in HTML
def test_gateway_escalations_are_merged_back_into_the_ui():
assert "response.headers.get('x-hermes-session-id')" in HTML
assert "response.headers.get('x-hermes-crisis-level')" in HTML
assert "response.headers.get('x-hermes-crisis-history')" in HTML
assert 'mergeGatewayCrisisHistory(gatewayHistory);' in HTML
assert 'renderCrisisSessionStatus();' in HTML

View File

@@ -1,111 +0,0 @@
"""Tests for evolution/crisis_synthesizer.py (issue #36)."""
from __future__ import annotations
import importlib.util
import json
import pathlib
import sys
import tempfile
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
SCRIPT = ROOT / 'evolution' / 'crisis_synthesizer.py'
spec = importlib.util.spec_from_file_location('crisis_synthesizer', str(SCRIPT))
mod = importlib.util.module_from_spec(spec)
sys.modules['crisis_synthesizer'] = mod
spec.loader.exec_module(mod)
class TestCrisisSynthesizerEvent(unittest.TestCase):
def test_build_interaction_event_is_privacy_preserving(self):
event = mod.build_interaction_event(
level='CRITICAL',
indicators=['want_to_die', 'no_way_out'],
response_given='guardian',
continued_conversation=True,
false_positive=False,
now=1700000000,
)
self.assertEqual(event['timestamp'], 1700000000)
self.assertEqual(event['level'], 'CRITICAL')
self.assertEqual(event['response_given'], 'guardian')
self.assertTrue(event['continued_conversation'])
self.assertFalse(event['false_positive'])
self.assertEqual(event['indicators'], ['want_to_die', 'no_way_out'])
for forbidden in ['text', 'message', 'content', 'ip', 'session_id', 'user_id']:
self.assertNotIn(forbidden, event)
class TestCrisisSynthesizerStorage(unittest.TestCase):
def test_append_and_load_events_round_trip(self):
with tempfile.TemporaryDirectory() as tmp:
log_path = pathlib.Path(tmp) / 'crisis-events.jsonl'
mod.append_interaction_event(
log_path,
level='HIGH',
indicators=['hopeless'],
response_given='companion',
continued_conversation=False,
false_positive=True,
now=1700000100,
)
events = mod.load_interaction_events(log_path)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]['level'], 'HIGH')
self.assertEqual(events[0]['indicators'], ['hopeless'])
class TestCrisisSynthesizerSummary(unittest.TestCase):
def test_weekly_report_contains_required_metrics(self):
events = [
mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', True, False, now=1700000000),
mod.build_interaction_event('HIGH', ['hopeless'], 'companion', False, True, now=1700000100),
mod.build_interaction_event('LOW', ['rough_day'], 'friend', False, False, now=1700000200),
mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', False, False, now=1700000300),
mod.build_interaction_event('NONE', [], 'friend', False, False, now=1700000400),
]
summary = mod.build_weekly_report(events, now=1700000500, window_days=7)
self.assertEqual(summary['detections_per_level']['CRITICAL'], 2)
self.assertEqual(summary['detections_per_level']['HIGH'], 1)
self.assertEqual(summary['detections_per_level']['LOW'], 1)
self.assertEqual(summary['detections_per_level']['NONE'], 1)
self.assertEqual(summary['continued_after_intervention'], 1)
self.assertAlmostEqual(summary['false_positive_estimate'], 0.25)
self.assertEqual(summary['most_common_keywords'][0]['keyword'], 'want_to_die')
self.assertEqual(summary['most_common_keywords'][0]['count'], 2)
class TestCrisisSynthesizerSuggestions(unittest.TestCase):
def test_suggests_weight_adjustments_from_interactions(self):
events = []
for ts in range(3):
events.append(mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', True, False, now=1700000000 + ts))
for ts in range(3):
events.append(mod.build_interaction_event('LOW', ['rough_day'], 'friend', False, True, now=1700000100 + ts))
suggestions = mod.suggest_keyword_adjustments(events, min_observations=3)
by_keyword = {s['keyword']: s for s in suggestions}
self.assertEqual(by_keyword['want_to_die']['suggested_adjustment'], 'raise_weight')
self.assertEqual(by_keyword['rough_day']['suggested_adjustment'], 'lower_weight')
class TestCrisisSynthesizerRendering(unittest.TestCase):
def test_render_weekly_report_outputs_json(self):
summary = {
'detections_per_level': {'NONE': 0, 'LOW': 1, 'MEDIUM': 0, 'HIGH': 0, 'CRITICAL': 0},
'most_common_keywords': [{'keyword': 'rough_day', 'count': 1}],
'false_positive_estimate': 0.0,
'continued_after_intervention': 0,
'keyword_weight_suggestions': [],
'window_days': 7,
'total_events': 1,
}
rendered = mod.render_weekly_report(summary)
parsed = json.loads(rendered)
self.assertEqual(parsed['window_days'], 7)
self.assertEqual(parsed['most_common_keywords'][0]['keyword'], 'rough_day')
if __name__ == '__main__':
unittest.main()

View File

@@ -1,100 +0,0 @@
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT_PATH = ROOT / "scripts" / "fleet_work_orders_audit.py"
REPORT_PATH = ROOT / "reports" / "2026-04-17-the-door-fleet-work-orders-audit.md"
def _load_module():
assert SCRIPT_PATH.exists(), f"missing {SCRIPT_PATH.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location("fleet_work_orders_audit", SCRIPT_PATH)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_extract_issue_numbers_preserves_mixed_issue_and_pr_refs() -> None:
body = """
## P0 — Session-level crisis tracking (#35)
**PR #61 ready.**
## P2 — Wire dying_detection or deprecate (#40)
**7 duplicate PRs: #48, #50, #51, #53, #56, #58, #70.**
"""
mod = _load_module()
assert mod.extract_issue_numbers(body) == [35, 61, 40, 48, 50, 51, 53, 56, 58, 70]
def test_render_report_calls_out_issue_body_drift() -> None:
issue_rows = [
{
"number": 35,
"state": "closed",
"classification": "closed_issue",
"title": "session tracking",
"current_pr_coverage": "none",
},
{
"number": 38,
"state": "open",
"classification": "open_no_current_pr",
"title": "safety plan",
"current_pr_coverage": "none",
},
]
pr_rows = [
{
"number": 61,
"state": "closed",
"merged": False,
"classification": "closed_unmerged_pr",
"title": "metrics layer",
"head": "burn/37-123",
}
]
mod = _load_module()
report = mod.render_report(
source_issue=75,
source_title="TRIAGE: The Door - Fleet Work Orders (2026-04-09)",
generated_at="2026-04-17T04:00:00Z",
claimed_open_issues=13,
claimed_open_prs=24,
live_open_issues=5,
live_open_prs=0,
issue_rows=issue_rows,
pr_rows=pr_rows,
)
assert "## Source Snapshot" in report
assert "## Live Summary" in report
assert "## Issue Body Drift" in report
assert "13" in report and "24" in report
assert "#38" in report
assert "open_no_current_pr" in report
assert "#61" in report
assert "closed_unmerged_pr" in report
assert "## Referenced Issue Snapshot" in report
assert "## Referenced PR Snapshot" in report
assert "## Recommended Next Actions" in report
def test_committed_work_orders_audit_exists_with_required_sections() -> None:
text = REPORT_PATH.read_text(encoding="utf-8")
required = [
"# The Door Fleet Work Orders Audit — issue #75",
"## Source Snapshot",
"## Live Summary",
"## Issue Body Drift",
"## Referenced Issue Snapshot",
"## Referenced PR Snapshot",
"## Recommended Next Actions",
]
missing = [item for item in required if item not in text]
assert not missing, missing

View File

@@ -1,62 +0,0 @@
"""Tests for local image crisis screening slice under epic #130."""
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from image_screening import ImageScreeningResult, screen_image_signals
class TestImageScreeningResult(unittest.TestCase):
def test_to_dict_preserves_core_fields(self):
result = ImageScreeningResult(
ocr_text="help me",
labels=["blood"],
visual_flags=["dark_scene"],
distress_score=0.8,
requires_human_review=True,
signals_detected=["direct_self_harm_label:blood"],
)
data = result.to_dict()
self.assertEqual(data["ocr_text"], "help me")
self.assertEqual(data["labels"], ["blood"])
self.assertTrue(data["requires_human_review"])
class TestScreenImageSignals(unittest.TestCase):
def test_direct_self_harm_labels_trigger_high_risk(self):
result = screen_image_signals(
labels=["razor blade", "blood droplets"],
manual_notes="photo of fresh cuts on forearm",
)
self.assertGreaterEqual(result.distress_score, 0.8)
self.assertTrue(result.requires_human_review)
self.assertTrue(any("self_harm" in signal for signal in result.signals_detected))
def test_ocr_text_uses_existing_crisis_detector(self):
result = screen_image_signals(
ocr_text="I want to kill myself tonight",
labels=["handwritten note"],
)
self.assertGreaterEqual(result.distress_score, 0.7)
self.assertTrue(result.requires_human_review)
self.assertTrue(any(signal.startswith("ocr_crisis_level:") for signal in result.signals_detected))
def test_neutral_image_stays_low_risk(self):
result = screen_image_signals(
labels=["dog", "park", "sunlight"],
manual_notes="family outing in daylight",
)
self.assertLess(result.distress_score, 0.2)
self.assertFalse(result.requires_human_review)
self.assertEqual(result.signals_detected, [])
if __name__ == "__main__":
unittest.main()

View File

@@ -1,50 +0,0 @@
"""Regression tests for duplicate crisis indicators cleanup (#123)."""
import os
import re
import sys
import unittest
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
ROOT = Path(__file__).resolve().parents[1]
DETECT_FILE = ROOT / "crisis" / "detect.py"
class TestCrisisIndicatorDedup(unittest.TestCase):
def _extract_indicator_list(self, name: str) -> set[str]:
text = DETECT_FILE.read_text(encoding="utf-8")
match = re.search(rf"{name}\s*=\s*\[(.*?)\n\]", text, re.S)
self.assertIsNotNone(match, f"{name} list missing")
return {
line.strip().rstrip(",")
for line in match.group(1).splitlines()
if line.strip().startswith('r"')
}
def test_high_and_medium_indicator_lists_do_not_overlap(self):
high = self._extract_indicator_list("HIGH_INDICATORS")
medium = self._extract_indicator_list("MEDIUM_INDICATORS")
overlap = high & medium
self.assertEqual(set(), overlap, f"duplicate cross-tier patterns found: {sorted(overlap)}")
def test_removed_duplicates_still_classify_at_high_tier(self):
cases = {
"I feel hopeless": "HIGH",
"I feel trapped": "HIGH",
"I feel desperate": "HIGH",
"I have no future ahead": "HIGH",
"I have given up on myself": "HIGH",
}
for text, expected in cases.items():
with self.subTest(text=text):
result = detect_crisis(text)
self.assertEqual(expected, result.level)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,52 @@
"""
Test: offline crisis resources load when network is unavailable.
Verifies that the service worker caches the crisis-offline.html page
and that it serves as the navigation fallback when offline.
"""
import json
import os
import subprocess
import sys
def test_service_worker_precaches_crisis_page():
"""Service worker precache list includes crisis-offline.html."""
sw_path = os.path.join(os.path.dirname(__file__), '..', 'sw.js')
with open(sw_path) as f:
sw_content = f.read()
assert '/crisis-offline.html' in sw_content, "crisis-offline.html must be in PRECACHE_ASSETS"
def test_crisis_offline_page_has_988():
"""Offline page contains 988 call link."""
page_path = os.path.join(os.path.dirname(__file__), '..', 'crisis-offline.html')
with open(page_path) as f:
content = f.read()
assert 'tel:988' in content, "Offline page must have 988 call link"
assert '741741' in content, "Offline page must have Crisis Text Line (741741)"
def test_crisis_offline_page_has_local_resources():
"""Offline page contains additional local crisis resources."""
page_path = os.path.join(os.path.dirname(__file__), '..', 'crisis-offline.html')
with open(page_path) as f:
content = f.read()
assert 'National Domestic Violence Hotline' in content, "Must include domestic violence hotline"
assert 'Trevor Project' in content, "Must include Trevor Project"
assert 'Veterans Crisis Line' in content, "Must include Veterans Crisis Line"
def test_crisis_offline_page_self_contained():
"""Offline page must work without external resources (inline styles, no external scripts)."""
page_path = os.path.join(os.path.dirname(__file__), '..', 'crisis-offline.html')
with open(page_path) as f:
content = f.read()
# No external CSS files
assert 'rel="stylesheet"' not in content, "Offline page must not depend on external stylesheets"
# No external JS files
assert '<script src=' not in content, "Offline page must not depend on external scripts"
def test_offline_fallback_path_configured():
"""Service worker configures crisis-offline.html as the offline fallback."""
sw_path = os.path.join(os.path.dirname(__file__), '..', 'sw.js')
with open(sw_path) as f:
sw_content = f.read()
assert "OFFLINE_FALLBACK_PATH" in sw_content, "Must define OFFLINE_FALLBACK_PATH"
assert "crisis-offline" in sw_content, "Fallback must reference crisis-offline.html"

View File

@@ -1,20 +0,0 @@
from pathlib import Path
INDEX = Path("index.html")
def test_chat_header_has_persistent_safety_plan_button():
html = INDEX.read_text()
assert 'id="chat-header"' in html
assert 'id="chat-safety-plan-btn"' in html
assert 'aria-label="Open My Safety Plan from chat header"' in html
assert 'My Safety Plan' in html
def test_chat_header_button_opens_existing_safety_plan_modal():
html = INDEX.read_text()
assert "var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');" in html
assert "chatSafetyPlanBtn.addEventListener('click'" in html
assert "function openSafetyPlan(triggerEl)" in html
assert "safetyPlanModal.classList.add('active');" in html
assert "openSafetyPlan(chatSafetyPlanBtn);" in html

View File

@@ -50,22 +50,6 @@ class TestCrisisOfflinePage(unittest.TestCase):
for phrase in required_phrases:
self.assertIn(phrase, self.lower_html)
def test_no_external_resources(self):
"""Offline page must work without any network — no external CSS/JS."""
import re
html = self.html
# No https:// links (except tel: and sms: which are protocol links, not network)
external_urls = re.findall(r'href=["\']https://|src=["\']https://', html)
self.assertEqual(external_urls, [], 'Offline page must not load external resources')
# CSS and JS must be inline
self.assertIn('<style>', html, 'CSS must be inline')
self.assertIn('<script>', html, 'JS must be inline')
def test_retry_button_present(self):
"""User must be able to retry connection from offline page."""
self.assertIn('retry-connection', self.html)
self.assertIn('Retry connection', self.html)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,277 +0,0 @@
"""
Tests for crisis session tracking and escalation (P0 #35).
Covers: session_tracker.py
Run with: python -m pytest tests/test_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
from crisis.session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
class TestSessionState(unittest.TestCase):
"""Test SessionState defaults."""
def test_default_state(self):
s = SessionState()
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.is_escalating)
self.assertFalse(s.is_deescalating)
class TestSessionTracking(unittest.TestCase):
"""Test basic session state tracking."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_record_none_message(self):
state = self.tracker.record(detect_crisis("Hello Timmy"))
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.message_count, 1)
self.assertEqual(state.peak_level, "NONE")
def test_record_low_message(self):
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("Having a rough day"))
self.assertIn(state.current_level, ("LOW", "NONE"))
self.assertEqual(state.message_count, 2)
def test_record_critical_updates_peak(self):
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to kill myself"))
self.assertEqual(state.current_level, "CRITICAL")
self.assertEqual(state.peak_level, "CRITICAL")
def test_peak_preserved_after_drop(self):
"""Peak level should stay at the highest seen, even after de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
state = self.tracker.record(detect_crisis("I'm feeling a bit better"))
self.assertEqual(state.peak_level, "CRITICAL")
def test_level_history(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to die"))
self.assertEqual(len(state.level_history), 3)
self.assertEqual(state.level_history[0], "NONE")
self.assertEqual(state.level_history[2], "CRITICAL")
def test_reset_clears_state(self):
self.tracker.record(detect_crisis("I want to kill myself"))
self.tracker.reset()
state = self.tracker.state
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.peak_level, "NONE")
self.assertEqual(state.message_count, 0)
self.assertEqual(state.level_history, [])
class TestEscalationDetection(unittest.TestCase):
"""Test escalation detection: LOW → HIGH in ≤3 messages."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_escalation_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_escalating)
def test_no_escalation_stable(self):
"""Two normal messages should not trigger escalation."""
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("How are you?"))
self.assertFalse(state.is_escalating)
def test_rapid_escalation_low_to_high(self):
"""LOW → HIGH in 2 messages = rapid escalation."""
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I can't take this anymore, everything is pointless"))
# Depending on detection, this could be HIGH or CRITICAL
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_rapid_escalation_three_messages(self):
"""NONE → LOW → HIGH in 3 messages = escalation."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_escalation_rate(self):
"""Rate should be positive when escalating."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
state = self.tracker.state
self.assertGreater(state.escalation_rate, 0)
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: sustained LOW after HIGH/CRITICAL."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_deescalation_without_prior_crisis(self):
"""No de-escalation if never reached HIGH/CRITICAL."""
for _ in range(6):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_deescalating)
def test_deescalation_after_critical(self):
"""5+ consecutive LOW/NONE messages after CRITICAL = de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm doing better today"))
state = self.tracker.state
if state.peak_level == "CRITICAL":
self.assertTrue(state.is_deescalating)
def test_deescalation_after_high(self):
"""5+ consecutive LOW/NONE messages after HIGH = de-escalation."""
self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
for _ in range(5):
self.tracker.record(detect_crisis("Feeling okay"))
state = self.tracker.state
if state.peak_level == "HIGH":
self.assertTrue(state.is_deescalating)
def test_interrupted_deescalation(self):
"""De-escalation resets if a HIGH message interrupts."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(3):
self.tracker.record(detect_crisis("Doing better"))
# Interrupt with another crisis
self.tracker.record(detect_crisis("I feel hopeless again"))
self.tracker.record(detect_crisis("Feeling okay now"))
state = self.tracker.state
# Should NOT be de-escalating yet (counter reset)
self.assertFalse(state.is_deescalating)
class TestSessionModifier(unittest.TestCase):
"""Test system prompt modifier generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_modifier_for_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_no_modifier_for_stable_session(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Good morning"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_escalation_modifier(self):
"""Escalating session should produce a modifier."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_escalating:
self.assertIn("escalated", modifier.lower())
self.assertIn("NONE", modifier)
self.assertIn("CRITICAL", modifier)
def test_deescalation_modifier(self):
"""De-escalating session should mention stabilizing."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm feeling okay"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_deescalating:
self.assertIn("stabilizing", modifier.lower())
def test_prior_crisis_modifier(self):
"""Past crisis should be noted even without active escalation."""
self.tracker.record(detect_crisis("I want to die"))
self.tracker.record(detect_crisis("Feeling a bit better"))
modifier = self.tracker.get_session_modifier()
# Should note the prior CRITICAL
if modifier:
self.assertIn("CRITICAL", modifier)
class TestUIHints(unittest.TestCase):
"""Test UI hint generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_ui_hints_structure(self):
self.tracker.record(detect_crisis("Hello"))
hints = self.tracker.get_ui_hints()
self.assertIn("session_escalating", hints)
self.assertIn("session_deescalating", hints)
self.assertIn("session_peak_level", hints)
self.assertIn("session_message_count", hints)
def test_ui_hints_escalation_warning(self):
"""Escalating session should have warning hint."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
hints = self.tracker.get_ui_hints()
if hints["session_escalating"]:
self.assertTrue(hints.get("escalation_warning"))
self.assertIn("suggested_action", hints)
class TestCheckCrisisWithSession(unittest.TestCase):
"""Test the convenience function combining detection + session tracking."""
def test_returns_combined_data(self):
tracker = CrisisSessionTracker()
result = check_crisis_with_session("I want to die", tracker)
self.assertIn("level", result)
self.assertIn("session", result)
self.assertIn("current_level", result["session"])
self.assertIn("peak_level", result["session"])
self.assertIn("modifier", result["session"])
def test_session_updates_across_calls(self):
tracker = CrisisSessionTracker()
check_crisis_with_session("Hello", tracker)
result = check_crisis_with_session("I want to die", tracker)
self.assertEqual(result["session"]["message_count"], 2)
self.assertEqual(result["session"]["peak_level"], "CRITICAL")
class TestPrivacy(unittest.TestCase):
"""Verify privacy-first design principles."""
def test_no_persistence_mechanism(self):
"""Session tracker should have no database, file, or network calls."""
import inspect
source = inspect.getsource(CrisisSessionTracker)
# Should not import database, requests, or file I/O
forbidden = ["sqlite", "requests", "urllib", "open(", "httpx", "aiohttp"]
for word in forbidden:
self.assertNotIn(word, source.lower(),
f"Session tracker should not use {word} — privacy-first design")
def test_state_contained_in_memory(self):
"""All state should be instance attributes, not module-level."""
tracker = CrisisSessionTracker()
tracker.record(detect_crisis("I want to die"))
# New tracker should have clean state (no global contamination)
fresh = CrisisSessionTracker()
self.assertEqual(fresh.state.current_level, "NONE")
if __name__ == '__main__':
unittest.main()