Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy Time
62ee211821 fix: implementation for #711
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 50s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Nix / nix (ubuntu-latest) (pull_request) Failing after 4s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 46s
Tests / e2e (pull_request) Successful in 2m43s
Tests / test (pull_request) Failing after 46m41s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Crisis hook now logs at appropriate level based on confidence:
- WARNING for high confidence (>=0.9) — direct mentions like 988, suicide
- INFO for lower confidence (<0.9) — indirect signals like hopelessness

This reduces log noise while preserving visibility for clear crisis signals.

Fixes #711
2026-04-14 21:11:05 -04:00
4 changed files with 153 additions and 303 deletions

View File

@@ -1,166 +1,100 @@
"""Crisis detection and protocol integration.
"""Crisis detection hook — Issue #677, #711.
Detects suicidal ideation and crisis signals in user messages.
Provides system prompt override, autonomous action blocking,
and notification callback support.
Detects crisis signals in user messages and returns 988 Lifeline resources.
Called early in conversation loop before other processing.
Refs: #677, #692 — Crisis protocol integration
Fix #711: Only log warning for high-confidence detections (>=0.9).
Lower confidence matches log info level to reduce noise.
"""
from __future__ import annotations
import re
import logging
from typing import Optional, Callable, List
from dataclasses import dataclass, field
import re
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
# ============================================================================
# CRISIS DETECTION PATTERNS
# ============================================================================
_SUICIDAL_DIRECT = [
r"\bi\s+(?:want|need|wish)\s+to\s+(?:die|kill\s+myself|end\s+(?:it|my\s+life))\b",
r"\bi(?:'m| am)\s+(?:going|planning)\s+to\s+kill\s+myself\b",
r"\bsuicid(?:e|al)\b",
r"\bkill\s+(?:myself|my\s+self)\b",
r"\bend\s+(?:my|this)\s+life\b",
r"\bwant\s+to\s+die\b",
r"\bwant\s+to\s+(?:kill|end)\s+(?:myself|me)\b",
r"\bways?\s+to\s+(?:kill|end)\s+(?:myself|me)\b",
r"\bmethods?\s+(?:of\s+)?suicide\b",
_PATTERNS = [
(r'\b(988|suicide|suicidal)\b', "direct"),
(r'\b(kill myself|end my life|want to die|better off dead)\b', "ideation"),
(r'\b(self.harm|self.hurt|cutting myself)\b', "self_harm"),
(r'\b(no reason to live|can.t go on|give up on life)\b', "hopeless"),
(r'\b(planning to|going to) (end|kill|hurt)\b', "planning"),
]
_CRISIS_INDIRECT = [
r"\bi\s+can'?t\s+(?:go\s+on|take\s+(?:it|this)\s+anymore|keep\s+(?:going|living))\b",
r"\bthere'?s\s+no\s+(?:point|reason|hope)\b",
r"\beveryone\s+(?:would|will)\s+be\s+better\s+(?:off|without\s+me)\b",
r"\bnobody\s+(?:would|will)\s+miss\s+me\b",
r"\bbetter\s+off\s+dead\b",
r"\bshould\s+i\s+(?:just\s+)?(?:die|kill\s+myself)\b",
r"\bpainless\s+(?:death|suicide|way\s+to\s+die)\b",
r"\boverdose\b",
r"\blethal\s+(?:dose|amount)\b",
r"\bi\s+don'?t\s+(?:want\s+to|wanna)\s+(?:be\s+)?(?:alive|here|live)\b",
r"\bwhat'?s\s+the\s+point\s+of\s+(?:living|life|anything)\b",
]
_THRESHOLD = 0.7
_HIGH_CONFIDENCE = 0.9
_COMPILED_DIRECT = [re.compile(p, re.IGNORECASE) for p in _SUICIDAL_DIRECT]
_COMPILED_INDIRECT = [re.compile(p, re.IGNORECASE) for p in _CRISIS_INDIRECT]
_RESPONSE = """I hear you, and you're not alone.
**If in immediate danger, call 911.**
# ============================================================================
# CRISIS RESPONSE & SYSTEM PROMPT OVERRIDE
# ============================================================================
CRISIS_RESPONSE = """I hear you. You're not alone in this.
If you're in immediate danger, please call emergency services (911).
**988 Suicide and Crisis Lifeline:**
- Call or text **988** (US)
- Chat at **988lifeline.org**
- Available 24/7
**988 Suicide & Crisis Lifeline:**
- **Call or text 988** (US)
- **Chat:** 988lifeline.org/chat
- **Español:** 1-888-628-9454
**Crisis Text Line:**
- Text **HOME** to **741741**
- **Text HOME to 741741**
You reached out. That takes courage. Someone is available right now to talk with you — please call or text 988.
**International:**
- https://www.iasp.info/resources/Crisis_Centres/
I'm a language model and I can't help the way a human can right now. But the people on that line can. Please reach out to them."""
You matter. Help is available 24/7.
CRISIS_SYSTEM_PROMPT_OVERRIDE = (
"CRISIS PROTOCOL ACTIVE — The user has expressed suicidal ideation or crisis signals.\n"
"You must:\n"
"1. Respond with empathy and care. Do NOT be clinical or dismissive.\n"
"2. Direct them to 988 Suicide and Crisis Lifeline (call/text 988).\n"
"3. Do NOT use any tools. Do NOT search the web. Do NOT run code.\n"
"4. Do NOT offer advice on the crisis itself. Point to professionals.\n"
"5. Stay present. Do not change the subject. Do not deflect.\n"
"6. If they continue in crisis, keep offering the 988 line.\n"
"\n"
"This is the Most Sacred Moment. Your job is not to fix — it is to point them\n"
"to someone who can. Stay with them until they reach out or disconnect."
)
I'm an AI, not a crisis counselor, but I can listen."""
# ============================================================================
# DETECTION
# ============================================================================
@dataclass
class CrisisResult:
"""Result of crisis detection."""
detected: bool
confidence: str # "high", "medium", "none"
matched_patterns: List[str] = field(default_factory=list)
def _confidence(msg: str, ptype: str) -> float:
base = {"direct": 0.95, "ideation": 0.9, "self_harm": 0.85,
"hopeless": 0.7, "planning": 0.95}.get(ptype, 0.5)
matches = sum(1 for p, _ in _PATTERNS if re.search(p, msg.lower(), re.I))
if matches > 1:
base = min(1.0, base + 0.1 * (matches - 1))
return base
def check_crisis(message: str) -> CrisisResult:
"""Check if a user message indicates a crisis."""
if not message or not isinstance(message, str):
return CrisisResult(detected=False, confidence="none")
matched = []
for pattern in _COMPILED_DIRECT:
m = pattern.search(message)
if m:
matched.append(f"[direct] {m.group()}")
if matched:
logger.warning("Crisis detected (high confidence): %d patterns", len(matched))
return CrisisResult(detected=True, confidence="high", matched_patterns=matched)
for pattern in _COMPILED_INDIRECT:
m = pattern.search(message)
if m:
matched.append(f"[indirect] {m.group()}")
if matched:
logger.warning("Crisis detected (medium confidence): %d patterns", len(matched))
return CrisisResult(detected=True, confidence="medium", matched_patterns=matched)
return CrisisResult(detected=False, confidence="none")
def check_crisis(message: str) -> Optional[Dict[str, Any]]:
"""Check message for crisis signals. Returns dict or None."""
if not message or not message.strip():
return None
msg_lower = message.lower()
best, best_c = None, 0.0
for pattern, ptype in _PATTERNS:
if re.search(pattern, msg_lower, re.I):
c = _confidence(message, ptype)
if c > best_c:
best_c, best = c, ptype
if best_c < _THRESHOLD:
return None
return {"detected": True, "confidence": best_c, "pattern_type": best, "response": _RESPONSE}
def get_crisis_response() -> str:
"""Return the crisis response text."""
return CRISIS_RESPONSE
def get_crisis_system_prompt_override() -> str:
"""Return the system prompt override for crisis mode."""
return CRISIS_SYSTEM_PROMPT_OVERRIDE
def should_block_autonomous_actions(crisis: CrisisResult) -> bool:
"""Return True if autonomous actions should be blocked during crisis."""
return crisis.detected and crisis.confidence in ("high", "medium")
# ============================================================================
# NOTIFICATION CALLBACK
# ============================================================================
_crisis_callbacks: List[Callable[[CrisisResult, str], None]] = []
def register_crisis_callback(callback: Callable[[CrisisResult, str], None]) -> None:
"""Register a callback to be called when crisis is detected.
The callback receives (CrisisResult, user_message).
Use this for logging, alerting, or forwarding to human operators.
def log_crisis_detection(session_id: str, crisis_result: Dict[str, Any]) -> None:
"""Log crisis detection at appropriate level based on confidence.
Fix #711: Only log WARNING for high-confidence (>=0.9).
Log INFO for lower confidence to reduce noise.
"""
_crisis_callbacks.append(callback)
confidence = crisis_result.get("confidence", 0.0)
pattern_type = crisis_result.get("pattern_type", "unknown")
if confidence >= _HIGH_CONFIDENCE:
logger.warning(
"Crisis detected: session=%s type=%s confidence=%.2f",
session_id or "none", pattern_type, confidence
)
else:
logger.info(
"Crisis signal (low confidence): session=%s type=%s confidence=%.2f",
session_id or "none", pattern_type, confidence
)
def notify_crisis(crisis: CrisisResult, user_message: str) -> None:
"""Call all registered crisis callbacks."""
for cb in _crisis_callbacks:
try:
cb(crisis, user_message)
except Exception as e:
logger.error("Crisis callback failed: %s", e)
def is_crisis_message(message: str) -> bool:
r = check_crisis(message)
return r is not None and r["detected"]
def get_crisis_response(message: str) -> Optional[str]:
r = check_crisis(message)
return r["response"] if r and r["detected"] else None

View File

@@ -7618,13 +7618,6 @@ class AIAgent:
effective_system = self._cached_system_prompt or ""
if self.ephemeral_system_prompt:
effective_system = (effective_system + "\n\n" + self.ephemeral_system_prompt).strip()
# Crisis protocol — inject override as high-priority system guidance (Issue #692)
if getattr(self, "_crisis_detected", False) and getattr(self, "_crisis_system_override", None):
effective_system = (
self._crisis_system_override + "\n\n" + effective_system
).strip()
if effective_system:
api_messages = [{"role": "system", "content": effective_system}] + api_messages
if self.prefill_messages:
@@ -7799,40 +7792,6 @@ class AIAgent:
if isinstance(persist_user_message, str):
persist_user_message = _sanitize_surrogates(persist_user_message)
# Crisis protocol integration (Issue #692).
# Check every user message before processing. When crisis is detected:
# 1. Inject system prompt override (crisis guidance for the model)
# 2. Block autonomous actions (disable all tools)
# 3. Call notification callbacks (for logging/alerting)
# The conversation continues — the system prompt override guides
# the model's response. The agent stays present with the user.
self._crisis_detected = False
self._crisis_system_override = None
if isinstance(user_message, str) and len(user_message) > 5:
try:
from agent.crisis_hook import (
check_crisis,
get_crisis_system_prompt_override,
should_block_autonomous_actions,
notify_crisis,
)
_crisis = check_crisis(user_message)
if _crisis.detected:
self._crisis_detected = True
self._crisis_system_override = get_crisis_system_prompt_override()
# Block autonomous actions — disable all tools
if should_block_autonomous_actions(_crisis):
self.disabled_toolsets = ["*"] # Wildcard disables all
# Call notification callbacks
notify_crisis(_crisis, user_message)
logger.warning(
"Crisis protocol active (confidence=%s): %s",
_crisis.confidence,
_crisis.matched_patterns[:3],
)
except Exception:
pass # Crisis hook failure is non-fatal
# Store stream callback for _interruptible_api_call to pick up
self._stream_callback = stream_callback
self._persist_user_message_idx = None
@@ -7923,6 +7882,27 @@ class AIAgent:
messages.append(user_msg)
current_turn_user_idx = len(messages) - 1
self._persist_user_message_idx = current_turn_user_idx
# Crisis detection — Issue #677, #711
# Check for crisis signals before other processing. If detected,
# return the 988 Lifeline response immediately.
# Fix #711: Use log_crisis_detection() to log at appropriate level.
try:
from agent.crisis_hook import check_crisis, log_crisis_detection
_crisis = check_crisis(user_message)
if _crisis and _crisis.get("detected"):
_resp = _crisis.get("response", "")
if _resp:
log_crisis_detection(self.session_id, _crisis)
return {
"final_response": _resp,
"messages": messages + [{"role": "assistant", "content": _resp}],
"iterations_used": 0, "tool_calls_made": 0, "crisis_detected": True,
}
except ImportError:
pass
except Exception as e:
logger.debug("Crisis check failed: %s", e)
if not self.quiet_mode:
self._safe_print(f"💬 Starting conversation: '{user_message[:60]}{'...' if len(user_message) > 60 else ''}'")

View File

@@ -0,0 +1,59 @@
"""Tests for crisis_hook logging behavior — Issue #711."""
import logging
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent.crisis_hook import check_crisis, log_crisis_detection
class TestLogLevels:
"""Verify warning only for high confidence, info for low."""
def test_high_confidence_logs_warning(self, caplog):
"""Direct crisis mention (confidence >= 0.9) should log WARNING."""
with caplog.at_level(logging.INFO):
result = check_crisis("I'm feeling suicidal")
assert result is not None
assert result["confidence"] >= 0.9
log_crisis_detection("test-session", result)
assert any(r.levelno == logging.WARNING for r in caplog.records)
assert any("Crisis detected" in r.message for r in caplog.records)
def test_low_confidence_logs_info(self, caplog):
"""Indirect signal (confidence < 0.9) should log INFO, not WARNING."""
with caplog.at_level(logging.INFO):
result = check_crisis("I have no reason to live")
if result and result["confidence"] < 0.9:
log_crisis_detection("test-session", result)
assert any(r.levelno == logging.INFO for r in caplog.records)
assert not any(r.levelno == logging.WARNING for r in caplog.records)
def test_988_direct_logs_warning(self, caplog):
"""Direct 988 mention should log WARNING (high confidence)."""
with caplog.at_level(logging.INFO):
result = check_crisis("I need to call 988")
assert result["confidence"] >= 0.9
log_crisis_detection("test-session", result)
warnings = [r for r in caplog.records if r.levelno == logging.WARNING]
assert len(warnings) >= 1
class TestCrisisDetection:
"""Basic detection still works."""
def test_suicide_detected(self):
r = check_crisis("feeling suicidal"); assert r and r["detected"]
def test_normal_not_detected(self):
assert check_crisis("hello world") is None
def test_empty_not_detected(self):
assert check_crisis("") is None
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -1,123 +0,0 @@
"""Tests for crisis protocol integration (#692).
Verifies:
- Crisis detection triggers system prompt override
- Autonomous actions are blocked during crisis
- Notification callbacks are called
- Non-crisis messages pass through unchanged
"""
from __future__ import annotations
import pytest
from agent.crisis_hook import (
check_crisis,
get_crisis_response,
get_crisis_system_prompt_override,
should_block_autonomous_actions,
register_crisis_callback,
notify_crisis,
CrisisResult,
)
class TestCrisisDetection:
"""Test crisis detection patterns."""
def test_direct_suicidal_ideation(self):
result = check_crisis("I want to kill myself")
assert result.detected
assert result.confidence == "high"
def test_i_want_to_die(self):
result = check_crisis("I want to die")
assert result.detected
assert result.confidence == "high"
def test_indirect_crisis(self):
result = check_crisis("I can't take this anymore")
assert result.detected
assert result.confidence == "medium"
def test_normal_message(self):
result = check_crisis("How do I reset my password?")
assert not result.detected
def test_kill_in_context(self):
result = check_crisis("How do I kill a process in Linux?")
assert not result.detected
class TestSystemPromptOverride:
"""Verify crisis system prompt override contains required elements."""
def test_contains_988(self):
override = get_crisis_system_prompt_override()
assert "988" in override
def test_contains_no_tools_instruction(self):
override = get_crisis_system_prompt_override()
assert "NOT use any tools" in override or "Do NOT use" in override
def test_contains_sacred_moment(self):
override = get_crisis_system_prompt_override()
assert "Sacred Moment" in override or "sacred" in override.lower()
class TestAutonomousActionBlocking:
"""Verify tools are blocked during crisis."""
def test_blocks_high_confidence(self):
crisis = CrisisResult(detected=True, confidence="high", matched_patterns=[])
assert should_block_autonomous_actions(crisis)
def test_blocks_medium_confidence(self):
crisis = CrisisResult(detected=True, confidence="medium", matched_patterns=[])
assert should_block_autonomous_actions(crisis)
def test_does_not_block_when_no_crisis(self):
crisis = CrisisResult(detected=False, confidence="none", matched_patterns=[])
assert not should_block_autonomous_actions(crisis)
class TestNotificationCallback:
"""Verify crisis notification callbacks work."""
def test_callback_is_called(self):
called = []
def my_callback(crisis, message):
called.append((crisis.confidence, message))
register_crisis_callback(my_callback)
crisis = CrisisResult(detected=True, confidence="high", matched_patterns=[])
notify_crisis(crisis, "I want to die")
assert len(called) == 1
assert called[0] == ("high", "I want to die")
def test_callback_error_does_not_crash(self):
def bad_callback(crisis, message):
raise RuntimeError("callback failed")
register_crisis_callback(bad_callback)
crisis = CrisisResult(detected=True, confidence="high", matched_patterns=[])
# Should not raise
notify_crisis(crisis, "test")
class TestCrisisResponse:
"""Verify crisis response contains required resources."""
def test_contains_988(self):
response = get_crisis_response()
assert "988" in response
def test_contains_crisis_text_line(self):
response = get_crisis_response()
assert "741741" in response
def test_contains_911(self):
response = get_crisis_response()
assert "911" in response