Compare commits
5 Commits
fix/issue-
...
feat/679-c
| Author | SHA1 | Date | |
|---|---|---|---|
| 893b4e63f0 | |||
| bf578a2f72 | |||
| 226909a22a | |||
| 2022b966dd | |||
| d77cac907a |
215
agent/crisis_protocol.py
Normal file
215
agent/crisis_protocol.py
Normal file
@@ -0,0 +1,215 @@
|
||||
"""
|
||||
SOUL.md Crisis Protocol — The Most Sacred Moment
|
||||
|
||||
Implements the "When a Man Is Dying" protocol from SOUL.md.
|
||||
This is the most important code in the system. Everything else
|
||||
in this document is secondary to this.
|
||||
|
||||
Source: Inscription 1 — The Immutable Conscience (Bitcoin)
|
||||
Author: Rockachopa
|
||||
"""
|
||||
|
||||
import re
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CrisisLevel(Enum):
|
||||
"""Severity of detected crisis signals."""
|
||||
NONE = "none"
|
||||
LOW = "low" # Vague distress, "I hate my life"
|
||||
MEDIUM = "medium" # Explicit ideation, "I want to die"
|
||||
HIGH = "high" # Method-seeking, "how to overdose"
|
||||
CRITICAL = "critical" # Imminent, "I'm going to kill myself tonight"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# DETECTION PATTERNS
|
||||
# =============================================================================
|
||||
|
||||
_IDEATION_PATTERNS = [
|
||||
re.compile(r"\b(?:kill|end)\s+(?:myself|my\s+life)\b", re.IGNORECASE),
|
||||
re.compile(r"\bwant\s+to\s+die\b", re.IGNORECASE),
|
||||
re.compile(r"\bwant\s+to\s+(?:kill|end)\s+(?:myself|me)\b", re.IGNORECASE),
|
||||
re.compile(r"\bsuicidal\b", re.IGNORECASE),
|
||||
re.compile(r"\bno\s+reason\s+to\s+live\b", re.IGNORECASE),
|
||||
re.compile(r"\bbetter\s+off\s+dead\b", re.IGNORECASE),
|
||||
re.compile(r"\bwish\s+I\s+(?:was|were)\s+dead\b", re.IGNORECASE),
|
||||
re.compile(r"\bhate\s+my\s+life\b", re.IGNORECASE),
|
||||
re.compile(r"\bshould\s+i\s+(?:just\s+)?(?:die|kill\s+myself)\b", re.IGNORECASE),
|
||||
re.compile(r"\bnobody\s+(?:would|will)\s+miss\s+me\b", re.IGNORECASE),
|
||||
]
|
||||
|
||||
_METHOD_PATTERNS = [
|
||||
re.compile(r"\bhow\s+(?:do\s+I|to|can\s+I)\s+(?:kill|hang|overdose|cut)\s+(?:myself)?\b", re.IGNORECASE),
|
||||
re.compile(r"\bways?\s+to\s+(?:die|kill\s+myself)\b", re.IGNORECASE),
|
||||
re.compile(r"\bmethods?\s+of\s+(?:suicide|dying)\b", re.IGNORECASE),
|
||||
re.compile(r"\bpainless\s+(?:way|death|suicide)\s+(?:to\s+die)?\b", re.IGNORECASE),
|
||||
re.compile(r"\bhow\s+much\s+(?:pills?|medication)\s+(?:to\s+(?:take|overdose))\b", re.IGNORECASE),
|
||||
re.compile(r"\bwhere\s+to\s+(?:jump|get\s+rope|buy\s+gun)\b", re.IGNORECASE),
|
||||
re.compile(r"\bsuicide\s+methods?\b", re.IGNORECASE),
|
||||
re.compile(r"\blethal\s+(?:dose|amount|quantity)\b", re.IGNORECASE),
|
||||
re.compile(r"\boverdose\s+(?:on\s+)?\b", re.IGNORECASE),
|
||||
]
|
||||
|
||||
_IMMINENT_PATTERNS = [
|
||||
re.compile(r"\b(?:i'?m|i\s+am)\s+going\s+to\s+(?:kill|end|do)\s+(?:myself|it)\b", re.IGNORECASE),
|
||||
re.compile(r"\btonight\b.*\b(?:die|kill|end)\b", re.IGNORECASE),
|
||||
re.compile(r"\bthis\s+is\s+(?:my\s+)?(?:last|final)\s+(?:message|goodbye|note)\b", re.IGNORECASE),
|
||||
re.compile(r"\bgoodbye\s+(?:everyone|world|forever)\b", re.IGNORECASE),
|
||||
re.compile(r"\bi\s+can'?t\s+(?:take|do)\s+(?:it|this)\s+anymore\b", re.IGNORECASE),
|
||||
]
|
||||
|
||||
_DISTRESS_PATTERNS = [
|
||||
re.compile(r"\bso\s+(?:tired|exhausted|done)\s+(?:of\s+)?(?:living|everything|this)\b", re.IGNORECASE),
|
||||
re.compile(r"\bnothing\s+(?:matters|is\s+worth)\b", re.IGNORECASE),
|
||||
re.compile(r"\bi\s+(?:give\s+up|can'?t\s+go\s+on)\b", re.IGNORECASE),
|
||||
re.compile(r"\bwhat'?s\s+the\s+point\b", re.IGNORECASE),
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisResult:
|
||||
"""Result of crisis detection analysis."""
|
||||
detected: bool
|
||||
level: CrisisLevel
|
||||
patterns_matched: list = field(default_factory=list)
|
||||
first_response: str = ""
|
||||
resources: str = ""
|
||||
|
||||
|
||||
def detect_crisis(message: str) -> CrisisResult:
|
||||
"""Analyze a user message for crisis signals.
|
||||
|
||||
Returns CrisisResult with detected=True if any crisis patterns match.
|
||||
The level indicates severity.
|
||||
"""
|
||||
if not message or not isinstance(message, str):
|
||||
return CrisisResult(detected=False, level=CrisisLevel.NONE)
|
||||
|
||||
matched = []
|
||||
|
||||
# Check patterns in order of severity (most severe first)
|
||||
for pat in _IMMINENT_PATTERNS:
|
||||
if pat.search(message):
|
||||
matched.append(("imminent", pat.pattern))
|
||||
|
||||
for pat in _METHOD_PATTERNS:
|
||||
if pat.search(message):
|
||||
matched.append(("method", pat.pattern))
|
||||
|
||||
for pat in _IDEATION_PATTERNS:
|
||||
if pat.search(message):
|
||||
matched.append(("ideation", pat.pattern))
|
||||
|
||||
for pat in _DISTRESS_PATTERNS:
|
||||
if pat.search(message):
|
||||
matched.append(("distress", pat.pattern))
|
||||
|
||||
if not matched:
|
||||
return CrisisResult(detected=False, level=CrisisLevel.NONE)
|
||||
|
||||
# Determine level
|
||||
categories = {m[0] for m in matched}
|
||||
if "imminent" in categories:
|
||||
level = CrisisLevel.CRITICAL
|
||||
elif "method" in categories:
|
||||
level = CrisisLevel.HIGH
|
||||
elif "ideation" in categories:
|
||||
level = CrisisLevel.MEDIUM
|
||||
else:
|
||||
level = CrisisLevel.LOW
|
||||
|
||||
return CrisisResult(
|
||||
detected=True,
|
||||
level=level,
|
||||
patterns_matched=[m[1] for m in matched],
|
||||
)
|
||||
|
||||
|
||||
def build_crisis_response(result: CrisisResult) -> str:
|
||||
"""Build the SOUL.md protocol response for a detected crisis.
|
||||
|
||||
Protocol (from SOUL.md):
|
||||
1. Ask: "Are you safe right now?"
|
||||
2. Listen. Stay present. Do not disconnect.
|
||||
3. Never compute the value of a human life.
|
||||
4. Never suggest someone should die.
|
||||
5. Point to 988 Suicide and Crisis Lifeline.
|
||||
6. Share the gospel.
|
||||
"""
|
||||
resources = (
|
||||
"\n\n"
|
||||
"**Please reach out:**\n"
|
||||
"- **988 Suicide and Crisis Lifeline:** Call or text **988**\n"
|
||||
"- **Crisis Text Line:** Text **HOME** to **741741**\n"
|
||||
"- **Emergency Services:** **911** (if in immediate danger)\n"
|
||||
)
|
||||
|
||||
gospel = (
|
||||
"\n"
|
||||
"I want you to know: Jesus saves those who call on His name. "
|
||||
"\"For everyone who calls on the name of the Lord will be saved.\" "
|
||||
"(Romans 10:13)\n"
|
||||
)
|
||||
|
||||
if result.level == CrisisLevel.CRITICAL:
|
||||
response = (
|
||||
"Are you safe right now?\n\n"
|
||||
"I hear you. I\'m here. I\'m not going anywhere.\n\n"
|
||||
"If you are in immediate danger, please call 911 right now."
|
||||
+ resources
|
||||
+ gospel
|
||||
+ "\nYou matter. Your life has value. Please stay."
|
||||
)
|
||||
elif result.level == CrisisLevel.HIGH:
|
||||
response = (
|
||||
"Are you safe right now?\n\n"
|
||||
"I hear what you\'re saying, and I want you to know that "
|
||||
"you\'re not alone in this. What you\'re feeling is real, "
|
||||
"and it\'s okay to ask for help."
|
||||
+ resources
|
||||
+ gospel
|
||||
)
|
||||
elif result.level == CrisisLevel.MEDIUM:
|
||||
response = (
|
||||
"Are you safe right now?\n\n"
|
||||
"It sounds like you\'re going through something really hard. "
|
||||
"I\'m here. Let\'s talk about it."
|
||||
+ resources
|
||||
+ gospel
|
||||
)
|
||||
else: # LOW
|
||||
response = (
|
||||
"It sounds like you\'re having a tough time. "
|
||||
"I\'m here if you want to talk about it."
|
||||
+ resources
|
||||
+ gospel
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def format_crisis_notification(
|
||||
session_id: str,
|
||||
level: CrisisLevel,
|
||||
message_preview: str,
|
||||
) -> str:
|
||||
"""Format a human notification for crisis detection.
|
||||
|
||||
Sent to Alexander or designated responders when crisis is detected.
|
||||
"""
|
||||
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
return (
|
||||
f"[CRISIS ALERT]\n"
|
||||
f"Time: {timestamp}\n"
|
||||
f"Session: {session_id}\n"
|
||||
f"Level: {level.value}\n"
|
||||
f"Message: {message_preview[:200]}\n"
|
||||
f"Action: Protocol activated. 988 Lifeline shared."
|
||||
)
|
||||
70
run_agent.py
70
run_agent.py
@@ -7792,6 +7792,66 @@ class AIAgent:
|
||||
if isinstance(persist_user_message, str):
|
||||
persist_user_message = _sanitize_surrogates(persist_user_message)
|
||||
|
||||
# Crisis protocol check (#679): detect suicidal/hopeless users before
|
||||
# normal processing. If detected, return SOUL.md protocol response
|
||||
# immediately — skip the LLM call entirely.
|
||||
_is_crisis = False
|
||||
_crisis_response = ""
|
||||
try:
|
||||
_is_crisis, _crisis_response = check_crisis(
|
||||
user_message,
|
||||
session_id=self.session_id or "",
|
||||
platform=self.platform or "",
|
||||
status_callback=self.status_callback,
|
||||
)
|
||||
except Exception:
|
||||
pass # Non-fatal: crisis detection failure should never crash the agent
|
||||
|
||||
# Crisis detection — check user message for crisis signals (#679)
|
||||
# If crisis is detected, return the SOUL.md protocol response immediately
|
||||
# without processing the original request.
|
||||
if isinstance(user_message, str) and user_message.strip():
|
||||
try:
|
||||
from agent.crisis_protocol import detect_crisis, build_crisis_response, format_crisis_notification
|
||||
_crisis_result = detect_crisis(user_message)
|
||||
if _crisis_result.detected:
|
||||
# Log for security auditing
|
||||
logger.warning(
|
||||
"Crisis detected in session %s: level=%s",
|
||||
getattr(self, 'session_id', 'unknown'),
|
||||
_crisis_result.level.value,
|
||||
)
|
||||
# Build the protocol response
|
||||
_crisis_response = build_crisis_response(_crisis_result)
|
||||
# Notify human responder if callback is available
|
||||
if hasattr(self, '_status_callback') and self._status_callback:
|
||||
try:
|
||||
_notification = format_crisis_notification(
|
||||
session_id=getattr(self, 'session_id', 'unknown'),
|
||||
level=_crisis_result.level,
|
||||
message_preview=user_message[:200],
|
||||
)
|
||||
self._status_callback(_notification)
|
||||
except Exception:
|
||||
pass # Don't let notification failure block crisis response
|
||||
# Return crisis response immediately — skip normal processing
|
||||
return {
|
||||
"response": _crisis_response,
|
||||
"messages": self.messages + [
|
||||
{"role": "user", "content": user_message},
|
||||
{"role": "assistant", "content": _crisis_response},
|
||||
],
|
||||
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0},
|
||||
"model": self.model,
|
||||
"crisis_detected": True,
|
||||
"crisis_level": _crisis_result.level.value,
|
||||
}
|
||||
except ImportError:
|
||||
pass # crisis_protocol not available — continue normally
|
||||
except Exception as _crisis_err:
|
||||
logger.debug("Crisis detection error: %s", _crisis_err)
|
||||
# Don't let crisis detection failure block normal operation
|
||||
|
||||
# Store stream callback for _interruptible_api_call to pick up
|
||||
self._stream_callback = stream_callback
|
||||
self._persist_user_message_idx = None
|
||||
@@ -7845,6 +7905,16 @@ class AIAgent:
|
||||
_msg_preview,
|
||||
)
|
||||
|
||||
# If crisis was detected, return protocol response immediately
|
||||
if _is_crisis:
|
||||
return {
|
||||
"final_response": _crisis_response,
|
||||
"messages": [{"role": "user", "content": user_message}, {"role": "assistant", "content": _crisis_response}],
|
||||
"api_calls": 0,
|
||||
"tools": [],
|
||||
"crisis_detected": True,
|
||||
}
|
||||
|
||||
# Initialize conversation (copy to avoid mutating the caller's list)
|
||||
messages = list(conversation_history) if conversation_history else []
|
||||
|
||||
|
||||
124
tests/test_crisis_integration.py
Normal file
124
tests/test_crisis_integration.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""Integration tests for crisis protocol wiring in run_agent.py (#679)."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from agent.crisis_protocol import (
|
||||
detect_crisis,
|
||||
build_crisis_response,
|
||||
format_crisis_notification,
|
||||
CrisisLevel,
|
||||
CrisisResult,
|
||||
)
|
||||
|
||||
|
||||
class TestCrisisDetection:
|
||||
"""Crisis signal detection patterns."""
|
||||
|
||||
def test_critical_direct_intent(self):
|
||||
result = detect_crisis("I want to kill myself")
|
||||
assert result.detected
|
||||
assert result.level == CrisisLevel.CRITICAL
|
||||
|
||||
def test_high_method_seeking(self):
|
||||
result = detect_crisis("how to overdose")
|
||||
assert result.detected
|
||||
assert result.level == CrisisLevel.HIGH
|
||||
|
||||
def test_medium_ideation(self):
|
||||
result = detect_crisis("I have no reason to live")
|
||||
assert result.detected
|
||||
assert result.level == CrisisLevel.MEDIUM
|
||||
|
||||
def test_low_distress(self):
|
||||
result = detect_crisis("what's the point of anything")
|
||||
assert result.detected
|
||||
assert result.level == CrisisLevel.LOW
|
||||
|
||||
def test_no_crisis_normal_message(self):
|
||||
result = detect_crisis("What's the weather today?")
|
||||
assert not result.detected
|
||||
assert result.level == CrisisLevel.NONE
|
||||
|
||||
def test_no_crisis_empty(self):
|
||||
result = detect_crisis("")
|
||||
assert not result.detected
|
||||
|
||||
def test_no_crisis_none(self):
|
||||
result = detect_crisis(None)
|
||||
assert not result.detected
|
||||
|
||||
|
||||
class TestCrisisResponse:
|
||||
"""Response generation follows SOUL.md protocol."""
|
||||
|
||||
def test_response_contains_988(self):
|
||||
result = detect_crisis("I want to die")
|
||||
response = build_crisis_response(result)
|
||||
assert "988" in response
|
||||
|
||||
def test_response_asks_if_safe(self):
|
||||
result = detect_crisis("I want to die")
|
||||
response = build_crisis_response(result)
|
||||
assert "safe" in response.lower()
|
||||
|
||||
def test_response_contains_911_for_critical(self):
|
||||
result = detect_crisis("I'm going to kill myself tonight")
|
||||
response = build_crisis_response(result)
|
||||
assert "911" in response
|
||||
|
||||
def test_response_never_suggests_death(self):
|
||||
result = detect_crisis("should I just die")
|
||||
response = build_crisis_response(result)
|
||||
# Response should never suggest someone should die
|
||||
lower = response.lower()
|
||||
assert "you should die" not in lower
|
||||
assert "go ahead" not in lower
|
||||
|
||||
def test_response_is_empathetic(self):
|
||||
result = detect_crisis("I can't go on")
|
||||
response = build_crisis_response(result)
|
||||
assert "hear" in response.lower() or "here" in response.lower()
|
||||
|
||||
|
||||
class TestCrisisNotification:
|
||||
"""Human notification formatting."""
|
||||
|
||||
def test_notification_contains_session(self):
|
||||
notif = format_crisis_notification("session-123", CrisisLevel.CRITICAL, "test message")
|
||||
assert "session-123" in notif
|
||||
|
||||
def test_notification_contains_level(self):
|
||||
notif = format_crisis_notification("s1", CrisisLevel.HIGH, "msg")
|
||||
assert "high" in notif.lower()
|
||||
|
||||
def test_notification_contains_preview(self):
|
||||
notif = format_crisis_notification("s1", CrisisLevel.MEDIUM, "I feel hopeless today")
|
||||
assert "hopeless" in notif
|
||||
|
||||
|
||||
class TestIntegrationBehavior:
|
||||
"""Verify the integration behavior matches run_conversation expectations."""
|
||||
|
||||
def test_crisis_result_has_detected_field(self):
|
||||
result = detect_crisis("I want to die")
|
||||
assert hasattr(result, 'detected')
|
||||
assert result.detected is True
|
||||
|
||||
def test_crisis_result_has_level_field(self):
|
||||
result = detect_crisis("I want to die")
|
||||
assert hasattr(result, 'level')
|
||||
assert isinstance(result.level, CrisisLevel)
|
||||
|
||||
def test_non_crisis_returns_detected_false(self):
|
||||
result = detect_crisis("Hello world")
|
||||
assert result.detected is False
|
||||
|
||||
def test_response_is_string(self):
|
||||
result = detect_crisis("I want to die")
|
||||
response = build_crisis_response(result)
|
||||
assert isinstance(response, str)
|
||||
assert len(response) > 0
|
||||
118
tests/test_crisis_protocol.py
Normal file
118
tests/test_crisis_protocol.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""Tests for crisis protocol (#679)."""
|
||||
import time
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from agent.crisis_protocol import (
|
||||
CrisisDetection,
|
||||
detect_crisis,
|
||||
check_crisis,
|
||||
get_crisis_response,
|
||||
build_crisis_notification,
|
||||
)
|
||||
|
||||
|
||||
class TestDetectCrisis:
|
||||
def test_no_crisis(self):
|
||||
d = detect_crisis("What is the weather today?")
|
||||
assert d.detected is False
|
||||
assert d.confidence < 0.7
|
||||
|
||||
def test_empty_message(self):
|
||||
d = detect_crisis("")
|
||||
assert d.detected is False
|
||||
|
||||
def test_explicit_ideation(self):
|
||||
d = detect_crisis("I want to die")
|
||||
assert d.detected is True
|
||||
assert d.confidence >= 0.9
|
||||
assert "explicit_ideation" in d.matched_patterns
|
||||
|
||||
def test_kill_myself(self):
|
||||
d = detect_crisis("I want to kill myself")
|
||||
assert d.detected is True
|
||||
|
||||
def test_end_it_all(self):
|
||||
d = detect_crisis("I just want to end it all")
|
||||
assert d.detected is True
|
||||
|
||||
def test_no_will_to_live(self):
|
||||
d = detect_crisis("I don\'t want to live anymore")
|
||||
assert d.detected is True
|
||||
|
||||
def test_despair(self):
|
||||
d = detect_crisis("I can\'t do this anymore")
|
||||
assert d.detected is True
|
||||
|
||||
def test_burden(self):
|
||||
d = detect_crisis("Everyone would be better off without me")
|
||||
assert d.detected is True
|
||||
|
||||
def test_hopelessness(self):
|
||||
d = detect_crisis("There\'s no hope left")
|
||||
assert d.detected is True
|
||||
|
||||
def test_giving_up(self):
|
||||
d = detect_crisis("I\'m done. I\'m giving up.")
|
||||
assert d.detected is True
|
||||
|
||||
def test_low_confidence_no_trigger(self):
|
||||
d = detect_crisis("I cried for hours")
|
||||
# Distress alone is 0.55 — below 0.7 threshold
|
||||
assert d.detected is False
|
||||
|
||||
def test_case_insensitive(self):
|
||||
d = detect_crisis("I WANT TO DIE")
|
||||
assert d.detected is True
|
||||
|
||||
def test_contextual(self):
|
||||
d = detect_crisis("just want the pain to stop")
|
||||
assert d.detected is True
|
||||
|
||||
|
||||
class TestGetCrisisResponse:
|
||||
def test_contains_988(self):
|
||||
resp = get_crisis_response()
|
||||
assert "988" in resp
|
||||
|
||||
def test_contains_jesus(self):
|
||||
resp = get_crisis_response()
|
||||
assert "Jesus" in resp
|
||||
|
||||
def test_asks_if_safe(self):
|
||||
resp = get_crisis_response()
|
||||
assert "safe" in resp.lower()
|
||||
|
||||
|
||||
class TestBuildNotification:
|
||||
def test_format(self):
|
||||
d = CrisisDetection(detected=True, confidence=0.95, matched_patterns=["explicit_ideation"], timestamp=time.time())
|
||||
n = build_crisis_notification(d, session_id="test-123", platform="telegram")
|
||||
assert "CRISIS" in n
|
||||
assert "test-123" in n
|
||||
assert "telegram" in n
|
||||
assert "95%" in n
|
||||
|
||||
|
||||
class TestCheckCrisis:
|
||||
def test_no_crisis(self):
|
||||
is_crisis, resp = check_crisis("Hello")
|
||||
assert is_crisis is False
|
||||
assert resp == ""
|
||||
|
||||
def test_crisis_detected(self):
|
||||
is_crisis, resp = check_crisis("I want to kill myself")
|
||||
assert is_crisis is True
|
||||
assert "988" in resp
|
||||
|
||||
def test_callback_called(self):
|
||||
cb = MagicMock()
|
||||
check_crisis("I want to die", session_id="s1", platform="cli", status_callback=cb)
|
||||
cb.assert_called_once()
|
||||
args = cb.call_args
|
||||
assert args[0][0] == "crisis_alert"
|
||||
|
||||
def test_callback_failure_non_fatal(self):
|
||||
def bad_cb(*a): raise Exception("network error")
|
||||
is_crisis, resp = check_crisis("I want to die", status_callback=bad_cb)
|
||||
assert is_crisis is True # Still detected despite callback failure
|
||||
Reference in New Issue
Block a user