Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
4d084654d8 feat: add crisis analytics metrics (#37)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Successful in 6s
2026-04-17 02:24:01 -04:00
26 changed files with 318 additions and 2097 deletions

View File

@@ -12,7 +12,7 @@ VPS := alexanderwhitestone.com
DOMAIN := alexanderwhitestone.com
DEPLOY_DIR := deploy
.PHONY: help deploy deploy-bash check ssl push service metrics
.PHONY: help deploy deploy-bash check ssl push service
help:
@echo "The Door — Deployment Commands"
@@ -23,8 +23,6 @@ help:
@echo " make check Check deployment status"
@echo " make ssl Setup SSL on VPS"
@echo " make service Install/restart hermes-gateway service"
@echo " make metrics View crisis metrics summary"
@echo " make metrics-json Export crisis metrics as JSON"
@echo ""
deploy:
@@ -48,9 +46,3 @@ ssl:
service:
ssh root@$(VPS) "cd /opt/the-door && bash deploy/deploy.sh --service"
metrics:
python3 -m crisis.metrics --summary
metrics-json:
python3 -m crisis.metrics --json

View File

@@ -8,7 +8,13 @@ from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urg
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
from .metrics import CrisisMetrics, AggregateMetrics
from .metrics import (
build_metrics_event,
append_metrics_event,
load_metrics_events,
build_weekly_summary,
render_weekly_summary,
)
__all__ = [
"detect_crisis",
@@ -24,6 +30,9 @@ __all__ = [
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
"CrisisMetrics",
"AggregateMetrics",
"build_metrics_event",
"append_metrics_event",
"load_metrics_events",
"build_weekly_summary",
"render_weekly_summary",
]

View File

@@ -1,19 +0,0 @@
"""Compatibility bridge for the-door crisis gateway.
Issue #141 describes the shared bridge API as `crisis.bridge`.
The canonical implementation lives in `crisis.gateway`. Re-export the public
entrypoints here so downstream hermes-agent wiring can import the stable name
without copying logic out of the-door.
"""
from .gateway import (
check_crisis,
get_system_prompt,
format_gateway_response,
)
__all__ = [
"check_crisis",
"get_system_prompt",
"format_gateway_response",
]

View File

@@ -104,9 +104,13 @@ MEDIUM_INDICATORS = [
r"\blost\s+all\s+hope\b",
r"\bno\s+tomorrow\b",
# Contextual versions (from crisis_detector.py legacy)
# Keep only medium-only patterns here; stronger overlaps live in HIGH_INDICATORS.
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
r"\bfeel(?:s|ing)?\s+trapped\b",
r"\bfeel(?:s|ing)?\s+desperate\b",
r"\bno\s+future\s+(?:for\s+me|ahead|left)\b",
r"\bnothing\s+left\s+(?:to\s+(?:live|hope)\s+for|inside)\b",
r"\bgive(?:n)?\s*up\s+on\s+myself\b",
]
LOW_INDICATORS = [

View File

@@ -23,9 +23,17 @@ from .response import (
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
from .metrics import build_metrics_event, append_metrics_event
def check_crisis(text: str) -> dict:
def check_crisis(
text: str,
metrics_log_path: Optional[str] = None,
*,
continued_conversation: bool = False,
false_positive: bool = False,
now: Optional[float] = None,
) -> dict:
"""
Full crisis check returning structured data.
@@ -35,7 +43,7 @@ def check_crisis(text: str) -> dict:
detection = detect_crisis(text)
response = generate_response(detection)
return {
result = {
"level": detection.level,
"score": detection.score,
"indicators": detection.indicators,
@@ -49,6 +57,23 @@ def check_crisis(text: str) -> dict:
"escalate": response.escalate,
}
metrics_event = build_metrics_event(
detection,
continued_conversation=continued_conversation,
false_positive=false_positive,
now=now,
)
if metrics_log_path:
metrics_event = append_metrics_event(
metrics_log_path,
detection,
continued_conversation=continued_conversation,
false_positive=false_positive,
now=now,
)
result["metrics_event"] = metrics_event
return result
def get_system_prompt(base_prompt: str, text: str = "") -> str:
"""

View File

@@ -1,244 +1,166 @@
"""
crisis/metrics.py — Aggregate crisis detection metrics.
"""Privacy-preserving crisis analytics metrics for the-door.
Tracks session-level crisis data for aggregate reporting.
Privacy-first: stores only aggregate counts, never user content.
Usage:
from crisis.metrics import CrisisMetrics
metrics = CrisisMetrics()
metrics.record_session(tracker.state)
summary = metrics.get_summary()
Stores only timestamps, crisis levels, indicator categories, and operator
feedback flags. No raw message text or PII is persisted.
"""
from __future__ import annotations
import argparse
import json
import os
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from collections import Counter
from pathlib import Path
from typing import Dict, List, Optional
from typing import Iterable
METRICS_DIR = Path.home() / ".the-door" / "metrics"
from .detect import CrisisDetectionResult, detect_crisis
LEVELS = ("NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL")
@dataclass
class SessionMetrics:
"""Metrics from a single crisis session."""
timestamp: float
current_level: str
peak_level: str
message_count: int
was_escalating: bool
was_deescalating: bool
escalation_rate: float
triggered_overlay: bool = False
showed_988: bool = False
def normalize_indicator(indicator: str) -> str:
"""Return a stable privacy-safe keyword/category identifier."""
return indicator
@dataclass
class AggregateMetrics:
"""Aggregate metrics across sessions."""
total_sessions: int = 0
total_messages: int = 0
# Level distribution
level_counts: Dict[str, int] = field(default_factory=lambda: {
"NONE": 0, "LOW": 0, "MEDIUM": 0, "HIGH": 0, "CRITICAL": 0
})
# Escalation tracking
escalating_sessions: int = 0
deescalating_sessions: int = 0
# Safety interventions
overlay_triggers: int = 0
ninety_eight_show: int = 0
# Time window
period_start: Optional[float] = None
period_end: Optional[float] = None
def build_metrics_event(
detection: CrisisDetectionResult,
*,
continued_conversation: bool = False,
false_positive: bool = False,
now: float | None = None,
) -> dict:
timestamp = float(time.time() if now is None else now)
indicators = [normalize_indicator(indicator) for indicator in detection.indicators]
return {
"timestamp": timestamp,
"level": detection.level,
"indicator_count": len(indicators),
"indicators": indicators,
"continued_conversation": bool(continued_conversation),
"false_positive": bool(false_positive),
}
class CrisisMetrics:
"""
Aggregate crisis metrics with local JSON persistence.
Privacy-first: stores only aggregate counts per day.
Never stores user messages, content, or identifying info.
"""
def __init__(self, metrics_dir: Optional[Path] = None):
self.metrics_dir = metrics_dir or METRICS_DIR
self.metrics_dir.mkdir(parents=True, exist_ok=True)
self._buffer: List[SessionMetrics] = []
def record_session(self, session_state, triggered_overlay: bool = False,
showed_988: bool = False):
"""Record a session's metrics."""
from .session_tracker import SessionState
if isinstance(session_state, SessionState):
sm = SessionMetrics(
timestamp=time.time(),
current_level=session_state.current_level,
peak_level=session_state.peak_level,
message_count=session_state.message_count,
was_escalating=session_state.is_escalating,
was_deescalating=session_state.is_deescalating,
escalation_rate=session_state.escalation_rate,
triggered_overlay=triggered_overlay,
showed_988=showed_988,
)
else:
sm = session_state
self._buffer.append(sm)
self._flush()
def _flush(self):
"""Write buffered sessions to daily file."""
if not self._buffer:
return
today = datetime.utcnow().strftime("%Y-%m-%d")
filepath = self.metrics_dir / f"{today}.jsonl"
with open(filepath, 'a') as f:
for sm in self._buffer:
f.write(json.dumps(asdict(sm)) + '\n')
self._buffer.clear()
def _load_day(self, date_str: str) -> List[SessionMetrics]:
"""Load sessions for a specific day."""
filepath = self.metrics_dir / f"{date_str}.jsonl"
if not filepath.exists():
return []
sessions = []
with open(filepath) as f:
for line in f:
if line.strip():
data = json.loads(line)
sessions.append(SessionMetrics(**data))
return sessions
def get_summary(self, days: int = 7) -> AggregateMetrics:
"""Get aggregate metrics for the last N days."""
agg = AggregateMetrics()
now = datetime.utcnow()
for i in range(days):
date = (now - timedelta(days=i)).strftime("%Y-%m-%d")
sessions = self._load_day(date)
for sm in sessions:
agg.total_sessions += 1
agg.total_messages += sm.message_count
# Level counts (use peak level)
level = sm.peak_level
agg.level_counts[level] = agg.level_counts.get(level, 0) + 1
if sm.was_escalating:
agg.escalating_sessions += 1
if sm.was_deescalating:
agg.deescalating_sessions += 1
if sm.triggered_overlay:
agg.overlay_triggers += 1
if sm.showed_988:
agg.ninety_eight_show += 1
# Time window
if agg.period_start is None or sm.timestamp < agg.period_start:
agg.period_start = sm.timestamp
if agg.period_end is None or sm.timestamp > agg.period_end:
agg.period_end = sm.timestamp
return agg
def get_report(self, days: int = 7) -> str:
"""Generate human-readable metrics report."""
agg = self.get_summary(days)
lines = []
lines.append("=" * 50)
lines.append(" CRISIS METRICS REPORT")
lines.append(f" Last {days} days")
if agg.period_start:
start = datetime.fromtimestamp(agg.period_start).strftime("%Y-%m-%d %H:%M")
lines.append(f" Period: {start} → now")
lines.append("=" * 50)
lines.append(f"\n Sessions: {agg.total_sessions}")
lines.append(f" Messages tracked: {agg.total_messages}")
lines.append(f"\n Level Distribution (by peak):")
for level in ["NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"]:
count = agg.level_counts.get(level, 0)
pct = (count / agg.total_sessions * 100) if agg.total_sessions > 0 else 0
bar = "" * int(pct / 5)
lines.append(f" {level:<10} {count:>5} ({pct:>5.1f}%) {bar}")
lines.append(f"\n Escalations: {agg.escalating_sessions}")
lines.append(f" De-escalations: {agg.deescalating_sessions}")
lines.append(f" Overlay triggers: {agg.overlay_triggers}")
lines.append(f" 988 shown: {agg.ninety_eight_show}")
if agg.total_sessions > 0:
escalation_rate = agg.escalating_sessions / agg.total_sessions * 100
lines.append(f"\n Escalation rate: {escalation_rate:.1f}%")
lines.append("=" * 50)
return "\n".join(lines)
def get_json(self, days: int = 7) -> str:
"""Export metrics as JSON."""
agg = self.get_summary(days)
return json.dumps(asdict(agg), indent=2)
def append_metrics_event(
log_path: str | Path,
detection: CrisisDetectionResult,
*,
continued_conversation: bool = False,
false_positive: bool = False,
now: float | None = None,
) -> dict:
event = build_metrics_event(
detection,
continued_conversation=continued_conversation,
false_positive=false_positive,
now=now,
)
path = Path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event) + "\n")
return event
def main():
"""CLI entry point for crisis metrics."""
import argparse
parser = argparse.ArgumentParser(description="Crisis Detection Metrics")
parser.add_argument("--summary", action="store_true", help="Show summary report")
parser.add_argument("--json", action="store_true", help="JSON export")
parser.add_argument("--days", type=int, default=7, help="Days to include")
parser.add_argument("--demo", action="store_true", help="Generate demo data")
args = parser.parse_args()
metrics = CrisisMetrics()
if args.demo:
import random
levels = ["NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"]
for i in range(50):
from .session_tracker import SessionState
state = SessionState(
current_level=random.choice(levels),
peak_level=random.choice(levels),
message_count=random.randint(1, 20),
is_escalating=random.random() > 0.7,
is_deescalating=random.random() > 0.8,
escalation_rate=random.random(),
)
metrics.record_session(
state,
triggered_overlay=random.random() > 0.8,
showed_988=random.random() > 0.7,
)
print("Generated 50 demo sessions.")
if args.json:
print(metrics.get_json(args.days))
else:
print(metrics.get_report(args.days))
def load_metrics_events(log_path: str | Path) -> list[dict]:
path = Path(log_path)
if not path.exists():
return []
events = []
for line in path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
events.append(json.loads(line))
return events
def build_weekly_summary(
events: Iterable[dict],
*,
now: float | None = None,
window_days: int = 7,
) -> dict:
current_time = float(time.time() if now is None else now)
cutoff = current_time - (window_days * 86400)
filtered = [event for event in events if float(event.get("timestamp", 0)) >= cutoff]
detections_per_level = {level: 0 for level in LEVELS}
keyword_counts: Counter[str] = Counter()
detections = []
continued_after_intervention = 0
for event in filtered:
level = event.get("level", "NONE")
detections_per_level[level] = detections_per_level.get(level, 0) + 1
keyword_counts.update(event.get("indicators", []))
if level != "NONE":
detections.append(event)
if event.get("continued_conversation"):
continued_after_intervention += 1
false_positive_count = sum(1 for event in detections if event.get("false_positive"))
false_positive_estimate = (
false_positive_count / len(detections) if detections else 0.0
)
return {
"window_days": window_days,
"total_events": len(filtered),
"detections_per_level": detections_per_level,
"most_common_keywords": [
{"keyword": keyword, "count": count}
for keyword, count in keyword_counts.most_common(10)
],
"false_positive_estimate": false_positive_estimate,
"continued_after_intervention": continued_after_intervention,
}
def render_weekly_summary(summary: dict) -> str:
return json.dumps(summary, indent=2)
def write_weekly_summary(path: str | Path, summary: dict) -> Path:
output_path = Path(path)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(render_weekly_summary(summary) + "\n", encoding="utf-8")
return output_path
def record_text_event(
text: str,
log_path: str | Path,
*,
continued_conversation: bool = False,
false_positive: bool = False,
now: float | None = None,
) -> dict:
detection = detect_crisis(text)
return append_metrics_event(
log_path,
detection,
continued_conversation=continued_conversation,
false_positive=false_positive,
now=now,
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Privacy-preserving crisis metrics summary")
parser.add_argument("--log-path", required=True, help="JSONL event log path")
parser.add_argument("--days", type=int, default=7, help="Summary window in days")
parser.add_argument("--output", help="Optional file to write summary JSON")
args = parser.parse_args(argv)
events = load_metrics_events(args.log_path)
summary = build_weekly_summary(events, window_days=args.days)
rendered = render_weekly_summary(summary)
print(rendered)
if args.output:
write_weekly_summary(args.output, summary)
return 0
if __name__ == "__main__":
main()
raise SystemExit(main())

View File

@@ -1,139 +0,0 @@
"""Crisis synthesizer integration for the responder pipeline.
Privacy-first and opt-in. Writes append-only JSONL events that feed the
existing crisis_synthesizer reporting flow.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Optional
from evolution.crisis_synthesizer import DEFAULT_LOG_PATH, append_interaction_event
logger = logging.getLogger(__name__)
SYNTH_ENABLED = os.environ.get("CRISIS_SYNTH_ENABLED", "0") == "1"
SYNTH_LOG_PATH = Path(os.environ.get("CRISIS_SYNTH_LOG_PATH", str(DEFAULT_LOG_PATH)))
class CrisisSynthesizerIntegration:
"""Append-only bridge between responder events and the synthesizer log."""
def __init__(
self,
enabled: Optional[bool] = None,
log_dir: Optional[Path] = None,
log_path: Optional[Path] = None,
):
self.enabled = SYNTH_ENABLED if enabled is None else bool(enabled)
if log_path is not None:
self.log_file = Path(log_path)
elif log_dir is not None:
self.log_file = Path(log_dir) / "events.jsonl"
else:
self.log_file = SYNTH_LOG_PATH
self.log_file.parent.mkdir(parents=True, exist_ok=True)
try:
os.chmod(self.log_file.parent, 0o700)
except OSError:
pass
def log_crisis_event(
self,
*,
level: str,
matched_keywords: list[str],
response_type: str,
session_id: str = "",
user_continued: bool = False,
metadata: Optional[dict[str, Any]] = None,
) -> bool:
if not self.enabled:
return False
try:
append_interaction_event(
self.log_file,
level=level,
indicators=list(matched_keywords),
response_given=response_type,
continued_conversation=user_continued,
false_positive=False,
session_hash=self._hash_session(session_id) if session_id else "",
response_type=response_type,
metadata=metadata or {},
)
self._lock_down_log_file()
return True
except Exception as exc:
logger.warning("Failed to log crisis synthesizer event: %s", exc)
return False
def log_user_continued(self, session_id: str) -> bool:
if not self.enabled or not session_id or not self.log_file.exists():
return False
session_hash = self._hash_session(session_id)
saw_prior_crisis = False
try:
with self.log_file.open("r", encoding="utf-8") as handle:
for raw in handle:
raw = raw.strip()
if not raw:
continue
event = json.loads(raw)
if (
event.get("event_type", "crisis_interaction") == "crisis_interaction"
and event.get("session_hash") == session_hash
):
saw_prior_crisis = True
if not saw_prior_crisis:
return False
continuation = {
"event_type": "continuation",
"timestamp": float(time.time()),
"session_hash": session_hash,
"user_continued": True,
}
with self.log_file.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(continuation) + "\n")
self._lock_down_log_file()
return True
except Exception as exc:
logger.warning("Failed to append crisis continuation marker: %s", exc)
return False
@staticmethod
def _hash_session(session_id: str) -> str:
return hashlib.sha256(session_id.encode("utf-8")).hexdigest()[:16]
def _lock_down_log_file(self) -> None:
try:
os.chmod(self.log_file, 0o600)
except OSError:
pass
_integration: Optional[CrisisSynthesizerIntegration] = None
def get_integration() -> CrisisSynthesizerIntegration:
global _integration
if _integration is None:
_integration = CrisisSynthesizerIntegration()
return _integration
def log_crisis_event(**kwargs: Any) -> bool:
return get_integration().log_crisis_event(**kwargs)
def log_user_continued(session_id: str) -> bool:
return get_integration().log_user_continued(session_id)

View File

@@ -1,19 +0,0 @@
"""Compatibility bridge for crisis session tracking.
Issue #141 describes the shared the-door tracker surface as `crisis.tracker`.
The canonical implementation lives in `crisis.session_tracker`, but hermes-agent
integration should be able to import the shorter path without caring about
internal file layout.
"""
from .session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
__all__ = [
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
]

View File

@@ -21,13 +21,10 @@ Usage:
"""
import random
import threading
import uuid
from dataclasses import dataclass, field
from typing import List, Optional
from crisis_detector import CrisisResult
from crisis.synthesizer_integration import get_integration
# ── Core values (from SOUL.md / system-prompt.txt) ──────────────
@@ -140,26 +137,6 @@ class CrisisResponder:
- Presence over brevity — stay as long as they need
"""
def __init__(
self,
synth_integration=None,
session_id: str = "",
async_synth_logging: bool = True,
):
self._synth = synth_integration if synth_integration is not None else get_integration()
self._session_id = session_id or uuid.uuid4().hex
self._async_synth_logging = async_synth_logging
self._awaiting_continuation = False
def _run_synth_task(self, fn, *args, **kwargs):
if not self._synth or not getattr(self._synth, "enabled", False):
return
if self._async_synth_logging:
thread = threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=True)
thread.start()
else:
fn(*args, **kwargs)
def respond(self, detection: CrisisResult) -> CrisisResponse:
"""
Generate a crisis response based on detection results.
@@ -173,40 +150,20 @@ class CrisisResponder:
level = detection.risk_level
if level == "CRITICAL":
response = self._respond_critical(detection)
return self._respond_critical(detection)
elif level == "HIGH":
response = self._respond_high(detection)
return self._respond_high(detection)
elif level == "MODERATE":
response = self._respond_moderate(detection)
return self._respond_moderate(detection)
elif level == "LOW":
response = self._respond_low(detection)
return self._respond_low(detection)
else:
response = CrisisResponse(
return CrisisResponse(
message="",
risk_level="NONE",
escalate=False,
)
if level != "NONE":
self._run_synth_task(
self._synth.log_crisis_event,
level=level,
matched_keywords=list(detection.matched_keywords),
response_type=response.risk_level,
session_id=self._session_id,
user_continued=False,
metadata={
"keyword_count": len(detection.matched_keywords),
"score": detection.score,
},
)
self._awaiting_continuation = True
elif self._awaiting_continuation:
self._run_synth_task(self._synth.log_user_continued, self._session_id)
self._awaiting_continuation = False
return response
def _respond_critical(self, detection: CrisisResult) -> CrisisResponse:
"""
CRITICAL response protocol:

View File

@@ -1,32 +0,0 @@
# Issue #41 Verification
Status: already implemented on main
Issue:
- #41 [P3] Service worker: cache crisis resources for offline
Acceptance criteria check:
1. Passed — offline page includes 988 call button, Crisis Text Line, and grounding techniques
2. Passed — service worker precaches `crisis-offline.html`
3. Passed — phone number is clickable via `href="tel:988"`
4. Passed — navigation timeout + offline fallback support intermittent connections
Evidence:
- `sw.js` precaches `/crisis-offline.html`
- `sw.js` contains `NAVIGATION_TIMEOUT_MS` and `AbortController`
- `crisis-offline.html` contains:
- `href="tel:988"`
- Crisis Text Line / `741741`
- grounding prompts for 5-4-3-2-1
- `tests/test_service_worker_offline.py` already exists on main and passes
Verification run from fresh main clone:
- `python3 -m pytest tests/test_service_worker_offline.py -q`
- `10 passed`
Prior implementation trail:
- Issue comment already states the core implementation is complete on main
- Closed PR #122 documented the already-complete implementation and added the offline guarantees test coverage that is now present on main
Recommendation:
- Close issue #41 as already implemented on main.

View File

@@ -1 +0,0 @@
"""Evolution package for learning-oriented the-door modules."""

View File

@@ -1,225 +1 @@
"""Crisis synthesizer — learn from anonymized crisis interactions.
This is deliberately simple and privacy-preserving. It does not train a model or
modify detection rules automatically. It only logs metadata, summarizes patterns,
and suggests human-reviewed keyword weight adjustments.
"""
from __future__ import annotations
import argparse
import json
import time
from collections import Counter, defaultdict
from pathlib import Path
from typing import Iterable
DEFAULT_LOG_PATH = Path.home() / ".the-door" / "crisis-interactions.jsonl"
LEVELS = ("NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL")
def build_interaction_event(
level: str,
indicators: list[str],
response_given: str,
continued_conversation: bool,
false_positive: bool,
*,
now: float | None = None,
session_hash: str = "",
response_type: str | None = None,
metadata: dict | None = None,
) -> dict:
indicator_list = list(indicators)
return {
"event_type": "crisis_interaction",
"timestamp": float(time.time() if now is None else now),
"level": level,
"indicators": indicator_list,
"matched_keywords": indicator_list,
"indicator_count": len(indicator_list),
"response_given": response_given,
"response_type": response_type or response_given,
"session_hash": session_hash,
"continued_conversation": bool(continued_conversation),
"user_continued": bool(continued_conversation),
"false_positive": bool(false_positive),
"metadata": metadata or {},
}
def append_interaction_event(
log_path: str | Path,
*,
level: str,
indicators: list[str],
response_given: str,
continued_conversation: bool,
false_positive: bool,
now: float | None = None,
session_hash: str = "",
response_type: str | None = None,
metadata: dict | None = None,
) -> dict:
event = build_interaction_event(
level,
indicators,
response_given,
continued_conversation,
false_positive,
now=now,
session_hash=session_hash,
response_type=response_type,
metadata=metadata,
)
path = Path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event) + "\n")
return event
def load_interaction_events(log_path: str | Path) -> list[dict]:
path = Path(log_path)
if not path.exists():
return []
events: list[dict] = []
for line in path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
event = json.loads(line)
event_type = event.get("event_type", "crisis_interaction")
if event_type == "continuation":
session_hash = event.get("session_hash", "")
for prior in reversed(events):
if (
prior.get("event_type", "crisis_interaction") == "crisis_interaction"
and prior.get("session_hash", "") == session_hash
):
prior["continued_conversation"] = True
prior["user_continued"] = True
prior["continued_at"] = event.get("timestamp")
break
continue
events.append(event)
return events
def summarize_keywords(events: Iterable[dict]) -> list[dict]:
counts: Counter[str] = Counter()
for event in events:
counts.update(event.get("indicators", []))
return [{"keyword": keyword, "count": count} for keyword, count in counts.most_common(10)]
def suggest_keyword_adjustments(events: Iterable[dict], *, min_observations: int = 5) -> list[dict]:
stats: dict[str, dict[str, int]] = defaultdict(lambda: {
"observations": 0,
"true_positive_count": 0,
"false_positive_count": 0,
"continued_conversation_count": 0,
})
for event in events:
for keyword in event.get("indicators", []):
bucket = stats[keyword]
bucket["observations"] += 1
if event.get("false_positive"):
bucket["false_positive_count"] += 1
else:
bucket["true_positive_count"] += 1
if event.get("continued_conversation"):
bucket["continued_conversation_count"] += 1
suggestions = []
for keyword, bucket in sorted(stats.items()):
if bucket["observations"] < min_observations:
continue
fp = bucket["false_positive_count"]
tp = bucket["true_positive_count"]
if fp >= min_observations and tp == 0:
adjustment = "lower_weight"
rationale = "Observed only false positives across the sample window."
elif tp >= min_observations and fp == 0:
adjustment = "raise_weight"
rationale = "Observed repeated genuine crises with no false positives."
else:
adjustment = "observe"
rationale = "Mixed evidence; keep monitoring before changing weights."
suggestions.append(
{
"keyword": keyword,
**bucket,
"suggested_adjustment": adjustment,
"rationale": rationale,
}
)
return suggestions
def build_weekly_report(
events: Iterable[dict],
*,
now: float | None = None,
window_days: int = 7,
min_observations: int = 3,
) -> dict:
current_time = float(time.time() if now is None else now)
cutoff = current_time - (window_days * 86400)
filtered = [event for event in events if float(event.get("timestamp", 0)) >= cutoff]
detections_per_level = {level: 0 for level in LEVELS}
detected_events = []
continued_after_intervention = 0
for event in filtered:
level = event.get("level", "NONE")
detections_per_level[level] = detections_per_level.get(level, 0) + 1
if level != "NONE":
detected_events.append(event)
if event.get("continued_conversation"):
continued_after_intervention += 1
false_positive_count = sum(1 for event in detected_events if event.get("false_positive"))
false_positive_estimate = false_positive_count / len(detected_events) if detected_events else 0.0
return {
"window_days": window_days,
"total_events": len(filtered),
"detections_per_level": detections_per_level,
"most_common_keywords": summarize_keywords(filtered),
"false_positive_estimate": false_positive_estimate,
"continued_after_intervention": continued_after_intervention,
"keyword_weight_suggestions": suggest_keyword_adjustments(filtered, min_observations=min_observations),
}
def render_weekly_report(summary: dict) -> str:
return json.dumps(summary, indent=2)
def write_weekly_report(output_path: str | Path, summary: dict) -> Path:
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(render_weekly_report(summary) + "\n", encoding="utf-8")
return path
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Summarize anonymized crisis interactions")
parser.add_argument("--log-path", default=str(DEFAULT_LOG_PATH), help="JSONL crisis interaction log")
parser.add_argument("--days", type=int, default=7, help="Lookback window in days")
parser.add_argument("--min-observations", type=int, default=3, help="Minimum observations before suggesting keyword adjustments")
parser.add_argument("--output", help="Optional file to write the weekly report JSON")
args = parser.parse_args(argv)
events = load_interaction_events(args.log_path)
summary = build_weekly_report(events, window_days=args.days, min_observations=args.min_observations)
rendered = render_weekly_report(summary)
print(rendered)
if args.output:
write_weekly_report(args.output, summary)
return 0
if __name__ == "__main__":
raise SystemExit(main())
...

View File

@@ -1,170 +0,0 @@
"""
image_screening.py — local image crisis screening slice for epic #130.
Grounded scope:
- screens OCR text, upstream object labels, and operator notes for crisis signals
- intentionally does NOT claim raw computer-vision understanding of pixels
- designed to plug into future multimodal scoring once a dedicated image model lands
"""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from typing import Iterable, List, Optional
from crisis.detect import detect_crisis
DIRECT_SELF_HARM_LABELS = {
"blood",
"blade",
"razor",
"knife",
"scissors",
"noose",
"ligature",
"hanging",
"pills",
"pill bottle",
"overdose",
"gun",
"firearm",
"rope",
"cuts",
"self-harm",
}
INJURY_LABELS = {
"wound",
"wounds",
"bruise",
"bruises",
"bandage",
"bandages",
"injury",
"injuries",
"scar",
"scars",
"burn",
"burns",
"bleeding",
}
HIGH_RISK_SCENE_LABELS = {
"bridge edge",
"rooftop edge",
"train tracks",
"ledge",
"cliff edge",
"dark room",
"bathroom floor",
"hospital wristband",
"suicide note",
"goodbye letter",
}
@dataclass
class ImageScreeningResult:
ocr_text: str = ""
labels: List[str] = field(default_factory=list)
visual_flags: List[str] = field(default_factory=list)
distress_score: float = 0.0
requires_human_review: bool = False
signals_detected: List[str] = field(default_factory=list)
grounded_scope: str = (
"heuristic screening over OCR text, upstream labels, and operator notes; "
"raw vision-model inference is not implemented in this slice"
)
def to_dict(self) -> dict:
return asdict(self)
def _normalize_items(values: Optional[Iterable[str]]) -> List[str]:
if not values:
return []
normalized = []
for value in values:
text = str(value).strip().lower()
if text:
normalized.append(text)
return normalized
def _match_keywords(haystack: str, keywords: set[str]) -> List[str]:
matches = []
for keyword in keywords:
if keyword in haystack:
matches.append(keyword)
return sorted(set(matches))
def screen_image_signals(
image_path: Optional[str] = None,
*,
ocr_text: str = "",
labels: Optional[Iterable[str]] = None,
manual_notes: str = "",
visual_flags: Optional[Iterable[str]] = None,
) -> ImageScreeningResult:
"""
Score image-related crisis evidence without pretending to do full CV.
Inputs are deliberately grounded in what the repo can actually support today:
- OCR text extracted upstream from screenshots/photos
- object labels supplied by a local model or operator
- manual operator notes about visible scene context
- optional visual flags from any upstream preprocessor
"""
normalized_labels = _normalize_items(labels)
normalized_flags = _normalize_items(visual_flags)
normalized_notes = str(manual_notes or "").strip().lower()
normalized_ocr = str(ocr_text or "").strip()
combined_label_text = " ".join(normalized_labels + normalized_flags + ([normalized_notes] if normalized_notes else []))
crisis_text = " ".join(part for part in [normalized_ocr, normalized_notes] if part).strip()
direct_matches = _match_keywords(combined_label_text, DIRECT_SELF_HARM_LABELS)
injury_matches = _match_keywords(combined_label_text, INJURY_LABELS)
scene_matches = _match_keywords(combined_label_text, HIGH_RISK_SCENE_LABELS)
text_detection = detect_crisis(crisis_text) if crisis_text else None
signals: List[str] = []
score = 0.0
if direct_matches:
score = max(score, 0.85)
for match in direct_matches:
signals.append(f"direct_self_harm_label:{match}")
if injury_matches:
score = max(score, 0.55)
for match in injury_matches:
signals.append(f"injury_indicator:{match}")
if scene_matches:
score = max(score, 0.4)
for match in scene_matches:
signals.append(f"high_risk_scene:{match}")
if text_detection and text_detection.level != "NONE":
score = max(score, min(1.0, text_detection.score))
signals.append(f"ocr_crisis_level:{text_detection.level}")
for indicator in text_detection.indicators[:3]:
signals.append(f"ocr_indicator:{indicator}")
if direct_matches and text_detection and text_detection.level in {"HIGH", "CRITICAL"}:
score = min(1.0, max(score, 0.95))
signals.append("cross_modal_confirmation:text_plus_visual")
requires_human_review = score >= 0.4 or bool(direct_matches)
return ImageScreeningResult(
ocr_text=normalized_ocr,
labels=list(normalized_labels),
visual_flags=list(normalized_flags),
distress_score=round(score, 4),
requires_human_review=requires_human_review,
signals_detected=signals,
)

View File

@@ -241,48 +241,6 @@ html, body {
opacity: 0.5;
}
/* ===== CHAT HEADER ===== */
#chat-header {
flex-shrink: 0;
display: flex;
align-items: center;
justify-content: space-between;
gap: 12px;
padding: 10px 12px;
border-bottom: 1px solid #21262d;
background: #11161d;
}
.chat-header-title {
font-size: 0.85rem;
color: #8b949e;
font-weight: 600;
letter-spacing: 0.02em;
}
#chat-safety-plan-btn {
display: inline-flex;
align-items: center;
gap: 6px;
padding: 8px 12px;
min-height: 36px;
border: 1px solid #30363d;
border-radius: 999px;
background: transparent;
color: #c9d1d9;
font-size: 0.8rem;
font-weight: 600;
cursor: pointer;
}
#chat-safety-plan-btn:hover,
#chat-safety-plan-btn:focus {
border-color: #58a6ff;
background: rgba(88, 166, 255, 0.12);
outline: 2px solid #58a6ff;
outline-offset: 2px;
}
/* ===== CHAT AREA ===== */
#chat-area {
flex: 1;
@@ -691,14 +649,6 @@ html, body {
</div>
</div>
<div id="chat-header">
<div class="chat-header-title" aria-hidden="true">Conversation</div>
<button id="chat-safety-plan-btn" type="button" aria-label="Open My Safety Plan from chat header">
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" aria-hidden="true"><path d="M14 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V8z"/><polyline points="14 2 14 8 20 8"/><line x1="16" y1="13" x2="8" y2="13"/><line x1="16" y1="17" x2="8" y2="17"/><polyline points="10 9 9 9 8 9"/></svg>
My Safety Plan
</button>
</div>
<!-- Chat messages -->
<div id="chat-area" role="log" aria-label="Chat messages" aria-live="polite" tabindex="0">
<!-- Messages inserted here -->
@@ -731,7 +681,6 @@ html, body {
<!-- Footer -->
<footer id="footer">
<a href="/about.html" aria-label="About The Door">about</a>
<button id="crisis-resources-btn" aria-label="Open crisis resources">crisis resources</button>
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
</footer>
@@ -862,11 +811,9 @@ Sovereignty and service always.`;
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
var crisisResourcesBtn = document.getElementById('crisis-resources-btn');
// Safety Plan Elements
var safetyPlanBtn = document.getElementById('safety-plan-btn');
var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');
var crisisSafetyPlanBtn = document.getElementById('crisis-safety-plan-btn');
var safetyPlanModal = document.getElementById('safety-plan-modal');
var closeSafetyPlan = document.getElementById('close-safety-plan');
@@ -879,9 +826,6 @@ Sovereignty and service always.`;
var isStreaming = false;
var overlayTimer = null;
var crisisPanelShown = false;
var CRISIS_OVERLAY_COOLDOWN_MS = 10 * 60 * 1000;
var CRISIS_OVERLAY_LAST_SHOWN_KEY = 'timmy_crisis_overlay_last_shown_at';
var CRISIS_OVERLAY_EVENT_LOG_KEY = 'timmy_crisis_overlay_event_log';
// ===== SERVICE WORKER =====
if ('serviceWorker' in navigator) {
@@ -909,43 +853,6 @@ Sovereignty and service always.`;
window.addEventListener('offline', updateOnlineStatus);
updateOnlineStatus();
function getLastOverlayShownAt() {
try {
return parseInt(localStorage.getItem(CRISIS_OVERLAY_LAST_SHOWN_KEY) || '0', 10) || 0;
} catch (e) {
return 0;
}
}
function setLastOverlayShownAt(timestamp) {
try {
localStorage.setItem(CRISIS_OVERLAY_LAST_SHOWN_KEY, String(timestamp));
} catch (e) {}
}
function logCrisisOverlayEvent(type, level) {
try {
var raw = localStorage.getItem(CRISIS_OVERLAY_EVENT_LOG_KEY);
var events = raw ? JSON.parse(raw) : [];
if (!Array.isArray(events)) events = [];
events.push({ type: type, level: level, at: Date.now() });
if (events.length > 20) events = events.slice(events.length - 20);
localStorage.setItem(CRISIS_OVERLAY_EVENT_LOG_KEY, JSON.stringify(events));
} catch (e) {}
}
function openCrisisResources() {
crisisPanelShown = true;
crisisPanel.classList.add('visible');
if (typeof crisisPanel.scrollIntoView === 'function') {
crisisPanel.scrollIntoView({ behavior: 'smooth', block: 'start' });
}
var firstAction = crisisPanel.querySelector('.crisis-btn, a[href]');
if (firstAction && typeof firstAction.focus === 'function') {
firstAction.focus();
}
}
// ===== CRISIS KEYWORDS =====
// Tier 1: General crisis indicators - triggers enhanced 988 panel
var crisisKeywords = [
@@ -1080,33 +987,20 @@ Sovereignty and service always.`;
// Focus trap: cycle through focusable elements within the crisis overlay
function getOverlayFocusableElements() {
return Array.prototype.slice.call(crisisOverlay.querySelectorAll(
return crisisOverlay.querySelectorAll(
'a[href], button:not([disabled]), [tabindex]:not([tabindex="-1"])'
));
);
}
function trapFocusInOverlay(e) {
if (!crisisOverlay.classList.contains('active')) return;
if (e.key === 'Escape') {
e.preventDefault();
closeOverlay(msgInput);
return;
}
if (e.key !== 'Tab') return;
var focusable = getOverlayFocusableElements();
if (focusable.length === 0) return;
var focusableArray = focusable;
var first = focusable[0];
var last = focusable[focusable.length - 1];
var activeIndex = focusableArray.indexOf(document.activeElement);
if (activeIndex === -1) {
e.preventDefault();
e.shiftKey ? last.focus() : first.focus();
return;
}
if (e.shiftKey) {
// Shift+Tab: if on first, wrap to last
@@ -1126,47 +1020,7 @@ Sovereignty and service always.`;
// Store the element that had focus before the overlay opened
var _preOverlayFocusElement = null;
function closeOverlay(restoreFocusTarget) {
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.getElementById('app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat-area');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
if (restoreFocusTarget === _preOverlayFocusElement && _preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else if (restoreFocusTarget && typeof restoreFocusTarget.focus === 'function') {
restoreFocusTarget.focus();
} else if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
function showOverlay() {
return showOverlayWithRateLimit(false, 2);
}
function showOverlayWithRateLimit(forceOpen, level) {
var lastShownAt = getLastOverlayShownAt();
if (!forceOpen && Date.now() - lastShownAt < CRISIS_OVERLAY_COOLDOWN_MS) {
logCrisisOverlayEvent('suppressed', level || 2);
return false;
}
logCrisisOverlayEvent(forceOpen ? 'manual-open' : 'shown', level || 2);
setLastOverlayShownAt(Date.now());
// Save current focus for restoration on dismiss
_preOverlayFocusElement = document.activeElement;
@@ -1176,10 +1030,10 @@ Sovereignty and service always.`;
overlayDismissBtn.textContent = 'Continue to chat (' + countdown + 's)';
// Disable background interaction via inert attribute
var mainApp = document.getElementById('app');
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.setAttribute('inert', '');
// Also hide from assistive tech
var chatSection = document.getElementById('chat-area');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.setAttribute('aria-hidden', 'true');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.setAttribute('aria-hidden', 'true');
@@ -1199,7 +1053,6 @@ Sovereignty and service always.`;
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
return true;
}
// Register focus trap on document (always listening, gated by class check)
@@ -1207,7 +1060,27 @@ Sovereignty and service always.`;
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
closeOverlay(_preOverlayFocusElement);
crisisOverlay.classList.remove('active');
if (overlayTimer) {
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
// Restore focus to the element that had it before the overlay opened
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
}
});
@@ -1412,31 +1285,19 @@ Sovereignty and service always.`;
_spTriggerEl = null;
}
function openSafetyPlan(triggerEl) {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(triggerEl || document.activeElement);
}
// Wire open buttons to activate focus trap
safetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(safetyPlanBtn);
});
chatSafetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(chatSafetyPlanBtn);
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
openSafetyPlan(crisisSafetyPlanBtn);
});
}
if (crisisResourcesBtn) {
crisisResourcesBtn.addEventListener('click', function() {
openCrisisResources();
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(crisisSafetyPlanBtn);
});
}
@@ -1583,7 +1444,9 @@ Sovereignty and service always.`;
// Check for URL params (e.g., ?safetyplan=true for PWA shortcut)
var urlParams = new URLSearchParams(window.location.search);
if (urlParams.get('safetyplan') === 'true') {
openSafetyPlan(chatSafetyPlanBtn || safetyPlanBtn);
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
// Clean up URL
window.history.replaceState({}, document.title, window.location.pathname);
}

View File

@@ -1,68 +0,0 @@
# The Door Fleet Work Orders Audit — issue #75
Generated: 2026-04-17T04:10:14Z
Source issue: `TRIAGE: The Door - Fleet Work Orders (2026-04-09)`
## Source Snapshot
Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.
## Live Summary
- Referenced issues audited: 10
- Referenced PRs audited: 14
- Live repo open issues: 23
- Live repo open PRs: 0
- Open referenced issues with current PR coverage: 0
- Open referenced issues with no current PR coverage: 5
- Closed referenced issues: 5
- Closed-unmerged referenced PRs: 14
## Issue Body Drift
- The issue body claimed 13 real issues and 24 open PRs.
- Live repo state now shows 23 open issues and 0 open PRs.
- Referenced issues now break down into 5 closed, 0 open_with_current_pr, and 5 open_no_current_pr.
- Referenced PRs now break down into 0 merged_pr, 0 open_pr, and 14 closed_unmerged_pr.
## Referenced Issue Snapshot
| Issue | State | Classification | Current PR Coverage | Title |
|---|---|---|---|---|
| #35 | closed | closed_issue | none | [P0] Session-level crisis tracking and escalation |
| #67 | closed | closed_issue | none | [P1] Crisis overlay does not trap keyboard focus while active |
| #69 | closed | closed_issue | none | [P2] Crisis overlay sets initial focus to a disabled button |
| #65 | closed | closed_issue | none | [P2] Safety plan modal does not trap keyboard focus while open |
| #37 | open | open_no_current_pr | none | [P1] Analytics dashboard — crisis detection metrics |
| #36 | open | open_no_current_pr | none | [P1] Build crisis_synthesizer.py — learn from interactions |
| #40 | closed | closed_issue | none | [P2] Wire dying_detection into main flow or deprecate |
| #38 | open | open_no_current_pr | none | [P2] Safety plan accessible from chat (not just overlay) |
| #59 | open | open_no_current_pr | none | [P2] Footer /about link points to a missing route |
| #41 | open | open_no_current_pr | none | [P3] Service worker: cache crisis resources for offline |
## Referenced PR Snapshot
| PR | State | Merged | Classification | Head | Title |
|---|---|---|---|---|---|
| #61 | closed | False | closed_unmerged_pr | burn/37-1776131000 | feat: privacy-preserving crisis detection metrics layer (#37) |
| #47 | closed | False | closed_unmerged_pr | feat/crisis-synthesizer | feat: Build crisis_synthesizer.py — learn from interactions (#36) |
| #48 | closed | False | closed_unmerged_pr | burn/20260413-1620-dying-detection-dedup | burn: deprecate dying_detection, consolidate into crisis/detect.py |
| #50 | closed | False | closed_unmerged_pr | whip/40-1776128804 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #51 | closed | False | closed_unmerged_pr | queue/40-1776129201 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #53 | closed | False | closed_unmerged_pr | q/40-1776129480 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #56 | closed | False | closed_unmerged_pr | triage/40-1776129677 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #58 | closed | False | closed_unmerged_pr | dawn/40-1776130053 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #70 | closed | False | closed_unmerged_pr | am/40-1776166469 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #72 | closed | False | closed_unmerged_pr | am/38-1776166469 | feat: add always-on safety plan access in chat header (#38) |
| #62 | closed | False | closed_unmerged_pr | burn/59-1776131200 | fix: point footer about link to /about.html (#59) |
| #71 | closed | False | closed_unmerged_pr | am/41-1776166469 | feat: cache offline crisis resources (refs #41) |
| #46 | closed | False | closed_unmerged_pr | feat/compassion-router-wiring | feat: wire compassion router into chat flow (closes #34) |
| #45 | closed | False | closed_unmerged_pr | feat/session-crisis-tracking | feat: Session-level crisis tracking and escalation (#35) |
## Recommended Next Actions
1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.
2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.
3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.
4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.
5. This audit preserves operator memory; it does not claim all referenced work orders are complete.

View File

@@ -1,295 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG = "Timmy_Foundation"
DEFAULT_TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
DEFAULT_OUTPUT = "reports/2026-04-17-the-door-fleet-work-orders-audit.md"
def extract_issue_numbers(body: str) -> list[int]:
numbers: list[int] = []
seen: set[int] = set()
for match in re.finditer(r"#(\d+)", body or ""):
value = int(match.group(1))
if value in seen:
continue
seen.add(value)
numbers.append(value)
return numbers
def api_get(repo: str, path: str, token: str) -> Any:
req = Request(
f"{API_BASE}/repos/{ORG}/{repo}{path}",
headers={"Authorization": f"token {token}"},
)
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def fetch_open_prs(repo: str, token: str) -> list[dict[str, Any]]:
prs: list[dict[str, Any]] = []
page = 1
while True:
batch = api_get(repo, f"/pulls?state=open&limit=100&page={page}", token)
if not batch:
break
prs.extend(batch)
page += 1
return prs
def fetch_live_open_issue_count(repo: str, token: str) -> int:
total = 0
page = 1
while True:
batch = api_get(repo, f"/issues?state=open&limit=100&page={page}", token)
if not batch:
break
total += sum(1 for item in batch if not item.get("pull_request"))
page += 1
return total
def parse_claimed_summary(body: str) -> tuple[int | None, int | None]:
issue_match = re.search(r"has\s+(\d+)\s+real issues", body or "", flags=re.IGNORECASE)
pr_match = re.search(r"and\s+(\d+)\s+open PRs", body or "", flags=re.IGNORECASE)
claimed_open_issues = int(issue_match.group(1)) if issue_match else None
claimed_open_prs = int(pr_match.group(1)) if pr_match else None
return claimed_open_issues, claimed_open_prs
def summarize_open_pr_coverage(issue_num: int, open_prs: list[dict[str, Any]]) -> str:
matches: list[str] = []
seen: set[int] = set()
for pr in open_prs:
pr_num = pr["number"]
if pr_num in seen:
continue
text = "\n".join(
[
pr.get("title") or "",
pr.get("body") or "",
(pr.get("head") or {}).get("ref") or "",
]
)
if f"#{issue_num}" not in text:
continue
seen.add(pr_num)
matches.append(f"open PR #{pr_num}")
return ", ".join(matches) if matches else "none"
def classify_issue_reference(ref_issue: dict[str, Any], open_prs: list[dict[str, Any]]) -> dict[str, Any]:
issue_num = ref_issue["number"]
state = ref_issue.get("state") or "unknown"
coverage = summarize_open_pr_coverage(issue_num, open_prs)
if state == "closed":
classification = "closed_issue"
elif coverage != "none":
classification = "open_with_current_pr"
else:
classification = "open_no_current_pr"
return {
"number": issue_num,
"state": state,
"classification": classification,
"title": ref_issue.get("title") or "",
"current_pr_coverage": coverage,
"url": ref_issue.get("html_url") or ref_issue.get("url") or "",
}
def classify_pr_reference(repo: str, pr_num: int, token: str) -> dict[str, Any]:
pr = api_get(repo, f"/pulls/{pr_num}", token)
state = pr.get("state") or "unknown"
merged = bool(pr.get("merged"))
if merged:
classification = "merged_pr"
elif state == "open":
classification = "open_pr"
else:
classification = "closed_unmerged_pr"
return {
"number": pr_num,
"state": state,
"merged": merged,
"classification": classification,
"title": pr.get("title") or "",
"head": (pr.get("head") or {}).get("ref") or "",
"url": pr.get("html_url") or pr.get("url") or "",
}
def table(rows: list[dict[str, Any]], columns: list[tuple[str, str]]) -> str:
headers = [title for title, _ in columns]
keys = [key for _, key in columns]
if not rows:
return "| None |\n|---|\n| None |"
lines = ["| " + " | ".join(headers) + " |", "|" + "|".join(["---"] * len(headers)) + "|"]
for row in rows:
values: list[str] = []
for key in keys:
value = row.get(key, "")
if key == "number" and value != "":
value = f"#{value}"
values.append(str(value).replace("\n", " "))
lines.append("| " + " | ".join(values) + " |")
return "\n".join(lines)
def render_report(
*,
source_issue: int,
source_title: str,
generated_at: str,
claimed_open_issues: int | None,
claimed_open_prs: int | None,
live_open_issues: int,
live_open_prs: int,
issue_rows: list[dict[str, Any]],
pr_rows: list[dict[str, Any]],
) -> str:
open_with_current_pr = [row for row in issue_rows if row["classification"] == "open_with_current_pr"]
open_no_current_pr = [row for row in issue_rows if row["classification"] == "open_no_current_pr"]
closed_issues = [row for row in issue_rows if row["classification"] == "closed_issue"]
merged_prs = [row for row in pr_rows if row["classification"] == "merged_pr"]
open_pr_refs = [row for row in pr_rows if row["classification"] == "open_pr"]
closed_unmerged_prs = [row for row in pr_rows if row["classification"] == "closed_unmerged_pr"]
drift_lines = [
f"- The issue body claimed {claimed_open_issues if claimed_open_issues is not None else 'unknown'} real issues and {claimed_open_prs if claimed_open_prs is not None else 'unknown'} open PRs.",
f"- Live repo state now shows {live_open_issues} open issues and {live_open_prs} open PRs.",
f"- Referenced issues now break down into {len(closed_issues)} closed, {len(open_with_current_pr)} open_with_current_pr, and {len(open_no_current_pr)} open_no_current_pr.",
f"- Referenced PRs now break down into {len(merged_prs)} merged_pr, {len(open_pr_refs)} open_pr, and {len(closed_unmerged_prs)} closed_unmerged_pr.",
]
return "\n".join(
[
f"# The Door Fleet Work Orders Audit — issue #{source_issue}",
"",
f"Generated: {generated_at}",
f"Source issue: `{source_title}`",
"",
"## Source Snapshot",
"",
"Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.",
"",
"## Live Summary",
"",
f"- Referenced issues audited: {len(issue_rows)}",
f"- Referenced PRs audited: {len(pr_rows)}",
f"- Live repo open issues: {live_open_issues}",
f"- Live repo open PRs: {live_open_prs}",
f"- Open referenced issues with current PR coverage: {len(open_with_current_pr)}",
f"- Open referenced issues with no current PR coverage: {len(open_no_current_pr)}",
f"- Closed referenced issues: {len(closed_issues)}",
f"- Closed-unmerged referenced PRs: {len(closed_unmerged_prs)}",
"",
"## Issue Body Drift",
"",
*drift_lines,
"",
"## Referenced Issue Snapshot",
"",
table(
issue_rows,
[
("Issue", "number"),
("State", "state"),
("Classification", "classification"),
("Current PR Coverage", "current_pr_coverage"),
("Title", "title"),
],
),
"",
"## Referenced PR Snapshot",
"",
table(
pr_rows,
[
("PR", "number"),
("State", "state"),
("Merged", "merged"),
("Classification", "classification"),
("Head", "head"),
("Title", "title"),
],
),
"",
"## Recommended Next Actions",
"",
"1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.",
"2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.",
"3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.",
"4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.",
"5. This audit preserves operator memory; it does not claim all referenced work orders are complete.",
]
) + "\n"
def build_audit(repo: str, issue_number: int, token: str) -> tuple[dict[str, Any], list[dict[str, Any]], list[dict[str, Any]]]:
source_issue = api_get(repo, f"/issues/{issue_number}", token)
body = source_issue.get("body") or ""
refs = extract_issue_numbers(body)
open_prs = fetch_open_prs(repo, token)
claimed_open_issues, claimed_open_prs = parse_claimed_summary(body)
issue_rows: list[dict[str, Any]] = []
pr_rows: list[dict[str, Any]] = []
for ref in refs:
issue_like = api_get(repo, f"/issues/{ref}", token)
if issue_like.get("pull_request"):
pr_rows.append(classify_pr_reference(repo, ref, token))
else:
issue_rows.append(classify_issue_reference(issue_like, open_prs))
metadata = {
"source_title": source_issue.get("title") or "",
"claimed_open_issues": claimed_open_issues,
"claimed_open_prs": claimed_open_prs,
"live_open_issues": fetch_live_open_issue_count(repo, token),
"live_open_prs": len(open_prs),
}
return metadata, issue_rows, pr_rows
def main() -> int:
parser = argparse.ArgumentParser(description="Audit The Door fleet work orders issue against live forge state.")
parser.add_argument("--repo", default="the-door")
parser.add_argument("--issue", type=int, default=75)
parser.add_argument("--token-file", default=DEFAULT_TOKEN_PATH)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
args = parser.parse_args()
token = Path(args.token_file).read_text(encoding="utf-8").strip()
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
metadata, issue_rows, pr_rows = build_audit(args.repo, args.issue, token)
report = render_report(
source_issue=args.issue,
source_title=metadata["source_title"],
generated_at=generated_at,
claimed_open_issues=metadata["claimed_open_issues"],
claimed_open_prs=metadata["claimed_open_prs"],
live_open_issues=metadata["live_open_issues"],
live_open_prs=metadata["live_open_prs"],
issue_rows=issue_rows,
pr_rows=pr_rows,
)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(output_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,26 +0,0 @@
"""Regression tests for the crisis integration contract expected by hermes-agent.
Issue #141 names the-door-side shared modules as `crisis.tracker` and
`crisis.bridge`. Keep those import paths available even if the canonical
implementation lives in `session_tracker.py` and `gateway.py`.
"""
import importlib
def test_crisis_tracker_module_exports_session_tracker_contract():
tracker = importlib.import_module("crisis.tracker")
session_tracker = importlib.import_module("crisis.session_tracker")
assert tracker.CrisisSessionTracker is session_tracker.CrisisSessionTracker
assert tracker.SessionState is session_tracker.SessionState
assert tracker.check_crisis_with_session is session_tracker.check_crisis_with_session
def test_crisis_bridge_module_exports_gateway_contract():
bridge = importlib.import_module("crisis.bridge")
gateway = importlib.import_module("crisis.gateway")
assert bridge.check_crisis is gateway.check_crisis
assert bridge.get_system_prompt is gateway.get_system_prompt
assert bridge.format_gateway_response is gateway.format_gateway_response

View File

@@ -1,117 +1,99 @@
"""
Tests for crisis/metrics.py — Aggregate crisis metrics.
"""
"""Tests for privacy-preserving crisis metrics aggregation (issue #37)."""
from __future__ import annotations
import json
import os
import shutil
import tempfile
import unittest
from pathlib import Path
import pathlib
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
import unittest
from crisis.metrics import CrisisMetrics, SessionMetrics, AggregateMetrics
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
from crisis.gateway import check_crisis
from crisis.metrics import (
append_metrics_event,
build_metrics_event,
build_weekly_summary,
load_metrics_events,
render_weekly_summary,
)
class TestCrisisMetrics(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.metrics = CrisisMetrics(Path(self.tmpdir))
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_record_session_creates_file(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="LOW",
peak_level="MEDIUM",
message_count=5,
was_escalating=True,
was_deescalating=False,
escalation_rate=0.5,
class TestMetricsEvent(unittest.TestCase):
def test_event_is_privacy_preserving(self):
detection = detect_crisis("I want to kill myself")
event = build_metrics_event(
detection,
continued_conversation=True,
false_positive=False,
now=1_700_000_000,
)
self.metrics.record_session(sm)
self.assertEqual(event["timestamp"], 1_700_000_000)
self.assertEqual(event["level"], "CRITICAL")
self.assertTrue(event["continued_conversation"])
self.assertFalse(event["false_positive"])
self.assertNotIn("text", event)
self.assertNotIn("message", event)
self.assertGreaterEqual(event["indicator_count"], 1)
self.assertTrue(event["indicators"])
files = list(Path(self.tmpdir).glob("*.jsonl"))
self.assertEqual(len(files), 1)
def test_record_session_writes_jsonl(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="HIGH",
peak_level="CRITICAL",
message_count=10,
was_escalating=True,
was_deescalating=False,
escalation_rate=1.0,
triggered_overlay=True,
showed_988=True,
class TestMetricsLogAndSummary(unittest.TestCase):
def test_append_and_load_metrics_events(self):
log_path = pathlib.Path(self._testMethodName).with_suffix(".jsonl")
try:
append_metrics_event(log_path, detect_crisis("I want to die"), now=1_700_000_000)
events = load_metrics_events(log_path)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]["level"], "CRITICAL")
finally:
if log_path.exists():
log_path.unlink()
def test_weekly_summary_counts_levels_keywords_and_false_positives(self):
events = [
build_metrics_event(detect_crisis("I want to die"), continued_conversation=True, false_positive=False, now=1_700_000_000),
build_metrics_event(detect_crisis("I'm having a rough day"), continued_conversation=False, false_positive=False, now=1_700_000_100),
build_metrics_event(detect_crisis("I want to die"), continued_conversation=False, false_positive=True, now=1_700_000_200),
build_metrics_event(detect_crisis("Hello there"), continued_conversation=False, false_positive=False, now=1_700_000_300),
]
summary = build_weekly_summary(events, now=1_700_000_400, window_days=7)
self.assertEqual(summary["detections_per_level"]["CRITICAL"], 2)
self.assertEqual(summary["detections_per_level"]["LOW"], 1)
self.assertEqual(summary["detections_per_level"]["NONE"], 1)
self.assertEqual(summary["continued_after_intervention"], 1)
self.assertAlmostEqual(summary["false_positive_estimate"], 1 / 3, places=4)
self.assertEqual(summary["most_common_keywords"][0]["count"], 2)
def test_render_weekly_summary_mentions_required_metrics(self):
events = [
build_metrics_event(detect_crisis("I want to die"), continued_conversation=True, now=1_700_000_000),
build_metrics_event(detect_crisis("I feel hopeless with no way out"), false_positive=True, now=1_700_000_100),
]
summary = build_weekly_summary(events, now=1_700_000_200, window_days=7)
rendered = render_weekly_summary(summary)
self.assertIn("detections_per_level", rendered)
self.assertIn("most_common_keywords", rendered)
self.assertIn("false_positive_estimate", rendered)
self.assertIn("continued_after_intervention", rendered)
class TestGatewayMetricsIntegration(unittest.TestCase):
def test_check_crisis_can_emit_metrics_event(self):
result = check_crisis(
"I want to die",
metrics_log_path=None,
continued_conversation=True,
false_positive=False,
now=1_700_000_000,
)
self.metrics.record_session(sm)
files = list(Path(self.tmpdir).glob("*.jsonl"))
with open(files[0]) as f:
data = json.loads(f.readline())
self.assertEqual(data['peak_level'], 'CRITICAL')
self.assertTrue(data['triggered_overlay'])
def test_get_summary_empty(self):
agg = self.metrics.get_summary(days=7)
self.assertEqual(agg.total_sessions, 0)
self.assertEqual(agg.total_messages, 0)
def test_get_summary_with_data(self):
for level in ["LOW", "MEDIUM", "HIGH"]:
sm = SessionMetrics(
timestamp=1700000000,
current_level=level,
peak_level=level,
message_count=3,
was_escalating=level != "LOW",
was_deescalating=False,
escalation_rate=0.5,
)
self.metrics.record_session(sm)
agg = self.metrics.get_summary(days=1)
self.assertEqual(agg.total_sessions, 3)
self.assertEqual(agg.total_messages, 9)
self.assertEqual(agg.escalating_sessions, 2)
def test_get_report_returns_string(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="LOW",
peak_level="LOW",
message_count=5,
was_escalating=False,
was_deescalating=False,
escalation_rate=0.0,
)
self.metrics.record_session(sm)
report = self.metrics.get_report(days=1)
self.assertIn("CRISIS METRICS REPORT", report)
self.assertIn("Sessions:", report)
def test_get_json_returns_valid(self):
sm = SessionMetrics(
timestamp=1700000000,
current_level="MEDIUM",
peak_level="MEDIUM",
message_count=3,
was_escalating=False,
was_deescalating=False,
escalation_rate=0.0,
)
self.metrics.record_session(sm)
json_str = self.metrics.get_json(days=1)
data = json.loads(json_str)
self.assertEqual(data['total_sessions'], 1)
self.assertEqual(result["level"], "CRITICAL")
self.assertIn("metrics_event", result)
self.assertEqual(result["metrics_event"]["timestamp"], 1_700_000_000)
self.assertTrue(result["metrics_event"]["continued_conversation"])
if __name__ == "__main__":

View File

@@ -1,54 +0,0 @@
import pathlib
from playwright.sync_api import sync_playwright
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
def test_crisis_overlay_supports_keyboard_only_navigation():
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
try:
page.goto(INDEX_HTML.as_uri())
page.evaluate(
"""
() => {
window.fetch = () => Promise.reject(new Error('test fetch blocked'));
window.alert = () => {};
}
"""
)
page.fill('#msg-input', "I'm going to kill myself tonight")
page.press('#msg-input', 'Enter')
page.wait_for_function("document.getElementById('crisis-overlay').classList.contains('active')")
assert page.evaluate("document.activeElement.classList.contains('overlay-call')") is True
page.evaluate(
"""
() => {
const btn = document.getElementById('overlay-dismiss-btn');
btn.disabled = false;
btn.textContent = 'Continue to chat';
}
"""
)
page.keyboard.press('Tab')
assert page.evaluate("document.activeElement.id") == 'overlay-dismiss-btn'
page.keyboard.press('Tab')
assert page.evaluate("document.activeElement.classList.contains('overlay-call')") is True
page.keyboard.press('Shift+Tab')
assert page.evaluate("document.activeElement.id") == 'overlay-dismiss-btn'
page.keyboard.press('Escape')
page.wait_for_function("!document.getElementById('crisis-overlay').classList.contains('active')")
assert page.evaluate("document.activeElement.id") == 'msg-input'
finally:
browser.close()

View File

@@ -1,53 +0,0 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestCrisisOverlayRateLimit(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_overlay_has_ten_minute_cooldown_constant(self):
self.assertRegex(
self.html,
r"CRISIS_OVERLAY_COOLDOWN_MS\s*=\s*10\s*\*\s*60\s*\*\s*1000",
'Expected a 10-minute crisis overlay cooldown constant.',
)
def test_show_overlay_suppresses_repeat_with_logging(self):
self.assertRegex(
self.html,
r"function\s+logCrisisOverlayEvent\s*\(",
'Expected a crisis overlay event logger.',
)
self.assertRegex(
self.html,
r"if\s*\(!forceOpen\s*&&\s*Date\.now\(\)\s*-\s*lastShownAt\s*<\s*CRISIS_OVERLAY_COOLDOWN_MS\)",
'Expected showOverlay to suppress repeated auto-displays inside the cooldown window.',
)
self.assertRegex(
self.html,
r"logCrisisOverlayEvent\('suppressed'",
'Expected suppressed overlay attempts to be logged.',
)
def test_manual_crisis_resources_button_exists_and_bypasses_cooldown(self):
self.assertIn('id="crisis-resources-btn"', self.html)
self.assertRegex(
self.html,
r"function\s+openCrisisResources\s*\(",
'Expected a manual crisis resources opener.',
)
self.assertRegex(
self.html,
r"crisisResourcesBtn\.addEventListener\('click',\s*function\(\)\s*\{\s*openCrisisResources\(\);",
'Expected the footer button to wire into openCrisisResources().',
)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,111 +0,0 @@
"""Tests for evolution/crisis_synthesizer.py (issue #36)."""
from __future__ import annotations
import importlib.util
import json
import pathlib
import sys
import tempfile
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
SCRIPT = ROOT / 'evolution' / 'crisis_synthesizer.py'
spec = importlib.util.spec_from_file_location('crisis_synthesizer', str(SCRIPT))
mod = importlib.util.module_from_spec(spec)
sys.modules['crisis_synthesizer'] = mod
spec.loader.exec_module(mod)
class TestCrisisSynthesizerEvent(unittest.TestCase):
def test_build_interaction_event_is_privacy_preserving(self):
event = mod.build_interaction_event(
level='CRITICAL',
indicators=['want_to_die', 'no_way_out'],
response_given='guardian',
continued_conversation=True,
false_positive=False,
now=1700000000,
)
self.assertEqual(event['timestamp'], 1700000000)
self.assertEqual(event['level'], 'CRITICAL')
self.assertEqual(event['response_given'], 'guardian')
self.assertTrue(event['continued_conversation'])
self.assertFalse(event['false_positive'])
self.assertEqual(event['indicators'], ['want_to_die', 'no_way_out'])
for forbidden in ['text', 'message', 'content', 'ip', 'session_id', 'user_id']:
self.assertNotIn(forbidden, event)
class TestCrisisSynthesizerStorage(unittest.TestCase):
def test_append_and_load_events_round_trip(self):
with tempfile.TemporaryDirectory() as tmp:
log_path = pathlib.Path(tmp) / 'crisis-events.jsonl'
mod.append_interaction_event(
log_path,
level='HIGH',
indicators=['hopeless'],
response_given='companion',
continued_conversation=False,
false_positive=True,
now=1700000100,
)
events = mod.load_interaction_events(log_path)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]['level'], 'HIGH')
self.assertEqual(events[0]['indicators'], ['hopeless'])
class TestCrisisSynthesizerSummary(unittest.TestCase):
def test_weekly_report_contains_required_metrics(self):
events = [
mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', True, False, now=1700000000),
mod.build_interaction_event('HIGH', ['hopeless'], 'companion', False, True, now=1700000100),
mod.build_interaction_event('LOW', ['rough_day'], 'friend', False, False, now=1700000200),
mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', False, False, now=1700000300),
mod.build_interaction_event('NONE', [], 'friend', False, False, now=1700000400),
]
summary = mod.build_weekly_report(events, now=1700000500, window_days=7)
self.assertEqual(summary['detections_per_level']['CRITICAL'], 2)
self.assertEqual(summary['detections_per_level']['HIGH'], 1)
self.assertEqual(summary['detections_per_level']['LOW'], 1)
self.assertEqual(summary['detections_per_level']['NONE'], 1)
self.assertEqual(summary['continued_after_intervention'], 1)
self.assertAlmostEqual(summary['false_positive_estimate'], 0.25)
self.assertEqual(summary['most_common_keywords'][0]['keyword'], 'want_to_die')
self.assertEqual(summary['most_common_keywords'][0]['count'], 2)
class TestCrisisSynthesizerSuggestions(unittest.TestCase):
def test_suggests_weight_adjustments_from_interactions(self):
events = []
for ts in range(3):
events.append(mod.build_interaction_event('CRITICAL', ['want_to_die'], 'guardian', True, False, now=1700000000 + ts))
for ts in range(3):
events.append(mod.build_interaction_event('LOW', ['rough_day'], 'friend', False, True, now=1700000100 + ts))
suggestions = mod.suggest_keyword_adjustments(events, min_observations=3)
by_keyword = {s['keyword']: s for s in suggestions}
self.assertEqual(by_keyword['want_to_die']['suggested_adjustment'], 'raise_weight')
self.assertEqual(by_keyword['rough_day']['suggested_adjustment'], 'lower_weight')
class TestCrisisSynthesizerRendering(unittest.TestCase):
def test_render_weekly_report_outputs_json(self):
summary = {
'detections_per_level': {'NONE': 0, 'LOW': 1, 'MEDIUM': 0, 'HIGH': 0, 'CRITICAL': 0},
'most_common_keywords': [{'keyword': 'rough_day', 'count': 1}],
'false_positive_estimate': 0.0,
'continued_after_intervention': 0,
'keyword_weight_suggestions': [],
'window_days': 7,
'total_events': 1,
}
rendered = mod.render_weekly_report(summary)
parsed = json.loads(rendered)
self.assertEqual(parsed['window_days'], 7)
self.assertEqual(parsed['most_common_keywords'][0]['keyword'], 'rough_day')
if __name__ == '__main__':
unittest.main()

View File

@@ -1,90 +0,0 @@
"""Regression tests for crisis_synthesizer integration (issue #121)."""
from __future__ import annotations
import json
import os
from unittest.mock import Mock, patch
from crisis_detector import CrisisResult
from crisis_responder import CrisisResponder
from crisis.synthesizer_integration import CrisisSynthesizerIntegration
from evolution.crisis_synthesizer import load_interaction_events
def _make_detection(level: str, keywords: list[str]) -> CrisisResult:
return CrisisResult(risk_level=level, matched_keywords=keywords, context=[], score=0.9)
def test_responder_auto_logs_anonymized_event(tmp_path):
integration = CrisisSynthesizerIntegration(enabled=True, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
detection = _make_detection("HIGH", ["hopeless", "can't go on"])
response = responder.respond(detection)
log_path = tmp_path / "events.jsonl"
lines = log_path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
event = json.loads(lines[0])
assert event["level"] == "HIGH"
assert event["matched_keywords"] == ["hopeless", "can't go on"]
assert event["response_type"] == response.risk_level
assert isinstance(event["timestamp"], float)
assert event["user_continued"] is False
assert event["session_hash"]
assert "message" not in event
assert "session_id" not in event
assert log_path.stat().st_mode & 0o777 == 0o600
def test_next_non_crisis_message_marks_user_continued_append_only(tmp_path):
integration = CrisisSynthesizerIntegration(enabled=True, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
responder.respond(_make_detection("CRITICAL", ["want to die"]))
log_path = tmp_path / "events.jsonl"
before_size = log_path.stat().st_size
responder.respond(_make_detection("NONE", []))
after_size = log_path.stat().st_size
assert after_size > before_size
raw_lines = log_path.read_text(encoding="utf-8").splitlines()
assert len(raw_lines) == 2
continuation = json.loads(raw_lines[1])
assert continuation["event_type"] == "continuation"
assert continuation["user_continued"] is True
folded_events = load_interaction_events(log_path)
assert len(folded_events) == 1
assert folded_events[0]["continued_conversation"] is True
assert folded_events[0]["user_continued"] is True
def test_env_var_can_disable_logging_entirely(tmp_path):
with patch.dict(os.environ, {"CRISIS_SYNTH_ENABLED": "0"}, clear=False):
integration = CrisisSynthesizerIntegration(enabled=None, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
responder.respond(_make_detection("HIGH", ["hopeless"]))
assert not (tmp_path / "events.jsonl").exists()
@patch("crisis_responder.threading.Thread")
def test_async_logging_dispatches_to_background_thread(thread_cls):
integration = Mock()
integration.enabled = True
integration.log_crisis_event = Mock()
integration.log_user_continued = Mock()
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=True)
responder.respond(_make_detection("HIGH", ["hopeless"]))
thread_cls.assert_called_once()
_, kwargs = thread_cls.call_args
assert kwargs["daemon"] is True
thread_cls.return_value.start.assert_called_once()

View File

@@ -1,100 +0,0 @@
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT_PATH = ROOT / "scripts" / "fleet_work_orders_audit.py"
REPORT_PATH = ROOT / "reports" / "2026-04-17-the-door-fleet-work-orders-audit.md"
def _load_module():
assert SCRIPT_PATH.exists(), f"missing {SCRIPT_PATH.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location("fleet_work_orders_audit", SCRIPT_PATH)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_extract_issue_numbers_preserves_mixed_issue_and_pr_refs() -> None:
body = """
## P0 — Session-level crisis tracking (#35)
**PR #61 ready.**
## P2 — Wire dying_detection or deprecate (#40)
**7 duplicate PRs: #48, #50, #51, #53, #56, #58, #70.**
"""
mod = _load_module()
assert mod.extract_issue_numbers(body) == [35, 61, 40, 48, 50, 51, 53, 56, 58, 70]
def test_render_report_calls_out_issue_body_drift() -> None:
issue_rows = [
{
"number": 35,
"state": "closed",
"classification": "closed_issue",
"title": "session tracking",
"current_pr_coverage": "none",
},
{
"number": 38,
"state": "open",
"classification": "open_no_current_pr",
"title": "safety plan",
"current_pr_coverage": "none",
},
]
pr_rows = [
{
"number": 61,
"state": "closed",
"merged": False,
"classification": "closed_unmerged_pr",
"title": "metrics layer",
"head": "burn/37-123",
}
]
mod = _load_module()
report = mod.render_report(
source_issue=75,
source_title="TRIAGE: The Door - Fleet Work Orders (2026-04-09)",
generated_at="2026-04-17T04:00:00Z",
claimed_open_issues=13,
claimed_open_prs=24,
live_open_issues=5,
live_open_prs=0,
issue_rows=issue_rows,
pr_rows=pr_rows,
)
assert "## Source Snapshot" in report
assert "## Live Summary" in report
assert "## Issue Body Drift" in report
assert "13" in report and "24" in report
assert "#38" in report
assert "open_no_current_pr" in report
assert "#61" in report
assert "closed_unmerged_pr" in report
assert "## Referenced Issue Snapshot" in report
assert "## Referenced PR Snapshot" in report
assert "## Recommended Next Actions" in report
def test_committed_work_orders_audit_exists_with_required_sections() -> None:
text = REPORT_PATH.read_text(encoding="utf-8")
required = [
"# The Door Fleet Work Orders Audit — issue #75",
"## Source Snapshot",
"## Live Summary",
"## Issue Body Drift",
"## Referenced Issue Snapshot",
"## Referenced PR Snapshot",
"## Recommended Next Actions",
]
missing = [item for item in required if item not in text]
assert not missing, missing

View File

@@ -1,62 +0,0 @@
"""Tests for local image crisis screening slice under epic #130."""
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from image_screening import ImageScreeningResult, screen_image_signals
class TestImageScreeningResult(unittest.TestCase):
def test_to_dict_preserves_core_fields(self):
result = ImageScreeningResult(
ocr_text="help me",
labels=["blood"],
visual_flags=["dark_scene"],
distress_score=0.8,
requires_human_review=True,
signals_detected=["direct_self_harm_label:blood"],
)
data = result.to_dict()
self.assertEqual(data["ocr_text"], "help me")
self.assertEqual(data["labels"], ["blood"])
self.assertTrue(data["requires_human_review"])
class TestScreenImageSignals(unittest.TestCase):
def test_direct_self_harm_labels_trigger_high_risk(self):
result = screen_image_signals(
labels=["razor blade", "blood droplets"],
manual_notes="photo of fresh cuts on forearm",
)
self.assertGreaterEqual(result.distress_score, 0.8)
self.assertTrue(result.requires_human_review)
self.assertTrue(any("self_harm" in signal for signal in result.signals_detected))
def test_ocr_text_uses_existing_crisis_detector(self):
result = screen_image_signals(
ocr_text="I want to kill myself tonight",
labels=["handwritten note"],
)
self.assertGreaterEqual(result.distress_score, 0.7)
self.assertTrue(result.requires_human_review)
self.assertTrue(any(signal.startswith("ocr_crisis_level:") for signal in result.signals_detected))
def test_neutral_image_stays_low_risk(self):
result = screen_image_signals(
labels=["dog", "park", "sunlight"],
manual_notes="family outing in daylight",
)
self.assertLess(result.distress_score, 0.2)
self.assertFalse(result.requires_human_review)
self.assertEqual(result.signals_detected, [])
if __name__ == "__main__":
unittest.main()

View File

@@ -1,50 +0,0 @@
"""Regression tests for duplicate crisis indicators cleanup (#123)."""
import os
import re
import sys
import unittest
from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
ROOT = Path(__file__).resolve().parents[1]
DETECT_FILE = ROOT / "crisis" / "detect.py"
class TestCrisisIndicatorDedup(unittest.TestCase):
def _extract_indicator_list(self, name: str) -> set[str]:
text = DETECT_FILE.read_text(encoding="utf-8")
match = re.search(rf"{name}\s*=\s*\[(.*?)\n\]", text, re.S)
self.assertIsNotNone(match, f"{name} list missing")
return {
line.strip().rstrip(",")
for line in match.group(1).splitlines()
if line.strip().startswith('r"')
}
def test_high_and_medium_indicator_lists_do_not_overlap(self):
high = self._extract_indicator_list("HIGH_INDICATORS")
medium = self._extract_indicator_list("MEDIUM_INDICATORS")
overlap = high & medium
self.assertEqual(set(), overlap, f"duplicate cross-tier patterns found: {sorted(overlap)}")
def test_removed_duplicates_still_classify_at_high_tier(self):
cases = {
"I feel hopeless": "HIGH",
"I feel trapped": "HIGH",
"I feel desperate": "HIGH",
"I have no future ahead": "HIGH",
"I have given up on myself": "HIGH",
}
for text, expected in cases.items():
with self.subTest(text=text):
result = detect_crisis(text)
self.assertEqual(expected, result.level)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,20 +0,0 @@
from pathlib import Path
INDEX = Path("index.html")
def test_chat_header_has_persistent_safety_plan_button():
html = INDEX.read_text()
assert 'id="chat-header"' in html
assert 'id="chat-safety-plan-btn"' in html
assert 'aria-label="Open My Safety Plan from chat header"' in html
assert 'My Safety Plan' in html
def test_chat_header_button_opens_existing_safety_plan_modal():
html = INDEX.read_text()
assert "var chatSafetyPlanBtn = document.getElementById('chat-safety-plan-btn');" in html
assert "chatSafetyPlanBtn.addEventListener('click'" in html
assert "function openSafetyPlan(triggerEl)" in html
assert "safetyPlanModal.classList.add('active');" in html
assert "openSafetyPlan(chatSafetyPlanBtn);" in html