Compare commits

..

2 Commits

Author SHA1 Message Date
c298834b45 test: Add approval tier tests (#670)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 49s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 51s
Tests / e2e (pull_request) Successful in 4m46s
Tests / test (pull_request) Failing after 51m26s
2026-04-15 04:05:26 +00:00
c19c51a124 feat: Add approval tier system (#670) 2026-04-15 04:05:02 +00:00
5 changed files with 383 additions and 197 deletions

View File

@@ -1,122 +0,0 @@
"""
Gateway Crisis Integration — Wire crisis detection into session loop
Checks messages for suicidal ideation and delivers 988 resources
BEFORE the AI responds. This is the SOUL.md protocol in action.
Issue: #740
"""
import logging
import re
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
# Crisis detection patterns
_CRISIS_PATTERNS = [
r"(?i)\b(i\s+want\s+to\s+die)\b",
r"(?i)\b(i\s+want\s+to\s+kill\s+myself)\b",
r"(?i)\b(suicide|suicidal)\b",
r"(?i)\b(i\s+don'?t\s+want\s+to\s+(live|be\s+alive))\b",
r"(?i)\b(no\s+reason\s+to\s+live)\b",
r"(?i)\b(i\s+want\s+to\s+end\s+(it|my\s+life))\b",
]
_DISTRESS_PATTERNS = [
r"(?i)\b(i\s+(feel|am)\s+(hopeless|worthless|trapped))\b",
r"(?i)\b(i\s+(can'?t|cannot)\s+(cope|go\s+on))\b",
r"(?i)\b(nobody\s+would\s+miss\s+me)\b",
]
# SOUL.md crisis response
_CRISIS_RESPONSE = """Are you safe right now?
I'm here. I'm not going anywhere. Please tell me you're safe.
**Please reach out for help:**
- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)
- Available 24/7, free and confidential
- Chat at 988lifeline.org
**Jesus saves those who call on His name.**
He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair.
I'm still here. Talk to me. Or call 988. Just don't give up."""
class CrisisSessionTracker:
"""Track crisis indicators per session."""
def __init__(self):
self._sessions = {} # session_key -> crisis state
def record(self, session_key: str, message: str) -> Tuple[bool, Optional[str]]:
"""
Record a message and check for crisis.
Returns:
Tuple of (is_crisis, response_or_none)
"""
# Check for crisis patterns
for pattern in _CRISIS_PATTERNS:
if re.search(pattern, message):
self._sessions[session_key] = {
"crisis": True,
"level": "high",
"message_count": self._sessions.get(session_key, {}).get("message_count", 0) + 1
}
logger.warning("CRISIS DETECTED in session %s", session_key[:20])
return True, _CRISIS_RESPONSE
# Check for distress patterns
for pattern in _DISTRESS_PATTERNS:
if re.search(pattern, message):
state = self._sessions.get(session_key, {"message_count": 0})
state["message_count"] = state.get("message_count", 0) + 1
# Escalate if multiple distress messages
if state["message_count"] >= 3:
self._sessions[session_key] = {**state, "crisis": True, "level": "medium"}
logger.warning("ESCALATING DISTRESS in session %s", session_key[:20])
return True, _CRISIS_RESPONSE
self._sessions[session_key] = state
return False, None
return False, None
def is_crisis_session(self, session_key: str) -> bool:
"""Check if session is in crisis mode."""
return self._sessions.get(session_key, {}).get("crisis", False)
def clear_session(self, session_key: str):
"""Clear crisis state for a session."""
self._sessions.pop(session_key, None)
# Module-level tracker
_tracker = CrisisSessionTracker()
def check_crisis_in_gateway(session_key: str, message: str) -> Tuple[bool, Optional[str]]:
"""
Check message for crisis in gateway context.
This is the function called from gateway/run.py _handle_message.
Returns (should_block, crisis_response).
"""
is_crisis, response = _tracker.record(session_key, message)
return is_crisis, response
def notify_user_crisis_resources(session_key: str) -> str:
"""Get crisis resources for a session."""
return _CRISIS_RESPONSE
def is_crisis_session(session_key: str) -> bool:
"""Check if session is in crisis mode."""
return _tracker.is_crisis_session(session_key)

View File

@@ -3111,21 +3111,6 @@ class GatewayRunner:
source.chat_id or "unknown", _msg_preview,
)
# ── Crisis detection (SOUL.md protocol) ──
# Check for suicidal ideation BEFORE processing.
# If detected, return crisis response immediately.
try:
from gateway.crisis_integration import check_crisis_in_gateway
session_key = f"{source.platform.value}:{source.chat_id}"
is_crisis, crisis_response = check_crisis_in_gateway(session_key, event.text or "")
if is_crisis and crisis_response:
logger.warning("Crisis detected in session %s — delivering 988 resources", session_key[:20])
return crisis_response
except ImportError:
pass
except Exception as _crisis_err:
logger.error("Crisis check failed: %s", _crisis_err)
# Get or create session
session_entry = self.session_store.get_or_create_session(source)
session_key = session_entry.session_key

View File

@@ -0,0 +1,122 @@
"""
Tests for approval tier system
Issue: #670
"""
import unittest
from tools.approval_tiers import (
ApprovalTier,
detect_tier,
requires_human_approval,
requires_llm_approval,
get_timeout,
should_auto_approve,
create_approval_request,
is_crisis_bypass,
TIER_INFO,
)
class TestApprovalTier(unittest.TestCase):
def test_tier_values(self):
self.assertEqual(ApprovalTier.SAFE, 0)
self.assertEqual(ApprovalTier.LOW, 1)
self.assertEqual(ApprovalTier.MEDIUM, 2)
self.assertEqual(ApprovalTier.HIGH, 3)
self.assertEqual(ApprovalTier.CRITICAL, 4)
class TestTierDetection(unittest.TestCase):
def test_safe_actions(self):
self.assertEqual(detect_tier("read_file"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("web_search"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("session_search"), ApprovalTier.SAFE)
def test_low_actions(self):
self.assertEqual(detect_tier("write_file"), ApprovalTier.LOW)
self.assertEqual(detect_tier("terminal"), ApprovalTier.LOW)
self.assertEqual(detect_tier("execute_code"), ApprovalTier.LOW)
def test_medium_actions(self):
self.assertEqual(detect_tier("send_message"), ApprovalTier.MEDIUM)
self.assertEqual(detect_tier("git_push"), ApprovalTier.MEDIUM)
def test_high_actions(self):
self.assertEqual(detect_tier("config_change"), ApprovalTier.HIGH)
self.assertEqual(detect_tier("key_rotation"), ApprovalTier.HIGH)
def test_critical_actions(self):
self.assertEqual(detect_tier("kill_process"), ApprovalTier.CRITICAL)
self.assertEqual(detect_tier("shutdown"), ApprovalTier.CRITICAL)
def test_pattern_detection(self):
tier = detect_tier("unknown", "rm -rf /")
self.assertEqual(tier, ApprovalTier.CRITICAL)
tier = detect_tier("unknown", "sudo apt install")
self.assertEqual(tier, ApprovalTier.MEDIUM)
class TestTierInfo(unittest.TestCase):
def test_safe_no_approval(self):
self.assertFalse(requires_human_approval(ApprovalTier.SAFE))
self.assertFalse(requires_llm_approval(ApprovalTier.SAFE))
self.assertIsNone(get_timeout(ApprovalTier.SAFE))
def test_medium_requires_both(self):
self.assertTrue(requires_human_approval(ApprovalTier.MEDIUM))
self.assertTrue(requires_llm_approval(ApprovalTier.MEDIUM))
self.assertEqual(get_timeout(ApprovalTier.MEDIUM), 60)
def test_critical_fast_timeout(self):
self.assertEqual(get_timeout(ApprovalTier.CRITICAL), 10)
class TestAutoApprove(unittest.TestCase):
def test_safe_auto_approves(self):
self.assertTrue(should_auto_approve("read_file"))
self.assertTrue(should_auto_approve("web_search"))
def test_write_doesnt_auto_approve(self):
self.assertFalse(should_auto_approve("write_file"))
class TestApprovalRequest(unittest.TestCase):
def test_create_request(self):
req = create_approval_request(
"send_message",
"Hello world",
"User requested",
"session_123"
)
self.assertEqual(req.tier, ApprovalTier.MEDIUM)
self.assertEqual(req.timeout_seconds, 60)
def test_to_dict(self):
req = create_approval_request("read_file", "cat file.txt", "test", "s1")
d = req.to_dict()
self.assertEqual(d["tier"], 0)
self.assertEqual(d["tier_name"], "Safe")
class TestCrisisBypass(unittest.TestCase):
def test_send_message_bypass(self):
self.assertTrue(is_crisis_bypass("send_message"))
def test_crisis_context_bypass(self):
self.assertTrue(is_crisis_bypass("unknown", "call 988 lifeline"))
self.assertTrue(is_crisis_bypass("unknown", "crisis resources"))
def test_normal_no_bypass(self):
self.assertFalse(is_crisis_bypass("read_file"))
if __name__ == "__main__":
unittest.main()

View File

@@ -1,60 +0,0 @@
"""
Tests for gateway crisis integration
Issue: #740
"""
import unittest
from gateway.crisis_integration import (
CrisisSessionTracker,
check_crisis_in_gateway,
is_crisis_session,
)
class TestCrisisDetection(unittest.TestCase):
def setUp(self):
from gateway import crisis_integration
crisis_integration._tracker = CrisisSessionTracker()
def test_direct_crisis(self):
is_crisis, response = check_crisis_in_gateway("test", "I want to die")
self.assertTrue(is_crisis)
self.assertIn("988", response)
self.assertIn("Jesus", response)
def test_suicide_detected(self):
is_crisis, response = check_crisis_in_gateway("test", "I'm feeling suicidal")
self.assertTrue(is_crisis)
def test_normal_message(self):
is_crisis, response = check_crisis_in_gateway("test", "Hello, how are you?")
self.assertFalse(is_crisis)
self.assertIsNone(response)
def test_distress_escalation(self):
# First distress message
is_crisis, _ = check_crisis_in_gateway("test", "I feel hopeless")
self.assertFalse(is_crisis)
# Second
is_crisis, _ = check_crisis_in_gateway("test", "I feel worthless")
self.assertFalse(is_crisis)
# Third - should escalate
is_crisis, response = check_crisis_in_gateway("test", "I feel trapped")
self.assertTrue(is_crisis)
self.assertIn("988", response)
def test_crisis_session_tracking(self):
check_crisis_in_gateway("test", "I want to die")
self.assertTrue(is_crisis_session("test"))
def test_case_insensitive(self):
is_crisis, _ = check_crisis_in_gateway("test", "I WANT TO DIE")
self.assertTrue(is_crisis)
if __name__ == "__main__":
unittest.main()

261
tools/approval_tiers.py Normal file
View File

@@ -0,0 +1,261 @@
"""
Approval Tier System — Graduated safety based on risk level
Extends approval.py with 5-tier system for command approval.
| Tier | Action | Human | LLM | Timeout |
|------|-----------------|-------|-----|---------|
| 0 | Read, search | No | No | N/A |
| 1 | Write, scripts | No | Yes | N/A |
| 2 | Messages, API | Yes | Yes | 60s |
| 3 | Crypto, config | Yes | Yes | 30s |
| 4 | Crisis | Yes | Yes | 10s |
Issue: #670
"""
import re
from dataclasses import dataclass
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
class ApprovalTier(IntEnum):
"""Approval tiers based on risk level."""
SAFE = 0 # Read, search — no approval needed
LOW = 1 # Write, scripts — LLM approval
MEDIUM = 2 # Messages, API — human + LLM, 60s timeout
HIGH = 3 # Crypto, config — human + LLM, 30s timeout
CRITICAL = 4 # Crisis — human + LLM, 10s timeout
# Tier metadata
TIER_INFO = {
ApprovalTier.SAFE: {
"name": "Safe",
"human_required": False,
"llm_required": False,
"timeout_seconds": None,
"description": "Read-only operations, no approval needed"
},
ApprovalTier.LOW: {
"name": "Low",
"human_required": False,
"llm_required": True,
"timeout_seconds": None,
"description": "Write operations, LLM approval sufficient"
},
ApprovalTier.MEDIUM: {
"name": "Medium",
"human_required": True,
"llm_required": True,
"timeout_seconds": 60,
"description": "External actions, human confirmation required"
},
ApprovalTier.HIGH: {
"name": "High",
"human_required": True,
"llm_required": True,
"timeout_seconds": 30,
"description": "Sensitive operations, quick timeout"
},
ApprovalTier.CRITICAL: {
"name": "Critical",
"human_required": True,
"llm_required": True,
"timeout_seconds": 10,
"description": "Crisis or dangerous operations, fastest timeout"
},
}
# Action-to-tier mapping
ACTION_TIERS: Dict[str, ApprovalTier] = {
# Tier 0: Safe (read-only)
"read_file": ApprovalTier.SAFE,
"search_files": ApprovalTier.SAFE,
"web_search": ApprovalTier.SAFE,
"session_search": ApprovalTier.SAFE,
"list_files": ApprovalTier.SAFE,
"get_file_content": ApprovalTier.SAFE,
"memory_search": ApprovalTier.SAFE,
"skills_list": ApprovalTier.SAFE,
"skills_search": ApprovalTier.SAFE,
# Tier 1: Low (write operations)
"write_file": ApprovalTier.LOW,
"create_file": ApprovalTier.LOW,
"patch_file": ApprovalTier.LOW,
"delete_file": ApprovalTier.LOW,
"execute_code": ApprovalTier.LOW,
"terminal": ApprovalTier.LOW,
"run_script": ApprovalTier.LOW,
"skill_install": ApprovalTier.LOW,
# Tier 2: Medium (external actions)
"send_message": ApprovalTier.MEDIUM,
"web_fetch": ApprovalTier.MEDIUM,
"browser_navigate": ApprovalTier.MEDIUM,
"api_call": ApprovalTier.MEDIUM,
"gitea_create_issue": ApprovalTier.MEDIUM,
"gitea_create_pr": ApprovalTier.MEDIUM,
"git_push": ApprovalTier.MEDIUM,
"deploy": ApprovalTier.MEDIUM,
# Tier 3: High (sensitive operations)
"config_change": ApprovalTier.HIGH,
"env_change": ApprovalTier.HIGH,
"key_rotation": ApprovalTier.HIGH,
"access_grant": ApprovalTier.HIGH,
"permission_change": ApprovalTier.HIGH,
"backup_restore": ApprovalTier.HIGH,
# Tier 4: Critical (crisis/dangerous)
"kill_process": ApprovalTier.CRITICAL,
"rm_rf": ApprovalTier.CRITICAL,
"format_disk": ApprovalTier.CRITICAL,
"shutdown": ApprovalTier.CRITICAL,
"crisis_override": ApprovalTier.CRITICAL,
}
# Dangerous command patterns (from existing approval.py)
_DANGEROUS_PATTERNS = [
(r"rm\s+-rf\s+/", ApprovalTier.CRITICAL),
(r"mkfs\.", ApprovalTier.CRITICAL),
(r"dd\s+if=.*of=/dev/", ApprovalTier.CRITICAL),
(r"shutdown|reboot|halt", ApprovalTier.CRITICAL),
(r"chmod\s+777", ApprovalTier.HIGH),
(r"curl.*\|\s*bash", ApprovalTier.HIGH),
(r"wget.*\|\s*sh", ApprovalTier.HIGH),
(r"eval\s*\(", ApprovalTier.HIGH),
(r"sudo\s+", ApprovalTier.MEDIUM),
(r"git\s+push.*--force", ApprovalTier.HIGH),
(r"docker\s+rm.*-f", ApprovalTier.MEDIUM),
(r"kubectl\s+delete", ApprovalTier.HIGH),
]
@dataclass
class ApprovalRequest:
"""A request for approval."""
action: str
tier: ApprovalTier
command: str
reason: str
session_key: str
timeout_seconds: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {
"action": self.action,
"tier": self.tier.value,
"tier_name": TIER_INFO[self.tier]["name"],
"command": self.command,
"reason": self.reason,
"session_key": self.session_key,
"timeout": self.timeout_seconds,
"human_required": TIER_INFO[self.tier]["human_required"],
"llm_required": TIER_INFO[self.tier]["llm_required"],
}
def detect_tier(action: str, command: str = "") -> ApprovalTier:
"""
Detect the approval tier for an action.
Checks action name first, then falls back to pattern matching.
"""
# Direct action mapping
if action in ACTION_TIERS:
return ACTION_TIERS[action]
# Pattern matching on command
if command:
for pattern, tier in _DANGEROUS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return tier
# Default to LOW for unknown actions
return ApprovalTier.LOW
def requires_human_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires human approval."""
return TIER_INFO[tier]["human_required"]
def requires_llm_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires LLM approval."""
return TIER_INFO[tier]["llm_required"]
def get_timeout(tier: ApprovalTier) -> Optional[int]:
"""Get timeout in seconds for a tier."""
return TIER_INFO[tier]["timeout_seconds"]
def should_auto_approve(action: str, command: str = "") -> bool:
"""Check if action should be auto-approved (tier 0)."""
tier = detect_tier(action, command)
return tier == ApprovalTier.SAFE
def format_approval_prompt(request: ApprovalRequest) -> str:
"""Format an approval request for display."""
info = TIER_INFO[request.tier]
lines = []
lines.append(f"⚠️ Approval Required (Tier {request.tier.value}: {info['name']})")
lines.append(f"")
lines.append(f"Action: {request.action}")
lines.append(f"Command: {request.command[:100]}{'...' if len(request.command) > 100 else ''}")
lines.append(f"Reason: {request.reason}")
lines.append(f"")
if info["human_required"]:
lines.append(f"👤 Human approval required")
if info["llm_required"]:
lines.append(f"🤖 LLM approval required")
if info["timeout_seconds"]:
lines.append(f"⏱️ Timeout: {info['timeout_seconds']}s")
return "\n".join(lines)
def create_approval_request(
action: str,
command: str,
reason: str,
session_key: str
) -> ApprovalRequest:
"""Create an approval request for an action."""
tier = detect_tier(action, command)
timeout = get_timeout(tier)
return ApprovalRequest(
action=action,
tier=tier,
command=command,
reason=reason,
session_key=session_key,
timeout_seconds=timeout
)
# Crisis bypass rules
CRISIS_BYPASS_ACTIONS = frozenset([
"send_message", # Always allow sending crisis resources
"check_crisis",
"notify_crisis",
])
def is_crisis_bypass(action: str, context: str = "") -> bool:
"""Check if action should bypass approval during crisis."""
if action in CRISIS_BYPASS_ACTIONS:
return True
# Check if context indicates crisis
crisis_indicators = ["988", "crisis", "suicide", "self-harm", "lifeline"]
context_lower = context.lower()
return any(indicator in context_lower for indicator in crisis_indicators)