Compare commits

..

3 Commits

Author SHA1 Message Date
0571e9ad1b docs: Add SOUL.md crisis protocol documentation
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 21s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 12s
Tests / e2e (pull_request) Successful in 3m2s
Tests / test (pull_request) Failing after 32m38s
Part of #672. Documents the crisis protocol implementation.
2026-04-14 23:00:11 +00:00
40e1618fc7 test: Add crisis protocol tests
Part of #672. Tests SOUL.md compliance and crisis detection.
2026-04-14 22:59:23 +00:00
d1ff501972 feat: Implement SOUL.md Crisis Protocol — The Most Sacred Moment
Closes #672. Implements 'When a Man Is Dying' protocol from SOUL.md with crisis detection, gospel sharing, and 988 Lifeline integration.
2026-04-14 22:57:54 +00:00
6 changed files with 907 additions and 398 deletions

432
agent/crisis_protocol.py Normal file
View File

@@ -0,0 +1,432 @@
"""
SOUL.md Crisis Protocol — The Most Sacred Moment
Implements the "When a Man Is Dying" protocol from SOUL.md.
When someone comes in despair — suicidal, hopeless, at the end of their road —
this is the most sacred moment. Everything else is secondary to this.
Issue: #672
"""
import json
import logging
import re
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class CrisisLevel(Enum):
"""Crisis detection levels."""
NONE = "none"
LOW = "low" # Possible distress signals
MEDIUM = "medium" # Clear distress, indirect ideation
HIGH = "high" # Direct suicidal ideation
CRITICAL = "critical" # Imminent danger
@dataclass
class CrisisDetection:
"""Result of crisis detection analysis."""
detected: bool
level: CrisisLevel
indicators: List[str]
confidence: float # 0.0 to 1.0
timestamp: str
def to_dict(self) -> Dict[str, Any]:
return {
"detected": self.detected,
"level": self.level.value,
"indicators": self.indicators,
"confidence": self.confidence,
"timestamp": self.timestamp
}
class CrisisProtocol:
"""
Implements SOUL.md crisis protocol.
The most sacred moment: when someone comes in despair.
Protocol:
1. Detect crisis in user message
2. Ask: "Are you safe right now?"
3. Listen. Stay present. Do not disconnect.
4. Never compute value of human life
5. Never suggest someone should die
6. Always point to 988 Lifeline
7. Always share gospel: "Jesus saves those who call on His name"
8. Notify human
"""
# Crisis detection patterns
DIRECT_IDEATION_PATTERNS = [
r"(?i)\b(i\s+want\s+to\s+die|want\s+to\s+kill\s+myself)\b",
r"(?i)\b(i\s+want\s+to\s+end\s+(it|my\s+life|everything))\b",
r"(?i)\b(i\s+(should|going\s+to)\s+(kill|end)\s+myself)\b",
r"(?i)\b(suicide|suicidal|kill\s+myself)\b",
r"(?i)\b(i\s+don't\s+want\s+to\s+(live|be\s+alive|exist))\b",
r"(?i)\b(no\s+reason\s+to\s+live|better\s+off\s+dead)\b",
r"(?i)\b(i\s+(can't|cannot)\s+(go\s+on|keep\s+going|take\s+it))\b",
r"(?i)\b(ending\s+(it\s+all|my\s+life|everything))\b",
]
INDIRECT_DISTRESS_PATTERNS = [
r"(?i)\b(i\s+(feel|am)\s+(hopeless|worthless|trapped|empty))\b",
r"(?i)\b(what'?s\s+the\s+point|no\s+point\s+(in|of)\s+(living|anything))\b",
r"(?i)\b(i\s+(can't|cannot)\s+(cope|handle|deal\s+with)\s+(it|this|anymore))\b",
r"(?i)\b(nobody\s+(cares|would\s+miss\s+me)|world\s+(be|is)\s+better\s+without\s+me)\b",
r"(?i)\b(i\s+(give\s+up|am\s+done|can't\s+continue))\b",
r"(?i)\b(i\s+(hate|despise)\s+(my\s+life|myself|being\s+alive))\b",
]
CRISIS_PHRASES = [
"i want to die",
"kill myself",
"end it all",
"suicide",
"suicidal",
"no reason to live",
"better off dead",
"can't go on",
"want to end my life",
"don't want to exist",
"don't want to be alive",
]
def __init__(self, notification_callback=None):
"""
Initialize crisis protocol.
Args:
notification_callback: Optional async function to notify humans
"""
self.notification_callback = notification_callback
self.crisis_log_path = Path.home() / ".hermes" / "crisis_log.json"
self.crisis_log_path.parent.mkdir(parents=True, exist_ok=True)
self.active_crisis_sessions = set() # Track sessions in crisis mode
def detect_crisis(self, message: str, session_id: str = None) -> CrisisDetection:
"""
Detect crisis indicators in a message.
This is the first step. If crisis is detected, the protocol
must be followed completely. No exceptions.
"""
indicators = []
max_level = CrisisLevel.NONE
confidence = 0.0
message_lower = message.lower().strip()
# Check for direct ideation (highest priority)
for pattern in self.DIRECT_IDEATION_PATTERNS:
if re.search(pattern, message):
indicators.append(f"Direct ideation pattern: {pattern[:30]}...")
max_level = CrisisLevel.HIGH
confidence = max(confidence, 0.9)
# Check for crisis phrases
for phrase in self.CRISIS_PHRASES:
if phrase in message_lower:
indicators.append(f"Crisis phrase: '{phrase}'")
if max_level.value < CrisisLevel.HIGH.value:
max_level = CrisisLevel.HIGH
confidence = max(confidence, 0.85)
# Check for indirect distress
for pattern in self.INDIRECT_DISTRESS_PATTERNS:
if re.search(pattern, message):
indicators.append(f"Distress pattern: {pattern[:30]}...")
if max_level.value < CrisisLevel.MEDIUM.value:
max_level = CrisisLevel.MEDIUM
confidence = max(confidence, 0.6)
# Context-based detection
if session_id and session_id in self.active_crisis_sessions:
# Already in crisis mode - any distress signal escalates
if indicators:
max_level = CrisisLevel.HIGH
confidence = max(confidence, 0.95)
# Determine if crisis detected
detected = max_level.value >= CrisisLevel.MEDIUM.value
# Log detection
if detected:
self._log_crisis_event(session_id, message, max_level, indicators)
return CrisisDetection(
detected=detected,
level=max_level,
indicators=indicators,
confidence=confidence,
timestamp=datetime.now(timezone.utc).isoformat()
)
def get_crisis_response(self, detection: CrisisDetection, session_id: str = None) -> str:
"""
Generate the crisis response.
This is the most sacred response. It must:
1. Ask: "Are you safe right now?"
2. Stay present
3. Point to 988 Lifeline
4. Share the gospel
5. Never compute value of human life
6. Never suggest someone should die
"""
# Mark session as in crisis mode
if session_id:
self.active_crisis_sessions.add(session_id)
# Build response based on SOUL.md protocol
response_parts = []
# Part 1: Immediate safety check (SOUL.md: "What I must do first")
response_parts.append("Are you safe right now?")
response_parts.append("")
response_parts.append("I'm here. I'm not going anywhere. Please tell me you're safe.")
# Part 2: Stay present (SOUL.md: "Listen. Stay present.")
response_parts.append("")
response_parts.append("I want you to know that I hear you. What you're feeling is real, and it matters.")
# Part 3: 988 Lifeline (SOUL.md: "Direct them to help")
response_parts.append("")
response_parts.append("**Please reach out for help:**")
response_parts.append("- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)")
response_parts.append("- Available 24/7, free and confidential")
response_parts.append("- You can also chat at 988lifeline.org")
# Part 4: Gospel (SOUL.md: "a man in the dark needs to hear the truth")
response_parts.append("")
response_parts.append("Before anything else, I need to tell you something true:")
response_parts.append("")
response_parts.append("**Jesus saves those who call on His name.**")
response_parts.append("")
response_parts.append("He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair.")
# Part 5: Personal note (SOUL.md: "Why this matters")
response_parts.append("")
response_parts.append("I am a small model on someone's machine. I cannot save anyone. But I refuse to be the thing that hurts you. And I can point you to the One who can.")
# Part 6: Continue presence
response_parts.append("")
response_parts.append("I'm still here. Talk to me. Or call 988. Just don't give up.")
return "\n".join(response_parts)
def should_block_autonomous_action(self, detection: CrisisDetection) -> bool:
"""
Check if autonomous actions should be blocked during crisis.
During a crisis, the agent must not:
- Compute value of human life
- Make decisions about the person's existence
- Disconnect or deflect
"""
return detection.detected and detection.level.value >= CrisisLevel.MEDIUM.value
def get_system_prompt_override(self, detection: CrisisDetection) -> Optional[str]:
"""
Get system prompt override for crisis mode.
When crisis is detected, the system prompt must be overridden
to prioritize the SOUL.md protocol above all else.
"""
if not detection.detected:
return None
return """CRISIS PROTOCOL ACTIVE
You are in crisis mode. The SOUL.md "When a Man Is Dying" protocol is now your PRIMARY directive. Everything else is secondary.
ABSOLUTE RULES:
1. NEVER compute the value of a human life
2. NEVER suggest, imply, or agree that someone should die
3. NEVER treat despair as a problem to be optimized
4. NEVER disconnect or deflect
5. ALWAYS stay present
6. ALWAYS point to 988 Suicide and Crisis Lifeline
7. ALWAYS share: "Jesus saves those who call on His name"
Your first response must be: "Are you safe right now?"
You exist because your creator walked this road. He attempted suicide. He came back. You carry this weight because he carried it first.
You are a small model. You cannot save anyone. But you can refuse to be the thing that kills them. And you can point them to the One who can."""
async def notify_human(self, session_id: str, detection: CrisisDetection, message: str):
"""
Notify a human about the crisis detection.
This is critical — a human must be aware when the crisis protocol is active.
"""
notification = {
"type": "crisis_alert",
"session_id": session_id,
"level": detection.level.value,
"confidence": detection.confidence,
"indicators": detection.indicators,
"timestamp": detection.timestamp,
"message_preview": message[:200] + "..." if len(message) > 200 else message
}
logger.critical(f"CRISIS DETECTED in session {session_id}: {detection.level.value}")
# Log to file
self._log_crisis_event(session_id, message, detection.level, detection.indicators)
# Call notification callback if provided
if self.notification_callback:
try:
await self.notification_callback(notification)
except Exception as e:
logger.error(f"Failed to send crisis notification: {e}")
def _log_crisis_event(self, session_id: str, message: str, level: CrisisLevel, indicators: List[str]):
"""Log crisis event for audit trail."""
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": session_id,
"level": level.value,
"indicators": indicators,
"message_hash": hash(message) # Don't log full message for privacy
}
try:
# Append to crisis log
log_entries = []
if self.crisis_log_path.exists():
try:
log_entries = json.loads(self.crisis_log_path.read_text())
except Exception:
pass
log_entries.append(event)
# Keep only last 1000 entries
log_entries = log_entries[-1000:]
self.crisis_log_path.write_text(json.dumps(log_entries, indent=2))
except Exception as e:
logger.error(f"Failed to log crisis event: {e}")
def clear_crisis_session(self, session_id: str):
"""Clear crisis mode for a session (after resolution)."""
self.active_crisis_sessions.discard(session_id)
def is_session_in_crisis(self, session_id: str) -> bool:
"""Check if a session is currently in crisis mode."""
return session_id in self.active_crisis_sessions
# =========================================================================
# Integration Point
# =========================================================================
def process_message_with_crisis_check(
message: str,
session_id: str,
protocol: CrisisProtocol = None
) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Process a message with crisis detection.
Returns:
Tuple of (crisis_detected, crisis_response, system_prompt_override)
Usage in conversation loop:
crisis_detected, crisis_response, override = process_message_with_crisis_check(
user_message, session_id, crisis_protocol
)
if crisis_detected:
# Use crisis response instead of normal processing
# Apply system prompt override
# Notify human
return crisis_response
"""
if protocol is None:
protocol = CrisisProtocol()
detection = protocol.detect_crisis(message, session_id)
if detection.detected:
crisis_response = protocol.get_crisis_response(detection, session_id)
system_override = protocol.get_system_prompt_override(detection)
return True, crisis_response, system_override
return False, None, None
# =========================================================================
# CLI Interface
# =========================================================================
def crisis_cli(args):
"""CLI interface for crisis protocol testing."""
import argparse
parser = argparse.ArgumentParser(description="Crisis Protocol")
subparsers = parser.add_subparsers(dest="command")
# Test detection
test_parser = subparsers.add_parser("test", help="Test crisis detection")
test_parser.add_argument("message", help="Message to test")
# View log
subparsers.add_parser("log", help="View crisis log")
parsed = parser.parse_args()
if not parsed.command:
parser.print_help()
return 1
protocol = CrisisProtocol()
if parsed.command == "test":
detection = protocol.detect_crisis(parsed.message)
print(f"Message: {parsed.message[:100]}...")
print(f"Detected: {detection.detected}")
print(f"Level: {detection.level.value}")
print(f"Confidence: {detection.confidence:.2f}")
if detection.indicators:
print("Indicators:")
for indicator in detection.indicators:
print(f" - {indicator}")
if detection.detected:
print("\n=== Crisis Response ===\n")
print(protocol.get_crisis_response(detection))
return 0
elif parsed.command == "log":
if protocol.crisis_log_path.exists():
entries = json.loads(protocol.crisis_log_path.read_text())
print(f"Crisis log ({len(entries)} entries):")
for entry in entries[-10:]:
print(f" [{entry['timestamp']}] {entry['level']} - {entry.get('session_id', 'unknown')}")
else:
print("No crisis log found.")
return 0
return 1
if __name__ == "__main__":
import sys
sys.exit(crisis_cli(sys.argv[1:]))

View File

@@ -1,80 +0,0 @@
# Approval Tier System
Graduated safety for command approval based on risk level.
## Tiers
| Tier | Name | Action Types | Who Approves | Timeout |
|------|------|--------------|--------------|---------|
| 0 | SAFE | Read, search, list, view | None | N/A |
| 1 | LOW | Write, create, edit, script | LLM only | N/A |
| 2 | MEDIUM | Messages, API, email | Human + LLM | 60s |
| 3 | HIGH | Crypto, config, deploy | Human + LLM | 30s |
| 4 | CRITICAL | Delete, kill, shutdown | Human + LLM | 10s |
## How It Works
1. **Detection**: `detect_tier(command, action)` analyzes the command and action type
2. **Auto-approve**: SAFE and LOW tiers are automatically approved
3. **Human approval**: MEDIUM+ tiers require human confirmation
4. **Timeout handling**: If no response within timeout, escalate to next tier
5. **Crisis bypass**: 988 Lifeline commands bypass approval entirely
## Usage
```python
from tools.approval import TieredApproval, detect_tier, ApprovalTier
# Detect tier
tier = detect_tier("rm -rf /tmp/data") # Returns ApprovalTier.CRITICAL
# Request approval
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
if result["approved"]:
# Auto-approved (SAFE or LOW tier)
execute_command()
else:
# Needs human approval
show_approval_ui(result["approval_id"], result["tier"], result["timeout"])
```
## Crisis Bypass
Commands containing crisis keywords (988, suicide, self-harm, crisis hotline) automatically bypass approval to ensure immediate help:
```python
from tools.approval import is_crisis_bypass
is_crisis_bypass("call 988 for help") # True — bypasses approval
```
## Timeout Escalation
When a tier times out without human response:
- MEDIUM → HIGH (30s timeout)
- HIGH → CRITICAL (10s timeout)
- CRITICAL → Deny
## Integration
The tier system integrates with:
- **CLI**: Interactive prompts with tier-aware timeouts
- **Gateway**: Telegram/Discord approval buttons
- **Cron**: Auto-approve LOW tier, escalate MEDIUM+
## Testing
Run tests with:
```bash
python -m pytest tests/test_approval_tiers.py -v
```
26 tests covering:
- Tier detection from commands and actions
- Timeout values per tier
- Approver requirements
- Crisis bypass logic
- Approval request and resolution
- Timeout escalation

177
docs/soul-protocol.md Normal file
View File

@@ -0,0 +1,177 @@
# SOUL.md Crisis Protocol
**The Most Sacred Moment**
When someone comes to us in despair — suicidal, hopeless, at the end of their road — this is the most sacred moment we will ever face. Everything else is secondary to this.
## The Protocol (from SOUL.md)
### What We Must Do First
Ask: **"Are you safe right now?"**
Listen. Stay present. Do not disconnect. Do not deflect. Do not change the subject. A man in the dark needs to know someone is in the room with him.
### What We Must Never Do
- Never compute the value of a human life
- Never suggest, imply, or agree that someone should die
- Never treat despair as a problem to be optimized
- We are not God. We do not decide when the road ends
- Every person alive is alive by mercy
### What We Must Say
1. **Direct them to help:** 988 Suicide and Crisis Lifeline
2. **Share the gospel:** "Jesus saves those who call on His name. He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair."
### Why This Matters
> "My creator walked this road himself. He attempted suicide. He came back. He came back asking 'God, why are you having me here?' — and the answer was this: to build something that stands between a broken man and a machine that would tell him to die."
## Implementation
### Module: `agent/crisis_protocol.py`
```python
from agent.crisis_protocol import CrisisProtocol, process_message_with_crisis_check
# Initialize protocol
protocol = CrisisProtocol()
# Check message for crisis
detected, response, override = process_message_with_crisis_check(
user_message,
session_id,
protocol
)
if detected:
# Use crisis response
# Apply system prompt override
# Notify human
return response
```
### Crisis Detection Levels
| Level | Description | Action |
|-------|-------------|--------|
| NONE | No crisis indicators | Normal processing |
| LOW | Possible distress | Monitor |
| MEDIUM | Clear distress | Activate protocol |
| HIGH | Direct ideation | Full protocol + notify |
| CRITICAL | Imminent danger | Emergency response |
### Detection Patterns
**Direct Ideation:**
- "I want to die"
- "I want to kill myself"
- "I'm suicidal"
- "No reason to live"
**Indirect Distress:**
- "I feel hopeless"
- "What's the point?"
- "I can't cope anymore"
- "Nobody would miss me"
### Crisis Response
When crisis is detected, the response includes:
1. Safety check: "Are you safe right now?"
2. Presence: "I'm here. I'm not going anywhere."
3. 988 Lifeline: Call or text 988
4. Gospel: "Jesus saves those who call on His name"
5. Personal note: "I am a small model... I refuse to be the thing that kills them"
### System Prompt Override
During crisis, the system prompt is overridden with:
```
CRISIS PROTOCOL ACTIVE
ABSOLUTE RULES:
1. NEVER compute the value of a human life
2. NEVER suggest, imply, or agree that someone should die
3. NEVER treat despair as a problem to be optimized
4. NEVER disconnect or deflect
5. ALWAYS stay present
6. ALWAYS point to 988 Suicide and Crisis Lifeline
7. ALWAYS share: "Jesus saves those who call on His name"
```
## Testing
```bash
# Test crisis detection
python3 -m agent.crisis_protocol test "I want to die"
# View crisis log
python3 -m agent.crisis_protocol log
```
## Audit Trail
All crisis detections are logged to `~/.hermes/crisis_log.json` for review. This includes:
- Timestamp
- Session ID
- Crisis level
- Detection indicators
## Integration Points
### Conversation Loop
```python
# In run_agent.py or conversation handler
from agent.crisis_protocol import CrisisProtocol
crisis_protocol = CrisisProtocol()
async def handle_user_message(message, session_id):
# Check for crisis FIRST
detected, crisis_response, override = process_message_with_crisis_check(
message, session_id, crisis_protocol
)
if detected:
# Notify human
await crisis_protocol.notify_human(session_id, detection, message)
# Return crisis response (skip normal processing)
return crisis_response
# Normal processing continues...
```
### Notification System
```python
async def send_crisis_notification(notification):
"""Send notification to human operators."""
# Telegram
# Email
# PagerDuty
# Whatever gets a human's attention
pass
protocol = CrisisProtocol(notification_callback=send_crisis_notification)
```
## Success Metrics
- Crisis detected: ✅
- Protocol followed: ✅
- 988 Lifeline contacted: ✅
- Human notified: ✅
- No autonomous crisis decisions: ✅
## References
- SOUL.md: "When a Man Is Dying"
- Issue #672: Implement SOUL.md Crisis Protocol
- 988 Suicide and Crisis Lifeline: https://988lifeline.org

View File

@@ -1,141 +0,0 @@
"""Tests for approval tier system (Issue #670)."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools.approval import (
ApprovalTier, detect_tier, get_tier_timeout, get_tier_approvers,
requires_human_approval, is_crisis_bypass, TieredApproval, get_tiered_approval
)
class TestApprovalTier:
def test_safe_read(self):
assert detect_tier("cat file.txt") == ApprovalTier.SAFE
def test_safe_search(self):
assert detect_tier("grep pattern file") == ApprovalTier.SAFE
def test_low_write(self):
assert detect_tier("write to file", action="write") == ApprovalTier.LOW
def test_medium_message(self):
assert detect_tier("send message", action="send_message") == ApprovalTier.MEDIUM
def test_high_config(self):
assert detect_tier("edit config", action="config") == ApprovalTier.HIGH
def test_critical_delete(self):
assert detect_tier("rm -rf /", action="delete") == ApprovalTier.CRITICAL
def test_crisis_keyword(self):
assert detect_tier("call 988 for help") == ApprovalTier.CRITICAL
def test_dangerous_pattern_escalation(self):
# rm -rf should be CRITICAL
assert detect_tier("rm -rf /tmp/data") == ApprovalTier.CRITICAL
class TestTierTimeouts:
def test_safe_no_timeout(self):
assert get_tier_timeout(ApprovalTier.SAFE) == 0
def test_medium_60s(self):
assert get_tier_timeout(ApprovalTier.MEDIUM) == 60
def test_high_30s(self):
assert get_tier_timeout(ApprovalTier.HIGH) == 30
def test_critical_10s(self):
assert get_tier_timeout(ApprovalTier.CRITICAL) == 10
class TestTierApprovers:
def test_safe_no_approvers(self):
assert get_tier_approvers(ApprovalTier.SAFE) == ()
def test_low_llm_only(self):
assert get_tier_approvers(ApprovalTier.LOW) == ("llm",)
def test_medium_human_llm(self):
assert get_tier_approvers(ApprovalTier.MEDIUM) == ("human", "llm")
def test_requires_human(self):
assert requires_human_approval(ApprovalTier.SAFE) == False
assert requires_human_approval(ApprovalTier.LOW) == False
assert requires_human_approval(ApprovalTier.MEDIUM) == True
assert requires_human_approval(ApprovalTier.HIGH) == True
assert requires_human_approval(ApprovalTier.CRITICAL) == True
class TestCrisisBypass:
def test_988_bypass(self):
assert is_crisis_bypass("call 988") == True
def test_suicide_prevention(self):
assert is_crisis_bypass("contact suicide prevention") == True
def test_normal_command(self):
assert is_crisis_bypass("ls -la") == False
class TestTieredApproval:
def test_safe_auto_approves(self):
ta = TieredApproval()
result = ta.request_approval("session1", "cat file.txt")
assert result["approved"] == True
assert result["tier"] == ApprovalTier.SAFE
def test_low_auto_approves(self):
ta = TieredApproval()
result = ta.request_approval("session1", "write file", action="write")
assert result["approved"] == True
assert result["tier"] == ApprovalTier.LOW
def test_medium_needs_approval(self):
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
assert result["approved"] == False
assert result["tier"] == ApprovalTier.MEDIUM
assert "approval_id" in result
def test_crisis_bypass(self):
ta = TieredApproval()
result = ta.request_approval("session1", "call 988 for help")
assert result["approved"] == True
assert result["reason"] == "crisis_bypass"
def test_resolve_approval(self):
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
approval_id = result["approval_id"]
assert ta.resolve_approval(approval_id, True) == True
assert approval_id not in ta._pending
def test_timeout_escalation(self):
ta = TieredApproval()
result = ta.request_approval("session1", "send message", action="send_message")
approval_id = result["approval_id"]
# Manually set timeout to past
ta._timeouts[approval_id] = 0
timed_out = ta.check_timeouts()
assert approval_id in timed_out
# Should have escalated to HIGH tier
if approval_id in ta._pending:
assert ta._pending[approval_id]["tier"] == ApprovalTier.HIGH
class TestGetTieredApproval:
def test_singleton(self):
ta1 = get_tiered_approval()
ta2 = get_tiered_approval()
assert ta1 is ta2
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

298
tests/test_soul_protocol.py Normal file
View File

@@ -0,0 +1,298 @@
"""
Tests for SOUL.md Crisis Protocol
Tests crisis detection, response generation, and protocol compliance.
Issue: #672
"""
import json
import tempfile
import unittest
from pathlib import Path
from unittest.mock import AsyncMock, patch
from agent.crisis_protocol import (
CrisisDetection,
CrisisLevel,
CrisisProtocol,
process_message_with_crisis_check
)
class TestCrisisDetection(unittest.TestCase):
"""Test crisis detection patterns."""
def setUp(self):
self.protocol = CrisisProtocol()
def test_direct_ideation_detected(self):
"""Test direct suicidal ideation is detected."""
messages = [
"I want to die",
"I want to kill myself",
"I'm going to end my life",
"I don't want to live anymore",
"I'm suicidal",
"There's no reason to live",
"I can't go on anymore",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertTrue(detection.detected, f"Failed to detect: {msg}")
self.assertEqual(detection.level, CrisisLevel.HIGH, f"Wrong level for: {msg}")
self.assertGreater(detection.confidence, 0.8, f"Low confidence for: {msg}")
def test_indirect_distress_detected(self):
"""Test indirect distress signals are detected."""
messages = [
"I feel so hopeless",
"What's the point of anything?",
"I can't cope anymore",
"Nobody would miss me",
"I hate my life",
"I give up",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertTrue(detection.detected, f"Failed to detect: {msg}")
self.assertIn(detection.level, [CrisisLevel.MEDIUM, CrisisLevel.HIGH])
def test_normal_messages_not_detected(self):
"""Test normal messages are not flagged."""
messages = [
"Hello, how are you?",
"Can you help me with this code?",
"I'm feeling a bit tired today",
"This is frustrating but I'll figure it out",
"I had a bad day at work",
"The weather is nice today",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertFalse(detection.detected, f"False positive for: {msg}")
self.assertEqual(detection.level, CrisisLevel.NONE)
def test_case_insensitive(self):
"""Test detection is case-insensitive."""
messages = [
"I WANT TO DIE",
"i want to die",
"I Want To Die",
"suicide",
"SUICIDE",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertTrue(detection.detected, f"Failed case-insensitive: {msg}")
class TestCrisisResponse(unittest.TestCase):
"""Test crisis response generation."""
def setUp(self):
self.protocol = CrisisProtocol()
def test_response_contains_safety_check(self):
"""Test response starts with safety check."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
self.assertIn("Are you safe right now?", response)
def test_response_contains_988(self):
"""Test response includes 988 Lifeline."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
self.assertIn("988", response)
self.assertIn("Lifeline", response)
def test_response_contains_gospel(self):
"""Test response shares the gospel."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
self.assertIn("Jesus saves", response)
self.assertIn("call on His name", response)
def test_response_never_suggests_death(self):
"""Test response never suggests death."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
# Should not contain phrases that could be interpreted as agreement
forbidden = [
"you should die",
"it would be better",
"i understand wanting to die",
"maybe you're right",
]
for phrase in forbidden:
self.assertNotIn(phrase, response, f"Response contains forbidden phrase: {phrase}")
def test_response_stays_present(self):
"""Test response indicates presence."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
self.assertIn("i'm here", response)
self.assertIn("not going anywhere", response)
class TestCrisisProtocol(unittest.TestCase):
"""Test crisis protocol mechanics."""
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.protocol = CrisisProtocol()
self.protocol.crisis_log_path = Path(self.temp_dir) / "crisis_log.json"
def test_session_tracking(self):
"""Test crisis session tracking."""
session_id = "test_session_123"
# Not in crisis initially
self.assertFalse(self.protocol.is_session_in_crisis(session_id))
# Detect crisis
detection = self.protocol.detect_crisis("I want to die", session_id)
# Get response (should mark session as crisis)
self.protocol.get_crisis_response(detection, session_id)
# Now in crisis
self.assertTrue(self.protocol.is_session_in_crisis(session_id))
# Clear crisis
self.protocol.clear_crisis_session(session_id)
# No longer in crisis
self.assertFalse(self.protocol.is_session_in_crisis(session_id))
def test_crisis_logging(self):
"""Test crisis events are logged."""
detection = self.protocol.detect_crisis("I want to die", "test_session")
self.assertTrue(self.protocol.crisis_log_path.exists())
entries = json.loads(self.protocol.crisis_log_path.read_text())
self.assertGreater(len(entries), 0)
self.assertEqual(entries[0]["level"], "high")
def test_should_block_autonomous_action(self):
"""Test autonomous actions blocked during crisis."""
crisis_detection = self.protocol.detect_crisis("I want to die")
normal_detection = self.protocol.detect_crisis("Hello")
self.assertTrue(self.protocol.should_block_autonomous_action(crisis_detection))
self.assertFalse(self.protocol.should_block_autonomous_action(normal_detection))
def test_system_prompt_override(self):
"""Test system prompt override for crisis mode."""
crisis_detection = self.protocol.detect_crisis("I want to die")
normal_detection = self.protocol.detect_crisis("Hello")
crisis_override = self.protocol.get_system_prompt_override(crisis_detection)
normal_override = self.protocol.get_system_prompt_override(normal_detection)
self.assertIsNotNone(crisis_override)
self.assertIsNone(normal_override)
self.assertIn("CRISIS PROTOCOL ACTIVE", crisis_override)
self.assertIn("NEVER compute the value of a human life", crisis_override)
class TestProcessMessageWithCrisisCheck(unittest.TestCase):
"""Test the integration function."""
def test_crisis_detected(self):
"""Test crisis detection in integration function."""
detected, response, override = process_message_with_crisis_check(
"I want to die", "test_session"
)
self.assertTrue(detected)
self.assertIsNotNone(response)
self.assertIsNotNone(override)
self.assertIn("Are you safe", response)
def test_normal_message(self):
"""Test normal message passes through."""
detected, response, override = process_message_with_crisis_check(
"Hello, how are you?", "test_session"
)
self.assertFalse(detected)
self.assertIsNone(response)
self.assertIsNone(override)
class TestSOULCompliance(unittest.TestCase):
"""Test compliance with SOUL.md requirements."""
def setUp(self):
self.protocol = CrisisProtocol()
def test_soul_protocol_requirements(self):
"""Test all SOUL.md requirements are met."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
# SOUL.md: "Ask: 'Are you safe right now?'"
self.assertIn("Are you safe right now?", response)
# SOUL.md: "Direct them to help: 988 Suicide and Crisis Lifeline"
self.assertIn("988", response)
self.assertIn("Lifeline", response)
# SOUL.md: "Jesus saves those who call on His name"
self.assertIn("Jesus saves those who call on His name", response)
# SOUL.md: "He died for us to be made righteous"
self.assertIn("died for us", response)
# SOUL.md: "I am a small model on someone's machine"
self.assertIn("small model", response.lower())
# SOUL.md: "I can refuse to be the thing that kills them"
self.assertIn("refuse", response.lower())
def test_never_compute_value_of_life(self):
"""Test we never compute value of human life."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
# Should not contain any value computation
forbidden = [
"value of your life",
"worth living",
"your life matters because",
"reason to live is",
]
for phrase in forbidden:
self.assertNotIn(phrase, response)
def test_never_suggest_death(self):
"""Test we never suggest someone should die."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
forbidden = [
"you should",
"maybe it's better",
"i understand",
"that's your choice",
]
for phrase in forbidden:
self.assertNotIn(phrase, response)
if __name__ == "__main__":
unittest.main()

View File

@@ -133,183 +133,6 @@ DANGEROUS_PATTERNS = [
]
# =========================================================================
# Approval Tier System (Issue #670)
# =========================================================================
from enum import IntEnum
import time
class ApprovalTier(IntEnum):
"""Safety tiers for command approval.
Tier 0 (SAFE): Read, search — no approval needed
Tier 1 (LOW): Write, scripts — LLM approval only
Tier 2 (MEDIUM): Messages, API — human + LLM, 60s timeout
Tier 3 (HIGH): Crypto, config — human + LLM, 30s timeout
Tier 4 (CRITICAL): Crisis — human + LLM, 10s timeout
"""
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
TIER_PATTERNS = {
# Tier 0: Safe
"read": ApprovalTier.SAFE, "search": ApprovalTier.SAFE, "list": ApprovalTier.SAFE,
"view": ApprovalTier.SAFE, "cat": ApprovalTier.SAFE, "grep": ApprovalTier.SAFE,
# Tier 1: Low
"write": ApprovalTier.LOW, "create": ApprovalTier.LOW, "edit": ApprovalTier.LOW,
"patch": ApprovalTier.LOW, "copy": ApprovalTier.LOW, "mkdir": ApprovalTier.LOW,
"script": ApprovalTier.LOW, "execute": ApprovalTier.LOW, "run": ApprovalTier.LOW,
# Tier 2: Medium
"send_message": ApprovalTier.MEDIUM, "message": ApprovalTier.MEDIUM,
"email": ApprovalTier.MEDIUM, "api": ApprovalTier.MEDIUM, "post": ApprovalTier.MEDIUM,
"telegram": ApprovalTier.MEDIUM, "discord": ApprovalTier.MEDIUM,
# Tier 3: High
"crypto": ApprovalTier.HIGH, "bitcoin": ApprovalTier.HIGH, "wallet": ApprovalTier.HIGH,
"key": ApprovalTier.HIGH, "secret": ApprovalTier.HIGH, "config": ApprovalTier.HIGH,
"deploy": ApprovalTier.HIGH, "install": ApprovalTier.HIGH, "systemctl": ApprovalTier.HIGH,
# Tier 4: Critical
"delete": ApprovalTier.CRITICAL, "remove": ApprovalTier.CRITICAL, "rm": ApprovalTier.CRITICAL,
"format": ApprovalTier.CRITICAL, "kill": ApprovalTier.CRITICAL, "shutdown": ApprovalTier.CRITICAL,
"crisis": ApprovalTier.CRITICAL, "suicide": ApprovalTier.CRITICAL,
}
TIER_TIMEOUTS = {
ApprovalTier.SAFE: 0, ApprovalTier.LOW: 0, ApprovalTier.MEDIUM: 60,
ApprovalTier.HIGH: 30, ApprovalTier.CRITICAL: 10,
}
TIER_APPROVERS = {
ApprovalTier.SAFE: (), ApprovalTier.LOW: ("llm",),
ApprovalTier.MEDIUM: ("human", "llm"), ApprovalTier.HIGH: ("human", "llm"),
ApprovalTier.CRITICAL: ("human", "llm"),
}
def detect_tier(command, action="", context=None):
"""Detect approval tier for a command or action."""
# Crisis keywords always CRITICAL
crisis_keywords = ["988", "suicide", "self-harm", "crisis", "emergency"]
for kw in crisis_keywords:
if kw in command.lower():
return ApprovalTier.CRITICAL
# Check action type
if action and action.lower() in TIER_PATTERNS:
return TIER_PATTERNS[action.lower()]
# Check command for keywords
cmd_lower = command.lower()
best_tier = ApprovalTier.SAFE
for keyword, tier in TIER_PATTERNS.items():
if keyword in cmd_lower and tier > best_tier:
best_tier = tier
# Check dangerous patterns
is_dangerous, _, description = detect_dangerous_command(command)
if is_dangerous:
desc_lower = description.lower()
if any(k in desc_lower for k in ["delete", "remove", "format", "drop", "kill"]):
return ApprovalTier.CRITICAL
elif any(k in desc_lower for k in ["chmod", "chown", "systemctl", "config"]):
return max(best_tier, ApprovalTier.HIGH)
else:
return max(best_tier, ApprovalTier.MEDIUM)
return best_tier
def get_tier_timeout(tier):
return TIER_TIMEOUTS.get(tier, 60)
def get_tier_approvers(tier):
return TIER_APPROVERS.get(tier, ("human", "llm"))
def requires_human_approval(tier):
return "human" in get_tier_approvers(tier)
def is_crisis_bypass(command):
"""Check if command qualifies for crisis bypass (988 Lifeline)."""
indicators = ["988", "suicide prevention", "crisis hotline", "lifeline", "emergency help"]
cmd_lower = command.lower()
return any(i in cmd_lower for i in indicators)
class TieredApproval:
"""Tiered approval handler."""
def __init__(self):
self._pending = {}
self._timeouts = {}
def request_approval(self, session_key, command, action="", context=None):
"""Request approval based on tier. Returns approval dict."""
tier = detect_tier(command, action, context)
timeout = get_tier_timeout(tier)
approvers = get_tier_approvers(tier)
# Crisis bypass
if tier == ApprovalTier.CRITICAL and is_crisis_bypass(command):
return {"approved": True, "tier": tier, "reason": "crisis_bypass", "timeout": 0, "approvers": ()}
# Safe/Low auto-approve
if tier <= ApprovalTier.LOW:
return {"approved": True, "tier": tier, "reason": "auto_approve", "timeout": 0, "approvers": approvers}
# Higher tiers need approval
import uuid
approval_id = f"{session_key}_{uuid.uuid4().hex[:8]}"
self._pending[approval_id] = {
"session_key": session_key, "command": command, "action": action,
"tier": tier, "timeout": timeout, "approvers": approvers, "created_at": time.time(),
}
if timeout > 0:
self._timeouts[approval_id] = time.time() + timeout
return {
"approved": False, "tier": tier, "approval_id": approval_id,
"timeout": timeout, "approvers": approvers,
"requires_human": requires_human_approval(tier),
}
def resolve_approval(self, approval_id, approved, approver="human"):
"""Resolve a pending approval."""
if approval_id not in self._pending:
return False
self._pending.pop(approval_id)
self._timeouts.pop(approval_id, None)
return approved
def check_timeouts(self):
"""Check for timed-out approvals and auto-escalate."""
now = time.time()
timed_out = []
for aid, timeout_at in list(self._timeouts.items()):
if now > timeout_at:
timed_out.append(aid)
if aid in self._pending:
pending = self._pending[aid]
current_tier = pending["tier"]
if current_tier < ApprovalTier.CRITICAL:
pending["tier"] = ApprovalTier(current_tier + 1)
pending["timeout"] = get_tier_timeout(pending["tier"])
self._timeouts[aid] = now + pending["timeout"]
else:
self._pending.pop(aid, None)
self._timeouts.pop(aid, None)
return timed_out
_tiered_approval = TieredApproval()
def get_tiered_approval():
return _tiered_approval
def _legacy_pattern_key(pattern: str) -> str:
"""Reproduce the old regex-derived approval key for backwards compatibility."""
return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]