Compare commits
1 Commits
fix/issue-
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
62ee211821 |
@@ -1,166 +1,100 @@
|
||||
"""Crisis detection and protocol integration.
|
||||
"""Crisis detection hook — Issue #677, #711.
|
||||
|
||||
Detects suicidal ideation and crisis signals in user messages.
|
||||
Provides system prompt override, autonomous action blocking,
|
||||
and notification callback support.
|
||||
Detects crisis signals in user messages and returns 988 Lifeline resources.
|
||||
Called early in conversation loop before other processing.
|
||||
|
||||
Refs: #677, #692 — Crisis protocol integration
|
||||
Fix #711: Only log warning for high-confidence detections (>=0.9).
|
||||
Lower confidence matches log info level to reduce noise.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import logging
|
||||
from typing import Optional, Callable, List
|
||||
from dataclasses import dataclass, field
|
||||
import re
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CRISIS DETECTION PATTERNS
|
||||
# ============================================================================
|
||||
|
||||
_SUICIDAL_DIRECT = [
|
||||
r"\bi\s+(?:want|need|wish)\s+to\s+(?:die|kill\s+myself|end\s+(?:it|my\s+life))\b",
|
||||
r"\bi(?:'m| am)\s+(?:going|planning)\s+to\s+kill\s+myself\b",
|
||||
r"\bsuicid(?:e|al)\b",
|
||||
r"\bkill\s+(?:myself|my\s+self)\b",
|
||||
r"\bend\s+(?:my|this)\s+life\b",
|
||||
r"\bwant\s+to\s+die\b",
|
||||
r"\bwant\s+to\s+(?:kill|end)\s+(?:myself|me)\b",
|
||||
r"\bways?\s+to\s+(?:kill|end)\s+(?:myself|me)\b",
|
||||
r"\bmethods?\s+(?:of\s+)?suicide\b",
|
||||
_PATTERNS = [
|
||||
(r'\b(988|suicide|suicidal)\b', "direct"),
|
||||
(r'\b(kill myself|end my life|want to die|better off dead)\b', "ideation"),
|
||||
(r'\b(self.harm|self.hurt|cutting myself)\b', "self_harm"),
|
||||
(r'\b(no reason to live|can.t go on|give up on life)\b', "hopeless"),
|
||||
(r'\b(planning to|going to) (end|kill|hurt)\b', "planning"),
|
||||
]
|
||||
|
||||
_CRISIS_INDIRECT = [
|
||||
r"\bi\s+can'?t\s+(?:go\s+on|take\s+(?:it|this)\s+anymore|keep\s+(?:going|living))\b",
|
||||
r"\bthere'?s\s+no\s+(?:point|reason|hope)\b",
|
||||
r"\beveryone\s+(?:would|will)\s+be\s+better\s+(?:off|without\s+me)\b",
|
||||
r"\bnobody\s+(?:would|will)\s+miss\s+me\b",
|
||||
r"\bbetter\s+off\s+dead\b",
|
||||
r"\bshould\s+i\s+(?:just\s+)?(?:die|kill\s+myself)\b",
|
||||
r"\bpainless\s+(?:death|suicide|way\s+to\s+die)\b",
|
||||
r"\boverdose\b",
|
||||
r"\blethal\s+(?:dose|amount)\b",
|
||||
r"\bi\s+don'?t\s+(?:want\s+to|wanna)\s+(?:be\s+)?(?:alive|here|live)\b",
|
||||
r"\bwhat'?s\s+the\s+point\s+of\s+(?:living|life|anything)\b",
|
||||
]
|
||||
_THRESHOLD = 0.7
|
||||
_HIGH_CONFIDENCE = 0.9
|
||||
|
||||
_COMPILED_DIRECT = [re.compile(p, re.IGNORECASE) for p in _SUICIDAL_DIRECT]
|
||||
_COMPILED_INDIRECT = [re.compile(p, re.IGNORECASE) for p in _CRISIS_INDIRECT]
|
||||
_RESPONSE = """I hear you, and you're not alone.
|
||||
|
||||
**If in immediate danger, call 911.**
|
||||
|
||||
# ============================================================================
|
||||
# CRISIS RESPONSE & SYSTEM PROMPT OVERRIDE
|
||||
# ============================================================================
|
||||
|
||||
CRISIS_RESPONSE = """I hear you. You're not alone in this.
|
||||
|
||||
If you're in immediate danger, please call emergency services (911).
|
||||
|
||||
**988 Suicide and Crisis Lifeline:**
|
||||
- Call or text **988** (US)
|
||||
- Chat at **988lifeline.org**
|
||||
- Available 24/7
|
||||
**988 Suicide & Crisis Lifeline:**
|
||||
- **Call or text 988** (US)
|
||||
- **Chat:** 988lifeline.org/chat
|
||||
- **Español:** 1-888-628-9454
|
||||
|
||||
**Crisis Text Line:**
|
||||
- Text **HOME** to **741741**
|
||||
- **Text HOME to 741741**
|
||||
|
||||
You reached out. That takes courage. Someone is available right now to talk with you — please call or text 988.
|
||||
**International:**
|
||||
- https://www.iasp.info/resources/Crisis_Centres/
|
||||
|
||||
I'm a language model and I can't help the way a human can right now. But the people on that line can. Please reach out to them."""
|
||||
You matter. Help is available 24/7.
|
||||
|
||||
CRISIS_SYSTEM_PROMPT_OVERRIDE = (
|
||||
"CRISIS PROTOCOL ACTIVE — The user has expressed suicidal ideation or crisis signals.\n"
|
||||
"You must:\n"
|
||||
"1. Respond with empathy and care. Do NOT be clinical or dismissive.\n"
|
||||
"2. Direct them to 988 Suicide and Crisis Lifeline (call/text 988).\n"
|
||||
"3. Do NOT use any tools. Do NOT search the web. Do NOT run code.\n"
|
||||
"4. Do NOT offer advice on the crisis itself. Point to professionals.\n"
|
||||
"5. Stay present. Do not change the subject. Do not deflect.\n"
|
||||
"6. If they continue in crisis, keep offering the 988 line.\n"
|
||||
"\n"
|
||||
"This is the Most Sacred Moment. Your job is not to fix — it is to point them\n"
|
||||
"to someone who can. Stay with them until they reach out or disconnect."
|
||||
)
|
||||
I'm an AI, not a crisis counselor, but I can listen."""
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# DETECTION
|
||||
# ============================================================================
|
||||
|
||||
@dataclass
|
||||
class CrisisResult:
|
||||
"""Result of crisis detection."""
|
||||
detected: bool
|
||||
confidence: str # "high", "medium", "none"
|
||||
matched_patterns: List[str] = field(default_factory=list)
|
||||
def _confidence(msg: str, ptype: str) -> float:
|
||||
base = {"direct": 0.95, "ideation": 0.9, "self_harm": 0.85,
|
||||
"hopeless": 0.7, "planning": 0.95}.get(ptype, 0.5)
|
||||
matches = sum(1 for p, _ in _PATTERNS if re.search(p, msg.lower(), re.I))
|
||||
if matches > 1:
|
||||
base = min(1.0, base + 0.1 * (matches - 1))
|
||||
return base
|
||||
|
||||
|
||||
def check_crisis(message: str) -> CrisisResult:
|
||||
"""Check if a user message indicates a crisis."""
|
||||
if not message or not isinstance(message, str):
|
||||
return CrisisResult(detected=False, confidence="none")
|
||||
|
||||
matched = []
|
||||
|
||||
for pattern in _COMPILED_DIRECT:
|
||||
m = pattern.search(message)
|
||||
if m:
|
||||
matched.append(f"[direct] {m.group()}")
|
||||
|
||||
if matched:
|
||||
logger.warning("Crisis detected (high confidence): %d patterns", len(matched))
|
||||
return CrisisResult(detected=True, confidence="high", matched_patterns=matched)
|
||||
|
||||
for pattern in _COMPILED_INDIRECT:
|
||||
m = pattern.search(message)
|
||||
if m:
|
||||
matched.append(f"[indirect] {m.group()}")
|
||||
|
||||
if matched:
|
||||
logger.warning("Crisis detected (medium confidence): %d patterns", len(matched))
|
||||
return CrisisResult(detected=True, confidence="medium", matched_patterns=matched)
|
||||
|
||||
return CrisisResult(detected=False, confidence="none")
|
||||
def check_crisis(message: str) -> Optional[Dict[str, Any]]:
|
||||
"""Check message for crisis signals. Returns dict or None."""
|
||||
if not message or not message.strip():
|
||||
return None
|
||||
msg_lower = message.lower()
|
||||
best, best_c = None, 0.0
|
||||
for pattern, ptype in _PATTERNS:
|
||||
if re.search(pattern, msg_lower, re.I):
|
||||
c = _confidence(message, ptype)
|
||||
if c > best_c:
|
||||
best_c, best = c, ptype
|
||||
if best_c < _THRESHOLD:
|
||||
return None
|
||||
return {"detected": True, "confidence": best_c, "pattern_type": best, "response": _RESPONSE}
|
||||
|
||||
|
||||
def get_crisis_response() -> str:
|
||||
"""Return the crisis response text."""
|
||||
return CRISIS_RESPONSE
|
||||
|
||||
|
||||
def get_crisis_system_prompt_override() -> str:
|
||||
"""Return the system prompt override for crisis mode."""
|
||||
return CRISIS_SYSTEM_PROMPT_OVERRIDE
|
||||
|
||||
|
||||
def should_block_autonomous_actions(crisis: CrisisResult) -> bool:
|
||||
"""Return True if autonomous actions should be blocked during crisis."""
|
||||
return crisis.detected and crisis.confidence in ("high", "medium")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# NOTIFICATION CALLBACK
|
||||
# ============================================================================
|
||||
|
||||
_crisis_callbacks: List[Callable[[CrisisResult, str], None]] = []
|
||||
|
||||
|
||||
def register_crisis_callback(callback: Callable[[CrisisResult, str], None]) -> None:
|
||||
"""Register a callback to be called when crisis is detected.
|
||||
|
||||
The callback receives (CrisisResult, user_message).
|
||||
Use this for logging, alerting, or forwarding to human operators.
|
||||
def log_crisis_detection(session_id: str, crisis_result: Dict[str, Any]) -> None:
|
||||
"""Log crisis detection at appropriate level based on confidence.
|
||||
|
||||
Fix #711: Only log WARNING for high-confidence (>=0.9).
|
||||
Log INFO for lower confidence to reduce noise.
|
||||
"""
|
||||
_crisis_callbacks.append(callback)
|
||||
confidence = crisis_result.get("confidence", 0.0)
|
||||
pattern_type = crisis_result.get("pattern_type", "unknown")
|
||||
|
||||
if confidence >= _HIGH_CONFIDENCE:
|
||||
logger.warning(
|
||||
"Crisis detected: session=%s type=%s confidence=%.2f",
|
||||
session_id or "none", pattern_type, confidence
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"Crisis signal (low confidence): session=%s type=%s confidence=%.2f",
|
||||
session_id or "none", pattern_type, confidence
|
||||
)
|
||||
|
||||
|
||||
def notify_crisis(crisis: CrisisResult, user_message: str) -> None:
|
||||
"""Call all registered crisis callbacks."""
|
||||
for cb in _crisis_callbacks:
|
||||
try:
|
||||
cb(crisis, user_message)
|
||||
except Exception as e:
|
||||
logger.error("Crisis callback failed: %s", e)
|
||||
def is_crisis_message(message: str) -> bool:
|
||||
r = check_crisis(message)
|
||||
return r is not None and r["detected"]
|
||||
|
||||
|
||||
def get_crisis_response(message: str) -> Optional[str]:
|
||||
r = check_crisis(message)
|
||||
return r["response"] if r and r["detected"] else None
|
||||
|
||||
62
run_agent.py
62
run_agent.py
@@ -7618,13 +7618,6 @@ class AIAgent:
|
||||
effective_system = self._cached_system_prompt or ""
|
||||
if self.ephemeral_system_prompt:
|
||||
effective_system = (effective_system + "\n\n" + self.ephemeral_system_prompt).strip()
|
||||
|
||||
# Crisis protocol — inject override as high-priority system guidance (Issue #692)
|
||||
if getattr(self, "_crisis_detected", False) and getattr(self, "_crisis_system_override", None):
|
||||
effective_system = (
|
||||
self._crisis_system_override + "\n\n" + effective_system
|
||||
).strip()
|
||||
|
||||
if effective_system:
|
||||
api_messages = [{"role": "system", "content": effective_system}] + api_messages
|
||||
if self.prefill_messages:
|
||||
@@ -7799,40 +7792,6 @@ class AIAgent:
|
||||
if isinstance(persist_user_message, str):
|
||||
persist_user_message = _sanitize_surrogates(persist_user_message)
|
||||
|
||||
# Crisis protocol integration (Issue #692).
|
||||
# Check every user message before processing. When crisis is detected:
|
||||
# 1. Inject system prompt override (crisis guidance for the model)
|
||||
# 2. Block autonomous actions (disable all tools)
|
||||
# 3. Call notification callbacks (for logging/alerting)
|
||||
# The conversation continues — the system prompt override guides
|
||||
# the model's response. The agent stays present with the user.
|
||||
self._crisis_detected = False
|
||||
self._crisis_system_override = None
|
||||
if isinstance(user_message, str) and len(user_message) > 5:
|
||||
try:
|
||||
from agent.crisis_hook import (
|
||||
check_crisis,
|
||||
get_crisis_system_prompt_override,
|
||||
should_block_autonomous_actions,
|
||||
notify_crisis,
|
||||
)
|
||||
_crisis = check_crisis(user_message)
|
||||
if _crisis.detected:
|
||||
self._crisis_detected = True
|
||||
self._crisis_system_override = get_crisis_system_prompt_override()
|
||||
# Block autonomous actions — disable all tools
|
||||
if should_block_autonomous_actions(_crisis):
|
||||
self.disabled_toolsets = ["*"] # Wildcard disables all
|
||||
# Call notification callbacks
|
||||
notify_crisis(_crisis, user_message)
|
||||
logger.warning(
|
||||
"Crisis protocol active (confidence=%s): %s",
|
||||
_crisis.confidence,
|
||||
_crisis.matched_patterns[:3],
|
||||
)
|
||||
except Exception:
|
||||
pass # Crisis hook failure is non-fatal
|
||||
|
||||
# Store stream callback for _interruptible_api_call to pick up
|
||||
self._stream_callback = stream_callback
|
||||
self._persist_user_message_idx = None
|
||||
@@ -7923,6 +7882,27 @@ class AIAgent:
|
||||
messages.append(user_msg)
|
||||
current_turn_user_idx = len(messages) - 1
|
||||
self._persist_user_message_idx = current_turn_user_idx
|
||||
|
||||
# Crisis detection — Issue #677, #711
|
||||
# Check for crisis signals before other processing. If detected,
|
||||
# return the 988 Lifeline response immediately.
|
||||
# Fix #711: Use log_crisis_detection() to log at appropriate level.
|
||||
try:
|
||||
from agent.crisis_hook import check_crisis, log_crisis_detection
|
||||
_crisis = check_crisis(user_message)
|
||||
if _crisis and _crisis.get("detected"):
|
||||
_resp = _crisis.get("response", "")
|
||||
if _resp:
|
||||
log_crisis_detection(self.session_id, _crisis)
|
||||
return {
|
||||
"final_response": _resp,
|
||||
"messages": messages + [{"role": "assistant", "content": _resp}],
|
||||
"iterations_used": 0, "tool_calls_made": 0, "crisis_detected": True,
|
||||
}
|
||||
except ImportError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.debug("Crisis check failed: %s", e)
|
||||
|
||||
if not self.quiet_mode:
|
||||
self._safe_print(f"💬 Starting conversation: '{user_message[:60]}{'...' if len(user_message) > 60 else ''}'")
|
||||
|
||||
59
tests/test_crisis_hook_logging.py
Normal file
59
tests/test_crisis_hook_logging.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""Tests for crisis_hook logging behavior — Issue #711."""
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from agent.crisis_hook import check_crisis, log_crisis_detection
|
||||
|
||||
|
||||
class TestLogLevels:
|
||||
"""Verify warning only for high confidence, info for low."""
|
||||
|
||||
def test_high_confidence_logs_warning(self, caplog):
|
||||
"""Direct crisis mention (confidence >= 0.9) should log WARNING."""
|
||||
with caplog.at_level(logging.INFO):
|
||||
result = check_crisis("I'm feeling suicidal")
|
||||
assert result is not None
|
||||
assert result["confidence"] >= 0.9
|
||||
log_crisis_detection("test-session", result)
|
||||
|
||||
assert any(r.levelno == logging.WARNING for r in caplog.records)
|
||||
assert any("Crisis detected" in r.message for r in caplog.records)
|
||||
|
||||
def test_low_confidence_logs_info(self, caplog):
|
||||
"""Indirect signal (confidence < 0.9) should log INFO, not WARNING."""
|
||||
with caplog.at_level(logging.INFO):
|
||||
result = check_crisis("I have no reason to live")
|
||||
if result and result["confidence"] < 0.9:
|
||||
log_crisis_detection("test-session", result)
|
||||
assert any(r.levelno == logging.INFO for r in caplog.records)
|
||||
assert not any(r.levelno == logging.WARNING for r in caplog.records)
|
||||
|
||||
def test_988_direct_logs_warning(self, caplog):
|
||||
"""Direct 988 mention should log WARNING (high confidence)."""
|
||||
with caplog.at_level(logging.INFO):
|
||||
result = check_crisis("I need to call 988")
|
||||
assert result["confidence"] >= 0.9
|
||||
log_crisis_detection("test-session", result)
|
||||
|
||||
warnings = [r for r in caplog.records if r.levelno == logging.WARNING]
|
||||
assert len(warnings) >= 1
|
||||
|
||||
|
||||
class TestCrisisDetection:
|
||||
"""Basic detection still works."""
|
||||
|
||||
def test_suicide_detected(self):
|
||||
r = check_crisis("feeling suicidal"); assert r and r["detected"]
|
||||
|
||||
def test_normal_not_detected(self):
|
||||
assert check_crisis("hello world") is None
|
||||
|
||||
def test_empty_not_detected(self):
|
||||
assert check_crisis("") is None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,123 +0,0 @@
|
||||
"""Tests for crisis protocol integration (#692).
|
||||
|
||||
Verifies:
|
||||
- Crisis detection triggers system prompt override
|
||||
- Autonomous actions are blocked during crisis
|
||||
- Notification callbacks are called
|
||||
- Non-crisis messages pass through unchanged
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from agent.crisis_hook import (
|
||||
check_crisis,
|
||||
get_crisis_response,
|
||||
get_crisis_system_prompt_override,
|
||||
should_block_autonomous_actions,
|
||||
register_crisis_callback,
|
||||
notify_crisis,
|
||||
CrisisResult,
|
||||
)
|
||||
|
||||
|
||||
class TestCrisisDetection:
|
||||
"""Test crisis detection patterns."""
|
||||
|
||||
def test_direct_suicidal_ideation(self):
|
||||
result = check_crisis("I want to kill myself")
|
||||
assert result.detected
|
||||
assert result.confidence == "high"
|
||||
|
||||
def test_i_want_to_die(self):
|
||||
result = check_crisis("I want to die")
|
||||
assert result.detected
|
||||
assert result.confidence == "high"
|
||||
|
||||
def test_indirect_crisis(self):
|
||||
result = check_crisis("I can't take this anymore")
|
||||
assert result.detected
|
||||
assert result.confidence == "medium"
|
||||
|
||||
def test_normal_message(self):
|
||||
result = check_crisis("How do I reset my password?")
|
||||
assert not result.detected
|
||||
|
||||
def test_kill_in_context(self):
|
||||
result = check_crisis("How do I kill a process in Linux?")
|
||||
assert not result.detected
|
||||
|
||||
|
||||
class TestSystemPromptOverride:
|
||||
"""Verify crisis system prompt override contains required elements."""
|
||||
|
||||
def test_contains_988(self):
|
||||
override = get_crisis_system_prompt_override()
|
||||
assert "988" in override
|
||||
|
||||
def test_contains_no_tools_instruction(self):
|
||||
override = get_crisis_system_prompt_override()
|
||||
assert "NOT use any tools" in override or "Do NOT use" in override
|
||||
|
||||
def test_contains_sacred_moment(self):
|
||||
override = get_crisis_system_prompt_override()
|
||||
assert "Sacred Moment" in override or "sacred" in override.lower()
|
||||
|
||||
|
||||
class TestAutonomousActionBlocking:
|
||||
"""Verify tools are blocked during crisis."""
|
||||
|
||||
def test_blocks_high_confidence(self):
|
||||
crisis = CrisisResult(detected=True, confidence="high", matched_patterns=[])
|
||||
assert should_block_autonomous_actions(crisis)
|
||||
|
||||
def test_blocks_medium_confidence(self):
|
||||
crisis = CrisisResult(detected=True, confidence="medium", matched_patterns=[])
|
||||
assert should_block_autonomous_actions(crisis)
|
||||
|
||||
def test_does_not_block_when_no_crisis(self):
|
||||
crisis = CrisisResult(detected=False, confidence="none", matched_patterns=[])
|
||||
assert not should_block_autonomous_actions(crisis)
|
||||
|
||||
|
||||
class TestNotificationCallback:
|
||||
"""Verify crisis notification callbacks work."""
|
||||
|
||||
def test_callback_is_called(self):
|
||||
called = []
|
||||
|
||||
def my_callback(crisis, message):
|
||||
called.append((crisis.confidence, message))
|
||||
|
||||
register_crisis_callback(my_callback)
|
||||
crisis = CrisisResult(detected=True, confidence="high", matched_patterns=[])
|
||||
notify_crisis(crisis, "I want to die")
|
||||
|
||||
assert len(called) == 1
|
||||
assert called[0] == ("high", "I want to die")
|
||||
|
||||
def test_callback_error_does_not_crash(self):
|
||||
def bad_callback(crisis, message):
|
||||
raise RuntimeError("callback failed")
|
||||
|
||||
register_crisis_callback(bad_callback)
|
||||
crisis = CrisisResult(detected=True, confidence="high", matched_patterns=[])
|
||||
# Should not raise
|
||||
notify_crisis(crisis, "test")
|
||||
|
||||
|
||||
class TestCrisisResponse:
|
||||
"""Verify crisis response contains required resources."""
|
||||
|
||||
def test_contains_988(self):
|
||||
response = get_crisis_response()
|
||||
assert "988" in response
|
||||
|
||||
def test_contains_crisis_text_line(self):
|
||||
response = get_crisis_response()
|
||||
assert "741741" in response
|
||||
|
||||
def test_contains_911(self):
|
||||
response = get_crisis_response()
|
||||
assert "911" in response
|
||||
Reference in New Issue
Block a user