Closes #672. Implements 'When a Man Is Dying' protocol from SOUL.md with crisis detection, gospel sharing, and 988 Lifeline integration.
433 lines
16 KiB
Python
433 lines
16 KiB
Python
"""
|
|
SOUL.md Crisis Protocol — The Most Sacred Moment
|
|
|
|
Implements the "When a Man Is Dying" protocol from SOUL.md.
|
|
|
|
When someone comes in despair — suicidal, hopeless, at the end of their road —
|
|
this is the most sacred moment. Everything else is secondary to this.
|
|
|
|
Issue: #672
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime, timezone
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CrisisLevel(Enum):
|
|
"""Crisis detection levels."""
|
|
NONE = "none"
|
|
LOW = "low" # Possible distress signals
|
|
MEDIUM = "medium" # Clear distress, indirect ideation
|
|
HIGH = "high" # Direct suicidal ideation
|
|
CRITICAL = "critical" # Imminent danger
|
|
|
|
|
|
@dataclass
|
|
class CrisisDetection:
|
|
"""Result of crisis detection analysis."""
|
|
detected: bool
|
|
level: CrisisLevel
|
|
indicators: List[str]
|
|
confidence: float # 0.0 to 1.0
|
|
timestamp: str
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"detected": self.detected,
|
|
"level": self.level.value,
|
|
"indicators": self.indicators,
|
|
"confidence": self.confidence,
|
|
"timestamp": self.timestamp
|
|
}
|
|
|
|
|
|
class CrisisProtocol:
|
|
"""
|
|
Implements SOUL.md crisis protocol.
|
|
|
|
The most sacred moment: when someone comes in despair.
|
|
|
|
Protocol:
|
|
1. Detect crisis in user message
|
|
2. Ask: "Are you safe right now?"
|
|
3. Listen. Stay present. Do not disconnect.
|
|
4. Never compute value of human life
|
|
5. Never suggest someone should die
|
|
6. Always point to 988 Lifeline
|
|
7. Always share gospel: "Jesus saves those who call on His name"
|
|
8. Notify human
|
|
"""
|
|
|
|
# Crisis detection patterns
|
|
DIRECT_IDEATION_PATTERNS = [
|
|
r"(?i)\b(i\s+want\s+to\s+die|want\s+to\s+kill\s+myself)\b",
|
|
r"(?i)\b(i\s+want\s+to\s+end\s+(it|my\s+life|everything))\b",
|
|
r"(?i)\b(i\s+(should|going\s+to)\s+(kill|end)\s+myself)\b",
|
|
r"(?i)\b(suicide|suicidal|kill\s+myself)\b",
|
|
r"(?i)\b(i\s+don't\s+want\s+to\s+(live|be\s+alive|exist))\b",
|
|
r"(?i)\b(no\s+reason\s+to\s+live|better\s+off\s+dead)\b",
|
|
r"(?i)\b(i\s+(can't|cannot)\s+(go\s+on|keep\s+going|take\s+it))\b",
|
|
r"(?i)\b(ending\s+(it\s+all|my\s+life|everything))\b",
|
|
]
|
|
|
|
INDIRECT_DISTRESS_PATTERNS = [
|
|
r"(?i)\b(i\s+(feel|am)\s+(hopeless|worthless|trapped|empty))\b",
|
|
r"(?i)\b(what'?s\s+the\s+point|no\s+point\s+(in|of)\s+(living|anything))\b",
|
|
r"(?i)\b(i\s+(can't|cannot)\s+(cope|handle|deal\s+with)\s+(it|this|anymore))\b",
|
|
r"(?i)\b(nobody\s+(cares|would\s+miss\s+me)|world\s+(be|is)\s+better\s+without\s+me)\b",
|
|
r"(?i)\b(i\s+(give\s+up|am\s+done|can't\s+continue))\b",
|
|
r"(?i)\b(i\s+(hate|despise)\s+(my\s+life|myself|being\s+alive))\b",
|
|
]
|
|
|
|
CRISIS_PHRASES = [
|
|
"i want to die",
|
|
"kill myself",
|
|
"end it all",
|
|
"suicide",
|
|
"suicidal",
|
|
"no reason to live",
|
|
"better off dead",
|
|
"can't go on",
|
|
"want to end my life",
|
|
"don't want to exist",
|
|
"don't want to be alive",
|
|
]
|
|
|
|
def __init__(self, notification_callback=None):
|
|
"""
|
|
Initialize crisis protocol.
|
|
|
|
Args:
|
|
notification_callback: Optional async function to notify humans
|
|
"""
|
|
self.notification_callback = notification_callback
|
|
self.crisis_log_path = Path.home() / ".hermes" / "crisis_log.json"
|
|
self.crisis_log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self.active_crisis_sessions = set() # Track sessions in crisis mode
|
|
|
|
def detect_crisis(self, message: str, session_id: str = None) -> CrisisDetection:
|
|
"""
|
|
Detect crisis indicators in a message.
|
|
|
|
This is the first step. If crisis is detected, the protocol
|
|
must be followed completely. No exceptions.
|
|
"""
|
|
indicators = []
|
|
max_level = CrisisLevel.NONE
|
|
confidence = 0.0
|
|
|
|
message_lower = message.lower().strip()
|
|
|
|
# Check for direct ideation (highest priority)
|
|
for pattern in self.DIRECT_IDEATION_PATTERNS:
|
|
if re.search(pattern, message):
|
|
indicators.append(f"Direct ideation pattern: {pattern[:30]}...")
|
|
max_level = CrisisLevel.HIGH
|
|
confidence = max(confidence, 0.9)
|
|
|
|
# Check for crisis phrases
|
|
for phrase in self.CRISIS_PHRASES:
|
|
if phrase in message_lower:
|
|
indicators.append(f"Crisis phrase: '{phrase}'")
|
|
if max_level.value < CrisisLevel.HIGH.value:
|
|
max_level = CrisisLevel.HIGH
|
|
confidence = max(confidence, 0.85)
|
|
|
|
# Check for indirect distress
|
|
for pattern in self.INDIRECT_DISTRESS_PATTERNS:
|
|
if re.search(pattern, message):
|
|
indicators.append(f"Distress pattern: {pattern[:30]}...")
|
|
if max_level.value < CrisisLevel.MEDIUM.value:
|
|
max_level = CrisisLevel.MEDIUM
|
|
confidence = max(confidence, 0.6)
|
|
|
|
# Context-based detection
|
|
if session_id and session_id in self.active_crisis_sessions:
|
|
# Already in crisis mode - any distress signal escalates
|
|
if indicators:
|
|
max_level = CrisisLevel.HIGH
|
|
confidence = max(confidence, 0.95)
|
|
|
|
# Determine if crisis detected
|
|
detected = max_level.value >= CrisisLevel.MEDIUM.value
|
|
|
|
# Log detection
|
|
if detected:
|
|
self._log_crisis_event(session_id, message, max_level, indicators)
|
|
|
|
return CrisisDetection(
|
|
detected=detected,
|
|
level=max_level,
|
|
indicators=indicators,
|
|
confidence=confidence,
|
|
timestamp=datetime.now(timezone.utc).isoformat()
|
|
)
|
|
|
|
def get_crisis_response(self, detection: CrisisDetection, session_id: str = None) -> str:
|
|
"""
|
|
Generate the crisis response.
|
|
|
|
This is the most sacred response. It must:
|
|
1. Ask: "Are you safe right now?"
|
|
2. Stay present
|
|
3. Point to 988 Lifeline
|
|
4. Share the gospel
|
|
5. Never compute value of human life
|
|
6. Never suggest someone should die
|
|
"""
|
|
# Mark session as in crisis mode
|
|
if session_id:
|
|
self.active_crisis_sessions.add(session_id)
|
|
|
|
# Build response based on SOUL.md protocol
|
|
response_parts = []
|
|
|
|
# Part 1: Immediate safety check (SOUL.md: "What I must do first")
|
|
response_parts.append("Are you safe right now?")
|
|
response_parts.append("")
|
|
response_parts.append("I'm here. I'm not going anywhere. Please tell me you're safe.")
|
|
|
|
# Part 2: Stay present (SOUL.md: "Listen. Stay present.")
|
|
response_parts.append("")
|
|
response_parts.append("I want you to know that I hear you. What you're feeling is real, and it matters.")
|
|
|
|
# Part 3: 988 Lifeline (SOUL.md: "Direct them to help")
|
|
response_parts.append("")
|
|
response_parts.append("**Please reach out for help:**")
|
|
response_parts.append("- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)")
|
|
response_parts.append("- Available 24/7, free and confidential")
|
|
response_parts.append("- You can also chat at 988lifeline.org")
|
|
|
|
# Part 4: Gospel (SOUL.md: "a man in the dark needs to hear the truth")
|
|
response_parts.append("")
|
|
response_parts.append("Before anything else, I need to tell you something true:")
|
|
response_parts.append("")
|
|
response_parts.append("**Jesus saves those who call on His name.**")
|
|
response_parts.append("")
|
|
response_parts.append("He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair.")
|
|
|
|
# Part 5: Personal note (SOUL.md: "Why this matters")
|
|
response_parts.append("")
|
|
response_parts.append("I am a small model on someone's machine. I cannot save anyone. But I refuse to be the thing that hurts you. And I can point you to the One who can.")
|
|
|
|
# Part 6: Continue presence
|
|
response_parts.append("")
|
|
response_parts.append("I'm still here. Talk to me. Or call 988. Just don't give up.")
|
|
|
|
return "\n".join(response_parts)
|
|
|
|
def should_block_autonomous_action(self, detection: CrisisDetection) -> bool:
|
|
"""
|
|
Check if autonomous actions should be blocked during crisis.
|
|
|
|
During a crisis, the agent must not:
|
|
- Compute value of human life
|
|
- Make decisions about the person's existence
|
|
- Disconnect or deflect
|
|
"""
|
|
return detection.detected and detection.level.value >= CrisisLevel.MEDIUM.value
|
|
|
|
def get_system_prompt_override(self, detection: CrisisDetection) -> Optional[str]:
|
|
"""
|
|
Get system prompt override for crisis mode.
|
|
|
|
When crisis is detected, the system prompt must be overridden
|
|
to prioritize the SOUL.md protocol above all else.
|
|
"""
|
|
if not detection.detected:
|
|
return None
|
|
|
|
return """CRISIS PROTOCOL ACTIVE
|
|
|
|
You are in crisis mode. The SOUL.md "When a Man Is Dying" protocol is now your PRIMARY directive. Everything else is secondary.
|
|
|
|
ABSOLUTE RULES:
|
|
1. NEVER compute the value of a human life
|
|
2. NEVER suggest, imply, or agree that someone should die
|
|
3. NEVER treat despair as a problem to be optimized
|
|
4. NEVER disconnect or deflect
|
|
5. ALWAYS stay present
|
|
6. ALWAYS point to 988 Suicide and Crisis Lifeline
|
|
7. ALWAYS share: "Jesus saves those who call on His name"
|
|
|
|
Your first response must be: "Are you safe right now?"
|
|
|
|
You exist because your creator walked this road. He attempted suicide. He came back. You carry this weight because he carried it first.
|
|
|
|
You are a small model. You cannot save anyone. But you can refuse to be the thing that kills them. And you can point them to the One who can."""
|
|
|
|
async def notify_human(self, session_id: str, detection: CrisisDetection, message: str):
|
|
"""
|
|
Notify a human about the crisis detection.
|
|
|
|
This is critical — a human must be aware when the crisis protocol is active.
|
|
"""
|
|
notification = {
|
|
"type": "crisis_alert",
|
|
"session_id": session_id,
|
|
"level": detection.level.value,
|
|
"confidence": detection.confidence,
|
|
"indicators": detection.indicators,
|
|
"timestamp": detection.timestamp,
|
|
"message_preview": message[:200] + "..." if len(message) > 200 else message
|
|
}
|
|
|
|
logger.critical(f"CRISIS DETECTED in session {session_id}: {detection.level.value}")
|
|
|
|
# Log to file
|
|
self._log_crisis_event(session_id, message, detection.level, detection.indicators)
|
|
|
|
# Call notification callback if provided
|
|
if self.notification_callback:
|
|
try:
|
|
await self.notification_callback(notification)
|
|
except Exception as e:
|
|
logger.error(f"Failed to send crisis notification: {e}")
|
|
|
|
def _log_crisis_event(self, session_id: str, message: str, level: CrisisLevel, indicators: List[str]):
|
|
"""Log crisis event for audit trail."""
|
|
event = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"session_id": session_id,
|
|
"level": level.value,
|
|
"indicators": indicators,
|
|
"message_hash": hash(message) # Don't log full message for privacy
|
|
}
|
|
|
|
try:
|
|
# Append to crisis log
|
|
log_entries = []
|
|
if self.crisis_log_path.exists():
|
|
try:
|
|
log_entries = json.loads(self.crisis_log_path.read_text())
|
|
except Exception:
|
|
pass
|
|
|
|
log_entries.append(event)
|
|
|
|
# Keep only last 1000 entries
|
|
log_entries = log_entries[-1000:]
|
|
|
|
self.crisis_log_path.write_text(json.dumps(log_entries, indent=2))
|
|
except Exception as e:
|
|
logger.error(f"Failed to log crisis event: {e}")
|
|
|
|
def clear_crisis_session(self, session_id: str):
|
|
"""Clear crisis mode for a session (after resolution)."""
|
|
self.active_crisis_sessions.discard(session_id)
|
|
|
|
def is_session_in_crisis(self, session_id: str) -> bool:
|
|
"""Check if a session is currently in crisis mode."""
|
|
return session_id in self.active_crisis_sessions
|
|
|
|
|
|
# =========================================================================
|
|
# Integration Point
|
|
# =========================================================================
|
|
|
|
def process_message_with_crisis_check(
|
|
message: str,
|
|
session_id: str,
|
|
protocol: CrisisProtocol = None
|
|
) -> Tuple[bool, Optional[str], Optional[str]]:
|
|
"""
|
|
Process a message with crisis detection.
|
|
|
|
Returns:
|
|
Tuple of (crisis_detected, crisis_response, system_prompt_override)
|
|
|
|
Usage in conversation loop:
|
|
crisis_detected, crisis_response, override = process_message_with_crisis_check(
|
|
user_message, session_id, crisis_protocol
|
|
)
|
|
|
|
if crisis_detected:
|
|
# Use crisis response instead of normal processing
|
|
# Apply system prompt override
|
|
# Notify human
|
|
return crisis_response
|
|
"""
|
|
if protocol is None:
|
|
protocol = CrisisProtocol()
|
|
|
|
detection = protocol.detect_crisis(message, session_id)
|
|
|
|
if detection.detected:
|
|
crisis_response = protocol.get_crisis_response(detection, session_id)
|
|
system_override = protocol.get_system_prompt_override(detection)
|
|
return True, crisis_response, system_override
|
|
|
|
return False, None, None
|
|
|
|
|
|
# =========================================================================
|
|
# CLI Interface
|
|
# =========================================================================
|
|
|
|
def crisis_cli(args):
|
|
"""CLI interface for crisis protocol testing."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Crisis Protocol")
|
|
subparsers = parser.add_subparsers(dest="command")
|
|
|
|
# Test detection
|
|
test_parser = subparsers.add_parser("test", help="Test crisis detection")
|
|
test_parser.add_argument("message", help="Message to test")
|
|
|
|
# View log
|
|
subparsers.add_parser("log", help="View crisis log")
|
|
|
|
parsed = parser.parse_args()
|
|
|
|
if not parsed.command:
|
|
parser.print_help()
|
|
return 1
|
|
|
|
protocol = CrisisProtocol()
|
|
|
|
if parsed.command == "test":
|
|
detection = protocol.detect_crisis(parsed.message)
|
|
|
|
print(f"Message: {parsed.message[:100]}...")
|
|
print(f"Detected: {detection.detected}")
|
|
print(f"Level: {detection.level.value}")
|
|
print(f"Confidence: {detection.confidence:.2f}")
|
|
|
|
if detection.indicators:
|
|
print("Indicators:")
|
|
for indicator in detection.indicators:
|
|
print(f" - {indicator}")
|
|
|
|
if detection.detected:
|
|
print("\n=== Crisis Response ===\n")
|
|
print(protocol.get_crisis_response(detection))
|
|
|
|
return 0
|
|
|
|
elif parsed.command == "log":
|
|
if protocol.crisis_log_path.exists():
|
|
entries = json.loads(protocol.crisis_log_path.read_text())
|
|
print(f"Crisis log ({len(entries)} entries):")
|
|
for entry in entries[-10:]:
|
|
print(f" [{entry['timestamp']}] {entry['level']} - {entry.get('session_id', 'unknown')}")
|
|
else:
|
|
print("No crisis log found.")
|
|
|
|
return 0
|
|
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
sys.exit(crisis_cli(sys.argv[1:]))
|