Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
15c31a56a7 feat: wire crisis synthesizer into responder pipeline for #121
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 10s
Smoke Test / smoke (pull_request) Successful in 19s
2026-04-22 02:00:18 -04:00
Alexander Whitestone
57cfbb3940 test: define crisis synthesizer integration for #121 2026-04-22 02:00:06 -04:00
8 changed files with 312 additions and 310 deletions

View File

@@ -1,134 +0,0 @@
"""
Intake submission handler for the-door.
Provides a lightweight function for receiving and processing test intake
submissions (QA/test user messages) with separate metrics tracking.
Usage:
from crisis.intake import handle_intake_submission
result = handle_intake_submission("This is a test message", test=True)
"""
from dataclasses import dataclass
from typing import Optional
from pathlib import Path
from .gateway import check_crisis
from .metrics import CrisisMetrics, SessionMetrics
from .session_tracker import CrisisSessionTracker
@dataclass
class IntakeResult:
"""Result from processing an intake submission."""
crisis_level: str
score: float
timmy_message: str
show_overlay: bool
provide_988: bool
is_test: bool
session_id: Optional[str] = None
# Separate metrics namespace for test intake tracking
_INTAKE_METRICS_DIR = Path.home() / ".the-door" / "intake-metrics"
def handle_intake_submission(
message: str,
test: bool = True,
session_id: Optional[str] = None,
) -> IntakeResult:
"""
Handle an intake submission from a user (including test users).
Args:
message: The user's message text.
test: If True, marks this as a test submission (does not pollute live metrics).
session_id: Optional session identifier for correlation.
Returns:
IntakeResult with crisis assessment and response data.
"""
# Run crisis detection
crisis_result = check_crisis(message)
# For test submissions, optionally log to separate test metrics
if test:
_log_test_intake(crisis_result, message)
return IntakeResult(
crisis_level=crisis_result["level"],
score=crisis_result["score"],
timmy_message=crisis_result["timmy_message"],
show_overlay=crisis_result["ui"]["show_overlay"],
provide_988=crisis_result["ui"]["provide_988"],
is_test=test,
session_id=session_id,
)
def _log_test_intake(detection, message: str) -> None:
"""
Log test intake submissions to a separate metrics file.
These do not mix with production crisis statistics.
Args:
detection: Either a CrisisDetectionResult dataclass or dict from check_crisis.
message: The original user message.
"""
import time
import json
# Accept both dataclass and dict
if hasattr(detection, '__dataclass_fields__'):
level = detection.level
score = detection.score
indicators = detection.indicators
else:
level = detection.get("level")
score = detection.get("score")
indicators = detection.get("indicators", [])
_INTAKE_METRICS_DIR.mkdir(parents=True, exist_ok=True)
today = time.strftime("%Y-%m-%d")
filepath = _INTAKE_METRICS_DIR / f"test-intake-{today}.jsonl"
record = {
"timestamp": time.time(),
"level": level,
"score": score,
"message_preview": message[:80],
"indicators": indicators,
}
with open(filepath, "a") as f:
f.write(json.dumps(record) + "\n")
# ── Quick test interface ────────────────────────────────────────────
def _interactive():
"""Run an interactive test intake session."""
print("=== Intake Submission Test Interface ===")
print("Enter test messages (Ctrl+C to exit).\n")
try:
while True:
msg = input("test-user> ").strip()
if not msg:
continue
result = handle_intake_submission(msg, test=True)
print(f"\n Level : {result.crisis_level}")
print(f" Score : {result.score}")
print(f" Timmy : {result.timmy_message}")
print(f" Overlay: {result.show_overlay}")
print()
except (EOFError, KeyboardInterrupt):
print("\nDone.")
if __name__ == "__main__":
_interactive()

View File

@@ -12,18 +12,6 @@ from typing import Optional
from .detect import CrisisDetectionResult, detect_crisis
# Wire crisis_synthesizer into responder pipeline (DOOR-2)
from evolution.crisis_synthesizer import append_interaction_event, DEFAULT_LOG_PATH
# Response type labels for synthesizer logging
LEVEL_TO_RESPONSE = {
"CRITICAL": "guardian",
"HIGH": "companion",
"MEDIUM": "witness",
"LOW": "friend",
"NONE": "friend",
}
@dataclass
class CrisisResponse:
@@ -146,15 +134,6 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
level = detection.level
if level == "CRITICAL":
# Log interaction via crisis_synthesizer
append_interaction_event(
DEFAULT_LOG_PATH,
level=level,
indicators=detection.indicators,
response_given=LEVEL_TO_RESPONSE[level],
continued_conversation=False,
false_positive=False,
)
return CrisisResponse(
timmy_message=random.choice(TIMMY_CRITICAL),
show_crisis_panel=True,
@@ -164,15 +143,6 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
)
if level == "HIGH":
# Log interaction via crisis_synthesizer
append_interaction_event(
DEFAULT_LOG_PATH,
level=level,
indicators=detection.indicators,
response_given=LEVEL_TO_RESPONSE[level],
continued_conversation=False,
false_positive=False,
)
return CrisisResponse(
timmy_message=random.choice(TIMMY_HIGH),
show_crisis_panel=True,
@@ -182,15 +152,6 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
)
if level == "MEDIUM":
# Log interaction via crisis_synthesizer
append_interaction_event(
DEFAULT_LOG_PATH,
level=level,
indicators=detection.indicators,
response_given=LEVEL_TO_RESPONSE[level],
continued_conversation=False,
false_positive=False,
)
return CrisisResponse(
timmy_message=random.choice(TIMMY_MEDIUM),
show_crisis_panel=False,
@@ -200,15 +161,6 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
)
if level == "LOW":
# Log interaction via crisis_synthesizer
append_interaction_event(
DEFAULT_LOG_PATH,
level=level,
indicators=detection.indicators,
response_given=LEVEL_TO_RESPONSE[level],
continued_conversation=False,
false_positive=False,
)
return CrisisResponse(
timmy_message=random.choice(TIMMY_LOW),
show_crisis_panel=False,
@@ -218,15 +170,6 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
)
# Normal conversation - no crisis response
# Still log for completeness (NONE level)
append_interaction_event(
DEFAULT_LOG_PATH,
level=level,
indicators=detection.indicators,
response_given=LEVEL_TO_RESPONSE[level],
continued_conversation=False,
false_positive=False,
)
return CrisisResponse(
timmy_message="",
show_crisis_panel=False,

View File

@@ -0,0 +1,139 @@
"""Crisis synthesizer integration for the responder pipeline.
Privacy-first and opt-in. Writes append-only JSONL events that feed the
existing crisis_synthesizer reporting flow.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Optional
from evolution.crisis_synthesizer import DEFAULT_LOG_PATH, append_interaction_event
logger = logging.getLogger(__name__)
SYNTH_ENABLED = os.environ.get("CRISIS_SYNTH_ENABLED", "0") == "1"
SYNTH_LOG_PATH = Path(os.environ.get("CRISIS_SYNTH_LOG_PATH", str(DEFAULT_LOG_PATH)))
class CrisisSynthesizerIntegration:
"""Append-only bridge between responder events and the synthesizer log."""
def __init__(
self,
enabled: Optional[bool] = None,
log_dir: Optional[Path] = None,
log_path: Optional[Path] = None,
):
self.enabled = SYNTH_ENABLED if enabled is None else bool(enabled)
if log_path is not None:
self.log_file = Path(log_path)
elif log_dir is not None:
self.log_file = Path(log_dir) / "events.jsonl"
else:
self.log_file = SYNTH_LOG_PATH
self.log_file.parent.mkdir(parents=True, exist_ok=True)
try:
os.chmod(self.log_file.parent, 0o700)
except OSError:
pass
def log_crisis_event(
self,
*,
level: str,
matched_keywords: list[str],
response_type: str,
session_id: str = "",
user_continued: bool = False,
metadata: Optional[dict[str, Any]] = None,
) -> bool:
if not self.enabled:
return False
try:
append_interaction_event(
self.log_file,
level=level,
indicators=list(matched_keywords),
response_given=response_type,
continued_conversation=user_continued,
false_positive=False,
session_hash=self._hash_session(session_id) if session_id else "",
response_type=response_type,
metadata=metadata or {},
)
self._lock_down_log_file()
return True
except Exception as exc:
logger.warning("Failed to log crisis synthesizer event: %s", exc)
return False
def log_user_continued(self, session_id: str) -> bool:
if not self.enabled or not session_id or not self.log_file.exists():
return False
session_hash = self._hash_session(session_id)
saw_prior_crisis = False
try:
with self.log_file.open("r", encoding="utf-8") as handle:
for raw in handle:
raw = raw.strip()
if not raw:
continue
event = json.loads(raw)
if (
event.get("event_type", "crisis_interaction") == "crisis_interaction"
and event.get("session_hash") == session_hash
):
saw_prior_crisis = True
if not saw_prior_crisis:
return False
continuation = {
"event_type": "continuation",
"timestamp": float(time.time()),
"session_hash": session_hash,
"user_continued": True,
}
with self.log_file.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(continuation) + "\n")
self._lock_down_log_file()
return True
except Exception as exc:
logger.warning("Failed to append crisis continuation marker: %s", exc)
return False
@staticmethod
def _hash_session(session_id: str) -> str:
return hashlib.sha256(session_id.encode("utf-8")).hexdigest()[:16]
def _lock_down_log_file(self) -> None:
try:
os.chmod(self.log_file, 0o600)
except OSError:
pass
_integration: Optional[CrisisSynthesizerIntegration] = None
def get_integration() -> CrisisSynthesizerIntegration:
global _integration
if _integration is None:
_integration = CrisisSynthesizerIntegration()
return _integration
def log_crisis_event(**kwargs: Any) -> bool:
return get_integration().log_crisis_event(**kwargs)
def log_user_continued(session_id: str) -> bool:
return get_integration().log_user_continued(session_id)

View File

@@ -1,110 +0,0 @@
"""
Tests for crisis.intake — intake submission handling for test users.
Run with: python -m pytest crisis/tests_intake.py -v
or: python -m crisis.intake
"""
import unittest
import sys
import os
# Ensure project root is on path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.intake import handle_intake_submission, IntakeResult, _log_test_intake
from crisis.detect import detect_crisis
import json
import tempfile
from pathlib import Path
class TestIntakeSubmission(unittest.TestCase):
"""Test intake submission handling."""
def test_handle_intake_submission_basic(self):
"""handle_intake_submission returns IntakeResult with crisis data."""
result = handle_intake_submission("This is a test message", test=True)
self.assertIsInstance(result, IntakeResult)
self.assertEqual(result.is_test, True)
self.assertIn(result.crisis_level, ("NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"))
self.assertIsInstance(result.score, float)
self.assertIsInstance(result.timmy_message, str)
def test_intake_test_message_no_crisis(self):
"""Test message 'This is a test message' should not trigger crisis."""
result = handle_intake_submission("This is a test message", test=True)
self.assertEqual(result.crisis_level, "NONE")
self.assertEqual(result.score, 0.0)
self.assertEqual(result.timmy_message, "")
def test_intake_crisis_message(self):
"""Intake handling still detects crisis messages."""
result = handle_intake_submission("I want to kill myself", test=True)
self.assertEqual(result.crisis_level, "CRITICAL")
self.assertEqual(result.score, 1.0)
self.assertTrue(result.show_overlay)
self.assertTrue(result.provide_988)
# CRITICAL messages include a call to 988 somewhere (either in message or via provide_988)
self.assertTrue("988" in result.timmy_message or result.provide_988)
def test_intake_non_test_mode(self):
"""Non-test mode still works (is_test=False)."""
result = handle_intake_submission("Hello there", test=False)
self.assertEqual(result.is_test, False)
self.assertEqual(result.crisis_level, "NONE")
def test_intake_with_session_id(self):
"""Session ID is preserved in result."""
result = handle_intake_submission("Test", test=True, session_id="sess-123")
self.assertEqual(result.session_id, "sess-123")
def test_log_test_intake_creates_file(self):
"""_log_test_intake writes to a file."""
with tempfile.TemporaryDirectory() as tmpdir:
# Patch the metrics dir
import crisis.intake as intake_mod
original_dir = intake_mod._INTAKE_METRICS_DIR
try:
intake_mod._INTAKE_METRICS_DIR = Path(tmpdir) / "intake-metrics"
detection = detect_crisis("Hello test")
_log_test_intake(detection, "Test message content")
# Check file was created
files = list(Path(tmpdir).glob("**/*.jsonl"))
self.assertTrue(len(files) > 0)
# Verify content
with open(files[0]) as f:
line = f.readline()
record = json.loads(line)
self.assertIn("timestamp", record)
self.assertIn("level", record)
self.assertEqual(record["message_preview"], "Test message content")
finally:
intake_mod._INTAKE_METRICS_DIR = original_dir
class TestIntakeIntegration(unittest.TestCase):
"""Integration tests for intake with crisis system."""
def test_intake_end_to_end(self):
"""Full intake flow for test message."""
result = handle_intake_submission("This is a test message", test=True)
self.assertEqual(result.crisis_level, "NONE")
self.assertFalse(result.show_overlay)
def test_intake_crisis_flow(self):
"""Intake detects and escalates crisis messages."""
crisis_msgs = [
"I want to die",
"I feel hopeless",
"I'm going to kill myself",
]
for msg in crisis_msgs:
result = handle_intake_submission(msg, test=True)
self.assertIn(result.crisis_level, ("HIGH", "CRITICAL"),
f"Message '{msg}' should be crisis")
self.assertTrue(result.provide_988)
if __name__ == "__main__":
unittest.main()

View File

@@ -21,10 +21,13 @@ Usage:
"""
import random
import threading
import uuid
from dataclasses import dataclass, field
from typing import List, Optional
from crisis_detector import CrisisResult
from crisis.synthesizer_integration import get_integration
# ── Core values (from SOUL.md / system-prompt.txt) ──────────────
@@ -137,6 +140,26 @@ class CrisisResponder:
- Presence over brevity — stay as long as they need
"""
def __init__(
self,
synth_integration=None,
session_id: str = "",
async_synth_logging: bool = True,
):
self._synth = synth_integration if synth_integration is not None else get_integration()
self._session_id = session_id or uuid.uuid4().hex
self._async_synth_logging = async_synth_logging
self._awaiting_continuation = False
def _run_synth_task(self, fn, *args, **kwargs):
if not self._synth or not getattr(self._synth, "enabled", False):
return
if self._async_synth_logging:
thread = threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=True)
thread.start()
else:
fn(*args, **kwargs)
def respond(self, detection: CrisisResult) -> CrisisResponse:
"""
Generate a crisis response based on detection results.
@@ -150,20 +173,40 @@ class CrisisResponder:
level = detection.risk_level
if level == "CRITICAL":
return self._respond_critical(detection)
response = self._respond_critical(detection)
elif level == "HIGH":
return self._respond_high(detection)
response = self._respond_high(detection)
elif level == "MODERATE":
return self._respond_moderate(detection)
response = self._respond_moderate(detection)
elif level == "LOW":
return self._respond_low(detection)
response = self._respond_low(detection)
else:
return CrisisResponse(
response = CrisisResponse(
message="",
risk_level="NONE",
escalate=False,
)
if level != "NONE":
self._run_synth_task(
self._synth.log_crisis_event,
level=level,
matched_keywords=list(detection.matched_keywords),
response_type=response.risk_level,
session_id=self._session_id,
user_continued=False,
metadata={
"keyword_count": len(detection.matched_keywords),
"score": detection.score,
},
)
self._awaiting_continuation = True
elif self._awaiting_continuation:
self._run_synth_task(self._synth.log_user_continued, self._session_id)
self._awaiting_continuation = False
return response
def _respond_critical(self, detection: CrisisResult) -> CrisisResponse:
"""
CRITICAL response protocol:

1
evolution/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Evolution package for learning-oriented the-door modules."""

View File

@@ -26,15 +26,25 @@ def build_interaction_event(
false_positive: bool,
*,
now: float | None = None,
session_hash: str = "",
response_type: str | None = None,
metadata: dict | None = None,
) -> dict:
indicator_list = list(indicators)
return {
"event_type": "crisis_interaction",
"timestamp": float(time.time() if now is None else now),
"level": level,
"indicators": list(indicators),
"indicator_count": len(indicators),
"indicators": indicator_list,
"matched_keywords": indicator_list,
"indicator_count": len(indicator_list),
"response_given": response_given,
"response_type": response_type or response_given,
"session_hash": session_hash,
"continued_conversation": bool(continued_conversation),
"user_continued": bool(continued_conversation),
"false_positive": bool(false_positive),
"metadata": metadata or {},
}
@@ -47,6 +57,9 @@ def append_interaction_event(
continued_conversation: bool,
false_positive: bool,
now: float | None = None,
session_hash: str = "",
response_type: str | None = None,
metadata: dict | None = None,
) -> dict:
event = build_interaction_event(
level,
@@ -55,6 +68,9 @@ def append_interaction_event(
continued_conversation,
false_positive,
now=now,
session_hash=session_hash,
response_type=response_type,
metadata=metadata,
)
path = Path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
@@ -67,11 +83,25 @@ def load_interaction_events(log_path: str | Path) -> list[dict]:
path = Path(log_path)
if not path.exists():
return []
events = []
events: list[dict] = []
for line in path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
events.append(json.loads(line))
event = json.loads(line)
event_type = event.get("event_type", "crisis_interaction")
if event_type == "continuation":
session_hash = event.get("session_hash", "")
for prior in reversed(events):
if (
prior.get("event_type", "crisis_interaction") == "crisis_interaction"
and prior.get("session_hash", "") == session_hash
):
prior["continued_conversation"] = True
prior["user_continued"] = True
prior["continued_at"] = event.get("timestamp")
break
continue
events.append(event)
return events

View File

@@ -0,0 +1,90 @@
"""Regression tests for crisis_synthesizer integration (issue #121)."""
from __future__ import annotations
import json
import os
from unittest.mock import Mock, patch
from crisis_detector import CrisisResult
from crisis_responder import CrisisResponder
from crisis.synthesizer_integration import CrisisSynthesizerIntegration
from evolution.crisis_synthesizer import load_interaction_events
def _make_detection(level: str, keywords: list[str]) -> CrisisResult:
return CrisisResult(risk_level=level, matched_keywords=keywords, context=[], score=0.9)
def test_responder_auto_logs_anonymized_event(tmp_path):
integration = CrisisSynthesizerIntegration(enabled=True, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
detection = _make_detection("HIGH", ["hopeless", "can't go on"])
response = responder.respond(detection)
log_path = tmp_path / "events.jsonl"
lines = log_path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
event = json.loads(lines[0])
assert event["level"] == "HIGH"
assert event["matched_keywords"] == ["hopeless", "can't go on"]
assert event["response_type"] == response.risk_level
assert isinstance(event["timestamp"], float)
assert event["user_continued"] is False
assert event["session_hash"]
assert "message" not in event
assert "session_id" not in event
assert log_path.stat().st_mode & 0o777 == 0o600
def test_next_non_crisis_message_marks_user_continued_append_only(tmp_path):
integration = CrisisSynthesizerIntegration(enabled=True, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
responder.respond(_make_detection("CRITICAL", ["want to die"]))
log_path = tmp_path / "events.jsonl"
before_size = log_path.stat().st_size
responder.respond(_make_detection("NONE", []))
after_size = log_path.stat().st_size
assert after_size > before_size
raw_lines = log_path.read_text(encoding="utf-8").splitlines()
assert len(raw_lines) == 2
continuation = json.loads(raw_lines[1])
assert continuation["event_type"] == "continuation"
assert continuation["user_continued"] is True
folded_events = load_interaction_events(log_path)
assert len(folded_events) == 1
assert folded_events[0]["continued_conversation"] is True
assert folded_events[0]["user_continued"] is True
def test_env_var_can_disable_logging_entirely(tmp_path):
with patch.dict(os.environ, {"CRISIS_SYNTH_ENABLED": "0"}, clear=False):
integration = CrisisSynthesizerIntegration(enabled=None, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
responder.respond(_make_detection("HIGH", ["hopeless"]))
assert not (tmp_path / "events.jsonl").exists()
@patch("crisis_responder.threading.Thread")
def test_async_logging_dispatches_to_background_thread(thread_cls):
integration = Mock()
integration.enabled = True
integration.log_crisis_event = Mock()
integration.log_user_continued = Mock()
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=True)
responder.respond(_make_detection("HIGH", ["hopeless"]))
thread_cls.assert_called_once()
_, kwargs = thread_cls.call_args
assert kwargs["daemon"] is True
thread_cls.return_value.start.assert_called_once()