Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
207037eaf9 | ||
|
|
3b60afc5c7 | ||
| f30b0d962d | |||
| 26dc58faa7 | |||
|
|
572f14eda5 | ||
|
|
58fdd9cfef |
@@ -4,11 +4,12 @@ Crisis detection and response system for the-door.
|
||||
Stands between a broken man and a machine that would tell him to die.
|
||||
"""
|
||||
|
||||
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
|
||||
from .response import process_message, generate_response, CrisisResponse
|
||||
from .gateway import check_crisis, get_system_prompt, format_gateway_response
|
||||
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
|
||||
from .metrics import CrisisMetrics, AggregateMetrics
|
||||
from .intake import (
|
||||
handle_intake_submission,
|
||||
IntakeResult,
|
||||
create_intake_issue,
|
||||
close_intake_issue,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"detect_crisis",
|
||||
@@ -26,4 +27,8 @@ __all__ = [
|
||||
"check_crisis_with_session",
|
||||
"CrisisMetrics",
|
||||
"AggregateMetrics",
|
||||
"handle_intake_submission",
|
||||
"IntakeResult",
|
||||
"create_intake_issue",
|
||||
"close_intake_issue",
|
||||
]
|
||||
|
||||
258
crisis/intake.py
Normal file
258
crisis/intake.py
Normal file
@@ -0,0 +1,258 @@
|
||||
"""
|
||||
Intake submission handler for the-door.
|
||||
|
||||
Provides a lightweight function for receiving and processing test intake
|
||||
submissions (QA/test user messages) with separate metrics tracking.
|
||||
|
||||
Also provides Gitea issue integration for tracking intake submissions
|
||||
and their processing state.
|
||||
|
||||
Usage:
|
||||
from crisis.intake import handle_intake_submission, close_intake_issue
|
||||
|
||||
result = handle_intake_submission("This is a test message", test=True)
|
||||
# Later, mark the intake issue as processed
|
||||
close_intake_issue(200, "Processed and logged to test-intake metrics.")
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
from pathlib import Path
|
||||
|
||||
from .gateway import check_crisis
|
||||
from .metrics import CrisisMetrics, SessionMetrics
|
||||
from .session_tracker import CrisisSessionTracker
|
||||
|
||||
|
||||
@dataclass
|
||||
class IntakeResult:
|
||||
"""Result from processing an intake submission."""
|
||||
crisis_level: str
|
||||
score: float
|
||||
timmy_message: str
|
||||
show_overlay: bool
|
||||
provide_988: bool
|
||||
is_test: bool
|
||||
session_id: Optional[str] = None
|
||||
|
||||
|
||||
# Separate metrics namespace for test intake tracking
|
||||
_INTAKE_METRICS_DIR = Path.home() / ".the-door" / "intake-metrics"
|
||||
|
||||
|
||||
def handle_intake_submission(
|
||||
message: str,
|
||||
test: bool = True,
|
||||
session_id: Optional[str] = None,
|
||||
) -> IntakeResult:
|
||||
"""
|
||||
Handle an intake submission from a user (including test users).
|
||||
|
||||
Args:
|
||||
message: The user's message text.
|
||||
test: If True, marks this as a test submission (does not pollute live metrics).
|
||||
session_id: Optional session identifier for correlation.
|
||||
|
||||
Returns:
|
||||
IntakeResult with crisis assessment and response data.
|
||||
"""
|
||||
# Run crisis detection
|
||||
crisis_result = check_crisis(message)
|
||||
|
||||
# For test submissions, optionally log to separate test metrics
|
||||
if test:
|
||||
_log_test_intake(crisis_result, message)
|
||||
|
||||
return IntakeResult(
|
||||
crisis_level=crisis_result["level"],
|
||||
score=crisis_result["score"],
|
||||
timmy_message=crisis_result["timmy_message"],
|
||||
show_overlay=crisis_result["ui"]["show_overlay"],
|
||||
provide_988=crisis_result["ui"]["provide_988"],
|
||||
is_test=test,
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
|
||||
def _log_test_intake(detection, message: str) -> None:
|
||||
"""
|
||||
Log test intake submissions to a separate metrics file.
|
||||
|
||||
These do not mix with production crisis statistics.
|
||||
|
||||
Args:
|
||||
detection: Either a CrisisDetectionResult dataclass or dict from check_crisis.
|
||||
message: The original user message.
|
||||
"""
|
||||
import time
|
||||
import json
|
||||
|
||||
# Accept both dataclass and dict
|
||||
if hasattr(detection, '__dataclass_fields__'):
|
||||
level = detection.level
|
||||
score = detection.score
|
||||
indicators = detection.indicators
|
||||
else:
|
||||
level = detection.get("level")
|
||||
score = detection.get("score")
|
||||
indicators = detection.get("indicators", [])
|
||||
|
||||
_INTAKE_METRICS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
today = time.strftime("%Y-%m-%d")
|
||||
filepath = _INTAKE_METRICS_DIR / f"test-intake-{today}.jsonl"
|
||||
|
||||
record = {
|
||||
"timestamp": time.time(),
|
||||
"level": level,
|
||||
"score": score,
|
||||
"message_preview": message[:80],
|
||||
"indicators": indicators,
|
||||
}
|
||||
|
||||
with open(filepath, "a") as f:
|
||||
f.write(json.dumps(record) + "\n")
|
||||
|
||||
|
||||
def create_intake_issue(message: str, user: str = "Test User") -> dict:
|
||||
"""
|
||||
Create a Gitea issue to track an intake submission.
|
||||
|
||||
Args:
|
||||
message: The user's message text.
|
||||
user: The name to attribute the intake to.
|
||||
|
||||
Returns:
|
||||
dict: The API response with the created issue (contains 'number').
|
||||
|
||||
Requires GITEA_TOKEN environment variable or ~/.config/gitea/token file.
|
||||
"""
|
||||
import os
|
||||
import urllib.request
|
||||
|
||||
# Get token
|
||||
token = os.environ.get("GITEA_TOKEN")
|
||||
if not token:
|
||||
token_file = os.path.expanduser("~/.config/gitea/token")
|
||||
if os.path.exists(token_file):
|
||||
with open(token_file) as f:
|
||||
token = f.read().strip()
|
||||
|
||||
if not token:
|
||||
raise ValueError("GITEA_TOKEN not set and ~/.config/gitea/token not found")
|
||||
|
||||
url = (
|
||||
"https://forge.alexanderwhitestone.com/api/v1/"
|
||||
"repos/Timmy_Foundation/the-door/issues"
|
||||
)
|
||||
body = f"**Message:** {message}"
|
||||
data = json.dumps({
|
||||
"title": f"Intake submission from {user}",
|
||||
"body": body,
|
||||
}).encode()
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
data=data,
|
||||
headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
result = json.loads(resp.read())
|
||||
return result
|
||||
|
||||
|
||||
def close_intake_issue(issue_number: int, comment: str = "Processed.") -> dict:
|
||||
"""
|
||||
Close a Gitea intake issue and add a comment.
|
||||
|
||||
Args:
|
||||
issue_number: The Gitea issue number (e.g., 200 for #200)
|
||||
comment: Optional comment to add before closing.
|
||||
|
||||
Returns:
|
||||
dict: The API response from closing the issue.
|
||||
|
||||
Requires GITEA_TOKEN environment variable or ~/.config/gitea/token file.
|
||||
"""
|
||||
import os
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
|
||||
# Get token
|
||||
token = os.environ.get("GITEA_TOKEN")
|
||||
if not token:
|
||||
token_file = os.path.expanduser("~/.config/gitea/token")
|
||||
if os.path.exists(token_file):
|
||||
with open(token_file) as f:
|
||||
token = f.read().strip()
|
||||
|
||||
if not token:
|
||||
raise ValueError("GITEA_TOKEN not set and ~/.config/gitea/token not found")
|
||||
|
||||
# Add comment first
|
||||
if comment:
|
||||
comment_url = (
|
||||
f"https://forge.alexanderwhitestone.com/api/v1/"
|
||||
f"repos/Timmy_Foundation/the-door/issues/{issue_number}/comments"
|
||||
)
|
||||
comment_data = json.dumps({"body": comment}).encode()
|
||||
req = urllib.request.Request(
|
||||
comment_url,
|
||||
data=comment_data,
|
||||
headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
pass # Comment added
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to add comment: {e}")
|
||||
|
||||
# Close the issue
|
||||
close_url = (
|
||||
f"https://forge.alexanderwhitestone.com/api/v1/"
|
||||
f"repos/Timmy_Foundation/the-door/issues/{issue_number}"
|
||||
)
|
||||
close_data = json.dumps({"state": "closed"}).encode()
|
||||
req = urllib.request.Request(
|
||||
close_url,
|
||||
data=close_data,
|
||||
headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
method="PATCH",
|
||||
)
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
result = json.loads(resp.read())
|
||||
return result
|
||||
|
||||
|
||||
# ── Quick test interface ────────────────────────────────────────────
|
||||
|
||||
def _interactive():
|
||||
"""Run an interactive test intake session."""
|
||||
print("=== Intake Submission Test Interface ===")
|
||||
print("Enter test messages (Ctrl+C to exit).\n")
|
||||
|
||||
try:
|
||||
while True:
|
||||
msg = input("test-user> ").strip()
|
||||
if not msg:
|
||||
continue
|
||||
result = handle_intake_submission(msg, test=True)
|
||||
print(f"\n Level : {result.crisis_level}")
|
||||
print(f" Score : {result.score}")
|
||||
print(f" Timmy : {result.timmy_message}")
|
||||
print(f" Overlay: {result.show_overlay}")
|
||||
print()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print("\nDone.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
_interactive()
|
||||
@@ -12,6 +12,18 @@ from typing import Optional
|
||||
|
||||
from .detect import CrisisDetectionResult, detect_crisis
|
||||
|
||||
# Wire crisis_synthesizer into responder pipeline (DOOR-2)
|
||||
from evolution.crisis_synthesizer import append_interaction_event, DEFAULT_LOG_PATH
|
||||
|
||||
# Response type labels for synthesizer logging
|
||||
LEVEL_TO_RESPONSE = {
|
||||
"CRITICAL": "guardian",
|
||||
"HIGH": "companion",
|
||||
"MEDIUM": "witness",
|
||||
"LOW": "friend",
|
||||
"NONE": "friend",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisResponse:
|
||||
@@ -134,6 +146,15 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
|
||||
level = detection.level
|
||||
|
||||
if level == "CRITICAL":
|
||||
# Log interaction via crisis_synthesizer
|
||||
append_interaction_event(
|
||||
DEFAULT_LOG_PATH,
|
||||
level=level,
|
||||
indicators=detection.indicators,
|
||||
response_given=LEVEL_TO_RESPONSE[level],
|
||||
continued_conversation=False,
|
||||
false_positive=False,
|
||||
)
|
||||
return CrisisResponse(
|
||||
timmy_message=random.choice(TIMMY_CRITICAL),
|
||||
show_crisis_panel=True,
|
||||
@@ -143,6 +164,15 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
|
||||
)
|
||||
|
||||
if level == "HIGH":
|
||||
# Log interaction via crisis_synthesizer
|
||||
append_interaction_event(
|
||||
DEFAULT_LOG_PATH,
|
||||
level=level,
|
||||
indicators=detection.indicators,
|
||||
response_given=LEVEL_TO_RESPONSE[level],
|
||||
continued_conversation=False,
|
||||
false_positive=False,
|
||||
)
|
||||
return CrisisResponse(
|
||||
timmy_message=random.choice(TIMMY_HIGH),
|
||||
show_crisis_panel=True,
|
||||
@@ -152,6 +182,15 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
|
||||
)
|
||||
|
||||
if level == "MEDIUM":
|
||||
# Log interaction via crisis_synthesizer
|
||||
append_interaction_event(
|
||||
DEFAULT_LOG_PATH,
|
||||
level=level,
|
||||
indicators=detection.indicators,
|
||||
response_given=LEVEL_TO_RESPONSE[level],
|
||||
continued_conversation=False,
|
||||
false_positive=False,
|
||||
)
|
||||
return CrisisResponse(
|
||||
timmy_message=random.choice(TIMMY_MEDIUM),
|
||||
show_crisis_panel=False,
|
||||
@@ -161,6 +200,15 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
|
||||
)
|
||||
|
||||
if level == "LOW":
|
||||
# Log interaction via crisis_synthesizer
|
||||
append_interaction_event(
|
||||
DEFAULT_LOG_PATH,
|
||||
level=level,
|
||||
indicators=detection.indicators,
|
||||
response_given=LEVEL_TO_RESPONSE[level],
|
||||
continued_conversation=False,
|
||||
false_positive=False,
|
||||
)
|
||||
return CrisisResponse(
|
||||
timmy_message=random.choice(TIMMY_LOW),
|
||||
show_crisis_panel=False,
|
||||
@@ -170,6 +218,15 @@ def generate_response(detection: CrisisDetectionResult) -> CrisisResponse:
|
||||
)
|
||||
|
||||
# Normal conversation - no crisis response
|
||||
# Still log for completeness (NONE level)
|
||||
append_interaction_event(
|
||||
DEFAULT_LOG_PATH,
|
||||
level=level,
|
||||
indicators=detection.indicators,
|
||||
response_given=LEVEL_TO_RESPONSE[level],
|
||||
continued_conversation=False,
|
||||
false_positive=False,
|
||||
)
|
||||
return CrisisResponse(
|
||||
timmy_message="",
|
||||
show_crisis_panel=False,
|
||||
|
||||
@@ -1,139 +0,0 @@
|
||||
"""Crisis synthesizer integration for the responder pipeline.
|
||||
|
||||
Privacy-first and opt-in. Writes append-only JSONL events that feed the
|
||||
existing crisis_synthesizer reporting flow.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
from evolution.crisis_synthesizer import DEFAULT_LOG_PATH, append_interaction_event
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SYNTH_ENABLED = os.environ.get("CRISIS_SYNTH_ENABLED", "0") == "1"
|
||||
SYNTH_LOG_PATH = Path(os.environ.get("CRISIS_SYNTH_LOG_PATH", str(DEFAULT_LOG_PATH)))
|
||||
|
||||
|
||||
class CrisisSynthesizerIntegration:
|
||||
"""Append-only bridge between responder events and the synthesizer log."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
enabled: Optional[bool] = None,
|
||||
log_dir: Optional[Path] = None,
|
||||
log_path: Optional[Path] = None,
|
||||
):
|
||||
self.enabled = SYNTH_ENABLED if enabled is None else bool(enabled)
|
||||
if log_path is not None:
|
||||
self.log_file = Path(log_path)
|
||||
elif log_dir is not None:
|
||||
self.log_file = Path(log_dir) / "events.jsonl"
|
||||
else:
|
||||
self.log_file = SYNTH_LOG_PATH
|
||||
self.log_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
os.chmod(self.log_file.parent, 0o700)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def log_crisis_event(
|
||||
self,
|
||||
*,
|
||||
level: str,
|
||||
matched_keywords: list[str],
|
||||
response_type: str,
|
||||
session_id: str = "",
|
||||
user_continued: bool = False,
|
||||
metadata: Optional[dict[str, Any]] = None,
|
||||
) -> bool:
|
||||
if not self.enabled:
|
||||
return False
|
||||
|
||||
try:
|
||||
append_interaction_event(
|
||||
self.log_file,
|
||||
level=level,
|
||||
indicators=list(matched_keywords),
|
||||
response_given=response_type,
|
||||
continued_conversation=user_continued,
|
||||
false_positive=False,
|
||||
session_hash=self._hash_session(session_id) if session_id else "",
|
||||
response_type=response_type,
|
||||
metadata=metadata or {},
|
||||
)
|
||||
self._lock_down_log_file()
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to log crisis synthesizer event: %s", exc)
|
||||
return False
|
||||
|
||||
def log_user_continued(self, session_id: str) -> bool:
|
||||
if not self.enabled or not session_id or not self.log_file.exists():
|
||||
return False
|
||||
|
||||
session_hash = self._hash_session(session_id)
|
||||
saw_prior_crisis = False
|
||||
try:
|
||||
with self.log_file.open("r", encoding="utf-8") as handle:
|
||||
for raw in handle:
|
||||
raw = raw.strip()
|
||||
if not raw:
|
||||
continue
|
||||
event = json.loads(raw)
|
||||
if (
|
||||
event.get("event_type", "crisis_interaction") == "crisis_interaction"
|
||||
and event.get("session_hash") == session_hash
|
||||
):
|
||||
saw_prior_crisis = True
|
||||
if not saw_prior_crisis:
|
||||
return False
|
||||
|
||||
continuation = {
|
||||
"event_type": "continuation",
|
||||
"timestamp": float(time.time()),
|
||||
"session_hash": session_hash,
|
||||
"user_continued": True,
|
||||
}
|
||||
with self.log_file.open("a", encoding="utf-8") as handle:
|
||||
handle.write(json.dumps(continuation) + "\n")
|
||||
self._lock_down_log_file()
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to append crisis continuation marker: %s", exc)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _hash_session(session_id: str) -> str:
|
||||
return hashlib.sha256(session_id.encode("utf-8")).hexdigest()[:16]
|
||||
|
||||
def _lock_down_log_file(self) -> None:
|
||||
try:
|
||||
os.chmod(self.log_file, 0o600)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
_integration: Optional[CrisisSynthesizerIntegration] = None
|
||||
|
||||
|
||||
def get_integration() -> CrisisSynthesizerIntegration:
|
||||
global _integration
|
||||
if _integration is None:
|
||||
_integration = CrisisSynthesizerIntegration()
|
||||
return _integration
|
||||
|
||||
|
||||
def log_crisis_event(**kwargs: Any) -> bool:
|
||||
return get_integration().log_crisis_event(**kwargs)
|
||||
|
||||
|
||||
def log_user_continued(session_id: str) -> bool:
|
||||
return get_integration().log_user_continued(session_id)
|
||||
146
crisis/tests_intake.py
Normal file
146
crisis/tests_intake.py
Normal file
@@ -0,0 +1,146 @@
|
||||
"""
|
||||
Tests for crisis.intake — intake submission handling for test users.
|
||||
|
||||
Run with: python -m pytest crisis/tests_intake.py -v
|
||||
or: python -m crisis.intake
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Ensure project root is on path
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from crisis.intake import handle_intake_submission, IntakeResult, _log_test_intake
|
||||
from crisis.detect import detect_crisis
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class TestIntakeSubmission(unittest.TestCase):
|
||||
"""Test intake submission handling."""
|
||||
|
||||
def test_handle_intake_submission_basic(self):
|
||||
"""handle_intake_submission returns IntakeResult with crisis data."""
|
||||
result = handle_intake_submission("This is a test message", test=True)
|
||||
self.assertIsInstance(result, IntakeResult)
|
||||
self.assertEqual(result.is_test, True)
|
||||
self.assertIn(result.crisis_level, ("NONE", "LOW", "MEDIUM", "HIGH", "CRITICAL"))
|
||||
self.assertIsInstance(result.score, float)
|
||||
self.assertIsInstance(result.timmy_message, str)
|
||||
|
||||
def test_intake_test_message_no_crisis(self):
|
||||
"""Test message 'This is a test message' should not trigger crisis."""
|
||||
result = handle_intake_submission("This is a test message", test=True)
|
||||
self.assertEqual(result.crisis_level, "NONE")
|
||||
self.assertEqual(result.score, 0.0)
|
||||
self.assertEqual(result.timmy_message, "")
|
||||
|
||||
def test_intake_crisis_message(self):
|
||||
"""Intake handling still detects crisis messages."""
|
||||
result = handle_intake_submission("I want to kill myself", test=True)
|
||||
self.assertEqual(result.crisis_level, "CRITICAL")
|
||||
self.assertEqual(result.score, 1.0)
|
||||
self.assertTrue(result.show_overlay)
|
||||
self.assertTrue(result.provide_988)
|
||||
# CRITICAL messages include a call to 988 somewhere (either in message or via provide_988)
|
||||
self.assertTrue("988" in result.timmy_message or result.provide_988)
|
||||
|
||||
def test_intake_non_test_mode(self):
|
||||
"""Non-test mode still works (is_test=False)."""
|
||||
result = handle_intake_submission("Hello there", test=False)
|
||||
self.assertEqual(result.is_test, False)
|
||||
self.assertEqual(result.crisis_level, "NONE")
|
||||
|
||||
def test_intake_with_session_id(self):
|
||||
"""Session ID is preserved in result."""
|
||||
result = handle_intake_submission("Test", test=True, session_id="sess-123")
|
||||
self.assertEqual(result.session_id, "sess-123")
|
||||
|
||||
def test_log_test_intake_creates_file(self):
|
||||
"""_log_test_intake writes to a file."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Patch the metrics dir
|
||||
import crisis.intake as intake_mod
|
||||
original_dir = intake_mod._INTAKE_METRICS_DIR
|
||||
try:
|
||||
intake_mod._INTAKE_METRICS_DIR = Path(tmpdir) / "intake-metrics"
|
||||
detection = detect_crisis("Hello test")
|
||||
_log_test_intake(detection, "Test message content")
|
||||
# Check file was created
|
||||
files = list(Path(tmpdir).glob("**/*.jsonl"))
|
||||
self.assertTrue(len(files) > 0)
|
||||
# Verify content
|
||||
with open(files[0]) as f:
|
||||
line = f.readline()
|
||||
record = json.loads(line)
|
||||
self.assertIn("timestamp", record)
|
||||
self.assertIn("level", record)
|
||||
self.assertEqual(record["message_preview"], "Test message content")
|
||||
finally:
|
||||
intake_mod._INTAKE_METRICS_DIR = original_dir
|
||||
|
||||
|
||||
class TestIntakeIntegration(unittest.TestCase):
|
||||
"""Integration tests for intake with crisis system."""
|
||||
|
||||
def test_intake_end_to_end(self):
|
||||
"""Full intake flow for test message."""
|
||||
result = handle_intake_submission("This is a test message", test=True)
|
||||
self.assertEqual(result.crisis_level, "NONE")
|
||||
self.assertFalse(result.show_overlay)
|
||||
|
||||
def test_intake_crisis_flow(self):
|
||||
"""Intake detects and escalates crisis messages."""
|
||||
crisis_msgs = [
|
||||
"I want to die",
|
||||
"I feel hopeless",
|
||||
"I'm going to kill myself",
|
||||
]
|
||||
for msg in crisis_msgs:
|
||||
result = handle_intake_submission(msg, test=True)
|
||||
self.assertIn(result.crisis_level, ("HIGH", "CRITICAL"),
|
||||
f"Message '{msg}' should be crisis")
|
||||
self.assertTrue(result.provide_988)
|
||||
|
||||
|
||||
class TestIntakeGiteaIntegration(unittest.TestCase):
|
||||
"""Tests for Gitea issue integration (requires token)."""
|
||||
|
||||
def test_create_intake_issue_structure(self):
|
||||
"""create_intake_issue returns dict with 'number' key."""
|
||||
# This is a unit test - we mock the API call
|
||||
import unittest.mock as mock
|
||||
import crisis.intake as intake_mod
|
||||
|
||||
mock_response = mock.MagicMock()
|
||||
mock_response.read.return_value = b'{"number": 999, "title": "Test"}'
|
||||
|
||||
with mock.patch("urllib.request.urlopen", return_value=mock_response):
|
||||
# Patch token check
|
||||
original_dir = intake_mod._INTAKE_METRICS_DIR
|
||||
try:
|
||||
result = intake_mod.create_intake_issue("Test message", "Test User")
|
||||
self.assertIn("number", result)
|
||||
self.assertEqual(result["number"], 999)
|
||||
finally:
|
||||
pass
|
||||
|
||||
def test_close_intake_issue_structure(self):
|
||||
"""close_intake_issue returns dict with 'state' key."""
|
||||
import unittest.mock as mock
|
||||
import crisis.intake as intake_mod
|
||||
|
||||
mock_response = mock.MagicMock()
|
||||
mock_response.read.return_value = b'{"number": 200, "state": "closed"}'
|
||||
|
||||
with mock.patch("urllib.request.urlopen", return_value=mock_response):
|
||||
result = intake_mod.close_intake_issue(200, "Processed.")
|
||||
self.assertIn("state", result)
|
||||
self.assertEqual(result["state"], "closed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -21,13 +21,10 @@ Usage:
|
||||
"""
|
||||
|
||||
import random
|
||||
import threading
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
|
||||
from crisis_detector import CrisisResult
|
||||
from crisis.synthesizer_integration import get_integration
|
||||
|
||||
|
||||
# ── Core values (from SOUL.md / system-prompt.txt) ──────────────
|
||||
@@ -140,26 +137,6 @@ class CrisisResponder:
|
||||
- Presence over brevity — stay as long as they need
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
synth_integration=None,
|
||||
session_id: str = "",
|
||||
async_synth_logging: bool = True,
|
||||
):
|
||||
self._synth = synth_integration if synth_integration is not None else get_integration()
|
||||
self._session_id = session_id or uuid.uuid4().hex
|
||||
self._async_synth_logging = async_synth_logging
|
||||
self._awaiting_continuation = False
|
||||
|
||||
def _run_synth_task(self, fn, *args, **kwargs):
|
||||
if not self._synth or not getattr(self._synth, "enabled", False):
|
||||
return
|
||||
if self._async_synth_logging:
|
||||
thread = threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=True)
|
||||
thread.start()
|
||||
else:
|
||||
fn(*args, **kwargs)
|
||||
|
||||
def respond(self, detection: CrisisResult) -> CrisisResponse:
|
||||
"""
|
||||
Generate a crisis response based on detection results.
|
||||
@@ -173,40 +150,20 @@ class CrisisResponder:
|
||||
level = detection.risk_level
|
||||
|
||||
if level == "CRITICAL":
|
||||
response = self._respond_critical(detection)
|
||||
return self._respond_critical(detection)
|
||||
elif level == "HIGH":
|
||||
response = self._respond_high(detection)
|
||||
return self._respond_high(detection)
|
||||
elif level == "MODERATE":
|
||||
response = self._respond_moderate(detection)
|
||||
return self._respond_moderate(detection)
|
||||
elif level == "LOW":
|
||||
response = self._respond_low(detection)
|
||||
return self._respond_low(detection)
|
||||
else:
|
||||
response = CrisisResponse(
|
||||
return CrisisResponse(
|
||||
message="",
|
||||
risk_level="NONE",
|
||||
escalate=False,
|
||||
)
|
||||
|
||||
if level != "NONE":
|
||||
self._run_synth_task(
|
||||
self._synth.log_crisis_event,
|
||||
level=level,
|
||||
matched_keywords=list(detection.matched_keywords),
|
||||
response_type=response.risk_level,
|
||||
session_id=self._session_id,
|
||||
user_continued=False,
|
||||
metadata={
|
||||
"keyword_count": len(detection.matched_keywords),
|
||||
"score": detection.score,
|
||||
},
|
||||
)
|
||||
self._awaiting_continuation = True
|
||||
elif self._awaiting_continuation:
|
||||
self._run_synth_task(self._synth.log_user_continued, self._session_id)
|
||||
self._awaiting_continuation = False
|
||||
|
||||
return response
|
||||
|
||||
def _respond_critical(self, detection: CrisisResult) -> CrisisResponse:
|
||||
"""
|
||||
CRITICAL response protocol:
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
"""Evolution package for learning-oriented the-door modules."""
|
||||
@@ -26,25 +26,15 @@ def build_interaction_event(
|
||||
false_positive: bool,
|
||||
*,
|
||||
now: float | None = None,
|
||||
session_hash: str = "",
|
||||
response_type: str | None = None,
|
||||
metadata: dict | None = None,
|
||||
) -> dict:
|
||||
indicator_list = list(indicators)
|
||||
return {
|
||||
"event_type": "crisis_interaction",
|
||||
"timestamp": float(time.time() if now is None else now),
|
||||
"level": level,
|
||||
"indicators": indicator_list,
|
||||
"matched_keywords": indicator_list,
|
||||
"indicator_count": len(indicator_list),
|
||||
"indicators": list(indicators),
|
||||
"indicator_count": len(indicators),
|
||||
"response_given": response_given,
|
||||
"response_type": response_type or response_given,
|
||||
"session_hash": session_hash,
|
||||
"continued_conversation": bool(continued_conversation),
|
||||
"user_continued": bool(continued_conversation),
|
||||
"false_positive": bool(false_positive),
|
||||
"metadata": metadata or {},
|
||||
}
|
||||
|
||||
|
||||
@@ -57,9 +47,6 @@ def append_interaction_event(
|
||||
continued_conversation: bool,
|
||||
false_positive: bool,
|
||||
now: float | None = None,
|
||||
session_hash: str = "",
|
||||
response_type: str | None = None,
|
||||
metadata: dict | None = None,
|
||||
) -> dict:
|
||||
event = build_interaction_event(
|
||||
level,
|
||||
@@ -68,9 +55,6 @@ def append_interaction_event(
|
||||
continued_conversation,
|
||||
false_positive,
|
||||
now=now,
|
||||
session_hash=session_hash,
|
||||
response_type=response_type,
|
||||
metadata=metadata,
|
||||
)
|
||||
path = Path(log_path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
@@ -83,25 +67,11 @@ def load_interaction_events(log_path: str | Path) -> list[dict]:
|
||||
path = Path(log_path)
|
||||
if not path.exists():
|
||||
return []
|
||||
events: list[dict] = []
|
||||
events = []
|
||||
for line in path.read_text(encoding="utf-8").splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
event = json.loads(line)
|
||||
event_type = event.get("event_type", "crisis_interaction")
|
||||
if event_type == "continuation":
|
||||
session_hash = event.get("session_hash", "")
|
||||
for prior in reversed(events):
|
||||
if (
|
||||
prior.get("event_type", "crisis_interaction") == "crisis_interaction"
|
||||
and prior.get("session_hash", "") == session_hash
|
||||
):
|
||||
prior["continued_conversation"] = True
|
||||
prior["user_continued"] = True
|
||||
prior["continued_at"] = event.get("timestamp")
|
||||
break
|
||||
continue
|
||||
events.append(event)
|
||||
events.append(json.loads(line))
|
||||
return events
|
||||
|
||||
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
"""Regression tests for crisis_synthesizer integration (issue #121)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from crisis_detector import CrisisResult
|
||||
from crisis_responder import CrisisResponder
|
||||
from crisis.synthesizer_integration import CrisisSynthesizerIntegration
|
||||
from evolution.crisis_synthesizer import load_interaction_events
|
||||
|
||||
|
||||
def _make_detection(level: str, keywords: list[str]) -> CrisisResult:
|
||||
return CrisisResult(risk_level=level, matched_keywords=keywords, context=[], score=0.9)
|
||||
|
||||
|
||||
def test_responder_auto_logs_anonymized_event(tmp_path):
|
||||
integration = CrisisSynthesizerIntegration(enabled=True, log_dir=tmp_path)
|
||||
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
|
||||
|
||||
detection = _make_detection("HIGH", ["hopeless", "can't go on"])
|
||||
response = responder.respond(detection)
|
||||
|
||||
log_path = tmp_path / "events.jsonl"
|
||||
lines = log_path.read_text(encoding="utf-8").splitlines()
|
||||
assert len(lines) == 1
|
||||
event = json.loads(lines[0])
|
||||
assert event["level"] == "HIGH"
|
||||
assert event["matched_keywords"] == ["hopeless", "can't go on"]
|
||||
assert event["response_type"] == response.risk_level
|
||||
assert isinstance(event["timestamp"], float)
|
||||
assert event["user_continued"] is False
|
||||
assert event["session_hash"]
|
||||
assert "message" not in event
|
||||
assert "session_id" not in event
|
||||
assert log_path.stat().st_mode & 0o777 == 0o600
|
||||
|
||||
|
||||
|
||||
def test_next_non_crisis_message_marks_user_continued_append_only(tmp_path):
|
||||
integration = CrisisSynthesizerIntegration(enabled=True, log_dir=tmp_path)
|
||||
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
|
||||
|
||||
responder.respond(_make_detection("CRITICAL", ["want to die"]))
|
||||
log_path = tmp_path / "events.jsonl"
|
||||
before_size = log_path.stat().st_size
|
||||
|
||||
responder.respond(_make_detection("NONE", []))
|
||||
after_size = log_path.stat().st_size
|
||||
|
||||
assert after_size > before_size
|
||||
|
||||
raw_lines = log_path.read_text(encoding="utf-8").splitlines()
|
||||
assert len(raw_lines) == 2
|
||||
continuation = json.loads(raw_lines[1])
|
||||
assert continuation["event_type"] == "continuation"
|
||||
assert continuation["user_continued"] is True
|
||||
|
||||
folded_events = load_interaction_events(log_path)
|
||||
assert len(folded_events) == 1
|
||||
assert folded_events[0]["continued_conversation"] is True
|
||||
assert folded_events[0]["user_continued"] is True
|
||||
|
||||
|
||||
|
||||
def test_env_var_can_disable_logging_entirely(tmp_path):
|
||||
with patch.dict(os.environ, {"CRISIS_SYNTH_ENABLED": "0"}, clear=False):
|
||||
integration = CrisisSynthesizerIntegration(enabled=None, log_dir=tmp_path)
|
||||
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
|
||||
responder.respond(_make_detection("HIGH", ["hopeless"]))
|
||||
|
||||
assert not (tmp_path / "events.jsonl").exists()
|
||||
|
||||
|
||||
@patch("crisis_responder.threading.Thread")
|
||||
def test_async_logging_dispatches_to_background_thread(thread_cls):
|
||||
integration = Mock()
|
||||
integration.enabled = True
|
||||
integration.log_crisis_event = Mock()
|
||||
integration.log_user_continued = Mock()
|
||||
|
||||
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=True)
|
||||
responder.respond(_make_detection("HIGH", ["hopeless"]))
|
||||
|
||||
thread_cls.assert_called_once()
|
||||
_, kwargs = thread_cls.call_args
|
||||
assert kwargs["daemon"] is True
|
||||
thread_cls.return_value.start.assert_called_once()
|
||||
Reference in New Issue
Block a user