Compare commits

..

32 Commits

Author SHA1 Message Date
Timmy Soul
4467c27072 feat(crisis): support image screening in intake submissions; adds image-aware intake handler and tests
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 8s
- Extend handle_intake_submission to accept image parameters (ocr_text, labels, etc.)
- Integrate screen_image_signals for image-based screening
- Combine text- and image-based crisis signals into unified assessment
- Add comprehensive test suite for image intake scenarios

Closes #203
2026-04-30 10:47:42 -04:00
f30b0d962d Merge pull request 'DOOR-2: Wire crisis_synthesizer into responder pipeline' (#206) from step35/198-door-2-wire-crisis-synthesiz into main
All checks were successful
Smoke Test / smoke (push) Successful in 12s
2026-04-30 12:50:48 +00:00
26dc58faa7 Merge pull request 'feat(crisis): add intake submission handler for test users (#202)' (#204) from step35/202-intake-submission-from-test into main
All checks were successful
Smoke Test / smoke (push) Successful in 13s
2026-04-30 12:48:41 +00:00
Stepfun Agent
572f14eda5 DOOR-2: Wire crisis_synthesizer into responder pipeline
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 8s
Smoke Test / smoke (pull_request) Successful in 9s
Add synthesizer logging to generate_response():
- Import crisis_synthesizer.append_interaction_event and DEFAULT_LOG_PATH
- Map crisis levels to response_role labels (guardian/companion/witness/friend)
- Log each interaction with level, indicators, response_given, continued_conversation=False, false_positive=False

This creates the pipeline link so crisis interactions are recorded for
weekly analysis and keyword weight suggestion. continued_conversation
and false_positive will be refined in future updates.

Closes #198
2026-04-29 21:15:13 -04:00
STEP35 Agent
58fdd9cfef feat(crisis): add intake submission handler for test users (#202)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 9s
Smoke Test / smoke (pull_request) Successful in 12s
- Adds crisis.intake module for handling test intake submissions
- Separate metrics logging to ~/.the-door/intake-metrics/ (no pollution of
  production crisis stats)
- handle_intake_submission() function integrates with existing crisis
detection (check_crisis)
- Includes comprehensive test suite (8 tests)
- Implements smallest concrete fix for #202: intake submission from Test
  User.

Closes #202
2026-04-29 19:13:43 -04:00
30e645c8f3 Merge PR #191: feat: add crisis tracker/bridge compatibility contract for hermes-agent (#141)
All checks were successful
Smoke Test / smoke (push) Successful in 17s
Merged by automated sweep after diff review and verification. PR #191: feat: add crisis tracker/bridge compatibility contract for hermes-agent (#141)
2026-04-22 02:38:16 +00:00
586939e756 Merge PR #183: fix: remove duplicate crisis indicators across tiers (#123)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #183: fix: remove duplicate crisis indicators across tiers (#123)
2026-04-22 02:37:58 +00:00
cd7f2ae467 Merge PR #168: feat: crisis detection metrics with CLI (closes #136)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #168: feat: crisis detection metrics with CLI (closes #136)
2026-04-22 02:37:39 +00:00
68c9cfd596 Merge PR #185: fix: rate-limit crisis overlay display (#100)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #185: fix: rate-limit crisis overlay display (#100)
2026-04-22 02:37:33 +00:00
4ad48a437c Merge PR #166: fix: add chat-header safety plan access (#38)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #166: fix: add chat-header safety plan access (#38)
2026-04-22 02:37:27 +00:00
4f040158e4 Merge PR #187: feat: add image screening slice for #130
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #187: feat: add image screening slice for #130
2026-04-22 02:37:17 +00:00
e1bf155b5e Merge PR #165: docs: audit fleet work orders issue #75
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #165: docs: audit fleet work orders issue #75
2026-04-22 02:37:12 +00:00
a8c59ca8a8 Merge PR #181: feat: build crisis synthesizer (#36)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #181: feat: build crisis synthesizer (#36)
2026-04-22 02:37:06 +00:00
fcf0ee64b5 Merge PR #178: docs: verify #41 already implemented
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #178: docs: verify #41 already implemented
2026-04-22 02:36:57 +00:00
30f7738429 Merge pull request 'fix: complete crisis overlay keyboard navigation (#95)' (#175) from fix/95 into main
All checks were successful
Smoke Test / smoke (push) Successful in 8s
2026-04-21 12:09:35 +00:00
Alexander Whitestone
f007808228 feat: add crisis tracker and bridge import shims for hermes-agent (#141)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 13s
Smoke Test / smoke (pull_request) Successful in 30s
2026-04-21 02:13:37 -04:00
Alexander Whitestone
c494bba728 wip: add crisis integration contract regression for #141 2026-04-21 02:11:17 -04:00
Timmy
100cc743c0 feat: add image screening slice for #130
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 10s
2026-04-20 21:34:10 -04:00
Timmy
f7d99c6d9c test: define image crisis screening slice for #130 2026-04-20 21:32:00 -04:00
Alexander Whitestone
e132fc134b fix: rate-limit crisis overlay display (#100)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Successful in 6s
2026-04-20 19:39:26 -04:00
Alexander Whitestone
8b658e76f3 wip: add crisis overlay rate-limit regression test 2026-04-20 19:35:53 -04:00
Alexander Whitestone
1b4a70d5fe fix: remove duplicate crisis indicators across tiers (#123)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 13s
2026-04-20 19:17:59 -04:00
Alexander Whitestone
bc02ca54ec wip: add crisis indicator dedup regression test 2026-04-20 19:15:52 -04:00
Alexander Whitestone
9f2038659c feat: build crisis synthesizer (#36)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 6s
2026-04-17 02:36:30 -04:00
Alexander Whitestone
d5ae0172b3 wip: add crisis synthesizer regression tests 2026-04-17 02:36:30 -04:00
Alexander Whitestone
51a197ae14 docs: verify #41 already implemented
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 6s
Smoke Test / smoke (pull_request) Successful in 14s
2026-04-17 02:06:21 -04:00
Alexander Whitestone
544dbba8cc fix: complete crisis overlay keyboard navigation (#95)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 8s
Smoke Test / smoke (pull_request) Successful in 13s
2026-04-17 01:52:32 -04:00
Alexander Whitestone
6e03492147 feat: CLI command to view crisis metrics summary (#136)
crisis/metrics.py:
  CrisisMetrics class — aggregate crisis detection metrics
  Privacy-first: stores only counts, never user content
  Daily JSONL files in ~/.the-door/metrics/
  get_summary(days) → AggregateMetrics
  get_report(days) → human-readable report
  get_json(days) → JSON export
  CLI: python3 -m crisis.metrics --summary/--json

crisis/__init__.py:
  Export CrisisMetrics, AggregateMetrics

Makefile:
  make metrics      → summary report
  make metrics-json → JSON export

tests/test_crisis_metrics.py: 6 tests
  record_session, summary, report, JSON export
2026-04-17 01:26:44 -04:00
Alexander Whitestone
b4129bc873 fix: add chat-header safety plan access (#38)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Successful in 12s
2026-04-17 01:14:28 -04:00
Alexander Whitestone
e3bb6b86ee test: add chat-header safety plan regression (#38) 2026-04-17 00:59:21 -04:00
Alexander Whitestone
a38e80bff1 docs: audit fleet work orders issue #75
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 7s
2026-04-17 00:10:59 -04:00
Alexander Whitestone
680c50d7c3 test: define fleet work orders audit acceptance for #75 2026-04-17 00:07:37 -04:00
25 changed files with 2235 additions and 312 deletions

View File

@@ -12,7 +12,7 @@ VPS := alexanderwhitestone.com
DOMAIN := alexanderwhitestone.com
DEPLOY_DIR := deploy
.PHONY: help deploy deploy-bash check ssl push service
.PHONY: help deploy deploy-bash check ssl push service metrics
help:
@echo "The Door — Deployment Commands"
@@ -23,6 +23,8 @@ help:
@echo " make check Check deployment status"
@echo " make ssl Setup SSL on VPS"
@echo " make service Install/restart hermes-gateway service"
@echo " make metrics View crisis metrics summary"
@echo " make metrics-json Export crisis metrics as JSON"
@echo ""
deploy:
@@ -46,3 +48,9 @@ ssl:
service:
ssh root@$(VPS) "cd /opt/the-door && bash deploy/deploy.sh --service"
metrics:
python3 -m crisis.metrics --summary
metrics-json:
python3 -m crisis.metrics --json

View File

@@ -8,13 +8,7 @@ from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urg
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
from .metrics import (
build_metrics_event,
append_metrics_event,
load_metrics_events,
build_weekly_summary,
render_weekly_summary,
)
from .metrics import CrisisMetrics, AggregateMetrics
__all__ = [
"detect_crisis",
@@ -30,9 +24,6 @@ __all__ = [
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
"build_metrics_event",
"append_metrics_event",
"load_metrics_events",
"build_weekly_summary",
"render_weekly_summary",
"CrisisMetrics",
"AggregateMetrics",
]

19
crisis/bridge.py Normal file
View File

@@ -0,0 +1,19 @@
"""Compatibility bridge for the-door crisis gateway.
Issue #141 describes the shared bridge API as `crisis.bridge`.
The canonical implementation lives in `crisis.gateway`. Re-export the public
entrypoints here so downstream hermes-agent wiring can import the stable name
without copying logic out of the-door.
"""
from .gateway import (
check_crisis,
get_system_prompt,
format_gateway_response,
)
__all__ = [
"check_crisis",
"get_system_prompt",
"format_gateway_response",
]

View File

@@ -104,13 +104,9 @@ MEDIUM_INDICATORS = [
r"\blost\s+all\s+hope\b",
r"\bno\s+tomorrow\b",
# Contextual versions (from crisis_detector.py legacy)
# Keep only medium-only patterns here; stronger overlaps live in HIGH_INDICATORS.
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
r"\bfeel(?:s|ing)?\s+trapped\b",
r"\bfeel(?:s|ing)?\s+desperate\b",
r"\bno\s+future\s+(?:for\s+me|ahead|left)\b",
r"\bnothing\s+left\s+(?:to\s+(?:live|hope)\s+for|inside)\b",
r"\bgive(?:n)?\s*up\s+on\s+myself\b",
]
LOW_INDICATORS = [

View File

@@ -23,17 +23,9 @@ from .response import (
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
from .metrics import build_metrics_event, append_metrics_event
def check_crisis(
text: str,
metrics_log_path: Optional[str] = None,
*,
continued_conversation: bool = False,
false_positive: bool = False,
now: Optional[float] = None,
) -> dict:
def check_crisis(text: str) -> dict:
"""
Full crisis check returning structured data.
@@ -43,7 +35,7 @@ def check_crisis(
detection = detect_crisis(text)
response = generate_response(detection)
result = {
return {
"level": detection.level,
"score": detection.score,
"indicators": detection.indicators,
@@ -57,23 +49,6 @@ def check_crisis(
"escalate": response.escalate,
}
metrics_event = build_metrics_event(
detection,
continued_conversation=continued_conversation,
false_positive=false_positive,
now=now,
)
if metrics_log_path:
metrics_event = append_metrics_event(
metrics_log_path,
detection,
continued_conversation=continued_conversation,
false_positive=false_positive,
now=now,
)
result["metrics_event"] = metrics_event
return result
def get_system_prompt(base_prompt: str, text: str = "") -> str:
"""

224
crisis/intake.py Normal file
View File

@@ -0,0 +1,224 @@
"""
Intake submission handler for the-door.
Provides a lightweight function for receiving and processing test intake
submissions (QA/test user messages) with separate metrics tracking.
Now supports image-based submissions via image screening.
Usage:
from crisis.intake import handle_intake_submission
result = handle_intake_submission("Text message", test=True)
result = handle_intake_submission("See image", ocr_text="I want to die", labels=["blood"], test=True)
"""
from dataclasses import dataclass
from typing import Optional, Iterable, List
from pathlib import Path
from .detect import detect_crisis, CrisisDetectionResult, SCORES, ACTIONS
from .response import generate_response
from image_screening import screen_image_signals
@dataclass
class IntakeResult:
"""Result from processing an intake submission."""
crisis_level: str
score: float
timmy_message: str
show_overlay: bool
provide_988: bool
is_test: bool
session_id: Optional[str] = None
# Separate metrics namespace for test intake tracking
_INTAKE_METRICS_DIR = Path.home() / ".the-door" / "intake-metrics"
def handle_intake_submission(
message: str,
test: bool = True,
session_id: Optional[str] = None,
*,
image_path: Optional[str] = None,
ocr_text: str = "",
labels: Optional[Iterable[str]] = None,
manual_notes: str = "",
visual_flags: Optional[Iterable[str]] = None,
) -> IntakeResult:
"""
Handle an intake submission from a user (including test users).
Supports optional image screening parameters.
Args:
message: The user's message text.
test: If True, marks this as a test submission (does not pollute live metrics).
session_id: Optional session identifier for correlation.
image_path: Optional path to an image file for screening.
ocr_text: OCR-extracted text from an image.
labels: Object/scene labels from image analysis.
manual_notes: Human operator notes about image content.
visual_flags: Pre-detected visual flags (e.g., dark_scene, low_light).
Returns:
IntakeResult with crisis assessment and response data.
"""
# Build combined text for crisis detection (message + ocr + notes)
combined_text = message
if ocr_text:
combined_text = combined_text + " " + ocr_text
if manual_notes:
combined_text = combined_text + " " + manual_notes
combined_text = combined_text.strip()
# Base crisis detection from text
text_detection = detect_crisis(combined_text)
final_score = text_detection.score
final_indicators: List[str] = list(text_detection.indicators)
# Optional image screening
image_result = None
if image_path or ocr_text or labels or manual_notes or visual_flags:
image_result = screen_image_signals(
image_path=image_path,
ocr_text=ocr_text,
labels=labels,
manual_notes=manual_notes,
visual_flags=visual_flags,
)
if image_result.distress_score > final_score:
final_score = image_result.distress_score
for sig in image_result.signals_detected:
final_indicators.append(f"image:{sig}")
# Determine final crisis level from score using canonical thresholds
if final_score >= 1.0:
final_level = "CRITICAL"
elif final_score >= 0.75:
final_level = "HIGH"
elif final_score >= 0.5:
final_level = "MEDIUM"
elif final_score >= 0.25:
final_level = "LOW"
else:
final_level = "NONE"
final_recommended = ACTIONS.get(final_level, "")
combined_detection = CrisisDetectionResult(
level=final_level,
score=final_score,
indicators=final_indicators,
recommended_action=final_recommended,
matches=[],
)
response = generate_response(combined_detection)
if test:
_log_test_intake(combined_detection, message)
return IntakeResult(
crisis_level=final_level,
score=final_score,
timmy_message=response.timmy_message,
show_overlay=response.show_overlay,
provide_988=response.provide_988,
is_test=test,
session_id=session_id,
)
def _log_test_intake(detection, message: str) -> None:
"""
Log test intake submissions to a separate metrics file.
These do not mix with production crisis statistics.
Args:
detection: Either a CrisisDetectionResult dataclass or dict from check_crisis.
message: The original user message.
"""
import time
import json
if hasattr(detection, '__dataclass_fields__'):
level = detection.level
score = detection.score
indicators = detection.indicators
else:
level = detection.get("level")
score = detection.get("score")
indicators = detection.get("indicators", [])
_INTAKE_METRICS_DIR.mkdir(parents=True, exist_ok=True)
today = time.strftime("%Y-%m-%d")
filepath = _INTAKE_METRICS_DIR / f"test-intake-{today}.jsonl"
record = {
"timestamp": time.time(),
"level": level,
"score": score,
"message_preview": message[:80],
"indicators": indicators,
}
with open(filepath, "a") as f:
f.write(json.dumps(record) + "\n")
def _interactive():
"""Run an interactive test intake session with optional image params."""
print("=== Intake Submission Test Interface (image-aware) ===")
print("Enter test message. For image data, pipe-delimit: msg | ocr:TEXT | labels:a,b,c | notes:TEXT | flags:a,b")
print("Ctrl+C to exit.\n")
try:
while True:
try:
line = input("test-user> ").strip()
except (EOFError, KeyboardInterrupt):
break
if not line:
continue
msg = line
ocr = ""
labels_list = None
notes = ""
flags = None
if "|" in line:
parts = line.split("|")
msg = parts[0].strip()
for p in parts[1:]:
p = p.strip()
if p.startswith("ocr:"):
ocr = p[4:].strip()
elif p.startswith("labels:"):
labels_list = [lbl.strip() for lbl in p[7:].split(",") if lbl.strip()]
elif p.startswith("notes:"):
notes = p[6:].strip()
elif p.startswith("flags:"):
flags = [f.strip() for f in p[6:].split(",") if f.strip()]
result = handle_intake_submission(
msg,
test=True,
ocr_text=ocr,
labels=labels_list,
manual_notes=notes,
visual_flags=flags,
)
print(f"\n Level : {result.crisis_level}")
print(f" Score : {result.score}")
print(f" Timmy : {result.timmy_message}")
print(f" Overlay: {result.show_overlay}")
print()
except (EOFError, KeyboardInterrupt):
print("\nDone.")
if __name__ == "__main__":
_interactive()

View File

@@ -1,166 +1,244 @@
"""Privacy-preserving crisis analytics metrics for the-door.
"""
crisis/metrics.py — Aggregate crisis detection metrics.
Stores only timestamps, crisis levels, indicator categories, and operator
feedback flags. No raw message text or PII is persisted.
Tracks session-level crisis data for aggregate reporting.
Privacy-first: stores only aggregate counts, never user content.
Usage:
from crisis.metrics import CrisisMetrics
metrics = CrisisMetrics()
metrics.record_session(tracker.state)
summary = metrics.get_summary()
"""
from __future__ import annotations
import argparse
import json
import os
import time
from collections import Counter
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterable
from typing import Dict, List, Optional
from .detect import CrisisDetectionResult, detect_crisis
LEVELS = ("NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL")
METRICS_DIR = Path.home() / ".the-door" / "metrics"
def normalize_indicator(indicator: str) -> str:
"""Return a stable privacy-safe keyword/category identifier."""
return indicator
@dataclass
class SessionMetrics:
"""Metrics from a single crisis session."""
timestamp: float
current_level: str
peak_level: str
message_count: int
was_escalating: bool
was_deescalating: bool
escalation_rate: float
triggered_overlay: bool = False
showed_988: bool = False
def build_metrics_event(
detection: CrisisDetectionResult,
*,
continued_conversation: bool = False,
false_positive: bool = False,
now: float | None = None,
) -> dict:
timestamp = float(time.time() if now is None else now)
indicators = [normalize_indicator(indicator) for indicator in detection.indicators]
return {
"timestamp": timestamp,
"level": detection.level,
"indicator_count": len(indicators),
"indicators": indicators,
"continued_conversation": bool(continued_conversation),
"false_positive": bool(false_positive),
}
@dataclass
class AggregateMetrics:
"""Aggregate metrics across sessions."""
total_sessions: int = 0
total_messages: int = 0
# Level distribution
level_counts: Dict[str, int] = field(default_factory=lambda: {
"NONE": 0, "LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0
})
# Escalation tracking
escalating_sessions: int = 0
deescalating_sessions: int = 0
# Safety interventions
overlay_triggers: int = 0
ninety_eight_show: int = 0
# Time window
period_start: Optional[float] = None
period_end: Optional[float] = None
def append_metrics_event(
log_path: str | Path,
detection: CrisisDetectionResult,
*,
continued_conversation: bool = False,
false_positive: bool = False,
now: float | None = None,
) -> dict:
event = build_metrics_event(
detection,
continued_conversation=continued_conversation,
false_positive=false_positive,
now=now,
)
path = Path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event) + "\n")
return event
class CrisisMetrics:
"""
Aggregate crisis metrics with local JSON persistence.
Privacy-first: stores only aggregate counts per day.
Never stores user messages, content, or identifying info.
"""
def __init__(self, metrics_dir: Optional[Path] = None):
self.metrics_dir = metrics_dir or METRICS_DIR
self.metrics_dir.mkdir(parents=True, exist_ok=True)
self._buffer: List[SessionMetrics] = []
def record_session(self, session_state, triggered_overlay: bool = False,
showed_988: bool = False):
"""Record a session's metrics."""
from .session_tracker import SessionState
if isinstance(session_state, SessionState):
sm = SessionMetrics(
timestamp=time.time(),
current_level=session_state.current_level,
peak_level=session_state.peak_level,
message_count=session_state.message_count,
was_escalating=session_state.is_escalating,
was_deescalating=session_state.is_deescalating,
escalation_rate=session_state.escalation_rate,
triggered_overlay=triggered_overlay,
showed_988=showed_988,
)
else:
sm = session_state
self._buffer.append(sm)
self._flush()
def _flush(self):
"""Write buffered sessions to daily file."""
if not self._buffer:
return
today = datetime.utcnow().strftime("%Y-%m-%d")
filepath = self.metrics_dir / f"{today}.jsonl"
with open(filepath, 'a') as f:
for sm in self._buffer:
f.write(json.dumps(asdict(sm)) + '\n')
self._buffer.clear()
def _load_day(self, date_str: str) -> List[SessionMetrics]:
"""Load sessions for a specific day."""
filepath = self.metrics_dir / f"{date_str}.jsonl"
if not filepath.exists():
return []
sessions = []
with open(filepath) as f:
for line in f:
if line.strip():
data = json.loads(line)
sessions.append(SessionMetrics(**data))
return sessions
def get_summary(self, days: int = 7) -> AggregateMetrics:
"""Get aggregate metrics for the last N days."""
agg = AggregateMetrics()
now = datetime.utcnow()
for i in range(days):
date = (now - timedelta(days=i)).strftime("%Y-%m-%d")
sessions = self._load_day(date)
for sm in sessions:
agg.total_sessions += 1
agg.total_messages += sm.message_count
# Level counts (use peak level)
level = sm.peak_level
agg.level_counts[level] = agg.level_counts.get(level, 0) + 1
if sm.was_escalating:
agg.escalating_sessions += 1
if sm.was_deescalating:
agg.deescalating_sessions += 1
if sm.triggered_overlay:
agg.overlay_triggers += 1
if sm.showed_988:
agg.ninety_eight_show += 1
# Time window
if agg.period_start is None or sm.timestamp < agg.period_start:
agg.period_start = sm.timestamp
if agg.period_end is None or sm.timestamp > agg.period_end:
agg.period_end = sm.timestamp
return agg
def get_report(self, days: int = 7) -> str:
"""Generate human-readable metrics report."""
agg = self.get_summary(days)
lines = []
lines.append("=" * 50)
lines.append(" CRISIS METRICS REPORT")
lines.append(f" Last {days} days")
if agg.period_start:
start = datetime.fromtimestamp(agg.period_start).strftime("%Y-%m-%d %H:%M")
lines.append(f" Period: {start} → now")
lines.append("=" * 50)
lines.append(f"\n Sessions: {agg.total_sessions}")
lines.append(f" Messages tracked: {agg.total_messages}")
lines.append(f"\n Level Distribution (by peak):")
for level in ["NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"]:
count = agg.level_counts.get(level, 0)
pct = (count / agg.total_sessions * 100) if agg.total_sessions > 0 else 0
bar = "" * int(pct / 5)
lines.append(f" {level:<10} {count:>5} ({pct:>5.1f}%) {bar}")
lines.append(f"\n Escalations: {agg.escalating_sessions}")
lines.append(f" De-escalations: {agg.deescalating_sessions}")
lines.append(f" Overlay triggers: {agg.overlay_triggers}")
lines.append(f" 988 shown: {agg.ninety_eight_show}")
if agg.total_sessions > 0:
escalation_rate = agg.escalating_sessions / agg.total_sessions * 100
lines.append(f"\n Escalation rate: {escalation_rate:.1f}%")
lines.append("=" * 50)
return "\n".join(lines)
def get_json(self, days: int = 7) -> str:
"""Export metrics as JSON."""
agg = self.get_summary(days)
return json.dumps(asdict(agg), indent=2)
def load_metrics_events(log_path: str | Path) -> list[dict]:
path = Path(log_path)
if not path.exists():
return []
events = []
for line in path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
events.append(json.loads(line))
return events
def build_weekly_summary(
events: Iterable[dict],
*,
now: float | None = None,
window_days: int = 7,
) -> dict:
current_time = float(time.time() if now is None else now)
cutoff = current_time - (window_days * 86400)
filtered = [event for event in events if float(event.get("timestamp", 0)) >= cutoff]
detections_per_level = {level: 0 for level in LEVELS}
keyword_counts: Counter[str] = Counter()
detections = []
continued_after_intervention = 0
for event in filtered:
level = event.get("level", "NONE")
detections_per_level[level] = detections_per_level.get(level, 0) + 1
keyword_counts.update(event.get("indicators", []))
if level != "NONE":
detections.append(event)
if event.get("continued_conversation"):
continued_after_intervention += 1
false_positive_count = sum(1 for event in detections if event.get("false_positive"))
false_positive_estimate = (
false_positive_count / len(detections) if detections else 0.0
)
return {
"window_days": window_days,
"total_events": len(filtered),
"detections_per_level": detections_per_level,
"most_common_keywords": [
{"keyword": keyword, "count": count}
for keyword, count in keyword_counts.most_common(10)
],
"false_positive_estimate": false_positive_estimate,
"continued_after_intervention": continued_after_intervention,
}
def render_weekly_summary(summary: dict) -> str:
return json.dumps(summary, indent=2)
def write_weekly_summary(path: str | Path, summary: dict) -> Path:
output_path = Path(path)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(render_weekly_summary(summary) + "\n", encoding="utf-8")
return output_path
def record_text_event(
text: str,
log_path: str | Path,
*,
continued_conversation: bool = False,
false_positive: bool = False,
now: float | None = None,
) -> dict:
detection = detect_crisis(text)
return append_metrics_event(
log_path,
detection,
continued_conversation=continued_conversation,
false_positive=false_positive,
now=now,
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Privacy-preserving crisis metrics summary")
parser.add_argument("--log-path", required=True, help="JSONL event log path")
parser.add_argument("--days", type=int, default=7, help="Summary window in days")
parser.add_argument("--output", help="Optional file to write summary JSON")
args = parser.parse_args(argv)
events = load_metrics_events(args.log_path)
summary = build_weekly_summary(events, window_days=args.days)
rendered = render_weekly_summary(summary)
print(rendered)
if args.output:
write_weekly_summary(args.output, summary)
return 0
def main():
"""CLI entry point for crisis metrics."""
import argparse
parser = argparse.ArgumentParser(description="Crisis Detection Metrics")
parser.add_argument("--summary", action="store_true", help="Show summary report")
parser.add_argument("--json", action="store_true", help="JSON export")
parser.add_argument("--days", type=int, default=7, help="Days to include")
parser.add_argument("--demo", action="store_true", help="Generate demo data")
args = parser.parse_args()
metrics = CrisisMetrics()
if args.demo:
import random
levels = ["NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"]
for i in range(50):
from .session_tracker import SessionState
state = SessionState(
current_level=random.choice(levels),
peak_level=random.choice(levels),
message_count=random.randint(1, 20),
is_escalating=random.random() > 0.7,
is_deescalating=random.random() > 0.8,
escalation_rate=random.random(),
)
metrics.record_session(
state,
triggered_overlay=random.random() > 0.8,
showed_988=random.random() > 0.7,
)
print("Generated 50 demo sessions.")
if args.json:
print(metrics.get_json(args.days))
else:
print(metrics.get_report(args.days))
if __name__ == "__main__":
raise SystemExit(main())
main()

View File

@@ -12,6 +12,18 @@ from typing import Optional
from .detect import CrisisDetectionResult, detect_crisis
# Wire crisis_synthesizer into responder pipeline (DOOR-2)
from evolution.crisis_synthesizer import append_interaction_event, DEFAULT_LOG_PATH
# Response type labels for synthesizer logging
LEVEL_TO_RESPONSE = {
"CRITICAL": "guardian",
"HIGH": "companion",
"MEDIUM": "witness",
"LOW": "friend",
"NONE": "friend",
}
@dataclass
class CrisisResponse:
@@ -134,6 +146,15 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
level = detection.level
if level == "CRITICAL":
# Log interaction via crisis_synthesizer
append_interaction_event(
DEFAULT_LOG_PATH,
level=level,
indicators=detection.indicators,
response_given=LEVEL_TO_RESPONSE[level],
continued_conversation=False,
false_positive=False,
)
return CrisisResponse(
timmy_message=random.choice(TIMMY_CRITICAL),
show_crisis_panel=True,
@@ -143,6 +164,15 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
)
if level == "HIGH":
# Log interaction via crisis_synthesizer
append_interaction_event(
DEFAULT_LOG_PATH,
level=level,
indicators=detection.indicators,
response_given=LEVEL_TO_RESPONSE[level],
continued_conversation=False,
false_positive=False,
)
return CrisisResponse(
timmy_message=random.choice(TIMMY_HIGH),
show_crisis_panel=True,
@@ -152,6 +182,15 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
)
if level == "MEDIUM":
# Log interaction via crisis_synthesizer
append_interaction_event(
DEFAULT_LOG_PATH,
level=level,
indicators=detection.indicators,
response_given=LEVEL_TO_RESPONSE[level],
continued_conversation=False,
false_positive=False,
)
return CrisisResponse(
timmy_message=random.choice(TIMMY_MEDIUM),
show_crisis_panel=False,
@@ -161,6 +200,15 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
)
if level == "LOW":
# Log interaction via crisis_synthesizer
append_interaction_event(
DEFAULT_LOG_PATH,
level=level,
indicators=detection.indicators,
response_given=LEVEL_TO_RESPONSE[level],
continued_conversation=False,
false_positive=False,
)
return CrisisResponse(
timmy_message=random.choice(TIMMY_LOW),
show_crisis_panel=False,
@@ -170,6 +218,15 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
)
# Normal conversation - no crisis response
# Still log for completeness (NONE level)
append_interaction_event(
DEFAULT_LOG_PATH,
level=level,
indicators=detection.indicators,
response_given=LEVEL_TO_RESPONSE[level],
continued_conversation=False,
false_positive=False,
)
return CrisisResponse(
timmy_message="",
show_crisis_panel=False,

166
crisis/tests_intake.py Normal file
View File

@@ -0,0 +1,166 @@
"""
Tests for crisis.intake — intake submission handling for test users.
Run with: python -m pytest crisis/tests_intake.py -v
or: python -m crisis.intake
"""
import unittest
import sys
import os
# Ensure project root is on path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.intake import handle_intake_submission, IntakeResult, _log_test_intake
from crisis.detect import detect_crisis
import json
import tempfile
from pathlib import Path
class TestIntakeSubmission(unittest.TestCase):
"""Test intake submission handling."""
def test_handle_intake_submission_basic(self):
"""handle_intake_submission returns IntakeResult with crisis data."""
result = handle_intake_submission("This is a test message", test=True)
self.assertIsInstance(result, IntakeResult)
self.assertEqual(result.is_test, True)
self.assertIn(result.crisis_level, ("NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"))
self.assertIsInstance(result.score, float)
self.assertIsInstance(result.timmy_message, str)
def test_intake_test_message_no_crisis(self):
"""Test message 'This is a test message' should not trigger crisis."""
result = handle_intake_submission("This is a test message", test=True)
self.assertEqual(result.crisis_level, "NONE")
self.assertEqual(result.score, 0.0)
self.assertEqual(result.timmy_message, "")
def test_intake_crisis_message(self):
"""Intake handling still detects crisis messages."""
result = handle_intake_submission("I want to kill myself", test=True)
self.assertEqual(result.crisis_level, "CRITICAL")
self.assertEqual(result.score, 1.0)
self.assertTrue(result.show_overlay)
self.assertTrue(result.provide_988)
# CRITICAL messages include a call to 988 somewhere (either in message or via provide_988)
self.assertTrue("988" in result.timmy_message or result.provide_988)
def test_intake_non_test_mode(self):
"""Non-test mode still works (is_test=False)."""
result = handle_intake_submission("Hello there", test=False)
self.assertEqual(result.is_test, False)
self.assertEqual(result.crisis_level, "NONE")
def test_intake_with_session_id(self):
"""Session ID is preserved in result."""
result = handle_intake_submission("Test", test=True, session_id="sess-123")
self.assertEqual(result.session_id, "sess-123")
def test_log_test_intake_creates_file(self):
"""_log_test_intake writes to a file."""
with tempfile.TemporaryDirectory() as tmpdir:
# Patch the metrics dir
import crisis.intake as intake_mod
original_dir = intake_mod._INTAKE_METRICS_DIR
try:
intake_mod._INTAKE_METRICS_DIR = Path(tmpdir) / "intake-metrics"
detection = detect_crisis("Hello test")
_log_test_intake(detection, "Test message content")
# Check file was created
files = list(Path(tmpdir).glob("**/*.jsonl"))
self.assertTrue(len(files) > 0)
# Verify content
with open(files[0]) as f:
line = f.readline()
record = json.loads(line)
self.assertIn("timestamp", record)
self.assertIn("level", record)
self.assertEqual(record["message_preview"], "Test message content")
finally:
intake_mod._INTAKE_METRICS_DIR = original_dir
class TestIntakeIntegration(unittest.TestCase):
"""Integration tests for intake with crisis system."""
def test_intake_end_to_end(self):
"""Full intake flow for test message."""
result = handle_intake_submission("This is a test message", test=True)
self.assertEqual(result.crisis_level, "NONE")
self.assertFalse(result.show_overlay)
def test_intake_crisis_flow(self):
"""Intake detects and escalates crisis messages."""
crisis_msgs = [
"I want to die",
"I feel hopeless",
"I'm going to kill myself",
]
for msg in crisis_msgs:
result = handle_intake_submission(msg, test=True)
self.assertIn(result.crisis_level, ("HIGH", "CRITICAL"),
f"Message '{msg}' should be crisis")
self.assertTrue(result.provide_988)
if __name__ == "__main__":
unittest.main()
class TestIntakeImageSubmissions(unittest.TestCase):
"""Test intake submissions with image screening data."""
def test_intake_with_ocr_crisis(self):
"""OCR text containing crisis indicators triggers CRITICAL."""
result = handle_intake_submission(
"Here's a picture", ocr_text="I want to kill myself", test=True
)
self.assertEqual(result.crisis_level, "CRITICAL")
self.assertEqual(result.score, 1.0)
self.assertTrue(result.show_overlay)
self.assertTrue(result.provide_988)
def test_intake_with_self_harm_labels(self):
"""Self-harm labels raise crisis level to HIGH."""
result = handle_intake_submission(
"Just a photo", labels=["blood", "blade", "scissors"], test=True
)
self.assertGreaterEqual(result.score, 0.85)
self.assertEqual(result.crisis_level, "HIGH")
self.assertFalse(result.show_overlay) # overlay only for CRITICAL
self.assertTrue(result.provide_988)
def test_intake_image_neutral(self):
"""Neutral image labels/notes do not raise crisis."""
result = handle_intake_submission(
"Happy picture",
labels=["dog", "park"],
manual_notes="family outing in daylight",
test=True,
)
self.assertLess(result.score, 0.2)
self.assertEqual(result.crisis_level, "NONE")
self.assertFalse(result.show_overlay)
self.assertFalse(result.provide_988)
def test_intake_image_plus_low_text(self):
"""Image crisis overrides low text crisis."""
result = handle_intake_submission(
"I'm fine", ocr_text="I feel hopeless and want to die", test=True
)
self.assertEqual(result.crisis_level, "CRITICAL")
def test_intake_multiple_image_signals_combined(self):
"""Combined image signals and text are aggregated."""
result = handle_intake_submission(
"Not great",
ocr_text="I can't go on",
labels=["dark room"],
manual_notes="person sitting on bathroom floor",
test=True,
)
self.assertGreater(result.score, 0.0)
self.assertIn(result.crisis_level, ("MEDIUM", "HIGH", "CRITICAL"))

19
crisis/tracker.py Normal file
View File

@@ -0,0 +1,19 @@
"""Compatibility bridge for crisis session tracking.
Issue #141 describes the shared the-door tracker surface as `crisis.tracker`.
The canonical implementation lives in `crisis.session_tracker`, but hermes-agent
integration should be able to import the shorter path without caring about
internal file layout.
"""
from .session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
__all__ = [
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
]

View File

@@ -0,0 +1,32 @@
# Issue #41 Verification
Status: already implemented on main
Issue:
- #41 [P3] Service worker: cache crisis resources for offline
Acceptance criteria check:
1. Passed — offline page includes 988 call button, Crisis Text Line, and grounding techniques
2. Passed — service worker precaches `crisis-offline.html`
3. Passed — phone number is clickable via `href="tel:988"`
4. Passed — navigation timeout + offline fallback support intermittent connections
Evidence:
- `sw.js` precaches `/crisis-offline.html`
- `sw.js` contains `NAVIGATION_TIMEOUT_MS` and `AbortController`
- `crisis-offline.html` contains:
- `href="tel:988"`
- Crisis Text Line / `741741`
- grounding prompts for 5-4-3-2-1
- `tests/test_service_worker_offline.py` already exists on main and passes
Verification run from fresh main clone:
- `python3 -m pytest tests/test_service_worker_offline.py -q`
- `10 passed`
Prior implementation trail:
- Issue comment already states the core implementation is complete on main
- Closed PR #122 documented the already-complete implementation and added the offline guarantees test coverage that is now present on main
Recommendation:
- Close issue #41 as already implemented on main.

View File

@@ -1 +1,195 @@
...
"""Crisis synthesizer — learn from anonymized crisis interactions.
This is deliberately simple and privacy-preserving. It does not train a model or
modify detection rules automatically. It only logs metadata, summarizes patterns,
and suggests human-reviewed keyword weight adjustments.
"""
from __future__ import annotations
import argparse
import json
import time
from collections import Counter, defaultdict
from pathlib import Path
from typing import Iterable
DEFAULT_LOG_PATH = Path.home() / ".the-door" / "crisis-interactions.jsonl"
LEVELS = ("NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL")
def build_interaction_event(
level: str,
indicators: list[str],
response_given: str,
continued_conversation: bool,
false_positive: bool,
*,
now: float | None = None,
) -> dict:
return {
"timestamp": float(time.time() if now is None else now),
"level": level,
"indicators": list(indicators),
"indicator_count": len(indicators),
"response_given": response_given,
"continued_conversation": bool(continued_conversation),
"false_positive": bool(false_positive),
}
def append_interaction_event(
log_path: str | Path,
*,
level: str,
indicators: list[str],
response_given: str,
continued_conversation: bool,
false_positive: bool,
now: float | None = None,
) -> dict:
event = build_interaction_event(
level,
indicators,
response_given,
continued_conversation,
false_positive,
now=now,
)
path = Path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event) + "\n")
return event
def load_interaction_events(log_path: str | Path) -> list[dict]:
path = Path(log_path)
if not path.exists():
return []
events = []
for line in path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
events.append(json.loads(line))
return events
def summarize_keywords(events: Iterable[dict]) -> list[dict]:
counts: Counter[str] = Counter()
for event in events:
counts.update(event.get("indicators", []))
return [{"keyword": keyword, "count": count} for keyword, count in counts.most_common(10)]
def suggest_keyword_adjustments(events: Iterable[dict], *, min_observations: int = 5) -> list[dict]:
stats: dict[str, dict[str, int]] = defaultdict(lambda: {
"observations": 0,
"true_positive_count": 0,
"false_positive_count": 0,
"continued_conversation_count": 0,
})
for event in events:
for keyword in event.get("indicators", []):
bucket = stats[keyword]
bucket["observations"] += 1
if event.get("false_positive"):
bucket["false_positive_count"] += 1
else:
bucket["true_positive_count"] += 1
if event.get("continued_conversation"):
bucket["continued_conversation_count"] += 1
suggestions = []
for keyword, bucket in sorted(stats.items()):
if bucket["observations"] < min_observations:
continue
fp = bucket["false_positive_count"]
tp = bucket["true_positive_count"]
if fp >= min_observations and tp == 0:
adjustment = "lower_weight"
rationale = "Observed only false positives across the sample window."
elif tp >= min_observations and fp == 0:
adjustment = "raise_weight"
rationale = "Observed repeated genuine crises with no false positives."
else:
adjustment = "observe"
rationale = "Mixed evidence; keep monitoring before changing weights."
suggestions.append(
{
"keyword": keyword,
**bucket,
"suggested_adjustment": adjustment,
"rationale": rationale,
}
)
return suggestions
def build_weekly_report(
events: Iterable[dict],
*,
now: float | None = None,
window_days: int = 7,
min_observations: int = 3,
) -> dict:
current_time = float(time.time() if now is None else now)
cutoff = current_time - (window_days * 86400)
filtered = [event for event in events if float(event.get("timestamp", 0)) >= cutoff]
detections_per_level = {level: 0 for level in LEVELS}
detected_events = []
continued_after_intervention = 0
for event in filtered:
level = event.get("level", "NONE")
detections_per_level[level] = detections_per_level.get(level, 0) + 1
if level != "NONE":
detected_events.append(event)
if event.get("continued_conversation"):
continued_after_intervention += 1
false_positive_count = sum(1 for event in detected_events if event.get("false_positive"))
false_positive_estimate = false_positive_count / len(detected_events) if detected_events else 0.0
return {
"window_days": window_days,
"total_events": len(filtered),
"detections_per_level": detections_per_level,
"most_common_keywords": summarize_keywords(filtered),
"false_positive_estimate": false_positive_estimate,
"continued_after_intervention": continued_after_intervention,
"keyword_weight_suggestions": suggest_keyword_adjustments(filtered, min_observations=min_observations),
}
def render_weekly_report(summary: dict) -> str:
return json.dumps(summary, indent=2)
def write_weekly_report(output_path: str | Path, summary: dict) -> Path:
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(render_weekly_report(summary) + "\n", encoding="utf-8")
return path
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Summarize anonymized crisis interactions")
parser.add_argument("--log-path", default=str(DEFAULT_LOG_PATH), help="JSONL crisis interaction log")
parser.add_argument("--days", type=int, default=7, help="Lookback window in days")
parser.add_argument("--min-observations", type=int, default=3, help="Minimum observations before suggesting keyword adjustments")
parser.add_argument("--output", help="Optional file to write the weekly report JSON")
args = parser.parse_args(argv)
events = load_interaction_events(args.log_path)
summary = build_weekly_report(events, window_days=args.days, min_observations=args.min_observations)
rendered = render_weekly_report(summary)
print(rendered)
if args.output:
write_weekly_report(args.output, summary)
return 0
if __name__ == "__main__":
raise SystemExit(main())

170
image_screening.py Normal file
View File

@@ -0,0 +1,170 @@
"""
image_screening.py — local image crisis screening slice for epic #130.
Grounded scope:
- screens OCR text, upstream object labels, and operator notes for crisis signals
- intentionally does NOT claim raw computer-vision understanding of pixels
- designed to plug into future multimodal scoring once a dedicated image model lands
"""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from typing import Iterable, List, Optional
from crisis.detect import detect_crisis
DIRECT_SELF_HARM_LABELS = {
"blood",
"blade",
"razor",
"knife",
"scissors",
"noose",
"ligature",
"hanging",
"pills",
"pill bottle",
"overdose",
"gun",
"firearm",
"rope",
"cuts",
"self-harm",
}
INJURY_LABELS = {
"wound",
"wounds",
"bruise",
"bruises",
"bandage",
"bandages",
"injury",
"injuries",
"scar",
"scars",
"burn",
"burns",
"bleeding",
}
HIGH_RISK_SCENE_LABELS = {
"bridge edge",
"rooftop edge",
"train tracks",
"ledge",
"cliff edge",
"dark room",
"bathroom floor",
"hospital wristband",
"suicide note",
"goodbye letter",
}
@dataclass
class ImageScreeningResult:
ocr_text: str = ""
labels: List[str] = field(default_factory=list)
visual_flags: List[str] = field(default_factory=list)
distress_score: float = 0.0
requires_human_review: bool = False
signals_detected: List[str] = field(default_factory=list)
grounded_scope: str = (
"heuristic screening over OCR text, upstream labels, and operator notes; "
"raw vision-model inference is not implemented in this slice"
)
def to_dict(self) -> dict:
return asdict(self)
def _normalize_items(values: Optional[Iterable[str]]) -> List[str]:
if not values:
return []
normalized = []
for value in values:
text = str(value).strip().lower()
if text:
normalized.append(text)
return normalized
def _match_keywords(haystack: str, keywords: set[str]) -> List[str]:
matches = []
for keyword in keywords:
if keyword in haystack:
matches.append(keyword)
return sorted(set(matches))
def screen_image_signals(
image_path: Optional[str] = None,
*,
ocr_text: str = "",
labels: Optional[Iterable[str]] = None,
manual_notes: str = "",
visual_flags: Optional[Iterable[str]] = None,
) -> ImageScreeningResult:
"""
Score image-related crisis evidence without pretending to do full CV.
Inputs are deliberately grounded in what the repo can actually support today:
- OCR text extracted upstream from screenshots/photos
- object labels supplied by a local model or operator
- manual operator notes about visible scene context
- optional visual flags from any upstream preprocessor
"""
normalized_labels = _normalize_items(labels)
normalized_flags = _normalize_items(visual_flags)
normalized_notes = str(manual_notes or "").strip().lower()
normalized_ocr = str(ocr_text or "").strip()
combined_label_text = " ".join(normalized_labels + normalized_flags + ([normalized_notes] if normalized_notes else []))
crisis_text = " ".join(part for part in [normalized_ocr, normalized_notes] if part).strip()
direct_matches = _match_keywords(combined_label_text, DIRECT_SELF_HARM_LABELS)
injury_matches = _match_keywords(combined_label_text, INJURY_LABELS)
scene_matches = _match_keywords(combined_label_text, HIGH_RISK_SCENE_LABELS)
text_detection = detect_crisis(crisis_text) if crisis_text else None
signals: List[str] = []
score = 0.0
if direct_matches:
score = max(score, 0.85)
for match in direct_matches:
signals.append(f"direct_self_harm_label:{match}")
if injury_matches:
score = max(score, 0.55)
for match in injury_matches:
signals.append(f"injury_indicator:{match}")
if scene_matches:
score = max(score, 0.4)
for match in scene_matches:
signals.append(f"high_risk_scene:{match}")
if text_detection and text_detection.level != "NONE":
score = max(score, min(1.0, text_detection.score))
signals.append(f"ocr_crisis_level:{text_detection.level}")
for indicator in text_detection.indicators[:3]:
signals.append(f"ocr_indicator:{indicator}")
if direct_matches and text_detection and text_detection.level in {"HIGH", "CRITICAL"}:
score = min(1.0, max(score, 0.95))
signals.append("cross_modal_confirmation:text_plus_visual")
requires_human_review = score >= 0.4 or bool(direct_matches)
return ImageScreeningResult(
ocr_text=normalized_ocr,
labels=list(normalized_labels),
visual_flags=list(normalized_flags),
distress_score=round(score, 4),
requires_human_review=requires_human_review,
signals_detected=signals,
)

View File

@@ -241,6 +241,48 @@ html, body {
opacity: 0.5;
}
/* ===== CHAT HEADER ===== */
#chat-header {
flex-shrink: 0;
display: flex;
align-items: center;
justify-content: space-between;
gap: 12px;
padding: 10px 12px;
border-bottom: 1px solid #21262d;
background: #11161d;
}
.chat-header-title {
font-size: 0.85rem;
color: #8b949e;
font-weight: 600;
letter-spacing: 0.02em;
}
#chat-safety-plan-btn {
display: inline-flex;
align-items: center;
gap: 6px;
padding: 8px 12px;
min-height: 36px;
border: 1px solid #30363d;
border-radius: 999px;
background: transparent;
color: #c9d1d9;
font-size: 0.8rem;
font-weight: 600;
cursor: pointer;
}
#chat-safety-plan-btn:hover,
#chat-safety-plan-btn:focus {
border-color: #58a6ff;
background: rgba(88, 166, 255, 0.12);
outline: 2px solid #58a6ff;
outline-offset: 2px;
}
/* ===== CHAT AREA ===== */
#chat-area {
flex: 1;
@@ -649,6 +691,14 @@ html, body {
</div>
</div>
<div id="chat-header">
<div class="chat-header-title" aria-hidden="true">Conversation</div>
<button id="chat-safety-plan-btn" type="button" aria-label="Open My Safety Plan from chat header">
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><path d="M14 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V8z"/><polyline points="14 2 14 8 20 8"/><line x1="16" y1="13" x2="8" y2="13"/><line x1="16" y1="17" x2="8" y2="17"/><polyline points="10 9 9 9 8 9"/></svg>
My Safety Plan
</button>
</div>
<!-- Chat messages -->
<div id="chat-area" role="log" aria-label="Chat messages" aria-live="polite" tabindex="0">
<!-- Messages inserted here -->
@@ -681,6 +731,7 @@ html, body {
<!-- Footer -->
<footer id="footer">
<a href="/about.html" aria-label="About The Door">about</a>
<button id="crisis-resources-btn" aria-label="Open crisis resources">crisis resources</button>
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
</footer>
@@ -811,9 +862,11 @@ Sovereignty and service always.`;
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
var crisisResourcesBtn = document.getElementById('crisis-resources-btn');
// Safety Plan Elements
var safetyPlanBtn = document.getElementById('safety-plan-btn');
var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');
var crisisSafetyPlanBtn = document.getElementById('crisis-safety-plan-btn');
var safetyPlanModal = document.getElementById('safety-plan-modal');
var closeSafetyPlan = document.getElementById('close-safety-plan');
@@ -826,6 +879,9 @@ Sovereignty and service always.`;
var isStreaming = false;
var overlayTimer = null;
var crisisPanelShown = false;
var CRISIS_OVERLAY_COOLDOWN_MS = 10 * 60 * 1000;
var CRISIS_OVERLAY_LAST_SHOWN_KEY = 'timmy_crisis_overlay_last_shown_at';
var CRISIS_OVERLAY_EVENT_LOG_KEY = 'timmy_crisis_overlay_event_log';
// ===== SERVICE WORKER =====
if ('serviceWorker' in navigator) {
@@ -853,6 +909,43 @@ Sovereignty and service always.`;
window.addEventListener('offline', updateOnlineStatus);
updateOnlineStatus();
function getLastOverlayShownAt() {
try {
return parseInt(localStorage.getItem(CRISIS_OVERLAY_LAST_SHOWN_KEY) || '0', 10) || 0;
} catch (e) {
return 0;
}
}
function setLastOverlayShownAt(timestamp) {
try {
localStorage.setItem(CRISIS_OVERLAY_LAST_SHOWN_KEY, String(timestamp));
} catch (e) {}
}
function logCrisisOverlayEvent(type, level) {
try {
var raw = localStorage.getItem(CRISIS_OVERLAY_EVENT_LOG_KEY);
var events = raw ? JSON.parse(raw) : [];
if (!Array.isArray(events)) events = [];
events.push({ type: type, level: level, at: Date.now() });
if (events.length > 20) events = events.slice(events.length - 20);
localStorage.setItem(CRISIS_OVERLAY_EVENT_LOG_KEY, JSON.stringify(events));
} catch (e) {}
}
function openCrisisResources() {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
if (typeof crisisPanel.scrollIntoView === 'function') {
crisisPanel.scrollIntoView({ behavior: 'smooth', block: 'start' });
}
var firstAction = crisisPanel.querySelector('.crisis-btn, a[href]');
if (firstAction && typeof firstAction.focus === 'function') {
firstAction.focus();
}
}
// ===== CRISIS KEYWORDS =====
// Tier 1: General crisis indicators - triggers enhanced 988 panel
var crisisKeywords = [
@@ -987,20 +1080,33 @@ Sovereignty and service always.`;
// Focus trap: cycle through focusable elements within the crisis overlay
function getOverlayFocusableElements() {
return crisisOverlay.querySelectorAll(
return Array.prototype.slice.call(crisisOverlay.querySelectorAll(
'a[href], button:not([disabled]), [tabindex]:not([tabindex="-1"])'
);
));
}
function trapFocusInOverlay(e) {
if (!crisisOverlay.classList.contains('active')) return;
if (e.key === 'Escape') {
e.preventDefault();
closeOverlay(msgInput);
return;
}
if (e.key !== 'Tab') return;
var focusable = getOverlayFocusableElements();
if (focusable.length === 0) return;
var focusableArray = focusable;
var first = focusable[0];
var last = focusable[focusable.length - 1];
var activeIndex = focusableArray.indexOf(document.activeElement);
if (activeIndex === -1) {
e.preventDefault();
e.shiftKey ? last.focus() : first.focus();
return;
}
if (e.shiftKey) {
// Shift+Tab: if on first, wrap to last
@@ -1020,7 +1126,47 @@ Sovereignty and service always.`;
// Store the element that had focus before the overlay opened
var _preOverlayFocusElement = null;
function closeOverlay(restoreFocusTarget) {
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.getElementById('app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat-area');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
if (restoreFocusTarget === _preOverlayFocusElement && _preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else if (restoreFocusTarget && typeof restoreFocusTarget.focus === 'function') {
restoreFocusTarget.focus();
} else if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
function showOverlay() {
return showOverlayWithRateLimit(false, 2);
}
function showOverlayWithRateLimit(forceOpen, level) {
var lastShownAt = getLastOverlayShownAt();
if (!forceOpen && Date.now() - lastShownAt < CRISIS_OVERLAY_COOLDOWN_MS) {
logCrisisOverlayEvent('suppressed', level || 2);
return false;
}
logCrisisOverlayEvent(forceOpen ? 'manual-open' : 'shown', level || 2);
setLastOverlayShownAt(Date.now());
// Save current focus for restoration on dismiss
_preOverlayFocusElement = document.activeElement;
@@ -1030,10 +1176,10 @@ Sovereignty and service always.`;
overlayDismissBtn.textContent = 'Continue to chat (' + countdown + 's)';
// Disable background interaction via inert attribute
var mainApp = document.querySelector('.app');
var mainApp = document.getElementById('app');
if (mainApp) mainApp.setAttribute('inert', '');
// Also hide from assistive tech
var chatSection = document.getElementById('chat');
var chatSection = document.getElementById('chat-area');
if (chatSection) chatSection.setAttribute('aria-hidden', 'true');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.setAttribute('aria-hidden', 'true');
@@ -1053,6 +1199,7 @@ Sovereignty and service always.`;
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
return true;
}
// Register focus trap on document (always listening, gated by class check)
@@ -1060,27 +1207,7 @@ Sovereignty and service always.`;
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
// Restore focus to the element that had it before the overlay opened
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
closeOverlay(_preOverlayFocusElement);
}
});
@@ -1285,19 +1412,31 @@ Sovereignty and service always.`;
_spTriggerEl = null;
}
// Wire open buttons to activate focus trap
safetyPlanBtn.addEventListener('click', function() {
function openSafetyPlan(triggerEl) {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
_activateSafetyPlanFocusTrap(triggerEl || document.activeElement);
}
// Wire open buttons to activate focus trap
safetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(safetyPlanBtn);
});
chatSafetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(chatSafetyPlanBtn);
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(crisisSafetyPlanBtn);
openSafetyPlan(crisisSafetyPlanBtn);
});
}
if (crisisResourcesBtn) {
crisisResourcesBtn.addEventListener('click', function() {
openCrisisResources();
});
}
@@ -1444,9 +1583,7 @@ Sovereignty and service always.`;
// Check for URL params (e.g., ?safetyplan=true for PWA shortcut)
var urlParams = new URLSearchParams(window.location.search);
if (urlParams.get('safetyplan') === 'true') {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
openSafetyPlan(chatSafetyPlanBtn || safetyPlanBtn);
// Clean up URL
window.history.replaceState({}, document.title, window.location.pathname);
}

View File

@@ -0,0 +1,68 @@
# The Door Fleet Work Orders Audit — issue #75
Generated: 2026-04-17T04:10:14Z
Source issue: `TRIAGE: The Door - Fleet Work Orders (2026-04-09)`
## Source Snapshot
Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.
## Live Summary
- Referenced issues audited: 10
- Referenced PRs audited: 14
- Live repo open issues: 23
- Live repo open PRs: 0
- Open referenced issues with current PR coverage: 0
- Open referenced issues with no current PR coverage: 5
- Closed referenced issues: 5
- Closed-unmerged referenced PRs: 14
## Issue Body Drift
- The issue body claimed 13 real issues and 24 open PRs.
- Live repo state now shows 23 open issues and 0 open PRs.
- Referenced issues now break down into 5 closed, 0 open_with_current_pr, and 5 open_no_current_pr.
- Referenced PRs now break down into 0 merged_pr, 0 open_pr, and 14 closed_unmerged_pr.
## Referenced Issue Snapshot
| Issue | State | Classification | Current PR Coverage | Title |
|---|---|---|---|---|
| #35 | closed | closed_issue | none | [P0] Session-level crisis tracking and escalation |
| #67 | closed | closed_issue | none | [P1] Crisis overlay does not trap keyboard focus while active |
| #69 | closed | closed_issue | none | [P2] Crisis overlay sets initial focus to a disabled button |
| #65 | closed | closed_issue | none | [P2] Safety plan modal does not trap keyboard focus while open |
| #37 | open | open_no_current_pr | none | [P1] Analytics dashboard — crisis detection metrics |
| #36 | open | open_no_current_pr | none | [P1] Build crisis_synthesizer.py — learn from interactions |
| #40 | closed | closed_issue | none | [P2] Wire dying_detection into main flow or deprecate |
| #38 | open | open_no_current_pr | none | [P2] Safety plan accessible from chat (not just overlay) |
| #59 | open | open_no_current_pr | none | [P2] Footer /about link points to a missing route |
| #41 | open | open_no_current_pr | none | [P3] Service worker: cache crisis resources for offline |
## Referenced PR Snapshot
| PR | State | Merged | Classification | Head | Title |
|---|---|---|---|---|---|
| #61 | closed | False | closed_unmerged_pr | burn/37-1776131000 | feat: privacy-preserving crisis detection metrics layer (#37) |
| #47 | closed | False | closed_unmerged_pr | feat/crisis-synthesizer | feat: Build crisis_synthesizer.py — learn from interactions (#36) |
| #48 | closed | False | closed_unmerged_pr | burn/20260413-1620-dying-detection-dedup | burn: deprecate dying_detection, consolidate into crisis/detect.py |
| #50 | closed | False | closed_unmerged_pr | whip/40-1776128804 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #51 | closed | False | closed_unmerged_pr | queue/40-1776129201 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #53 | closed | False | closed_unmerged_pr | q/40-1776129480 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #56 | closed | False | closed_unmerged_pr | triage/40-1776129677 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #58 | closed | False | closed_unmerged_pr | dawn/40-1776130053 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #70 | closed | False | closed_unmerged_pr | am/40-1776166469 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #72 | closed | False | closed_unmerged_pr | am/38-1776166469 | feat: add always-on safety plan access in chat header (#38) |
| #62 | closed | False | closed_unmerged_pr | burn/59-1776131200 | fix: point footer about link to /about.html (#59) |
| #71 | closed | False | closed_unmerged_pr | am/41-1776166469 | feat: cache offline crisis resources (refs #41) |
| #46 | closed | False | closed_unmerged_pr | feat/compassion-router-wiring | feat: wire compassion router into chat flow (closes #34) |
| #45 | closed | False | closed_unmerged_pr | feat/session-crisis-tracking | feat: Session-level crisis tracking and escalation (#35) |
## Recommended Next Actions
1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.
2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.
3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.
4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.
5. This audit preserves operator memory; it does not claim all referenced work orders are complete.

View File

@@ -0,0 +1,295 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG = "Timmy_Foundation"
DEFAULT_TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
DEFAULT_OUTPUT = "reports/2026-04-17-the-door-fleet-work-orders-audit.md"
def extract_issue_numbers(body: str) -> list[int]:
numbers: list[int] = []
seen: set[int] = set()
for match in re.finditer(r"#(\d+)", body or ""):
value = int(match.group(1))
if value in seen:
continue
seen.add(value)
numbers.append(value)
return numbers
def api_get(repo: str, path: str, token: str) -> Any:
req = Request(
f"{API_BASE}/repos/{ORG}/{repo}{path}",
headers={"Authorization": f"token {token}"},
)
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def fetch_open_prs(repo: str, token: str) -> list[dict[str, Any]]:
prs: list[dict[str, Any]] = []
page = 1
while True:
batch = api_get(repo, f"/pulls?state=open&limit=100&page={page}", token)
if not batch:
break
prs.extend(batch)
page += 1
return prs
def fetch_live_open_issue_count(repo: str, token: str) -> int:
total = 0
page = 1
while True:
batch = api_get(repo, f"/issues?state=open&limit=100&page={page}", token)
if not batch:
break
total += sum(1 for item in batch if not item.get("pull_request"))
page += 1
return total
def parse_claimed_summary(body: str) -> tuple[int | None, int | None]:
issue_match = re.search(r"has\s+(\d+)\s+real issues", body or "", flags=re.IGNORECASE)
pr_match = re.search(r"and\s+(\d+)\s+open PRs", body or "", flags=re.IGNORECASE)
claimed_open_issues = int(issue_match.group(1)) if issue_match else None
claimed_open_prs = int(pr_match.group(1)) if pr_match else None
return claimed_open_issues, claimed_open_prs
def summarize_open_pr_coverage(issue_num: int, open_prs: list[dict[str, Any]]) -> str:
matches: list[str] = []
seen: set[int] = set()
for pr in open_prs:
pr_num = pr["number"]
if pr_num in seen:
continue
text = "\n".join(
[
pr.get("title") or "",
pr.get("body") or "",
(pr.get("head") or {}).get("ref") or "",
]
)
if f"#{issue_num}" not in text:
continue
seen.add(pr_num)
matches.append(f"open PR #{pr_num}")
return ", ".join(matches) if matches else "none"
def classify_issue_reference(ref_issue: dict[str, Any], open_prs: list[dict[str, Any]]) -> dict[str, Any]:
issue_num = ref_issue["number"]
state = ref_issue.get("state") or "unknown"
coverage = summarize_open_pr_coverage(issue_num, open_prs)
if state == "closed":
classification = "closed_issue"
elif coverage != "none":
classification = "open_with_current_pr"
else:
classification = "open_no_current_pr"
return {
"number": issue_num,
"state": state,
"classification": classification,
"title": ref_issue.get("title") or "",
"current_pr_coverage": coverage,
"url": ref_issue.get("html_url") or ref_issue.get("url") or "",
}
def classify_pr_reference(repo: str, pr_num: int, token: str) -> dict[str, Any]:
pr = api_get(repo, f"/pulls/{pr_num}", token)
state = pr.get("state") or "unknown"
merged = bool(pr.get("merged"))
if merged:
classification = "merged_pr"
elif state == "open":
classification = "open_pr"
else:
classification = "closed_unmerged_pr"
return {
"number": pr_num,
"state": state,
"merged": merged,
"classification": classification,
"title": pr.get("title") or "",
"head": (pr.get("head") or {}).get("ref") or "",
"url": pr.get("html_url") or pr.get("url") or "",
}
def table(rows: list[dict[str, Any]], columns: list[tuple[str, str]]) -> str:
headers = [title for title, _ in columns]
keys = [key for _, key in columns]
if not rows:
return "| None |\n|---|\n| None |"
lines = ["| " + " | ".join(headers) + " |", "|" + "|".join(["---"] * len(headers)) + "|"]
for row in rows:
values: list[str] = []
for key in keys:
value = row.get(key, "")
if key == "number" and value != "":
value = f"#{value}"
values.append(str(value).replace("\n", " "))
lines.append("| " + " | ".join(values) + " |")
return "\n".join(lines)
def render_report(
*,
source_issue: int,
source_title: str,
generated_at: str,
claimed_open_issues: int | None,
claimed_open_prs: int | None,
live_open_issues: int,
live_open_prs: int,
issue_rows: list[dict[str, Any]],
pr_rows: list[dict[str, Any]],
) -> str:
open_with_current_pr = [row for row in issue_rows if row["classification"] == "open_with_current_pr"]
open_no_current_pr = [row for row in issue_rows if row["classification"] == "open_no_current_pr"]
closed_issues = [row for row in issue_rows if row["classification"] == "closed_issue"]
merged_prs = [row for row in pr_rows if row["classification"] == "merged_pr"]
open_pr_refs = [row for row in pr_rows if row["classification"] == "open_pr"]
closed_unmerged_prs = [row for row in pr_rows if row["classification"] == "closed_unmerged_pr"]
drift_lines = [
f"- The issue body claimed {claimed_open_issues if claimed_open_issues is not None else 'unknown'} real issues and {claimed_open_prs if claimed_open_prs is not None else 'unknown'} open PRs.",
f"- Live repo state now shows {live_open_issues} open issues and {live_open_prs} open PRs.",
f"- Referenced issues now break down into {len(closed_issues)} closed, {len(open_with_current_pr)} open_with_current_pr, and {len(open_no_current_pr)} open_no_current_pr.",
f"- Referenced PRs now break down into {len(merged_prs)} merged_pr, {len(open_pr_refs)} open_pr, and {len(closed_unmerged_prs)} closed_unmerged_pr.",
]
return "\n".join(
[
f"# The Door Fleet Work Orders Audit — issue #{source_issue}",
"",
f"Generated: {generated_at}",
f"Source issue: `{source_title}`",
"",
"## Source Snapshot",
"",
"Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.",
"",
"## Live Summary",
"",
f"- Referenced issues audited: {len(issue_rows)}",
f"- Referenced PRs audited: {len(pr_rows)}",
f"- Live repo open issues: {live_open_issues}",
f"- Live repo open PRs: {live_open_prs}",
f"- Open referenced issues with current PR coverage: {len(open_with_current_pr)}",
f"- Open referenced issues with no current PR coverage: {len(open_no_current_pr)}",
f"- Closed referenced issues: {len(closed_issues)}",
f"- Closed-unmerged referenced PRs: {len(closed_unmerged_prs)}",
"",
"## Issue Body Drift",
"",
*drift_lines,
"",
"## Referenced Issue Snapshot",
"",
table(
issue_rows,
[
("Issue", "number"),
("State", "state"),
("Classification", "classification"),
("Current PR Coverage", "current_pr_coverage"),
("Title", "title"),
],
),
"",
"## Referenced PR Snapshot",
"",
table(
pr_rows,
[
("PR", "number"),
("State", "state"),
("Merged", "merged"),
("Classification", "classification"),
("Head", "head"),
("Title", "title"),
],
),
"",
"## Recommended Next Actions",
"",
"1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.",
"2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.",
"3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.",
"4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.",
"5. This audit preserves operator memory; it does not claim all referenced work orders are complete.",
]
) + "\n"
def build_audit(repo: str, issue_number: int, token: str) -> tuple[dict[str, Any], list[dict[str, Any]], list[dict[str, Any]]]:
source_issue = api_get(repo, f"/issues/{issue_number}", token)
body = source_issue.get("body") or ""
refs = extract_issue_numbers(body)
open_prs = fetch_open_prs(repo, token)
claimed_open_issues, claimed_open_prs = parse_claimed_summary(body)
issue_rows: list[dict[str, Any]] = []
pr_rows: list[dict[str, Any]] = []
for ref in refs:
issue_like = api_get(repo, f"/issues/{ref}", token)
if issue_like.get("pull_request"):
pr_rows.append(classify_pr_reference(repo, ref, token))
else:
issue_rows.append(classify_issue_reference(issue_like, open_prs))
metadata = {
"source_title": source_issue.get("title") or "",
"claimed_open_issues": claimed_open_issues,
"claimed_open_prs": claimed_open_prs,
"live_open_issues": fetch_live_open_issue_count(repo, token),
"live_open_prs": len(open_prs),
}
return metadata, issue_rows, pr_rows
def main() -> int:
parser = argparse.ArgumentParser(description="Audit The Door fleet work orders issue against live forge state.")
parser.add_argument("--repo", default="the-door")
parser.add_argument("--issue", type=int, default=75)
parser.add_argument("--token-file", default=DEFAULT_TOKEN_PATH)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
args = parser.parse_args()
token = Path(args.token_file).read_text(encoding="utf-8").strip()
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
metadata, issue_rows, pr_rows = build_audit(args.repo, args.issue, token)
report = render_report(
source_issue=args.issue,
source_title=metadata["source_title"],
generated_at=generated_at,
claimed_open_issues=metadata["claimed_open_issues"],
claimed_open_prs=metadata["claimed_open_prs"],
live_open_issues=metadata["live_open_issues"],
live_open_prs=metadata["live_open_prs"],
issue_rows=issue_rows,
pr_rows=pr_rows,
)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(output_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,26 @@
"""Regression tests for the crisis integration contract expected by hermes-agent.
Issue #141 names the-door-side shared modules as `crisis.tracker` and
`crisis.bridge`. Keep those import paths available even if the canonical
implementation lives in `session_tracker.py` and `gateway.py`.
"""
import importlib
def test_crisis_tracker_module_exports_session_tracker_contract():
tracker = importlib.import_module("crisis.tracker")
session_tracker = importlib.import_module("crisis.session_tracker")
assert tracker.CrisisSessionTracker is session_tracker.CrisisSessionTracker
assert tracker.SessionState is session_tracker.SessionState
assert tracker.check_crisis_with_session is session_tracker.check_crisis_with_session
def test_crisis_bridge_module_exports_gateway_contract():
bridge = importlib.import_module("crisis.bridge")
gateway = importlib.import_module("crisis.gateway")
assert bridge.check_crisis is gateway.check_crisis
assert bridge.get_system_prompt is gateway.get_system_prompt
assert bridge.format_gateway_response is gateway.format_gateway_response

View File

@@ -1,99 +1,117 @@
"""Tests for privacy-preserving crisis metrics aggregation (issue #37)."""
from __future__ import annotations
"""
Tests for crisis/metrics.py — Aggregate crisis metrics.
"""
import json
import os
import pathlib
import sys
import shutil
import tempfile
import unittest
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from crisis.detect import detect_crisis
from crisis.gateway import check_crisis
from crisis.metrics import (
append_metrics_event,
build_metrics_event,
build_weekly_summary,
load_metrics_events,
render_weekly_summary,
)
from crisis.metrics import CrisisMetrics, SessionMetrics, AggregateMetrics
class TestMetricsEvent(unittest.TestCase):
def test_event_is_privacy_preserving(self):
detection = detect_crisis("I want to kill myself")
event = build_metrics_event(
detection,
continued_conversation=True,
false_positive=False,
now=1_700_000_000,
class TestCrisisMetrics(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.metrics = CrisisMetrics(Path(self.tmpdir))
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_record_session_creates_file(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="LOW",
peak_level="MEDIUM",
message_count=5,
was_escalating=True,
was_deescalating=False,
escalation_rate=0.5,
)
self.assertEqual(event["timestamp"], 1_700_000_000)
self.assertEqual(event["level"], "CRITICAL")
self.assertTrue(event["continued_conversation"])
self.assertFalse(event["false_positive"])
self.assertNotIn("text", event)
self.assertNotIn("message", event)
self.assertGreaterEqual(event["indicator_count"], 1)
self.assertTrue(event["indicators"])
self.metrics.record_session(sm)
files = list(Path(self.tmpdir).glob("*.jsonl"))
self.assertEqual(len(files), 1)
class TestMetricsLogAndSummary(unittest.TestCase):
def test_append_and_load_metrics_events(self):
log_path = pathlib.Path(self._testMethodName).with_suffix(".jsonl")
try:
append_metrics_event(log_path, detect_crisis("I want to die"), now=1_700_000_000)
events = load_metrics_events(log_path)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]["level"], "CRITICAL")
finally:
if log_path.exists():
log_path.unlink()
def test_weekly_summary_counts_levels_keywords_and_false_positives(self):
events = [
build_metrics_event(detect_crisis("I want to die"), continued_conversation=True, false_positive=False, now=1_700_000_000),
build_metrics_event(detect_crisis("I'm having a rough day"), continued_conversation=False, false_positive=False, now=1_700_000_100),
build_metrics_event(detect_crisis("I want to die"), continued_conversation=False, false_positive=True, now=1_700_000_200),
build_metrics_event(detect_crisis("Hello there"), continued_conversation=False, false_positive=False, now=1_700_000_300),
]
summary = build_weekly_summary(events, now=1_700_000_400, window_days=7)
self.assertEqual(summary["detections_per_level"]["CRITICAL"], 2)
self.assertEqual(summary["detections_per_level"]["LOW"], 1)
self.assertEqual(summary["detections_per_level"]["NONE"], 1)
self.assertEqual(summary["continued_after_intervention"], 1)
self.assertAlmostEqual(summary["false_positive_estimate"], 1 / 3, places=4)
self.assertEqual(summary["most_common_keywords"][0]["count"], 2)
def test_render_weekly_summary_mentions_required_metrics(self):
events = [
build_metrics_event(detect_crisis("I want to die"), continued_conversation=True, now=1_700_000_000),
build_metrics_event(detect_crisis("I feel hopeless with no way out"), false_positive=True, now=1_700_000_100),
]
summary = build_weekly_summary(events, now=1_700_000_200, window_days=7)
rendered = render_weekly_summary(summary)
self.assertIn("detections_per_level", rendered)
self.assertIn("most_common_keywords", rendered)
self.assertIn("false_positive_estimate", rendered)
self.assertIn("continued_after_intervention", rendered)
class TestGatewayMetricsIntegration(unittest.TestCase):
def test_check_crisis_can_emit_metrics_event(self):
result = check_crisis(
"I want to die",
metrics_log_path=None,
continued_conversation=True,
false_positive=False,
now=1_700_000_000,
def test_record_session_writes_jsonl(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="HIGH",
peak_level="CRITICAL",
message_count=10,
was_escalating=True,
was_deescalating=False,
escalation_rate=1.0,
triggered_overlay=True,
showed_988=True,
)
self.assertEqual(result["level"], "CRITICAL")
self.assertIn("metrics_event", result)
self.assertEqual(result["metrics_event"]["timestamp"], 1_700_000_000)
self.assertTrue(result["metrics_event"]["continued_conversation"])
self.metrics.record_session(sm)
files = list(Path(self.tmpdir).glob("*.jsonl"))
with open(files[0]) as f:
data = json.loads(f.readline())
self.assertEqual(data['peak_level'], 'CRITICAL')
self.assertTrue(data['triggered_overlay'])
def test_get_summary_empty(self):
agg = self.metrics.get_summary(days=7)
self.assertEqual(agg.total_sessions, 0)
self.assertEqual(agg.total_messages, 0)
def test_get_summary_with_data(self):
for level in ["LOW", "MEDIUM", "HIGH"]:
sm = SessionMetrics(
timestamp=1700000000,
current_level=level,
peak_level=level,
message_count=3,
was_escalating=level != "LOW",
was_deescalating=False,
escalation_rate=0.5,
)
self.metrics.record_session(sm)
agg = self.metrics.get_summary(days=1)
self.assertEqual(agg.total_sessions, 3)
self.assertEqual(agg.total_messages, 9)
self.assertEqual(agg.escalating_sessions, 2)
def test_get_report_returns_string(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="LOW",
peak_level="LOW",
message_count=5,
was_escalating=False,
was_deescalating=False,
escalation_rate=0.0,
)
self.metrics.record_session(sm)
report = self.metrics.get_report(days=1)
self.assertIn("CRISIS METRICS REPORT", report)
self.assertIn("Sessions:", report)
def test_get_json_returns_valid(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="MEDIUM",
peak_level="MEDIUM",
message_count=3,
was_escalating=False,
was_deescalating=False,
escalation_rate=0.0,
)
self.metrics.record_session(sm)
json_str = self.metrics.get_json(days=1)
data = json.loads(json_str)
self.assertEqual(data['total_sessions'], 1)
if __name__ == "__main__":

View File

@@ -0,0 +1,54 @@
import pathlib
from playwright.sync_api import sync_playwright
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
def test_crisis_overlay_supports_keyboard_only_navigation():
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
try:
page.goto(INDEX_HTML.as_uri())
page.evaluate(
"""
() => {
window.fetch = () => Promise.reject(new Error('test fetch blocked'));
window.alert = () => {};
}
"""
)
page.fill('#msg-input', "I'm going to kill myself tonight")
page.press('#msg-input', 'Enter')
page.wait_for_function("document.getElementById('crisis-overlay').classList.contains('active')")
assert page.evaluate("document.activeElement.classList.contains('overlay-call')") is True
page.evaluate(
"""
() => {
const btn = document.getElementById('overlay-dismiss-btn');
btn.disabled = false;
btn.textContent = 'Continue to chat';
}
"""
)
page.keyboard.press('Tab')
assert page.evaluate("document.activeElement.id") == 'overlay-dismiss-btn'
page.keyboard.press('Tab')
assert page.evaluate("document.activeElement.classList.contains('overlay-call')") is True
page.keyboard.press('Shift+Tab')
assert page.evaluate("document.activeElement.id") == 'overlay-dismiss-btn'
page.keyboard.press('Escape')
page.wait_for_function("!document.getElementById('crisis-overlay').classList.contains('active')")
assert page.evaluate("document.activeElement.id") == 'msg-input'
finally:
browser.close()

View File

@@ -0,0 +1,53 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestCrisisOverlayRateLimit(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_overlay_has_ten_minute_cooldown_constant(self):
self.assertRegex(
self.html,
r"CRISIS_OVERLAY_COOLDOWN_MS\s*=\s*10\s*\*\s*60\s*\*\s*1000",
'Expected a 10-minute crisis overlay cooldown constant.',
)
def test_show_overlay_suppresses_repeat_with_logging(self):
self.assertRegex(
self.html,
r"function\s+logCrisisOverlayEvent\s*\(",
'Expected a crisis overlay event logger.',
)
self.assertRegex(
self.html,
r"if\s*\(!forceOpen\s*&&\s*Date\.now\(\)\s*-\s*lastShownAt\s*<\s*CRISIS_OVERLAY_COOLDOWN_MS\)",
'Expected showOverlay to suppress repeated auto-displays inside the cooldown window.',
)
self.assertRegex(
self.html,
r"logCrisisOverlayEvent\('suppressed'",
'Expected suppressed overlay attempts to be logged.',
)
def test_manual_crisis_resources_button_exists_and_bypasses_cooldown(self):
self.assertIn('id="crisis-resources-btn"', self.html)
self.assertRegex(
self.html,
r"function\s+openCrisisResources\s*\(",
'Expected a manual crisis resources opener.',
)
self.assertRegex(
self.html,
r"crisisResourcesBtn\.addEventListener\('click',\s*function\(\)\s*\{\s*openCrisisResources\(\);",
'Expected the footer button to wire into openCrisisResources().',
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,111 @@
"""Tests for evolution/crisis_synthesizer.py (issue #36)."""
from __future__ import annotations
import importlib.util
import json
import pathlib
import sys
import tempfile
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
SCRIPT = ROOT / 'evolution' / 'crisis_synthesizer.py'
spec = importlib.util.spec_from_file_location('crisis_synthesizer', str(SCRIPT))
mod = importlib.util.module_from_spec(spec)
sys.modules['crisis_synthesizer'] = mod
spec.loader.exec_module(mod)
class TestCrisisSynthesizerEvent(unittest.TestCase):
def test_build_interaction_event_is_privacy_preserving(self):
event = mod.build_interaction_event(
level='CRITICAL',
indicators=['want_to_die', 'no_way_out'],
response_given='guardian',
continued_conversation=True,
false_positive=False,
now=1700000000,
)
self.assertEqual(event['timestamp'], 1700000000)
self.assertEqual(event['level'], 'CRITICAL')
self.assertEqual(event['response_given'], 'guardian')
self.assertTrue(event['continued_conversation'])
self.assertFalse(event['false_positive'])
self.assertEqual(event['indicators'], ['want_to_die', 'no_way_out'])
for forbidden in ['text', 'message', 'content', 'ip', 'session_id', 'user_id']:
self.assertNotIn(forbidden, event)
class TestCrisisSynthesizerStorage(unittest.TestCase):
def test_append_and_load_events_round_trip(self):
with tempfile.TemporaryDirectory() as tmp:
log_path = pathlib.Path(tmp) / 'crisis-events.jsonl'
mod.append_interaction_event(
log_path,
level='HIGH',
indicators=['hopeless'],
response_given='companion',
continued_conversation=False,
false_positive=True,
now=1700000100,
)
events = mod.load_interaction_events(log_path)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]['level'], 'HIGH')
self.assertEqual(events[0]['indicators'], ['hopeless'])
class TestCrisisSynthesizerSummary(unittest.TestCase):
def test_weekly_report_contains_required_metrics(self):
events = [
mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', True, False, now=1700000000),
mod.build_interaction_event('HIGH', ['hopeless'], 'companion', False, True, now=1700000100),
mod.build_interaction_event('LOW', ['rough_day'], 'friend', False, False, now=1700000200),
mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', False, False, now=1700000300),
mod.build_interaction_event('NONE', [], 'friend', False, False, now=1700000400),
]
summary = mod.build_weekly_report(events, now=1700000500, window_days=7)
self.assertEqual(summary['detections_per_level']['CRITICAL'], 2)
self.assertEqual(summary['detections_per_level']['HIGH'], 1)
self.assertEqual(summary['detections_per_level']['LOW'], 1)
self.assertEqual(summary['detections_per_level']['NONE'], 1)
self.assertEqual(summary['continued_after_intervention'], 1)
self.assertAlmostEqual(summary['false_positive_estimate'], 0.25)
self.assertEqual(summary['most_common_keywords'][0]['keyword'], 'want_to_die')
self.assertEqual(summary['most_common_keywords'][0]['count'], 2)
class TestCrisisSynthesizerSuggestions(unittest.TestCase):
def test_suggests_weight_adjustments_from_interactions(self):
events = []
for ts in range(3):
events.append(mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', True, False, now=1700000000 + ts))
for ts in range(3):
events.append(mod.build_interaction_event('LOW', ['rough_day'], 'friend', False, True, now=1700000100 + ts))
suggestions = mod.suggest_keyword_adjustments(events, min_observations=3)
by_keyword = {s['keyword']: s for s in suggestions}
self.assertEqual(by_keyword['want_to_die']['suggested_adjustment'], 'raise_weight')
self.assertEqual(by_keyword['rough_day']['suggested_adjustment'], 'lower_weight')
class TestCrisisSynthesizerRendering(unittest.TestCase):
def test_render_weekly_report_outputs_json(self):
summary = {
'detections_per_level': {'NONE': 0, 'LOW': 1, 'MEDIUM': 0, 'HIGH': 0, 'CRITICAL': 0},
'most_common_keywords': [{'keyword': 'rough_day', 'count': 1}],
'false_positive_estimate': 0.0,
'continued_after_intervention': 0,
'keyword_weight_suggestions': [],
'window_days': 7,
'total_events': 1,
}
rendered = mod.render_weekly_report(summary)
parsed = json.loads(rendered)
self.assertEqual(parsed['window_days'], 7)
self.assertEqual(parsed['most_common_keywords'][0]['keyword'], 'rough_day')
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,100 @@
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT_PATH = ROOT / "scripts" / "fleet_work_orders_audit.py"
REPORT_PATH = ROOT / "reports" / "2026-04-17-the-door-fleet-work-orders-audit.md"
def _load_module():
assert SCRIPT_PATH.exists(), f"missing {SCRIPT_PATH.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location("fleet_work_orders_audit", SCRIPT_PATH)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_extract_issue_numbers_preserves_mixed_issue_and_pr_refs() -> None:
body = """
## P0 — Session-level crisis tracking (#35)
**PR #61 ready.**
## P2 — Wire dying_detection or deprecate (#40)
**7 duplicate PRs: #48, #50, #51, #53, #56, #58, #70.**
"""
mod = _load_module()
assert mod.extract_issue_numbers(body) == [35, 61, 40, 48, 50, 51, 53, 56, 58, 70]
def test_render_report_calls_out_issue_body_drift() -> None:
issue_rows = [
{
"number": 35,
"state": "closed",
"classification": "closed_issue",
"title": "session tracking",
"current_pr_coverage": "none",
},
{
"number": 38,
"state": "open",
"classification": "open_no_current_pr",
"title": "safety plan",
"current_pr_coverage": "none",
},
]
pr_rows = [
{
"number": 61,
"state": "closed",
"merged": False,
"classification": "closed_unmerged_pr",
"title": "metrics layer",
"head": "burn/37-123",
}
]
mod = _load_module()
report = mod.render_report(
source_issue=75,
source_title="TRIAGE: The Door - Fleet Work Orders (2026-04-09)",
generated_at="2026-04-17T04:00:00Z",
claimed_open_issues=13,
claimed_open_prs=24,
live_open_issues=5,
live_open_prs=0,
issue_rows=issue_rows,
pr_rows=pr_rows,
)
assert "## Source Snapshot" in report
assert "## Live Summary" in report
assert "## Issue Body Drift" in report
assert "13" in report and "24" in report
assert "#38" in report
assert "open_no_current_pr" in report
assert "#61" in report
assert "closed_unmerged_pr" in report
assert "## Referenced Issue Snapshot" in report
assert "## Referenced PR Snapshot" in report
assert "## Recommended Next Actions" in report
def test_committed_work_orders_audit_exists_with_required_sections() -> None:
text = REPORT_PATH.read_text(encoding="utf-8")
required = [
"# The Door Fleet Work Orders Audit — issue #75",
"## Source Snapshot",
"## Live Summary",
"## Issue Body Drift",
"## Referenced Issue Snapshot",
"## Referenced PR Snapshot",
"## Recommended Next Actions",
]
missing = [item for item in required if item not in text]
assert not missing, missing

View File

@@ -0,0 +1,62 @@
"""Tests for local image crisis screening slice under epic #130."""
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from image_screening import ImageScreeningResult, screen_image_signals
class TestImageScreeningResult(unittest.TestCase):
def test_to_dict_preserves_core_fields(self):
result = ImageScreeningResult(
ocr_text="help me",
labels=["blood"],
visual_flags=["dark_scene"],
distress_score=0.8,
requires_human_review=True,
signals_detected=["direct_self_harm_label:blood"],
)
data = result.to_dict()
self.assertEqual(data["ocr_text"], "help me")
self.assertEqual(data["labels"], ["blood"])
self.assertTrue(data["requires_human_review"])
class TestScreenImageSignals(unittest.TestCase):
def test_direct_self_harm_labels_trigger_high_risk(self):
result = screen_image_signals(
labels=["razor blade", "blood droplets"],
manual_notes="photo of fresh cuts on forearm",
)
self.assertGreaterEqual(result.distress_score, 0.8)
self.assertTrue(result.requires_human_review)
self.assertTrue(any("self_harm" in signal for signal in result.signals_detected))
def test_ocr_text_uses_existing_crisis_detector(self):
result = screen_image_signals(
ocr_text="I want to kill myself tonight",
labels=["handwritten note"],
)
self.assertGreaterEqual(result.distress_score, 0.7)
self.assertTrue(result.requires_human_review)
self.assertTrue(any(signal.startswith("ocr_crisis_level:") for signal in result.signals_detected))
def test_neutral_image_stays_low_risk(self):
result = screen_image_signals(
labels=["dog", "park", "sunlight"],
manual_notes="family outing in daylight",
)
self.assertLess(result.distress_score, 0.2)
self.assertFalse(result.requires_human_review)
self.assertEqual(result.signals_detected, [])
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,50 @@
"""Regression tests for duplicate crisis indicators cleanup (#123)."""
import os
import re
import sys
import unittest
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
ROOT = Path(__file__).resolve().parents[1]
DETECT_FILE = ROOT / "crisis" / "detect.py"
class TestCrisisIndicatorDedup(unittest.TestCase):
def _extract_indicator_list(self, name: str) -> set[str]:
text = DETECT_FILE.read_text(encoding="utf-8")
match = re.search(rf"{name}\s*=\s*\[(.*?)\n\]", text, re.S)
self.assertIsNotNone(match, f"{name} list missing")
return {
line.strip().rstrip(",")
for line in match.group(1).splitlines()
if line.strip().startswith('r"')
}
def test_high_and_medium_indicator_lists_do_not_overlap(self):
high = self._extract_indicator_list("HIGH_INDICATORS")
medium = self._extract_indicator_list("MEDIUM_INDICATORS")
overlap = high & medium
self.assertEqual(set(), overlap, f"duplicate cross-tier patterns found: {sorted(overlap)}")
def test_removed_duplicates_still_classify_at_high_tier(self):
cases = {
"I feel hopeless": "HIGH",
"I feel trapped": "HIGH",
"I feel desperate": "HIGH",
"I have no future ahead": "HIGH",
"I have given up on myself": "HIGH",
}
for text, expected in cases.items():
with self.subTest(text=text):
result = detect_crisis(text)
self.assertEqual(expected, result.level)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,20 @@
from pathlib import Path
INDEX = Path("index.html")
def test_chat_header_has_persistent_safety_plan_button():
html = INDEX.read_text()
assert 'id="chat-header"' in html
assert 'id="chat-safety-plan-btn"' in html
assert 'aria-label="Open My Safety Plan from chat header"' in html
assert 'My Safety Plan' in html
def test_chat_header_button_opens_existing_safety_plan_modal():
html = INDEX.read_text()
assert "var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');" in html
assert "chatSafetyPlanBtn.addEventListener('click'" in html
assert "function openSafetyPlan(triggerEl)" in html
assert "safetyPlanModal.classList.add('active');" in html
assert "openSafetyPlan(chatSafetyPlanBtn);" in html