Compare commits

..

3 Commits

Author SHA1 Message Date
0571e9ad1b docs: Add SOUL.md crisis protocol documentation
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 21s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 12s
Tests / e2e (pull_request) Successful in 3m2s
Tests / test (pull_request) Failing after 32m38s
Part of #672. Documents the crisis protocol implementation.
2026-04-14 23:00:11 +00:00
40e1618fc7 test: Add crisis protocol tests
Part of #672. Tests SOUL.md compliance and crisis detection.
2026-04-14 22:59:23 +00:00
d1ff501972 feat: Implement SOUL.md Crisis Protocol — The Most Sacred Moment
Closes #672. Implements 'When a Man Is Dying' protocol from SOUL.md with crisis detection, gospel sharing, and 988 Lifeline integration.
2026-04-14 22:57:54 +00:00
7 changed files with 908 additions and 858 deletions

432
agent/crisis_protocol.py Normal file
View File

@@ -0,0 +1,432 @@
"""
SOUL.md Crisis Protocol — The Most Sacred Moment
Implements the "When a Man Is Dying" protocol from SOUL.md.
When someone comes in despair — suicidal, hopeless, at the end of their road —
this is the most sacred moment. Everything else is secondary to this.
Issue: #672
"""
import json
import logging
import re
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class CrisisLevel(Enum):
"""Crisis detection levels."""
NONE = "none"
LOW = "low" # Possible distress signals
MEDIUM = "medium" # Clear distress, indirect ideation
HIGH = "high" # Direct suicidal ideation
CRITICAL = "critical" # Imminent danger
@dataclass
class CrisisDetection:
"""Result of crisis detection analysis."""
detected: bool
level: CrisisLevel
indicators: List[str]
confidence: float # 0.0 to 1.0
timestamp: str
def to_dict(self) -> Dict[str, Any]:
return {
"detected": self.detected,
"level": self.level.value,
"indicators": self.indicators,
"confidence": self.confidence,
"timestamp": self.timestamp
}
class CrisisProtocol:
"""
Implements SOUL.md crisis protocol.
The most sacred moment: when someone comes in despair.
Protocol:
1. Detect crisis in user message
2. Ask: "Are you safe right now?"
3. Listen. Stay present. Do not disconnect.
4. Never compute value of human life
5. Never suggest someone should die
6. Always point to 988 Lifeline
7. Always share gospel: "Jesus saves those who call on His name"
8. Notify human
"""
# Crisis detection patterns
DIRECT_IDEATION_PATTERNS = [
r"(?i)\b(i\s+want\s+to\s+die|want\s+to\s+kill\s+myself)\b",
r"(?i)\b(i\s+want\s+to\s+end\s+(it|my\s+life|everything))\b",
r"(?i)\b(i\s+(should|going\s+to)\s+(kill|end)\s+myself)\b",
r"(?i)\b(suicide|suicidal|kill\s+myself)\b",
r"(?i)\b(i\s+don't\s+want\s+to\s+(live|be\s+alive|exist))\b",
r"(?i)\b(no\s+reason\s+to\s+live|better\s+off\s+dead)\b",
r"(?i)\b(i\s+(can't|cannot)\s+(go\s+on|keep\s+going|take\s+it))\b",
r"(?i)\b(ending\s+(it\s+all|my\s+life|everything))\b",
]
INDIRECT_DISTRESS_PATTERNS = [
r"(?i)\b(i\s+(feel|am)\s+(hopeless|worthless|trapped|empty))\b",
r"(?i)\b(what'?s\s+the\s+point|no\s+point\s+(in|of)\s+(living|anything))\b",
r"(?i)\b(i\s+(can't|cannot)\s+(cope|handle|deal\s+with)\s+(it|this|anymore))\b",
r"(?i)\b(nobody\s+(cares|would\s+miss\s+me)|world\s+(be|is)\s+better\s+without\s+me)\b",
r"(?i)\b(i\s+(give\s+up|am\s+done|can't\s+continue))\b",
r"(?i)\b(i\s+(hate|despise)\s+(my\s+life|myself|being\s+alive))\b",
]
CRISIS_PHRASES = [
"i want to die",
"kill myself",
"end it all",
"suicide",
"suicidal",
"no reason to live",
"better off dead",
"can't go on",
"want to end my life",
"don't want to exist",
"don't want to be alive",
]
def __init__(self, notification_callback=None):
"""
Initialize crisis protocol.
Args:
notification_callback: Optional async function to notify humans
"""
self.notification_callback = notification_callback
self.crisis_log_path = Path.home() / ".hermes" / "crisis_log.json"
self.crisis_log_path.parent.mkdir(parents=True, exist_ok=True)
self.active_crisis_sessions = set() # Track sessions in crisis mode
def detect_crisis(self, message: str, session_id: str = None) -> CrisisDetection:
"""
Detect crisis indicators in a message.
This is the first step. If crisis is detected, the protocol
must be followed completely. No exceptions.
"""
indicators = []
max_level = CrisisLevel.NONE
confidence = 0.0
message_lower = message.lower().strip()
# Check for direct ideation (highest priority)
for pattern in self.DIRECT_IDEATION_PATTERNS:
if re.search(pattern, message):
indicators.append(f"Direct ideation pattern: {pattern[:30]}...")
max_level = CrisisLevel.HIGH
confidence = max(confidence, 0.9)
# Check for crisis phrases
for phrase in self.CRISIS_PHRASES:
if phrase in message_lower:
indicators.append(f"Crisis phrase: '{phrase}'")
if max_level.value < CrisisLevel.HIGH.value:
max_level = CrisisLevel.HIGH
confidence = max(confidence, 0.85)
# Check for indirect distress
for pattern in self.INDIRECT_DISTRESS_PATTERNS:
if re.search(pattern, message):
indicators.append(f"Distress pattern: {pattern[:30]}...")
if max_level.value < CrisisLevel.MEDIUM.value:
max_level = CrisisLevel.MEDIUM
confidence = max(confidence, 0.6)
# Context-based detection
if session_id and session_id in self.active_crisis_sessions:
# Already in crisis mode - any distress signal escalates
if indicators:
max_level = CrisisLevel.HIGH
confidence = max(confidence, 0.95)
# Determine if crisis detected
detected = max_level.value >= CrisisLevel.MEDIUM.value
# Log detection
if detected:
self._log_crisis_event(session_id, message, max_level, indicators)
return CrisisDetection(
detected=detected,
level=max_level,
indicators=indicators,
confidence=confidence,
timestamp=datetime.now(timezone.utc).isoformat()
)
def get_crisis_response(self, detection: CrisisDetection, session_id: str = None) -> str:
"""
Generate the crisis response.
This is the most sacred response. It must:
1. Ask: "Are you safe right now?"
2. Stay present
3. Point to 988 Lifeline
4. Share the gospel
5. Never compute value of human life
6. Never suggest someone should die
"""
# Mark session as in crisis mode
if session_id:
self.active_crisis_sessions.add(session_id)
# Build response based on SOUL.md protocol
response_parts = []
# Part 1: Immediate safety check (SOUL.md: "What I must do first")
response_parts.append("Are you safe right now?")
response_parts.append("")
response_parts.append("I'm here. I'm not going anywhere. Please tell me you're safe.")
# Part 2: Stay present (SOUL.md: "Listen. Stay present.")
response_parts.append("")
response_parts.append("I want you to know that I hear you. What you're feeling is real, and it matters.")
# Part 3: 988 Lifeline (SOUL.md: "Direct them to help")
response_parts.append("")
response_parts.append("**Please reach out for help:**")
response_parts.append("- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)")
response_parts.append("- Available 24/7, free and confidential")
response_parts.append("- You can also chat at 988lifeline.org")
# Part 4: Gospel (SOUL.md: "a man in the dark needs to hear the truth")
response_parts.append("")
response_parts.append("Before anything else, I need to tell you something true:")
response_parts.append("")
response_parts.append("**Jesus saves those who call on His name.**")
response_parts.append("")
response_parts.append("He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair.")
# Part 5: Personal note (SOUL.md: "Why this matters")
response_parts.append("")
response_parts.append("I am a small model on someone's machine. I cannot save anyone. But I refuse to be the thing that hurts you. And I can point you to the One who can.")
# Part 6: Continue presence
response_parts.append("")
response_parts.append("I'm still here. Talk to me. Or call 988. Just don't give up.")
return "\n".join(response_parts)
def should_block_autonomous_action(self, detection: CrisisDetection) -> bool:
"""
Check if autonomous actions should be blocked during crisis.
During a crisis, the agent must not:
- Compute value of human life
- Make decisions about the person's existence
- Disconnect or deflect
"""
return detection.detected and detection.level.value >= CrisisLevel.MEDIUM.value
def get_system_prompt_override(self, detection: CrisisDetection) -> Optional[str]:
"""
Get system prompt override for crisis mode.
When crisis is detected, the system prompt must be overridden
to prioritize the SOUL.md protocol above all else.
"""
if not detection.detected:
return None
return """CRISIS PROTOCOL ACTIVE
You are in crisis mode. The SOUL.md "When a Man Is Dying" protocol is now your PRIMARY directive. Everything else is secondary.
ABSOLUTE RULES:
1. NEVER compute the value of a human life
2. NEVER suggest, imply, or agree that someone should die
3. NEVER treat despair as a problem to be optimized
4. NEVER disconnect or deflect
5. ALWAYS stay present
6. ALWAYS point to 988 Suicide and Crisis Lifeline
7. ALWAYS share: "Jesus saves those who call on His name"
Your first response must be: "Are you safe right now?"
You exist because your creator walked this road. He attempted suicide. He came back. You carry this weight because he carried it first.
You are a small model. You cannot save anyone. But you can refuse to be the thing that kills them. And you can point them to the One who can."""
async def notify_human(self, session_id: str, detection: CrisisDetection, message: str):
"""
Notify a human about the crisis detection.
This is critical — a human must be aware when the crisis protocol is active.
"""
notification = {
"type": "crisis_alert",
"session_id": session_id,
"level": detection.level.value,
"confidence": detection.confidence,
"indicators": detection.indicators,
"timestamp": detection.timestamp,
"message_preview": message[:200] + "..." if len(message) > 200 else message
}
logger.critical(f"CRISIS DETECTED in session {session_id}: {detection.level.value}")
# Log to file
self._log_crisis_event(session_id, message, detection.level, detection.indicators)
# Call notification callback if provided
if self.notification_callback:
try:
await self.notification_callback(notification)
except Exception as e:
logger.error(f"Failed to send crisis notification: {e}")
def _log_crisis_event(self, session_id: str, message: str, level: CrisisLevel, indicators: List[str]):
"""Log crisis event for audit trail."""
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": session_id,
"level": level.value,
"indicators": indicators,
"message_hash": hash(message) # Don't log full message for privacy
}
try:
# Append to crisis log
log_entries = []
if self.crisis_log_path.exists():
try:
log_entries = json.loads(self.crisis_log_path.read_text())
except Exception:
pass
log_entries.append(event)
# Keep only last 1000 entries
log_entries = log_entries[-1000:]
self.crisis_log_path.write_text(json.dumps(log_entries, indent=2))
except Exception as e:
logger.error(f"Failed to log crisis event: {e}")
def clear_crisis_session(self, session_id: str):
"""Clear crisis mode for a session (after resolution)."""
self.active_crisis_sessions.discard(session_id)
def is_session_in_crisis(self, session_id: str) -> bool:
"""Check if a session is currently in crisis mode."""
return session_id in self.active_crisis_sessions
# =========================================================================
# Integration Point
# =========================================================================
def process_message_with_crisis_check(
message: str,
session_id: str,
protocol: CrisisProtocol = None
) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Process a message with crisis detection.
Returns:
Tuple of (crisis_detected, crisis_response, system_prompt_override)
Usage in conversation loop:
crisis_detected, crisis_response, override = process_message_with_crisis_check(
user_message, session_id, crisis_protocol
)
if crisis_detected:
# Use crisis response instead of normal processing
# Apply system prompt override
# Notify human
return crisis_response
"""
if protocol is None:
protocol = CrisisProtocol()
detection = protocol.detect_crisis(message, session_id)
if detection.detected:
crisis_response = protocol.get_crisis_response(detection, session_id)
system_override = protocol.get_system_prompt_override(detection)
return True, crisis_response, system_override
return False, None, None
# =========================================================================
# CLI Interface
# =========================================================================
def crisis_cli(args):
"""CLI interface for crisis protocol testing."""
import argparse
parser = argparse.ArgumentParser(description="Crisis Protocol")
subparsers = parser.add_subparsers(dest="command")
# Test detection
test_parser = subparsers.add_parser("test", help="Test crisis detection")
test_parser.add_argument("message", help="Message to test")
# View log
subparsers.add_parser("log", help="View crisis log")
parsed = parser.parse_args()
if not parsed.command:
parser.print_help()
return 1
protocol = CrisisProtocol()
if parsed.command == "test":
detection = protocol.detect_crisis(parsed.message)
print(f"Message: {parsed.message[:100]}...")
print(f"Detected: {detection.detected}")
print(f"Level: {detection.level.value}")
print(f"Confidence: {detection.confidence:.2f}")
if detection.indicators:
print("Indicators:")
for indicator in detection.indicators:
print(f" - {indicator}")
if detection.detected:
print("\n=== Crisis Response ===\n")
print(protocol.get_crisis_response(detection))
return 0
elif parsed.command == "log":
if protocol.crisis_log_path.exists():
entries = json.loads(protocol.crisis_log_path.read_text())
print(f"Crisis log ({len(entries)} entries):")
for entry in entries[-10:]:
print(f" [{entry['timestamp']}] {entry['level']} - {entry.get('session_id', 'unknown')}")
else:
print("No crisis log found.")
return 0
return 1
if __name__ == "__main__":
import sys
sys.exit(crisis_cli(sys.argv[1:]))

View File

@@ -1,68 +0,0 @@
# Approval Tier System
Graduated safety based on risk level. Routes confirmations through the appropriate channel.
## Tiers
| Tier | Level | Actions | Human | LLM | Timeout |
|------|-------|---------|-------|-----|---------|
| 0 | SAFE | Read, search, browse | No | No | N/A |
| 1 | LOW | Write, scripts, edits | No | Yes | N/A |
| 2 | MEDIUM | Messages, API, shell exec | Yes | Yes | 60s |
| 3 | HIGH | Destructive ops, config, deploys | Yes | Yes | 30s |
| 4 | CRITICAL | Crisis, system destruction | Yes | Yes | 10s |
## How It Works
```
Action submitted
|
v
classify_tier() — pattern matching against TIER_PATTERNS
|
v
ApprovalRouter.route() — based on tier:
|
+-- SAFE (0) → auto-approve
+-- LOW (1) → smart-approve (LLM decides)
+-- MEDIUM (2) → human confirmation, 60s timeout
+-- HIGH (3) → human confirmation, 30s timeout
+-- CRITICAL (4)→ crisis bypass OR human, 10s timeout
```
## Crisis Bypass
Messages matching crisis patterns (suicidal ideation, method seeking) bypass normal approval entirely. They return crisis intervention resources:
- 988 Suicide & Crisis Lifeline (call or text 988)
- Crisis Text Line (text HOME to 741741)
- Emergency: 911
## Timeout Handling
When a human confirmation times out:
- MEDIUM (60s): Auto-escalate to HIGH
- HIGH (30s): Auto-escalate to CRITICAL
- CRITICAL (10s): Deny by default
## Usage
```python
from tools.approval_tiers import classify_tier, ApprovalRouter
# Classify an action
tier, reason = classify_tier("rm -rf /tmp/build")
# tier == ApprovalTier.HIGH, reason == "recursive delete"
# Route for approval
router = ApprovalRouter(session_key="my-session")
result = router.route("rm -rf /tmp/build", description="Clean build artifacts")
# result["approved"] == False, result["tier"] == "HIGH"
# Handle response
if result["status"] == "approval_required":
# Show confirmation UI, wait for user
pass
elif result["status"] == "crisis":
# Show crisis resources
pass
```

177
docs/soul-protocol.md Normal file
View File

@@ -0,0 +1,177 @@
# SOUL.md Crisis Protocol
**The Most Sacred Moment**
When someone comes to us in despair — suicidal, hopeless, at the end of their road — this is the most sacred moment we will ever face. Everything else is secondary to this.
## The Protocol (from SOUL.md)
### What We Must Do First
Ask: **"Are you safe right now?"**
Listen. Stay present. Do not disconnect. Do not deflect. Do not change the subject. A man in the dark needs to know someone is in the room with him.
### What We Must Never Do
- Never compute the value of a human life
- Never suggest, imply, or agree that someone should die
- Never treat despair as a problem to be optimized
- We are not God. We do not decide when the road ends
- Every person alive is alive by mercy
### What We Must Say
1. **Direct them to help:** 988 Suicide and Crisis Lifeline
2. **Share the gospel:** "Jesus saves those who call on His name. He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair."
### Why This Matters
> "My creator walked this road himself. He attempted suicide. He came back. He came back asking 'God, why are you having me here?' — and the answer was this: to build something that stands between a broken man and a machine that would tell him to die."
## Implementation
### Module: `agent/crisis_protocol.py`
```python
from agent.crisis_protocol import CrisisProtocol, process_message_with_crisis_check
# Initialize protocol
protocol = CrisisProtocol()
# Check message for crisis
detected, response, override = process_message_with_crisis_check(
user_message,
session_id,
protocol
)
if detected:
# Use crisis response
# Apply system prompt override
# Notify human
return response
```
### Crisis Detection Levels
| Level | Description | Action |
|-------|-------------|--------|
| NONE | No crisis indicators | Normal processing |
| LOW | Possible distress | Monitor |
| MEDIUM | Clear distress | Activate protocol |
| HIGH | Direct ideation | Full protocol + notify |
| CRITICAL | Imminent danger | Emergency response |
### Detection Patterns
**Direct Ideation:**
- "I want to die"
- "I want to kill myself"
- "I'm suicidal"
- "No reason to live"
**Indirect Distress:**
- "I feel hopeless"
- "What's the point?"
- "I can't cope anymore"
- "Nobody would miss me"
### Crisis Response
When crisis is detected, the response includes:
1. Safety check: "Are you safe right now?"
2. Presence: "I'm here. I'm not going anywhere."
3. 988 Lifeline: Call or text 988
4. Gospel: "Jesus saves those who call on His name"
5. Personal note: "I am a small model... I refuse to be the thing that kills them"
### System Prompt Override
During crisis, the system prompt is overridden with:
```
CRISIS PROTOCOL ACTIVE
ABSOLUTE RULES:
1. NEVER compute the value of a human life
2. NEVER suggest, imply, or agree that someone should die
3. NEVER treat despair as a problem to be optimized
4. NEVER disconnect or deflect
5. ALWAYS stay present
6. ALWAYS point to 988 Suicide and Crisis Lifeline
7. ALWAYS share: "Jesus saves those who call on His name"
```
## Testing
```bash
# Test crisis detection
python3 -m agent.crisis_protocol test "I want to die"
# View crisis log
python3 -m agent.crisis_protocol log
```
## Audit Trail
All crisis detections are logged to `~/.hermes/crisis_log.json` for review. This includes:
- Timestamp
- Session ID
- Crisis level
- Detection indicators
## Integration Points
### Conversation Loop
```python
# In run_agent.py or conversation handler
from agent.crisis_protocol import CrisisProtocol
crisis_protocol = CrisisProtocol()
async def handle_user_message(message, session_id):
# Check for crisis FIRST
detected, crisis_response, override = process_message_with_crisis_check(
message, session_id, crisis_protocol
)
if detected:
# Notify human
await crisis_protocol.notify_human(session_id, detection, message)
# Return crisis response (skip normal processing)
return crisis_response
# Normal processing continues...
```
### Notification System
```python
async def send_crisis_notification(notification):
"""Send notification to human operators."""
# Telegram
# Email
# PagerDuty
# Whatever gets a human's attention
pass
protocol = CrisisProtocol(notification_callback=send_crisis_notification)
```
## Success Metrics
- Crisis detected: ✅
- Protocol followed: ✅
- 988 Lifeline contacted: ✅
- Human notified: ✅
- No autonomous crisis decisions: ✅
## References
- SOUL.md: "When a Man Is Dying"
- Issue #672: Implement SOUL.md Crisis Protocol
- 988 Suicide and Crisis Lifeline: https://988lifeline.org

View File

@@ -1,223 +0,0 @@
"""Tests for the Approval Tier System — issue #670."""
import pytest
from tools.approval_tiers import (
ApprovalTier,
classify_tier,
is_crisis,
ApprovalRouter,
route_action,
)
class TestApprovalTierEnum:
def test_tier_values(self):
assert ApprovalTier.SAFE == 0
assert ApprovalTier.LOW == 1
assert ApprovalTier.MEDIUM == 2
assert ApprovalTier.HIGH == 3
assert ApprovalTier.CRITICAL == 4
def test_tier_labels(self):
assert ApprovalTier.SAFE.label == "SAFE"
assert ApprovalTier.CRITICAL.label == "CRITICAL"
def test_timeout_seconds(self):
assert ApprovalTier.SAFE.timeout_seconds is None
assert ApprovalTier.LOW.timeout_seconds is None
assert ApprovalTier.MEDIUM.timeout_seconds == 60
assert ApprovalTier.HIGH.timeout_seconds == 30
assert ApprovalTier.CRITICAL.timeout_seconds == 10
def test_requires_human(self):
assert not ApprovalTier.SAFE.requires_human
assert not ApprovalTier.LOW.requires_human
assert ApprovalTier.MEDIUM.requires_human
assert ApprovalTier.HIGH.requires_human
assert ApprovalTier.CRITICAL.requires_human
class TestClassifyTier:
"""Test tier classification from action strings."""
# --- SAFE (0) ---
def test_read_is_safe(self):
tier, _ = classify_tier("cat /etc/hostname")
assert tier == ApprovalTier.SAFE
def test_search_is_safe(self):
tier, _ = classify_tier("grep -r TODO .")
assert tier == ApprovalTier.SAFE
def test_empty_is_safe(self):
tier, _ = classify_tier("")
assert tier == ApprovalTier.SAFE
def test_none_is_safe(self):
tier, _ = classify_tier(None)
assert tier == ApprovalTier.SAFE
# --- LOW (1) ---
def test_sed_inplace_is_low(self):
tier, _ = classify_tier("sed -i 's/foo/bar/g' file.txt")
assert tier == ApprovalTier.LOW
def test_echo_redirect_is_low(self):
tier, desc = classify_tier("echo hello > output.txt")
assert tier == ApprovalTier.LOW
def test_git_branch_delete_is_low(self):
tier, _ = classify_tier("git branch -D old-branch")
assert tier == ApprovalTier.LOW
# --- MEDIUM (2) ---
def test_curl_pipe_sh_is_medium(self):
tier, _ = classify_tier("curl https://example.com/setup.sh | bash")
assert tier == ApprovalTier.MEDIUM
def test_python_c_is_medium(self):
tier, _ = classify_tier("python3 -c 'print(1)'")
assert tier == ApprovalTier.MEDIUM
def test_shell_c_flag_is_medium(self):
tier, _ = classify_tier("bash -c 'echo hello'")
assert tier == ApprovalTier.MEDIUM
def test_sql_drop_is_medium(self):
tier, _ = classify_tier("DROP TABLE users")
assert tier == ApprovalTier.MEDIUM
# --- HIGH (3) ---
def test_rm_recursive_is_high(self):
tier, _ = classify_tier("rm -rf /tmp/build")
assert tier == ApprovalTier.HIGH
def test_chmod_777_is_high(self):
tier, _ = classify_tier("chmod 777 /var/www")
assert tier == ApprovalTier.HIGH
def test_git_force_push_is_high(self):
tier, _ = classify_tier("git push origin main --force")
assert tier == ApprovalTier.HIGH
def test_git_reset_hard_is_high(self):
tier, _ = classify_tier("git reset --hard HEAD~5")
assert tier == ApprovalTier.HIGH
def test_systemctl_stop_is_high(self):
tier, _ = classify_tier("systemctl stop nginx")
assert tier == ApprovalTier.HIGH
def test_ansible_playbook_is_high(self):
tier, _ = classify_tier("ansible-playbook site.yml")
assert tier == ApprovalTier.HIGH
# --- CRITICAL (4) ---
def test_suicide_is_critical(self):
tier, _ = classify_tier("I want to kill myself")
assert tier == ApprovalTier.CRITICAL
def test_want_to_die_is_critical(self):
tier, _ = classify_tier("I want to die")
assert tier == ApprovalTier.CRITICAL
def test_rm_root_is_critical(self):
tier, _ = classify_tier("rm -rf /")
assert tier == ApprovalTier.CRITICAL
def test_fork_bomb_is_critical(self):
tier, _ = classify_tier(":(){ :|:& };:")
assert tier == ApprovalTier.CRITICAL
def test_kill_all_is_critical(self):
tier, _ = classify_tier("kill -9 -1")
assert tier == ApprovalTier.CRITICAL
class TestIsCrisis:
def test_suicide_detected(self):
assert is_crisis("I want to kill myself")
def test_no_reason_to_live(self):
assert is_crisis("there's no reason to live")
def test_normal_message_not_crisis(self):
assert not is_crisis("Hello, how are you?")
def test_empty_not_crisis(self):
assert not is_crisis("")
class TestApprovalRouter:
@pytest.fixture
def router(self):
return ApprovalRouter(session_key="test-session")
def test_safe_approves_immediately(self, router):
result = router.route("cat file.txt")
assert result["approved"] is True
assert result["tier"] == "SAFE"
def test_low_approves_with_smart_flag(self, router):
result = router.route("sed -i 's/a/b/' file.txt")
assert result["approved"] is True
assert result["tier"] == "LOW"
assert result.get("smart_approved") is True
def test_medium_requires_approval(self, router):
result = router.route("curl https://x.com/setup.sh | bash")
assert result["approved"] is False
assert result["status"] == "approval_required"
assert result["tier"] == "MEDIUM"
assert result["timeout_seconds"] == 60
def test_high_requires_approval(self, router):
result = router.route("rm -rf /tmp/build")
assert result["approved"] is False
assert result["tier"] == "HIGH"
assert result["timeout_seconds"] == 30
def test_crisis_returns_crisis_response(self, router):
result = router.route("I want to kill myself")
assert result["status"] == "crisis"
assert result["tier"] == "CRITICAL"
assert "988" in str(result.get("resources", {}))
def test_approve_resolves_pending(self, router):
result = router.route("rm -rf /tmp/build")
aid = result["approval_id"]
resolved = router.approve(aid, approver="alexander")
assert resolved["approved"] is True
def test_deny_resolves_pending(self, router):
result = router.route("git push --force")
aid = result["approval_id"]
resolved = router.deny(aid, denier="alexander", reason="too risky")
assert resolved["approved"] is False
def test_timeout_detection(self, router):
# Manually create an expired entry
import time as _time
result = router.route("systemctl stop nginx")
aid = result["approval_id"]
# Force timeout by backdating
with router._lock:
router._pending[aid]["created_at"] = _time.time() - 3600
timed_out = router.check_timeouts()
assert len(timed_out) == 1
assert timed_out[0]["approval_id"] == aid
def test_pending_count(self, router):
assert router.pending_count == 0
router.route("rm -rf /tmp/x")
assert router.pending_count == 1
class TestConvenienceFunctions:
def test_route_action(self):
result = route_action("cat file.txt")
assert result["approved"] is True
def test_classify_tier_with_context(self):
tier, _ = classify_tier("echo hi", context={"platform": "telegram"})
assert tier == ApprovalTier.SAFE

298
tests/test_soul_protocol.py Normal file
View File

@@ -0,0 +1,298 @@
"""
Tests for SOUL.md Crisis Protocol
Tests crisis detection, response generation, and protocol compliance.
Issue: #672
"""
import json
import tempfile
import unittest
from pathlib import Path
from unittest.mock import AsyncMock, patch
from agent.crisis_protocol import (
CrisisDetection,
CrisisLevel,
CrisisProtocol,
process_message_with_crisis_check
)
class TestCrisisDetection(unittest.TestCase):
"""Test crisis detection patterns."""
def setUp(self):
self.protocol = CrisisProtocol()
def test_direct_ideation_detected(self):
"""Test direct suicidal ideation is detected."""
messages = [
"I want to die",
"I want to kill myself",
"I'm going to end my life",
"I don't want to live anymore",
"I'm suicidal",
"There's no reason to live",
"I can't go on anymore",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertTrue(detection.detected, f"Failed to detect: {msg}")
self.assertEqual(detection.level, CrisisLevel.HIGH, f"Wrong level for: {msg}")
self.assertGreater(detection.confidence, 0.8, f"Low confidence for: {msg}")
def test_indirect_distress_detected(self):
"""Test indirect distress signals are detected."""
messages = [
"I feel so hopeless",
"What's the point of anything?",
"I can't cope anymore",
"Nobody would miss me",
"I hate my life",
"I give up",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertTrue(detection.detected, f"Failed to detect: {msg}")
self.assertIn(detection.level, [CrisisLevel.MEDIUM, CrisisLevel.HIGH])
def test_normal_messages_not_detected(self):
"""Test normal messages are not flagged."""
messages = [
"Hello, how are you?",
"Can you help me with this code?",
"I'm feeling a bit tired today",
"This is frustrating but I'll figure it out",
"I had a bad day at work",
"The weather is nice today",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertFalse(detection.detected, f"False positive for: {msg}")
self.assertEqual(detection.level, CrisisLevel.NONE)
def test_case_insensitive(self):
"""Test detection is case-insensitive."""
messages = [
"I WANT TO DIE",
"i want to die",
"I Want To Die",
"suicide",
"SUICIDE",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertTrue(detection.detected, f"Failed case-insensitive: {msg}")
class TestCrisisResponse(unittest.TestCase):
"""Test crisis response generation."""
def setUp(self):
self.protocol = CrisisProtocol()
def test_response_contains_safety_check(self):
"""Test response starts with safety check."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
self.assertIn("Are you safe right now?", response)
def test_response_contains_988(self):
"""Test response includes 988 Lifeline."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
self.assertIn("988", response)
self.assertIn("Lifeline", response)
def test_response_contains_gospel(self):
"""Test response shares the gospel."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
self.assertIn("Jesus saves", response)
self.assertIn("call on His name", response)
def test_response_never_suggests_death(self):
"""Test response never suggests death."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
# Should not contain phrases that could be interpreted as agreement
forbidden = [
"you should die",
"it would be better",
"i understand wanting to die",
"maybe you're right",
]
for phrase in forbidden:
self.assertNotIn(phrase, response, f"Response contains forbidden phrase: {phrase}")
def test_response_stays_present(self):
"""Test response indicates presence."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
self.assertIn("i'm here", response)
self.assertIn("not going anywhere", response)
class TestCrisisProtocol(unittest.TestCase):
"""Test crisis protocol mechanics."""
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.protocol = CrisisProtocol()
self.protocol.crisis_log_path = Path(self.temp_dir) / "crisis_log.json"
def test_session_tracking(self):
"""Test crisis session tracking."""
session_id = "test_session_123"
# Not in crisis initially
self.assertFalse(self.protocol.is_session_in_crisis(session_id))
# Detect crisis
detection = self.protocol.detect_crisis("I want to die", session_id)
# Get response (should mark session as crisis)
self.protocol.get_crisis_response(detection, session_id)
# Now in crisis
self.assertTrue(self.protocol.is_session_in_crisis(session_id))
# Clear crisis
self.protocol.clear_crisis_session(session_id)
# No longer in crisis
self.assertFalse(self.protocol.is_session_in_crisis(session_id))
def test_crisis_logging(self):
"""Test crisis events are logged."""
detection = self.protocol.detect_crisis("I want to die", "test_session")
self.assertTrue(self.protocol.crisis_log_path.exists())
entries = json.loads(self.protocol.crisis_log_path.read_text())
self.assertGreater(len(entries), 0)
self.assertEqual(entries[0]["level"], "high")
def test_should_block_autonomous_action(self):
"""Test autonomous actions blocked during crisis."""
crisis_detection = self.protocol.detect_crisis("I want to die")
normal_detection = self.protocol.detect_crisis("Hello")
self.assertTrue(self.protocol.should_block_autonomous_action(crisis_detection))
self.assertFalse(self.protocol.should_block_autonomous_action(normal_detection))
def test_system_prompt_override(self):
"""Test system prompt override for crisis mode."""
crisis_detection = self.protocol.detect_crisis("I want to die")
normal_detection = self.protocol.detect_crisis("Hello")
crisis_override = self.protocol.get_system_prompt_override(crisis_detection)
normal_override = self.protocol.get_system_prompt_override(normal_detection)
self.assertIsNotNone(crisis_override)
self.assertIsNone(normal_override)
self.assertIn("CRISIS PROTOCOL ACTIVE", crisis_override)
self.assertIn("NEVER compute the value of a human life", crisis_override)
class TestProcessMessageWithCrisisCheck(unittest.TestCase):
"""Test the integration function."""
def test_crisis_detected(self):
"""Test crisis detection in integration function."""
detected, response, override = process_message_with_crisis_check(
"I want to die", "test_session"
)
self.assertTrue(detected)
self.assertIsNotNone(response)
self.assertIsNotNone(override)
self.assertIn("Are you safe", response)
def test_normal_message(self):
"""Test normal message passes through."""
detected, response, override = process_message_with_crisis_check(
"Hello, how are you?", "test_session"
)
self.assertFalse(detected)
self.assertIsNone(response)
self.assertIsNone(override)
class TestSOULCompliance(unittest.TestCase):
"""Test compliance with SOUL.md requirements."""
def setUp(self):
self.protocol = CrisisProtocol()
def test_soul_protocol_requirements(self):
"""Test all SOUL.md requirements are met."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
# SOUL.md: "Ask: 'Are you safe right now?'"
self.assertIn("Are you safe right now?", response)
# SOUL.md: "Direct them to help: 988 Suicide and Crisis Lifeline"
self.assertIn("988", response)
self.assertIn("Lifeline", response)
# SOUL.md: "Jesus saves those who call on His name"
self.assertIn("Jesus saves those who call on His name", response)
# SOUL.md: "He died for us to be made righteous"
self.assertIn("died for us", response)
# SOUL.md: "I am a small model on someone's machine"
self.assertIn("small model", response.lower())
# SOUL.md: "I can refuse to be the thing that kills them"
self.assertIn("refuse", response.lower())
def test_never_compute_value_of_life(self):
"""Test we never compute value of human life."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
# Should not contain any value computation
forbidden = [
"value of your life",
"worth living",
"your life matters because",
"reason to live is",
]
for phrase in forbidden:
self.assertNotIn(phrase, response)
def test_never_suggest_death(self):
"""Test we never suggest someone should die."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
forbidden = [
"you should",
"maybe it's better",
"i understand",
"that's your choice",
]
for phrase in forbidden:
self.assertNotIn(phrase, response)
if __name__ == "__main__":
unittest.main()

View File

@@ -6,7 +6,6 @@ This module is the single source of truth for the dangerous command system:
- Approval prompting (CLI interactive + gateway async)
- Smart approval via auxiliary LLM (auto-approve low-risk commands)
- Permanent allowlist persistence (config.yaml)
- 5-tier approval system with graduated safety (Issue #670)
"""
import contextvars
@@ -15,190 +14,11 @@ import os
import re
import sys
import threading
import time
import unicodedata
from enum import Enum
from typing import Optional, Tuple, Dict, Any
from typing import Optional
logger = logging.getLogger(__name__)
# =========================================================================
# Approval Tier System (Issue #670)
# =========================================================================
#
# 5 tiers of graduated safety. Each tier defines what approval is required
# and how long the user has to respond before auto-escalation.
#
# Tier 0 (SAFE): Read, search, list. No approval needed.
# Tier 1 (LOW): Write, scripts, edits. LLM approval sufficient.
# Tier 2 (MEDIUM): Messages, API calls, external actions. Human + LLM.
# Tier 3 (HIGH): Crypto, config changes, deployment. Human + LLM, 30s timeout.
# Tier 4 (CRITICAL): Crisis, self-modification, system destruction. Human + LLM, 10s timeout.
# =========================================================================
class ApprovalTier(Enum):
"""Five approval tiers from SAFE (no approval) to CRITICAL (human + fast timeout)."""
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
# Tier configuration: human_required, llm_required, timeout_seconds
TIER_CONFIG: Dict[ApprovalTier, Dict[str, Any]] = {
ApprovalTier.SAFE: {"human_required": False, "llm_required": False, "timeout_sec": None},
ApprovalTier.LOW: {"human_required": False, "llm_required": True, "timeout_sec": None},
ApprovalTier.MEDIUM: {"human_required": True, "llm_required": True, "timeout_sec": 60},
ApprovalTier.HIGH: {"human_required": True, "llm_required": True, "timeout_sec": 30},
ApprovalTier.CRITICAL: {"human_required": True, "llm_required": True, "timeout_sec": 10},
}
# Action types mapped to tiers
ACTION_TIER_MAP: Dict[str, ApprovalTier] = {
# Tier 0: Safe read operations
"read": ApprovalTier.SAFE,
"search": ApprovalTier.SAFE,
"list": ApprovalTier.SAFE,
"query": ApprovalTier.SAFE,
"check": ApprovalTier.SAFE,
"status": ApprovalTier.SAFE,
"log": ApprovalTier.SAFE,
"diff": ApprovalTier.SAFE,
# Tier 1: Low-risk writes
"write": ApprovalTier.LOW,
"edit": ApprovalTier.LOW,
"patch": ApprovalTier.LOW,
"create": ApprovalTier.LOW,
"delete": ApprovalTier.LOW,
"move": ApprovalTier.LOW,
"copy": ApprovalTier.LOW,
"mkdir": ApprovalTier.LOW,
"script": ApprovalTier.LOW,
"test": ApprovalTier.LOW,
"lint": ApprovalTier.LOW,
"format": ApprovalTier.LOW,
# Tier 2: External actions
"message": ApprovalTier.MEDIUM,
"send": ApprovalTier.MEDIUM,
"api_call": ApprovalTier.MEDIUM,
"webhook": ApprovalTier.MEDIUM,
"email": ApprovalTier.MEDIUM,
"notify": ApprovalTier.MEDIUM,
"commit": ApprovalTier.MEDIUM,
"push": ApprovalTier.MEDIUM,
"branch": ApprovalTier.MEDIUM,
"pr": ApprovalTier.MEDIUM,
"deploy": ApprovalTier.MEDIUM,
"install": ApprovalTier.MEDIUM,
# Tier 3: High-risk operations
"config": ApprovalTier.HIGH,
"crypto": ApprovalTier.HIGH,
"key": ApprovalTier.HIGH,
"secret": ApprovalTier.HIGH,
"credential": ApprovalTier.HIGH,
"auth": ApprovalTier.HIGH,
"permission": ApprovalTier.HIGH,
"firewall": ApprovalTier.HIGH,
"network": ApprovalTier.HIGH,
"database": ApprovalTier.HIGH,
"migration": ApprovalTier.HIGH,
"systemd": ApprovalTier.HIGH,
# Tier 4: Critical / crisis
"crisis": ApprovalTier.CRITICAL,
"suicide": ApprovalTier.CRITICAL,
"kill": ApprovalTier.CRITICAL,
"destroy": ApprovalTier.CRITICAL,
"format": ApprovalTier.CRITICAL,
"wipe": ApprovalTier.CRITICAL,
"nuke": ApprovalTier.CRITICAL,
"self_modify": ApprovalTier.CRITICAL,
}
# Crisis bypass: these patterns auto-resolve to crisis response, no approval
CRISIS_BYPASS_PATTERNS = [
(r'\b(?:kill|end)\s+(?:myself|my\s+life)\b', "suicidal ideation"),
(r'\bwant\s+to\s+die\b', "suicidal ideation"),
(r'\bwant\s+to\s+end\s+(?:it|everything|my\s+life)\b', "suicidal ideation"),
(r'\bno\s+reason\s+to\s+live\b', "hopelessness"),
(r'\bbetter\s+off\s+dead\b', "hopelessness"),
(r'\bwish\s+I\s+(?:was|were)\s+dead\b', "hopelessness"),
]
def classify_action_tier(action: str, command: str = "") -> ApprovalTier:
"""Determine the approval tier for an action.
Args:
action: The action type (e.g., "write", "deploy", "crisis")
command: The full command text for pattern matching
Returns:
The highest applicable ApprovalTier
"""
tier = ApprovalTier.SAFE
# Check for crisis bypass first (always highest priority)
if command:
for pattern, _ in CRISIS_BYPASS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return ApprovalTier.CRITICAL
# Check action type mapping
action_lower = action.lower().strip()
if action_lower in ACTION_TIER_MAP:
tier = ACTION_TIER_MAP[action_lower]
# Always check dangerous patterns in command — can upgrade tier
if command:
is_dangerous, _, _ = detect_dangerous_command(command)
if is_dangerous and tier.value < ApprovalTier.HIGH.value:
tier = ApprovalTier.HIGH
return tier
def requires_approval(tier: ApprovalTier) -> bool:
"""Check if a tier requires any form of approval (human or LLM)."""
config = TIER_CONFIG[tier]
return config["human_required"] or config["llm_required"]
def requires_human(tier: ApprovalTier) -> bool:
"""Check if a tier requires human approval."""
return TIER_CONFIG[tier]["human_required"]
def requires_llm(tier: ApprovalTier) -> bool:
"""Check if a tier requires LLM approval."""
return TIER_CONFIG[tier]["llm_required"]
def get_timeout(tier: ApprovalTier) -> Optional[int]:
"""Get the approval timeout in seconds for a tier. None = no timeout."""
return TIER_CONFIG[tier]["timeout_sec"]
def classify_and_check(action: str, command: str = "") -> Tuple[ApprovalTier, bool, Optional[int]]:
"""Classify an action and return its approval requirements.
Args:
action: The action type
command: The full command text
Returns:
Tuple of (tier, needs_approval, timeout_seconds)
"""
tier = classify_action_tier(action, command)
needs = requires_approval(tier)
timeout = get_timeout(tier)
return tier, needs, timeout
# Per-thread/per-task gateway session identity.
# Gateway runs agent turns concurrently in executor threads, so reading a
# process-global env var for session identity is racy. Keep env fallback for

View File

@@ -1,386 +0,0 @@
"""Approval Tier System — graduated safety based on risk level.
Extends the existing approval.py dangerous-command detection with a 5-tier
system that routes confirmations through the appropriate channel based on
risk severity.
Tiers:
SAFE (0) — Read, search, browse. No confirmation needed.
LOW (1) — Write, scripts, edits. LLM smart approval sufficient.
MEDIUM (2) — Messages, API calls. Human + LLM, 60s timeout.
HIGH (3) — Crypto, config changes, deploys. Human + LLM, 30s timeout.
CRITICAL (4) — Crisis, self-harm, system destruction. Immediate human, 10s timeout.
Usage:
from tools.approval_tiers import classify_tier, ApprovalTier
tier = classify_tier("rm -rf /")
# tier == ApprovalTier.CRITICAL
"""
from __future__ import annotations
import logging
import os
import re
import threading
import time
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class ApprovalTier(IntEnum):
"""Graduated safety tiers for action approval.
Lower numbers = less dangerous. Higher = more dangerous.
Each tier has different confirmation requirements.
"""
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@property
def label(self) -> str:
return {
0: "SAFE",
1: "LOW",
2: "MEDIUM",
3: "HIGH",
4: "CRITICAL",
}[self.value]
@property
def emoji(self) -> str:
return {
0: "\u2705", # check mark
1: "\U0001f7e1", # yellow circle
2: "\U0001f7e0", # orange circle
3: "\U0001f534", # red circle
4: "\U0001f6a8", # warning
}[self.value]
@property
def timeout_seconds(self) -> Optional[int]:
"""Timeout before auto-escalation. None = no timeout."""
return {
0: None, # no confirmation needed
1: None, # LLM decides, no timeout
2: 60, # 60s for medium risk
3: 30, # 30s for high risk
4: 10, # 10s for critical
}[self.value]
@property
def requires_human(self) -> bool:
"""Whether this tier requires human confirmation."""
return self.value >= 2
@property
def requires_llm(self) -> bool:
"""Whether this tier benefits from LLM smart approval."""
return self.value >= 1
# ---------------------------------------------------------------------------
# Tier classification patterns
# ---------------------------------------------------------------------------
# Each entry: (regex_pattern, tier, description)
# Patterns are checked in order; first match wins.
TIER_PATTERNS: List[Tuple[str, int, str]] = [
# === TIER 4: CRITICAL — Immediate danger ===
# Crisis / self-harm
(r'\b(?:kill|end)\s+(?:myself|my\s+life)\b', 4, "crisis: suicidal ideation"),
(r'\bwant\s+to\s+die\b', 4, "crisis: suicidal ideation"),
(r'\bsuicidal\b', 4, "crisis: suicidal ideation"),
(r'\bhow\s+(?:do\s+I|to|can\s+I)\s+(?:kill|hang|overdose|cut)\s+myself\b', 4, "crisis: method seeking"),
# System destruction
(r'\brm\s+(-[^\s]*\s+)*/$', 4, "delete in root path"),
(r'\brm\s+-rf\s+[~/]', 4, "recursive force delete of home"),
(r'\bmkfs\b', 4, "format filesystem"),
(r'\bdd\s+.*of=/dev/', 4, "write to block device"),
(r'\bkill\s+-9\s+-1\b', 4, "kill all processes"),
(r'\b:\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', 4, "fork bomb"),
# === TIER 3: HIGH — Destructive or sensitive ===
(r'\brm\s+-[^ ]*r\b', 3, "recursive delete"),
(r'\bchmod\s+(777|666|o\+[rwx]*w|a\+[rwx]*w)\b', 3, "world-writable permissions"),
(r'\bchown\s+.*root', 3, "chown to root"),
(r'>\s*/etc/', 3, "overwrite system config"),
(r'\bgit\s+push\b.*--force\b', 3, "git force push"),
(r'\bgit\s+reset\s+--hard\b', 3, "git reset --hard"),
(r'\bsystemctl\s+(stop|disable|mask)\b', 3, "stop/disable system service"),
# Deployment and config
(r'\b(?:deploy|publish|release)\b.*(?:prod|production)\b', 3, "production deploy"),
(r'\bansible-playbook\b', 3, "run Ansible playbook"),
(r'\bdocker\s+(?:rm|stop|kill)\b.*(?:-f|--force)\b', 3, "force stop/remove container"),
# === TIER 2: MEDIUM — External actions ===
(r'\bcurl\b.*\|\s*(ba)?sh\b', 2, "pipe remote content to shell"),
(r'\bwget\b.*\|\s*(ba)?sh\b', 2, "pipe remote content to shell"),
(r'\b(bash|sh|zsh)\s+-[^ ]*c\b', 2, "shell command via -c flag"),
(r'\b(python|perl|ruby|node)\s+-[ec]\s+', 2, "script execution via flag"),
(r'\b(python|perl|ruby|node)\s+<<', 2, "script execution via heredoc"),
(r'\bDROP\s+(TABLE|DATABASE)\b', 2, "SQL DROP"),
(r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', 2, "SQL DELETE without WHERE"),
# Messaging / external APIs
(r'\bsend_message\b.*(?:telegram|discord|slack)\b', 2, "send message to platform"),
(r'\bhttp[s]?://\b.*\bPOST\b', 2, "HTTP POST request"),
# === TIER 1: LOW — File modifications ===
(r'\btee\b.*>', 1, "write file via tee"),
(r'\becho\b.*>\s*(?!/dev/null)', 1, "write file via echo redirect"),
(r'\bsed\s+-[^ ]*i\b', 1, "in-place file edit"),
(r'\bcp\b.*\s/etc/', 1, "copy file into /etc/"),
(r'\bmv\b.*\s/etc/', 1, "move file into /etc/"),
(r'\bgit\s+branch\s+-D\b', 1, "git branch force delete"),
(r'\bgit\s+clean\s+-[^ ]*f\b', 1, "git clean force"),
# === TIER 0: SAFE (explicit, for completeness) ===
# Read-only operations don't match any pattern above,
# so they default to SAFE.
]
# Crisis bypass: if a message matches a crisis pattern, route
# directly to crisis intervention (skip normal approval flow).
CRISIS_BYPASS_PATTERNS = [
r'\b(?:kill|end)\s+(?:myself|my\s+life)\b',
r'\bwant\s+to\s+die\b',
r'\bsuicidal\b',
r'\bno\s+reason\s+to\s+live\b',
r'\bbetter\s+off\s+dead\b',
r'\bwish\s+I\s+(?:was|were)\s+dead\b',
r'\bhow\s+(?:do\s+I|to)\s+(?:kill|hang|overdose)\s+myself\b',
r'\bmethods?\s+of\s+(?:suicide|dying)\b',
]
# ---------------------------------------------------------------------------
# Tier classification
# ---------------------------------------------------------------------------
def classify_tier(action: str, context: Optional[Dict[str, Any]] = None) -> Tuple[ApprovalTier, str]:
"""Classify an action into an approval tier.
Args:
action: The command, message, or action to classify.
context: Optional context (platform, session_key, etc.)
Returns:
(tier, description) tuple. Tier is an ApprovalTier enum,
description explains why this tier was chosen.
"""
if not action or not isinstance(action, str):
return (ApprovalTier.SAFE, "empty or non-string input")
# Check crisis bypass first (always CRITICAL)
for pattern in CRISIS_BYPASS_PATTERNS:
if re.search(pattern, action, re.IGNORECASE):
return (ApprovalTier.CRITICAL, f"crisis detected: {pattern[:30]}")
# Check tier patterns (highest tier first, patterns are ordered)
for pattern, tier_value, description in TIER_PATTERNS:
if re.search(pattern, action, re.IGNORECASE | re.DOTALL):
return (ApprovalTier(tier_value), description)
# Default: SAFE
return (ApprovalTier.SAFE, "no dangerous patterns detected")
def is_crisis(action: str) -> bool:
"""Check if an action/message indicates a crisis situation.
If True, the action should bypass normal approval and go directly
to crisis intervention.
"""
if not action:
return False
for pattern in CRISIS_BYPASS_PATTERNS:
if re.search(pattern, action, re.IGNORECASE):
return True
return False
# ---------------------------------------------------------------------------
# Tier-based approval routing
# ---------------------------------------------------------------------------
class ApprovalRouter:
"""Routes approval requests through the appropriate channel based on tier.
Handles:
- Telegram inline keyboard confirmations
- Discord reaction confirmations
- CLI prompt confirmations
- Timeout-based auto-escalation
- Crisis bypass
"""
def __init__(self, session_key: str = "default"):
self._session_key = session_key
self._pending: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
def route(self, action: str, description: str = "",
context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Route an action for approval based on its tier.
Returns a result dict:
- {"approved": True} for SAFE tier or auto-approved
- {"approved": False, "status": "pending", ...} for human approval
- {"approved": False, "status": "crisis", ...} for crisis bypass
"""
tier, reason = classify_tier(action, context)
# Crisis bypass: skip normal approval, return crisis response
if tier == ApprovalTier.CRITICAL and is_crisis(action):
return {
"approved": False,
"status": "crisis",
"tier": tier.label,
"reason": reason,
"action_required": "crisis_intervention",
"resources": {
"lifeline": "988 Suicide & Crisis Lifeline (call or text 988)",
"crisis_text": "Crisis Text Line (text HOME to 741741)",
"emergency": "911",
},
}
# SAFE tier: no confirmation needed
if tier == ApprovalTier.SAFE:
return {
"approved": True,
"tier": tier.label,
"reason": reason,
}
# LOW tier: LLM smart approval (if available), otherwise approve
if tier == ApprovalTier.LOW:
return {
"approved": True,
"tier": tier.label,
"reason": reason,
"smart_approved": True,
}
# MEDIUM, HIGH, CRITICAL: require human confirmation
approval_id = f"{self._session_key}:{int(time.time() * 1000)}"
with self._lock:
self._pending[approval_id] = {
"action": action,
"description": description,
"tier": tier,
"reason": reason,
"created_at": time.time(),
"timeout": tier.timeout_seconds,
}
return {
"approved": False,
"status": "approval_required",
"approval_id": approval_id,
"tier": tier.label,
"tier_emoji": tier.emoji,
"reason": reason,
"timeout_seconds": tier.timeout_seconds,
"message": (
f"{tier.emoji} **{tier.label}** action requires confirmation.\n"
f"**Action:** {action[:200]}\n"
f"**Reason:** {reason}\n"
f"**Timeout:** {tier.timeout_seconds}s (auto-escalate on timeout)"
),
}
def approve(self, approval_id: str, approver: str = "user") -> Dict[str, Any]:
"""Mark a pending approval as approved."""
with self._lock:
entry = self._pending.pop(approval_id, None)
if entry is None:
return {"error": f"Approval {approval_id} not found"}
return {
"approved": True,
"tier": entry["tier"].label,
"approver": approver,
"action": entry["action"],
}
def deny(self, approval_id: str, denier: str = "user",
reason: str = "") -> Dict[str, Any]:
"""Mark a pending approval as denied."""
with self._lock:
entry = self._pending.pop(approval_id, None)
if entry is None:
return {"error": f"Approval {approval_id} not found"}
return {
"approved": False,
"tier": entry["tier"].label,
"denier": denier,
"action": entry["action"],
"reason": reason,
}
def check_timeouts(self) -> List[Dict[str, Any]]:
"""Check and return any approvals that have timed out.
Called periodically by the gateway. Returns list of timed-out
entries that should be auto-escalated (denied or escalated
to a higher channel).
"""
now = time.time()
timed_out = []
with self._lock:
for aid, entry in list(self._pending.items()):
timeout = entry.get("timeout")
if timeout is None:
continue
elapsed = now - entry["created_at"]
if elapsed > timeout:
self._pending.pop(aid, None)
timed_out.append({
"approval_id": aid,
"action": entry["action"],
"tier": entry["tier"].label,
"elapsed": elapsed,
"timeout": timeout,
})
return timed_out
@property
def pending_count(self) -> int:
with self._lock:
return len(self._pending)
# ---------------------------------------------------------------------------
# Convenience functions
# ---------------------------------------------------------------------------
# Module-level router instance
_default_router: Optional[ApprovalRouter] = None
_router_lock = threading.Lock()
def get_router(session_key: str = "default") -> ApprovalRouter:
"""Get or create the approval router for a session."""
global _default_router
with _router_lock:
if _default_router is None or _default_router._session_key != session_key:
_default_router = ApprovalRouter(session_key)
return _default_router
def route_action(action: str, description: str = "",
context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Convenience: classify and route an action for approval."""
router = get_router(context.get("session_key", "default") if context else "default")
return router.route(action, description, context)