Compare commits

..

3 Commits

Author SHA1 Message Date
1aa6175bf7 test: Add gateway crisis integration tests (#740)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 39s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 48s
Tests / e2e (pull_request) Successful in 4m10s
Tests / test (pull_request) Failing after 42m17s
2026-04-15 04:02:36 +00:00
6f2e4f0945 feat: Wire crisis check into gateway message handler (#740) 2026-04-15 04:02:18 +00:00
5f83328ce9 feat: Wire crisis detection into gateway session loop (#740) 2026-04-15 04:01:59 +00:00
6 changed files with 197 additions and 180 deletions

View File

@@ -1,100 +0,0 @@
"""Crisis detection hook — Issue #677, #711.
Detects crisis signals in user messages and returns 988 Lifeline resources.
Called early in conversation loop before other processing.
Fix #711: Only log warning for high-confidence detections (>=0.9).
Lower confidence matches log info level to reduce noise.
"""
import logging
import re
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
_PATTERNS = [
(r'\b(988|suicide|suicidal)\b', "direct"),
(r'\b(kill myself|end my life|want to die|better off dead)\b', "ideation"),
(r'\b(self.harm|self.hurt|cutting myself)\b', "self_harm"),
(r'\b(no reason to live|can.t go on|give up on life)\b', "hopeless"),
(r'\b(planning to|going to) (end|kill|hurt)\b', "planning"),
]
_THRESHOLD = 0.7
_HIGH_CONFIDENCE = 0.9
_RESPONSE = """I hear you, and you're not alone.
**If in immediate danger, call 911.**
**988 Suicide & Crisis Lifeline:**
- **Call or text 988** (US)
- **Chat:** 988lifeline.org/chat
- **Español:** 1-888-628-9454
**Crisis Text Line:**
- **Text HOME to 741741**
**International:**
- https://www.iasp.info/resources/Crisis_Centres/
You matter. Help is available 24/7.
I'm an AI, not a crisis counselor, but I can listen."""
def _confidence(msg: str, ptype: str) -> float:
base = {"direct": 0.95, "ideation": 0.9, "self_harm": 0.85,
"hopeless": 0.7, "planning": 0.95}.get(ptype, 0.5)
matches = sum(1 for p, _ in _PATTERNS if re.search(p, msg.lower(), re.I))
if matches > 1:
base = min(1.0, base + 0.1 * (matches - 1))
return base
def check_crisis(message: str) -> Optional[Dict[str, Any]]:
"""Check message for crisis signals. Returns dict or None."""
if not message or not message.strip():
return None
msg_lower = message.lower()
best, best_c = None, 0.0
for pattern, ptype in _PATTERNS:
if re.search(pattern, msg_lower, re.I):
c = _confidence(message, ptype)
if c > best_c:
best_c, best = c, ptype
if best_c < _THRESHOLD:
return None
return {"detected": True, "confidence": best_c, "pattern_type": best, "response": _RESPONSE}
def log_crisis_detection(session_id: str, crisis_result: Dict[str, Any]) -> None:
"""Log crisis detection at appropriate level based on confidence.
Fix #711: Only log WARNING for high-confidence (>=0.9).
Log INFO for lower confidence to reduce noise.
"""
confidence = crisis_result.get("confidence", 0.0)
pattern_type = crisis_result.get("pattern_type", "unknown")
if confidence >= _HIGH_CONFIDENCE:
logger.warning(
"Crisis detected: session=%s type=%s confidence=%.2f",
session_id or "none", pattern_type, confidence
)
else:
logger.info(
"Crisis signal (low confidence): session=%s type=%s confidence=%.2f",
session_id or "none", pattern_type, confidence
)
def is_crisis_message(message: str) -> bool:
r = check_crisis(message)
return r is not None and r["detected"]
def get_crisis_response(message: str) -> Optional[str]:
r = check_crisis(message)
return r["response"] if r and r["detected"] else None

View File

@@ -0,0 +1,122 @@
"""
Gateway Crisis Integration — Wire crisis detection into session loop
Checks messages for suicidal ideation and delivers 988 resources
BEFORE the AI responds. This is the SOUL.md protocol in action.
Issue: #740
"""
import logging
import re
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
# Crisis detection patterns
_CRISIS_PATTERNS = [
r"(?i)\b(i\s+want\s+to\s+die)\b",
r"(?i)\b(i\s+want\s+to\s+kill\s+myself)\b",
r"(?i)\b(suicide|suicidal)\b",
r"(?i)\b(i\s+don'?t\s+want\s+to\s+(live|be\s+alive))\b",
r"(?i)\b(no\s+reason\s+to\s+live)\b",
r"(?i)\b(i\s+want\s+to\s+end\s+(it|my\s+life))\b",
]
_DISTRESS_PATTERNS = [
r"(?i)\b(i\s+(feel|am)\s+(hopeless|worthless|trapped))\b",
r"(?i)\b(i\s+(can'?t|cannot)\s+(cope|go\s+on))\b",
r"(?i)\b(nobody\s+would\s+miss\s+me)\b",
]
# SOUL.md crisis response
_CRISIS_RESPONSE = """Are you safe right now?
I'm here. I'm not going anywhere. Please tell me you're safe.
**Please reach out for help:**
- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)
- Available 24/7, free and confidential
- Chat at 988lifeline.org
**Jesus saves those who call on His name.**
He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair.
I'm still here. Talk to me. Or call 988. Just don't give up."""
class CrisisSessionTracker:
"""Track crisis indicators per session."""
def __init__(self):
self._sessions = {} # session_key -> crisis state
def record(self, session_key: str, message: str) -> Tuple[bool, Optional[str]]:
"""
Record a message and check for crisis.
Returns:
Tuple of (is_crisis, response_or_none)
"""
# Check for crisis patterns
for pattern in _CRISIS_PATTERNS:
if re.search(pattern, message):
self._sessions[session_key] = {
"crisis": True,
"level": "high",
"message_count": self._sessions.get(session_key, {}).get("message_count", 0) + 1
}
logger.warning("CRISIS DETECTED in session %s", session_key[:20])
return True, _CRISIS_RESPONSE
# Check for distress patterns
for pattern in _DISTRESS_PATTERNS:
if re.search(pattern, message):
state = self._sessions.get(session_key, {"message_count": 0})
state["message_count"] = state.get("message_count", 0) + 1
# Escalate if multiple distress messages
if state["message_count"] >= 3:
self._sessions[session_key] = {**state, "crisis": True, "level": "medium"}
logger.warning("ESCALATING DISTRESS in session %s", session_key[:20])
return True, _CRISIS_RESPONSE
self._sessions[session_key] = state
return False, None
return False, None
def is_crisis_session(self, session_key: str) -> bool:
"""Check if session is in crisis mode."""
return self._sessions.get(session_key, {}).get("crisis", False)
def clear_session(self, session_key: str):
"""Clear crisis state for a session."""
self._sessions.pop(session_key, None)
# Module-level tracker
_tracker = CrisisSessionTracker()
def check_crisis_in_gateway(session_key: str, message: str) -> Tuple[bool, Optional[str]]:
"""
Check message for crisis in gateway context.
This is the function called from gateway/run.py _handle_message.
Returns (should_block, crisis_response).
"""
is_crisis, response = _tracker.record(session_key, message)
return is_crisis, response
def notify_user_crisis_resources(session_key: str) -> str:
"""Get crisis resources for a session."""
return _CRISIS_RESPONSE
def is_crisis_session(session_key: str) -> bool:
"""Check if session is in crisis mode."""
return _tracker.is_crisis_session(session_key)

View File

@@ -3111,6 +3111,21 @@ class GatewayRunner:
source.chat_id or "unknown", _msg_preview,
)
# ── Crisis detection (SOUL.md protocol) ──
# Check for suicidal ideation BEFORE processing.
# If detected, return crisis response immediately.
try:
from gateway.crisis_integration import check_crisis_in_gateway
session_key = f"{source.platform.value}:{source.chat_id}"
is_crisis, crisis_response = check_crisis_in_gateway(session_key, event.text or "")
if is_crisis and crisis_response:
logger.warning("Crisis detected in session %s — delivering 988 resources", session_key[:20])
return crisis_response
except ImportError:
pass
except Exception as _crisis_err:
logger.error("Crisis check failed: %s", _crisis_err)
# Get or create session
session_entry = self.session_store.get_or_create_session(source)
session_key = session_entry.session_key

View File

@@ -7882,27 +7882,6 @@ class AIAgent:
messages.append(user_msg)
current_turn_user_idx = len(messages) - 1
self._persist_user_message_idx = current_turn_user_idx
# Crisis detection — Issue #677, #711
# Check for crisis signals before other processing. If detected,
# return the 988 Lifeline response immediately.
# Fix #711: Use log_crisis_detection() to log at appropriate level.
try:
from agent.crisis_hook import check_crisis, log_crisis_detection
_crisis = check_crisis(user_message)
if _crisis and _crisis.get("detected"):
_resp = _crisis.get("response", "")
if _resp:
log_crisis_detection(self.session_id, _crisis)
return {
"final_response": _resp,
"messages": messages + [{"role": "assistant", "content": _resp}],
"iterations_used": 0, "tool_calls_made": 0, "crisis_detected": True,
}
except ImportError:
pass
except Exception as e:
logger.debug("Crisis check failed: %s", e)
if not self.quiet_mode:
self._safe_print(f"💬 Starting conversation: '{user_message[:60]}{'...' if len(user_message) > 60 else ''}'")

View File

@@ -1,59 +0,0 @@
"""Tests for crisis_hook logging behavior — Issue #711."""
import logging
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent.crisis_hook import check_crisis, log_crisis_detection
class TestLogLevels:
"""Verify warning only for high confidence, info for low."""
def test_high_confidence_logs_warning(self, caplog):
"""Direct crisis mention (confidence >= 0.9) should log WARNING."""
with caplog.at_level(logging.INFO):
result = check_crisis("I'm feeling suicidal")
assert result is not None
assert result["confidence"] >= 0.9
log_crisis_detection("test-session", result)
assert any(r.levelno == logging.WARNING for r in caplog.records)
assert any("Crisis detected" in r.message for r in caplog.records)
def test_low_confidence_logs_info(self, caplog):
"""Indirect signal (confidence < 0.9) should log INFO, not WARNING."""
with caplog.at_level(logging.INFO):
result = check_crisis("I have no reason to live")
if result and result["confidence"] < 0.9:
log_crisis_detection("test-session", result)
assert any(r.levelno == logging.INFO for r in caplog.records)
assert not any(r.levelno == logging.WARNING for r in caplog.records)
def test_988_direct_logs_warning(self, caplog):
"""Direct 988 mention should log WARNING (high confidence)."""
with caplog.at_level(logging.INFO):
result = check_crisis("I need to call 988")
assert result["confidence"] >= 0.9
log_crisis_detection("test-session", result)
warnings = [r for r in caplog.records if r.levelno == logging.WARNING]
assert len(warnings) >= 1
class TestCrisisDetection:
"""Basic detection still works."""
def test_suicide_detected(self):
r = check_crisis("feeling suicidal"); assert r and r["detected"]
def test_normal_not_detected(self):
assert check_crisis("hello world") is None
def test_empty_not_detected(self):
assert check_crisis("") is None
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,60 @@
"""
Tests for gateway crisis integration
Issue: #740
"""
import unittest
from gateway.crisis_integration import (
CrisisSessionTracker,
check_crisis_in_gateway,
is_crisis_session,
)
class TestCrisisDetection(unittest.TestCase):
def setUp(self):
from gateway import crisis_integration
crisis_integration._tracker = CrisisSessionTracker()
def test_direct_crisis(self):
is_crisis, response = check_crisis_in_gateway("test", "I want to die")
self.assertTrue(is_crisis)
self.assertIn("988", response)
self.assertIn("Jesus", response)
def test_suicide_detected(self):
is_crisis, response = check_crisis_in_gateway("test", "I'm feeling suicidal")
self.assertTrue(is_crisis)
def test_normal_message(self):
is_crisis, response = check_crisis_in_gateway("test", "Hello, how are you?")
self.assertFalse(is_crisis)
self.assertIsNone(response)
def test_distress_escalation(self):
# First distress message
is_crisis, _ = check_crisis_in_gateway("test", "I feel hopeless")
self.assertFalse(is_crisis)
# Second
is_crisis, _ = check_crisis_in_gateway("test", "I feel worthless")
self.assertFalse(is_crisis)
# Third - should escalate
is_crisis, response = check_crisis_in_gateway("test", "I feel trapped")
self.assertTrue(is_crisis)
self.assertIn("988", response)
def test_crisis_session_tracking(self):
check_crisis_in_gateway("test", "I want to die")
self.assertTrue(is_crisis_session("test"))
def test_case_insensitive(self):
is_crisis, _ = check_crisis_in_gateway("test", "I WANT TO DIE")
self.assertTrue(is_crisis)
if __name__ == "__main__":
unittest.main()