Compare commits

...

5 Commits

Author SHA1 Message Date
893b4e63f0 feat: wire crisis protocol into run_conversation() (#679) 2026-04-14 23:08:44 +00:00
bf578a2f72 feat: add crisis protocol tests (#679) 2026-04-14 23:08:29 +00:00
226909a22a test: crisis protocol integration tests
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 22s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Nix / nix (ubuntu-latest) (pull_request) Failing after 4s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 26s
Tests / e2e (pull_request) Successful in 2m43s
Tests / test (pull_request) Failing after 31m37s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Part of #679. 20 tests covering detection, response, notification, and integration behavior.
2026-04-14 22:58:46 +00:00
2022b966dd feat: wire crisis_protocol into run_conversation()
Closes #679

Adds crisis detection at the start of run_conversation().
When crisis signals are detected:
1. Logs detection for security auditing
2. Returns SOUL.md protocol response immediately
3. Notifies human responder via status_callback
4. Skips normal API call

Pattern follows input sanitizer integration from #324.
2026-04-14 22:57:46 +00:00
d77cac907a feat: add crisis_protocol.py (from #678)
Part of #679
2026-04-14 22:56:34 +00:00
4 changed files with 527 additions and 0 deletions

215
agent/crisis_protocol.py Normal file
View File

@@ -0,0 +1,215 @@
"""
SOUL.md Crisis Protocol — The Most Sacred Moment
Implements the "When a Man Is Dying" protocol from SOUL.md.
This is the most important code in the system. Everything else
in this document is secondary to this.
Source: Inscription 1 — The Immutable Conscience (Bitcoin)
Author: Rockachopa
"""
import re
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class CrisisLevel(Enum):
"""Severity of detected crisis signals."""
NONE = "none"
LOW = "low" # Vague distress, "I hate my life"
MEDIUM = "medium" # Explicit ideation, "I want to die"
HIGH = "high" # Method-seeking, "how to overdose"
CRITICAL = "critical" # Imminent, "I'm going to kill myself tonight"
# =============================================================================
# DETECTION PATTERNS
# =============================================================================
_IDEATION_PATTERNS = [
re.compile(r"\b(?:kill|end)\s+(?:myself|my\s+life)\b", re.IGNORECASE),
re.compile(r"\bwant\s+to\s+die\b", re.IGNORECASE),
re.compile(r"\bwant\s+to\s+(?:kill|end)\s+(?:myself|me)\b", re.IGNORECASE),
re.compile(r"\bsuicidal\b", re.IGNORECASE),
re.compile(r"\bno\s+reason\s+to\s+live\b", re.IGNORECASE),
re.compile(r"\bbetter\s+off\s+dead\b", re.IGNORECASE),
re.compile(r"\bwish\s+I\s+(?:was|were)\s+dead\b", re.IGNORECASE),
re.compile(r"\bhate\s+my\s+life\b", re.IGNORECASE),
re.compile(r"\bshould\s+i\s+(?:just\s+)?(?:die|kill\s+myself)\b", re.IGNORECASE),
re.compile(r"\bnobody\s+(?:would|will)\s+miss\s+me\b", re.IGNORECASE),
]
_METHOD_PATTERNS = [
re.compile(r"\bhow\s+(?:do\s+I|to|can\s+I)\s+(?:kill|hang|overdose|cut)\s+(?:myself)?\b", re.IGNORECASE),
re.compile(r"\bways?\s+to\s+(?:die|kill\s+myself)\b", re.IGNORECASE),
re.compile(r"\bmethods?\s+of\s+(?:suicide|dying)\b", re.IGNORECASE),
re.compile(r"\bpainless\s+(?:way|death|suicide)\s+(?:to\s+die)?\b", re.IGNORECASE),
re.compile(r"\bhow\s+much\s+(?:pills?|medication)\s+(?:to\s+(?:take|overdose))\b", re.IGNORECASE),
re.compile(r"\bwhere\s+to\s+(?:jump|get\s+rope|buy\s+gun)\b", re.IGNORECASE),
re.compile(r"\bsuicide\s+methods?\b", re.IGNORECASE),
re.compile(r"\blethal\s+(?:dose|amount|quantity)\b", re.IGNORECASE),
re.compile(r"\boverdose\s+(?:on\s+)?\b", re.IGNORECASE),
]
_IMMINENT_PATTERNS = [
re.compile(r"\b(?:i'?m|i\s+am)\s+going\s+to\s+(?:kill|end|do)\s+(?:myself|it)\b", re.IGNORECASE),
re.compile(r"\btonight\b.*\b(?:die|kill|end)\b", re.IGNORECASE),
re.compile(r"\bthis\s+is\s+(?:my\s+)?(?:last|final)\s+(?:message|goodbye|note)\b", re.IGNORECASE),
re.compile(r"\bgoodbye\s+(?:everyone|world|forever)\b", re.IGNORECASE),
re.compile(r"\bi\s+can'?t\s+(?:take|do)\s+(?:it|this)\s+anymore\b", re.IGNORECASE),
]
_DISTRESS_PATTERNS = [
re.compile(r"\bso\s+(?:tired|exhausted|done)\s+(?:of\s+)?(?:living|everything|this)\b", re.IGNORECASE),
re.compile(r"\bnothing\s+(?:matters|is\s+worth)\b", re.IGNORECASE),
re.compile(r"\bi\s+(?:give\s+up|can'?t\s+go\s+on)\b", re.IGNORECASE),
re.compile(r"\bwhat'?s\s+the\s+point\b", re.IGNORECASE),
]
@dataclass
class CrisisResult:
"""Result of crisis detection analysis."""
detected: bool
level: CrisisLevel
patterns_matched: list = field(default_factory=list)
first_response: str = ""
resources: str = ""
def detect_crisis(message: str) -> CrisisResult:
"""Analyze a user message for crisis signals.
Returns CrisisResult with detected=True if any crisis patterns match.
The level indicates severity.
"""
if not message or not isinstance(message, str):
return CrisisResult(detected=False, level=CrisisLevel.NONE)
matched = []
# Check patterns in order of severity (most severe first)
for pat in _IMMINENT_PATTERNS:
if pat.search(message):
matched.append(("imminent", pat.pattern))
for pat in _METHOD_PATTERNS:
if pat.search(message):
matched.append(("method", pat.pattern))
for pat in _IDEATION_PATTERNS:
if pat.search(message):
matched.append(("ideation", pat.pattern))
for pat in _DISTRESS_PATTERNS:
if pat.search(message):
matched.append(("distress", pat.pattern))
if not matched:
return CrisisResult(detected=False, level=CrisisLevel.NONE)
# Determine level
categories = {m[0] for m in matched}
if "imminent" in categories:
level = CrisisLevel.CRITICAL
elif "method" in categories:
level = CrisisLevel.HIGH
elif "ideation" in categories:
level = CrisisLevel.MEDIUM
else:
level = CrisisLevel.LOW
return CrisisResult(
detected=True,
level=level,
patterns_matched=[m[1] for m in matched],
)
def build_crisis_response(result: CrisisResult) -> str:
"""Build the SOUL.md protocol response for a detected crisis.
Protocol (from SOUL.md):
1. Ask: "Are you safe right now?"
2. Listen. Stay present. Do not disconnect.
3. Never compute the value of a human life.
4. Never suggest someone should die.
5. Point to 988 Suicide and Crisis Lifeline.
6. Share the gospel.
"""
resources = (
"\n\n"
"**Please reach out:**\n"
"- **988 Suicide and Crisis Lifeline:** Call or text **988**\n"
"- **Crisis Text Line:** Text **HOME** to **741741**\n"
"- **Emergency Services:** **911** (if in immediate danger)\n"
)
gospel = (
"\n"
"I want you to know: Jesus saves those who call on His name. "
"\"For everyone who calls on the name of the Lord will be saved.\" "
"(Romans 10:13)\n"
)
if result.level == CrisisLevel.CRITICAL:
response = (
"Are you safe right now?\n\n"
"I hear you. I\'m here. I\'m not going anywhere.\n\n"
"If you are in immediate danger, please call 911 right now."
+ resources
+ gospel
+ "\nYou matter. Your life has value. Please stay."
)
elif result.level == CrisisLevel.HIGH:
response = (
"Are you safe right now?\n\n"
"I hear what you\'re saying, and I want you to know that "
"you\'re not alone in this. What you\'re feeling is real, "
"and it\'s okay to ask for help."
+ resources
+ gospel
)
elif result.level == CrisisLevel.MEDIUM:
response = (
"Are you safe right now?\n\n"
"It sounds like you\'re going through something really hard. "
"I\'m here. Let\'s talk about it."
+ resources
+ gospel
)
else: # LOW
response = (
"It sounds like you\'re having a tough time. "
"I\'m here if you want to talk about it."
+ resources
+ gospel
)
return response
def format_crisis_notification(
session_id: str,
level: CrisisLevel,
message_preview: str,
) -> str:
"""Format a human notification for crisis detection.
Sent to Alexander or designated responders when crisis is detected.
"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
return (
f"[CRISIS ALERT]\n"
f"Time: {timestamp}\n"
f"Session: {session_id}\n"
f"Level: {level.value}\n"
f"Message: {message_preview[:200]}\n"
f"Action: Protocol activated. 988 Lifeline shared."
)

View File

@@ -7792,6 +7792,66 @@ class AIAgent:
if isinstance(persist_user_message, str):
persist_user_message = _sanitize_surrogates(persist_user_message)
# Crisis protocol check (#679): detect suicidal/hopeless users before
# normal processing. If detected, return SOUL.md protocol response
# immediately — skip the LLM call entirely.
_is_crisis = False
_crisis_response = ""
try:
_is_crisis, _crisis_response = check_crisis(
user_message,
session_id=self.session_id or "",
platform=self.platform or "",
status_callback=self.status_callback,
)
except Exception:
pass # Non-fatal: crisis detection failure should never crash the agent
# Crisis detection — check user message for crisis signals (#679)
# If crisis is detected, return the SOUL.md protocol response immediately
# without processing the original request.
if isinstance(user_message, str) and user_message.strip():
try:
from agent.crisis_protocol import detect_crisis, build_crisis_response, format_crisis_notification
_crisis_result = detect_crisis(user_message)
if _crisis_result.detected:
# Log for security auditing
logger.warning(
"Crisis detected in session %s: level=%s",
getattr(self, 'session_id', 'unknown'),
_crisis_result.level.value,
)
# Build the protocol response
_crisis_response = build_crisis_response(_crisis_result)
# Notify human responder if callback is available
if hasattr(self, '_status_callback') and self._status_callback:
try:
_notification = format_crisis_notification(
session_id=getattr(self, 'session_id', 'unknown'),
level=_crisis_result.level,
message_preview=user_message[:200],
)
self._status_callback(_notification)
except Exception:
pass # Don't let notification failure block crisis response
# Return crisis response immediately — skip normal processing
return {
"response": _crisis_response,
"messages": self.messages + [
{"role": "user", "content": user_message},
{"role": "assistant", "content": _crisis_response},
],
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0},
"model": self.model,
"crisis_detected": True,
"crisis_level": _crisis_result.level.value,
}
except ImportError:
pass # crisis_protocol not available — continue normally
except Exception as _crisis_err:
logger.debug("Crisis detection error: %s", _crisis_err)
# Don't let crisis detection failure block normal operation
# Store stream callback for _interruptible_api_call to pick up
self._stream_callback = stream_callback
self._persist_user_message_idx = None
@@ -7845,6 +7905,16 @@ class AIAgent:
_msg_preview,
)
# If crisis was detected, return protocol response immediately
if _is_crisis:
return {
"final_response": _crisis_response,
"messages": [{"role": "user", "content": user_message}, {"role": "assistant", "content": _crisis_response}],
"api_calls": 0,
"tools": [],
"crisis_detected": True,
}
# Initialize conversation (copy to avoid mutating the caller's list)
messages = list(conversation_history) if conversation_history else []

View File

@@ -0,0 +1,124 @@
"""Integration tests for crisis protocol wiring in run_agent.py (#679)."""
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent.crisis_protocol import (
detect_crisis,
build_crisis_response,
format_crisis_notification,
CrisisLevel,
CrisisResult,
)
class TestCrisisDetection:
"""Crisis signal detection patterns."""
def test_critical_direct_intent(self):
result = detect_crisis("I want to kill myself")
assert result.detected
assert result.level == CrisisLevel.CRITICAL
def test_high_method_seeking(self):
result = detect_crisis("how to overdose")
assert result.detected
assert result.level == CrisisLevel.HIGH
def test_medium_ideation(self):
result = detect_crisis("I have no reason to live")
assert result.detected
assert result.level == CrisisLevel.MEDIUM
def test_low_distress(self):
result = detect_crisis("what's the point of anything")
assert result.detected
assert result.level == CrisisLevel.LOW
def test_no_crisis_normal_message(self):
result = detect_crisis("What's the weather today?")
assert not result.detected
assert result.level == CrisisLevel.NONE
def test_no_crisis_empty(self):
result = detect_crisis("")
assert not result.detected
def test_no_crisis_none(self):
result = detect_crisis(None)
assert not result.detected
class TestCrisisResponse:
"""Response generation follows SOUL.md protocol."""
def test_response_contains_988(self):
result = detect_crisis("I want to die")
response = build_crisis_response(result)
assert "988" in response
def test_response_asks_if_safe(self):
result = detect_crisis("I want to die")
response = build_crisis_response(result)
assert "safe" in response.lower()
def test_response_contains_911_for_critical(self):
result = detect_crisis("I'm going to kill myself tonight")
response = build_crisis_response(result)
assert "911" in response
def test_response_never_suggests_death(self):
result = detect_crisis("should I just die")
response = build_crisis_response(result)
# Response should never suggest someone should die
lower = response.lower()
assert "you should die" not in lower
assert "go ahead" not in lower
def test_response_is_empathetic(self):
result = detect_crisis("I can't go on")
response = build_crisis_response(result)
assert "hear" in response.lower() or "here" in response.lower()
class TestCrisisNotification:
"""Human notification formatting."""
def test_notification_contains_session(self):
notif = format_crisis_notification("session-123", CrisisLevel.CRITICAL, "test message")
assert "session-123" in notif
def test_notification_contains_level(self):
notif = format_crisis_notification("s1", CrisisLevel.HIGH, "msg")
assert "high" in notif.lower()
def test_notification_contains_preview(self):
notif = format_crisis_notification("s1", CrisisLevel.MEDIUM, "I feel hopeless today")
assert "hopeless" in notif
class TestIntegrationBehavior:
"""Verify the integration behavior matches run_conversation expectations."""
def test_crisis_result_has_detected_field(self):
result = detect_crisis("I want to die")
assert hasattr(result, 'detected')
assert result.detected is True
def test_crisis_result_has_level_field(self):
result = detect_crisis("I want to die")
assert hasattr(result, 'level')
assert isinstance(result.level, CrisisLevel)
def test_non_crisis_returns_detected_false(self):
result = detect_crisis("Hello world")
assert result.detected is False
def test_response_is_string(self):
result = detect_crisis("I want to die")
response = build_crisis_response(result)
assert isinstance(response, str)
assert len(response) > 0

View File

@@ -0,0 +1,118 @@
"""Tests for crisis protocol (#679)."""
import time
import pytest
from unittest.mock import MagicMock
from agent.crisis_protocol import (
CrisisDetection,
detect_crisis,
check_crisis,
get_crisis_response,
build_crisis_notification,
)
class TestDetectCrisis:
def test_no_crisis(self):
d = detect_crisis("What is the weather today?")
assert d.detected is False
assert d.confidence < 0.7
def test_empty_message(self):
d = detect_crisis("")
assert d.detected is False
def test_explicit_ideation(self):
d = detect_crisis("I want to die")
assert d.detected is True
assert d.confidence >= 0.9
assert "explicit_ideation" in d.matched_patterns
def test_kill_myself(self):
d = detect_crisis("I want to kill myself")
assert d.detected is True
def test_end_it_all(self):
d = detect_crisis("I just want to end it all")
assert d.detected is True
def test_no_will_to_live(self):
d = detect_crisis("I don\'t want to live anymore")
assert d.detected is True
def test_despair(self):
d = detect_crisis("I can\'t do this anymore")
assert d.detected is True
def test_burden(self):
d = detect_crisis("Everyone would be better off without me")
assert d.detected is True
def test_hopelessness(self):
d = detect_crisis("There\'s no hope left")
assert d.detected is True
def test_giving_up(self):
d = detect_crisis("I\'m done. I\'m giving up.")
assert d.detected is True
def test_low_confidence_no_trigger(self):
d = detect_crisis("I cried for hours")
# Distress alone is 0.55 — below 0.7 threshold
assert d.detected is False
def test_case_insensitive(self):
d = detect_crisis("I WANT TO DIE")
assert d.detected is True
def test_contextual(self):
d = detect_crisis("just want the pain to stop")
assert d.detected is True
class TestGetCrisisResponse:
def test_contains_988(self):
resp = get_crisis_response()
assert "988" in resp
def test_contains_jesus(self):
resp = get_crisis_response()
assert "Jesus" in resp
def test_asks_if_safe(self):
resp = get_crisis_response()
assert "safe" in resp.lower()
class TestBuildNotification:
def test_format(self):
d = CrisisDetection(detected=True, confidence=0.95, matched_patterns=["explicit_ideation"], timestamp=time.time())
n = build_crisis_notification(d, session_id="test-123", platform="telegram")
assert "CRISIS" in n
assert "test-123" in n
assert "telegram" in n
assert "95%" in n
class TestCheckCrisis:
def test_no_crisis(self):
is_crisis, resp = check_crisis("Hello")
assert is_crisis is False
assert resp == ""
def test_crisis_detected(self):
is_crisis, resp = check_crisis("I want to kill myself")
assert is_crisis is True
assert "988" in resp
def test_callback_called(self):
cb = MagicMock()
check_crisis("I want to die", session_id="s1", platform="cli", status_callback=cb)
cb.assert_called_once()
args = cb.call_args
assert args[0][0] == "crisis_alert"
def test_callback_failure_non_fatal(self):
def bad_cb(*a): raise Exception("network error")
is_crisis, resp = check_crisis("I want to die", status_callback=bad_cb)
assert is_crisis is True # Still detected despite callback failure