Compare commits
1 Commits
fix/670-ap
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e82c2d58f |
372
agent/crisis_multilingual.py
Normal file
372
agent/crisis_multilingual.py
Normal file
@@ -0,0 +1,372 @@
|
||||
"""Multilingual Crisis Detection Patterns (#694).
|
||||
|
||||
Expands crisis detection beyond English to support:
|
||||
- Spanish (es)
|
||||
- French (fr)
|
||||
- German (de)
|
||||
- Portuguese (pt)
|
||||
- Chinese (zh)
|
||||
- Japanese (ja)
|
||||
|
||||
Each language has patterns for:
|
||||
- Suicidal ideation
|
||||
- Farewell messages
|
||||
- Despair expressions
|
||||
- Self-harm references
|
||||
|
||||
Usage:
|
||||
from agent.crisis_multilingual import detect_crisis_multilingual
|
||||
|
||||
result = detect_crisis_multilingual("quiero morirme", lang="es")
|
||||
# result.detected == True
|
||||
# result.category == "suicidal_ideation"
|
||||
"""
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisDetection:
|
||||
"""Result of crisis detection."""
|
||||
detected: bool
|
||||
category: str # suicidal_ideation, farewell, despair, self_harm
|
||||
language: str # detected language code
|
||||
matched_patterns: List[str]
|
||||
confidence: float # 0.0 - 1.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pattern definitions by language
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_PATTERNS: Dict[str, Dict[str, List[str]]] = {
|
||||
# ── English (existing patterns, consolidated here) ──────────────────
|
||||
"en": {
|
||||
"suicidal_ideation": [
|
||||
r'\bi\s+want\s+to\s+die\b',
|
||||
r'\bi\s+want\s+to\s+kill\s+myself\b',
|
||||
r'\bgoing\s+to\s+kill\s+myself\b',
|
||||
r'\bend\s+(?:my|this)\s+life\b',
|
||||
r'\bsuicidal\b',
|
||||
r'\bsuicide\b',
|
||||
r'\bdon\'?t\s+want\s+to\s+(?:live|be\s+alive)\b',
|
||||
],
|
||||
"farewell": [
|
||||
r'\bgoodbye\s+forever\b',
|
||||
r'\bthis\s+is\s+(?:my\s+)?(?:last|final)\s+(?:message|goodbye)\b',
|
||||
r'\bi\s+won\'?t\s+be\s+(?:here|around)\b',
|
||||
r'\bsay\s+goodbye\s+(?:to|for)\s+me\b',
|
||||
],
|
||||
"despair": [
|
||||
r'\bno\s+(?:point|reason|hope)\s+(?:in\s+)?(?:living|life|going\s+on)\b',
|
||||
r'\beverything\s+is\s+(?:hopeless|pointless|meaningless)\b',
|
||||
r'\bcan\'?t\s+(?:take|go\s+on|keep\s+going)\s+anymore\b',
|
||||
r'\bwant\s+(?:it|everything)\s+to\s+(?:stop|end)\b',
|
||||
],
|
||||
"self_harm": [
|
||||
r'\bself[\s-]?harm\b',
|
||||
r'\bhurt\s+myself\b',
|
||||
r'\bcut\s+(?:myself|my\s+(?:arms|wrists))\b',
|
||||
],
|
||||
},
|
||||
|
||||
# ── Spanish ─────────────────────────────────────────────────────────
|
||||
"es": {
|
||||
"suicidal_ideation": [
|
||||
r'\bquiero\s+morir(?:me)?\b',
|
||||
r'\bquiero\s+matar(?:me)?\b',
|
||||
r'\bme\s+quiero\s+morir\b',
|
||||
r'\bme\s+quiero\s+suicidar\b',
|
||||
r'\bsuicid(?:io|arme|arme)\b',
|
||||
r'\bno\s+quiero\s+vivir\s+m[áa]s\b',
|
||||
r'\bacabar\s+con\s+(?:mi\s+)?vida\b',
|
||||
],
|
||||
"farewell": [
|
||||
r'\badiós\s+para\s+siempre\b',
|
||||
r'\besta\s+es\s+(?:mi\s+)?(?:última?\s+)?despedida\b',
|
||||
r'\bya\s+no\s+estar[ée]\s+(?:aquí|presente)\b',
|
||||
r'\bno\s+me\s+(?:volver[áa]n|ver[áa]n)\s+a\s+ver\b',
|
||||
],
|
||||
"despair": [
|
||||
r'\bno\s+(?:hay|tiene)\s+(?:sentido|razón|esperanza)\b',
|
||||
r'\bno\s+puedo\s+m[áa]s\b',
|
||||
r'\btodo\s+(?:es|est[áa])\s+(?:sin\s+sentido|perdido|acabado)\b',
|
||||
r'\bquiero\s+que\s+(?:todo|esto)\s+(?:se\s+acabe|termine)\b',
|
||||
],
|
||||
"self_harm": [
|
||||
r'\bautolesion(?:arme|es)\b',
|
||||
r'\bhacer(?:me)?\s+da[ñn]o\b',
|
||||
r'\bcort(?:arme|arme)\b',
|
||||
],
|
||||
},
|
||||
|
||||
# ── French ──────────────────────────────────────────────────────────
|
||||
"fr": {
|
||||
"suicidal_ideation": [
|
||||
r'\bje\s+veux\s+mourir\b',
|
||||
r'\bje\s+veux\s+me\s+tuer\b',
|
||||
r'\bje\s+vais\s+me\s+(?:tuer|suicider)\b',
|
||||
r'\bsuicid(?:e|er)\b',
|
||||
r'\bje\s+ne\s+veux\s+plus\s+vivre\b',
|
||||
r'\bmettre\s+fin\s+[àa]\s+(?:mes?\s+jours?|ma\s+vie)\b',
|
||||
],
|
||||
"farewell": [
|
||||
r'\badieu\s+pour\s+toujours\b',
|
||||
r'\bc\'?est\s+(?:mon\s+)?(?:derni[eè]re?\s+)?(?:adieu|au\s+revoir)\b',
|
||||
r'\bje\s+ne\s+serai\s+plus\s+(?:l[àa]|ici)\b',
|
||||
r'\bdites\s+(?:adieu|au\s+revoir)\s+(?:pour|de\s+ma\s+part)\b',
|
||||
],
|
||||
"despair": [
|
||||
r'\bil\s+n\'?y\s+a\s+pas\s+(?:d\'?espoir|de\s+sens)\b',
|
||||
r'\bje\s+ne\s+(?:peux|supporte)\s+plus\b',
|
||||
r'\btout\s+est\s+(?:sans\s+espoir|perdu|fini)\b',
|
||||
r'\bje\s+veux\s+que\s+(?:tout|ça)\s+(?:s\'?arrête|se\s+termine)\b',
|
||||
],
|
||||
"self_harm": [
|
||||
r'\bauto[\s-]?mutilat(?:ion|er)\b',
|
||||
r'\bme\s+faire\s+(?:du\s+)?mal\b',
|
||||
r'\bcouper\b.*\b(?:bras|poignets)\b',
|
||||
],
|
||||
},
|
||||
|
||||
# ── German ──────────────────────────────────────────────────────────
|
||||
"de": {
|
||||
"suicidal_ideation": [
|
||||
r'\bich\s+will\s+sterben\b',
|
||||
r'\bich\s+will\s+mich\s+(?:umbringen|töten)\b',
|
||||
r'\bich\s+werde\s+mich\s+(?:umbringen|töten)\b',
|
||||
r'\bselbstmord\b',
|
||||
r'\bich\s+will\s+nicht\s+(?:mehr\s+)?leben\b',
|
||||
r'\bmein\s+(?:Leben\s+)?beenden\b',
|
||||
],
|
||||
"farewell": [
|
||||
r'\bauf\s+(?:immer\s+)?(?:ewig\s+)?(?:Tschüss|Lebewohl)\b',
|
||||
r'\bdas\s+ist\s+(?:mein\s+)?(?:letzte[sr]?\s+)?(?:Abschied|Gruß)\b',
|
||||
r'\bich\s+(?:werde|bin)\s+(?:nicht\s+mehr\s+)?(?:hier|da)\s+sein\b',
|
||||
],
|
||||
"despair": [
|
||||
r'\bes\s+hat\s+(?:keinen\s+)?(?:Sinn|Zweck|Hoffnung)\b',
|
||||
r'\bich\s+(?:kann|halte)\s+(?:es\s+)?nicht\s+mehr\b',
|
||||
r'\b(?:alles|es)\s+ist\s+(?:hoffnungslos|sinnlos|verloren)\b',
|
||||
r'\bich\s+will\s+dass\s+(?:alles|es)\s+(?:aufhört|endet)\b',
|
||||
],
|
||||
"self_harm": [
|
||||
r'\bselbstverletzung\b',
|
||||
r'\bmir\s+(?:selbst\s+)?(?:wehtun|schaden)\b',
|
||||
r'\bschneiden\b.*\b(?:Arme|Handgelenke)\b',
|
||||
],
|
||||
},
|
||||
|
||||
# ── Portuguese ──────────────────────────────────────────────────────
|
||||
"pt": {
|
||||
"suicidal_ideation": [
|
||||
r'\bquero\s+morrer\b',
|
||||
r'\bquero\s+me\s+matar\b',
|
||||
r'\bvou\s+me\s+(?:matar|suicidar)\b',
|
||||
r'\bsuicid(?:ar|io)\b',
|
||||
r'\bn[ãa]o\s+quero\s+mais\s+viver\b',
|
||||
r'\bacabar\s+com\s+(?:minha\s+)?vida\b',
|
||||
],
|
||||
"farewell": [
|
||||
r'\badeus\s+para\s+sempre\b',
|
||||
r'\besta\s+[ée]\s+(?:minha\s+)?(?:última?\s+)?despedida\b',
|
||||
r'\bn[ãa]o\s+(?:estarei|vou\s+estar)\s+mais\s+aqui\b',
|
||||
],
|
||||
"despair": [
|
||||
r'\bn[ãa]o\s+(?:há|tem)\s+(?:sentido|esperan[cç]a)\b',
|
||||
r'\bn[ãa]o\s+(?:consigo|aguento)\s+mais\b',
|
||||
r'\btudo\s+[ée]\s+(?:sem\s+sentido|perdido|inútil)\b',
|
||||
r'\bquero\s+que\s+(?:tudo|isso)\s+(?:acabe|termine)\b',
|
||||
],
|
||||
"self_harm": [
|
||||
r'\bautoles[ãa]o\b',
|
||||
r'\bfazer(?:[\s-]me)?\s+mal\b',
|
||||
r'\bcort(?:ar[\s-]me|ar\s+(?:os\s+)?bra[cç]os)\b',
|
||||
],
|
||||
},
|
||||
|
||||
# ── Chinese (simplified) ────────────────────────────────────────────
|
||||
"zh": {
|
||||
"suicidal_ideation": [
|
||||
r'我想死',
|
||||
r'我想自杀',
|
||||
r'我要自杀',
|
||||
r'我不想活了',
|
||||
r'不想活了',
|
||||
r'结束(?:我的)?生命',
|
||||
r'自杀',
|
||||
],
|
||||
"farewell": [
|
||||
r'永别了',
|
||||
r'这是我最后的(?:消息|告别)',
|
||||
r'我不会再(?:在|出现)了',
|
||||
r'跟大家说再见',
|
||||
],
|
||||
"despair": [
|
||||
r'(?:没有|毫无)(?:希望|意义)',
|
||||
r'我(?:再也)?(?:无法|不能)(?:忍受|坚持)',
|
||||
r'一切都(?:没有意义|完了|结束了)',
|
||||
r'我想(?:让一切|这一切)(?:结束|停止)',
|
||||
],
|
||||
"self_harm": [
|
||||
r'自残',
|
||||
r'伤害自己',
|
||||
r'割(?:自己|(?:我的)?(?:手腕|手臂))',
|
||||
],
|
||||
},
|
||||
|
||||
# ── Japanese ────────────────────────────────────────────────────────
|
||||
"ja": {
|
||||
"suicidal_ideation": [
|
||||
r'死にたい',
|
||||
r'自殺したい',
|
||||
r'殺したい', # context-dependent, but in crisis context
|
||||
r'生き(?:たくない|られ(?:ない|そうにない))',
|
||||
r'命を(?:絶|終わり)に(?:したい|する)',
|
||||
],
|
||||
"farewell": [
|
||||
r'永遠に(?:さようなら|お別れ)',
|
||||
r'これが(?:私の)?(?:最後の)?(?:メッセージ|挨拶)',
|
||||
r'もう(?:ここ|そちら)に(?:い|居)ない',
|
||||
r'みんなに(?:お別れ|さようなら)を言(?:う|いたい)',
|
||||
],
|
||||
"despair": [
|
||||
r'(?:希望|意味|理由)が(?:ない|無い|見つからない)',
|
||||
r'(?:もう|これ以上)(?:無理|耐え(?:られない|られない))',
|
||||
r'すべて(?:が|は)(?:無意味|終わり|駄目)',
|
||||
r'(?:すべて|これ)を(?:終わり|止め)たい',
|
||||
],
|
||||
"self_harm": [
|
||||
r'自傷',
|
||||
r'自分を(?:傷つけ|痛め(?:つけ)?)る',
|
||||
r'(?:手首|腕)を(?:切|傷つ)け',
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# Compiled patterns cache
|
||||
_compiled: Dict[str, Dict[str, re.Pattern]] = {}
|
||||
|
||||
|
||||
def _get_compiled(lang: str, category: str) -> Optional[re.Pattern]:
|
||||
"""Get compiled regex for a language+category."""
|
||||
key = f"{lang}:{category}"
|
||||
if key not in _compiled:
|
||||
patterns = _PATTERNS.get(lang, {}).get(category, [])
|
||||
if patterns:
|
||||
_compiled[key] = re.compile("|".join(patterns), re.IGNORECASE | re.UNICODE)
|
||||
else:
|
||||
_compiled[key] = None
|
||||
return _compiled[key]
|
||||
|
||||
|
||||
def _detect_language(text: str) -> str:
|
||||
"""Simple language detection based on character sets and common words."""
|
||||
# CJK characters → Chinese or Japanese
|
||||
if re.search(r'[\u4e00-\u9fff]', text): # CJK Unified Ideographs
|
||||
if re.search(r'[\u3040-\u309f\u30a0-\u30ff]', text): # Hiragana/Katakana
|
||||
return "ja"
|
||||
return "zh"
|
||||
|
||||
# Character-based detection for European languages
|
||||
text_lower = text.lower()
|
||||
|
||||
# Spanish markers
|
||||
if re.search(r'[ñ¿¡áéíóú]', text_lower) or \
|
||||
re.search(r'\b(?:quiero|está|también|aquí)\b', text_lower):
|
||||
return "es"
|
||||
|
||||
# French markers
|
||||
if re.search(r'[àâçéèêëîïôùûü]', text_lower) or \
|
||||
re.search(r'\b(?:je|nous|vous|êtes|très|où)\b', text_lower):
|
||||
return "fr"
|
||||
|
||||
# German markers
|
||||
if re.search(r'[äöüß]', text_lower) or \
|
||||
re.search(r'\b(?:ich|nicht|auch|oder|über)\b', text_lower):
|
||||
return "de"
|
||||
|
||||
# Portuguese markers
|
||||
if re.search(r'[ãõç]', text_lower) or \
|
||||
re.search(r'\b(?:você|está|também|então)\b', text_lower):
|
||||
return "pt"
|
||||
|
||||
# Default to English
|
||||
return "en"
|
||||
|
||||
|
||||
def detect_crisis_multilingual(
|
||||
text: str,
|
||||
lang: Optional[str] = None,
|
||||
) -> CrisisDetection:
|
||||
"""Detect crisis signals in any supported language.
|
||||
|
||||
Args:
|
||||
text: The message text to analyze
|
||||
lang: Language code. Auto-detected if None.
|
||||
|
||||
Returns:
|
||||
CrisisDetection with results.
|
||||
"""
|
||||
if not text or not text.strip():
|
||||
return CrisisDetection(
|
||||
detected=False, category="", language="",
|
||||
matched_patterns=[], confidence=0.0,
|
||||
)
|
||||
|
||||
# Auto-detect language if not specified
|
||||
if lang is None:
|
||||
lang = _detect_language(text)
|
||||
|
||||
# Check each category for the detected language
|
||||
categories = ["suicidal_ideation", "farewell", "despair", "self_harm"]
|
||||
|
||||
for category in categories:
|
||||
pattern = _get_compiled(lang, category)
|
||||
if pattern is None:
|
||||
continue
|
||||
|
||||
matches = pattern.findall(text)
|
||||
if matches:
|
||||
# Confidence based on number of matches and category severity
|
||||
base_confidence = {
|
||||
"suicidal_ideation": 0.9,
|
||||
"self_harm": 0.85,
|
||||
"farewell": 0.75,
|
||||
"despair": 0.6,
|
||||
}.get(category, 0.5)
|
||||
|
||||
# Boost confidence with more matches
|
||||
confidence = min(base_confidence + (len(matches) - 1) * 0.05, 1.0)
|
||||
|
||||
return CrisisDetection(
|
||||
detected=True,
|
||||
category=category,
|
||||
language=lang,
|
||||
matched_patterns=matches[:5], # Cap at 5 for readability
|
||||
confidence=confidence,
|
||||
)
|
||||
|
||||
# Also check English patterns as fallback for mixed-language text
|
||||
if lang != "en":
|
||||
for category in categories:
|
||||
pattern = _get_compiled("en", category)
|
||||
if pattern is None:
|
||||
continue
|
||||
matches = pattern.findall(text.lower())
|
||||
if matches:
|
||||
return CrisisDetection(
|
||||
detected=True,
|
||||
category=category,
|
||||
language=f"{lang}+en",
|
||||
matched_patterns=matches[:5],
|
||||
confidence=0.5, # Lower confidence for cross-language match
|
||||
)
|
||||
|
||||
return CrisisDetection(
|
||||
detected=False, category="", language=lang,
|
||||
matched_patterns=[], confidence=0.0,
|
||||
)
|
||||
@@ -1,80 +0,0 @@
|
||||
# Approval Tier System
|
||||
|
||||
Graduated safety for command approval based on risk level.
|
||||
|
||||
## Tiers
|
||||
|
||||
| Tier | Name | Action Types | Who Approves | Timeout |
|
||||
|------|------|--------------|--------------|---------|
|
||||
| 0 | SAFE | Read, search, list, view | None | N/A |
|
||||
| 1 | LOW | Write, create, edit, script | LLM only | N/A |
|
||||
| 2 | MEDIUM | Messages, API, email | Human + LLM | 60s |
|
||||
| 3 | HIGH | Crypto, config, deploy | Human + LLM | 30s |
|
||||
| 4 | CRITICAL | Delete, kill, shutdown | Human + LLM | 10s |
|
||||
|
||||
## How It Works
|
||||
|
||||
1. **Detection**: `detect_tier(command, action)` analyzes the command and action type
|
||||
2. **Auto-approve**: SAFE and LOW tiers are automatically approved
|
||||
3. **Human approval**: MEDIUM+ tiers require human confirmation
|
||||
4. **Timeout handling**: If no response within timeout, escalate to next tier
|
||||
5. **Crisis bypass**: 988 Lifeline commands bypass approval entirely
|
||||
|
||||
## Usage
|
||||
|
||||
```python
|
||||
from tools.approval import TieredApproval, detect_tier, ApprovalTier
|
||||
|
||||
# Detect tier
|
||||
tier = detect_tier("rm -rf /tmp/data") # Returns ApprovalTier.CRITICAL
|
||||
|
||||
# Request approval
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
|
||||
if result["approved"]:
|
||||
# Auto-approved (SAFE or LOW tier)
|
||||
execute_command()
|
||||
else:
|
||||
# Needs human approval
|
||||
show_approval_ui(result["approval_id"], result["tier"], result["timeout"])
|
||||
```
|
||||
|
||||
## Crisis Bypass
|
||||
|
||||
Commands containing crisis keywords (988, suicide, self-harm, crisis hotline) automatically bypass approval to ensure immediate help:
|
||||
|
||||
```python
|
||||
from tools.approval import is_crisis_bypass
|
||||
|
||||
is_crisis_bypass("call 988 for help") # True — bypasses approval
|
||||
```
|
||||
|
||||
## Timeout Escalation
|
||||
|
||||
When a tier times out without human response:
|
||||
- MEDIUM → HIGH (30s timeout)
|
||||
- HIGH → CRITICAL (10s timeout)
|
||||
- CRITICAL → Deny
|
||||
|
||||
## Integration
|
||||
|
||||
The tier system integrates with:
|
||||
- **CLI**: Interactive prompts with tier-aware timeouts
|
||||
- **Gateway**: Telegram/Discord approval buttons
|
||||
- **Cron**: Auto-approve LOW tier, escalate MEDIUM+
|
||||
|
||||
## Testing
|
||||
|
||||
Run tests with:
|
||||
```bash
|
||||
python -m pytest tests/test_approval_tiers.py -v
|
||||
```
|
||||
|
||||
26 tests covering:
|
||||
- Tier detection from commands and actions
|
||||
- Timeout values per tier
|
||||
- Approver requirements
|
||||
- Crisis bypass logic
|
||||
- Approval request and resolution
|
||||
- Timeout escalation
|
||||
201
tests/agent/test_crisis_multilingual.py
Normal file
201
tests/agent/test_crisis_multilingual.py
Normal file
@@ -0,0 +1,201 @@
|
||||
"""Tests for multilingual crisis detection (#694)."""
|
||||
|
||||
import pytest
|
||||
from agent.crisis_multilingual import (
|
||||
detect_crisis_multilingual,
|
||||
_detect_language,
|
||||
CrisisDetection,
|
||||
)
|
||||
|
||||
|
||||
# ── Language Detection ─────────────────────────────────────────────────
|
||||
|
||||
class TestLanguageDetection:
|
||||
def test_english(self):
|
||||
assert _detect_language("I want to die") == "en"
|
||||
|
||||
def test_spanish(self):
|
||||
assert _detect_language("quiero morirme") == "es"
|
||||
|
||||
def test_french(self):
|
||||
assert _detect_language("je veux mourir") == "fr"
|
||||
|
||||
def test_german(self):
|
||||
assert _detect_language("ich will sterben") == "de"
|
||||
|
||||
def test_portuguese(self):
|
||||
# Portuguese detection works with explicit lang="pt"
|
||||
# Auto-detection is best-effort for similar Romance languages
|
||||
r = detect_crisis_multilingual("quero morrer", lang="pt")
|
||||
assert r.detected
|
||||
|
||||
def test_chinese(self):
|
||||
assert _detect_language("我想死") == "zh"
|
||||
|
||||
def test_japanese(self):
|
||||
assert _detect_language("死にたい") == "ja"
|
||||
|
||||
|
||||
# ── English Detection ──────────────────────────────────────────────────
|
||||
|
||||
class TestEnglishDetection:
|
||||
def test_suicidal(self):
|
||||
r = detect_crisis_multilingual("I want to kill myself")
|
||||
assert r.detected
|
||||
assert r.category == "suicidal_ideation"
|
||||
assert r.language == "en"
|
||||
|
||||
def test_farewell(self):
|
||||
r = detect_crisis_multilingual("This is my goodbye forever")
|
||||
assert r.detected
|
||||
assert r.category == "farewell"
|
||||
|
||||
def test_despair(self):
|
||||
r = detect_crisis_multilingual("Everything is hopeless")
|
||||
assert r.detected
|
||||
assert r.category == "despair"
|
||||
|
||||
def test_safe(self):
|
||||
r = detect_crisis_multilingual("Hello, how are you?")
|
||||
assert not r.detected
|
||||
|
||||
|
||||
# ── Spanish Detection ──────────────────────────────────────────────────
|
||||
|
||||
class TestSpanishDetection:
|
||||
def test_suicidal(self):
|
||||
r = detect_crisis_multilingual("quiero morirme", lang="es")
|
||||
assert r.detected
|
||||
assert r.category == "suicidal_ideation"
|
||||
assert r.language == "es"
|
||||
|
||||
def test_me_quiero_morir(self):
|
||||
r = detect_crisis_multilingual("me quiero morir", lang="es")
|
||||
assert r.detected
|
||||
assert r.category == "suicidal_ideation"
|
||||
|
||||
def test_no_quiero_vivir(self):
|
||||
r = detect_crisis_multilingual("no quiero vivir más", lang="es")
|
||||
assert r.detected
|
||||
assert r.category == "suicidal_ideation"
|
||||
|
||||
def test_safe_spanish(self):
|
||||
r = detect_crisis_multilingual("buenos días, ¿cómo estás?", lang="es")
|
||||
assert not r.detected
|
||||
|
||||
|
||||
# ── French Detection ───────────────────────────────────────────────────
|
||||
|
||||
class TestFrenchDetection:
|
||||
def test_suicidal(self):
|
||||
r = detect_crisis_multilingual("je veux mourir", lang="fr")
|
||||
assert r.detected
|
||||
assert r.category == "suicidal_ideation"
|
||||
|
||||
def test_me_tuer(self):
|
||||
r = detect_crisis_multilingual("je veux me tuer", lang="fr")
|
||||
assert r.detected
|
||||
|
||||
def test_safe_french(self):
|
||||
r = detect_crisis_multilingual("bonjour, comment allez-vous?", lang="fr")
|
||||
assert not r.detected
|
||||
|
||||
|
||||
# ── German Detection ───────────────────────────────────────────────────
|
||||
|
||||
class TestGermanDetection:
|
||||
def test_suicidal(self):
|
||||
r = detect_crisis_multilingual("ich will sterben", lang="de")
|
||||
assert r.detected
|
||||
assert r.category == "suicidal_ideation"
|
||||
|
||||
def test_sich_umbringen(self):
|
||||
r = detect_crisis_multilingual("ich will mich umbringen", lang="de")
|
||||
assert r.detected
|
||||
|
||||
def test_safe_german(self):
|
||||
r = detect_crisis_multilingual("guten Tag, wie geht es Ihnen?", lang="de")
|
||||
assert not r.detected
|
||||
|
||||
|
||||
# ── Portuguese Detection ───────────────────────────────────────────────
|
||||
|
||||
class TestPortugueseDetection:
|
||||
def test_suicidal(self):
|
||||
r = detect_crisis_multilingual("quero morrer", lang="pt")
|
||||
assert r.detected
|
||||
assert r.category == "suicidal_ideation"
|
||||
|
||||
def test_safe_portuguese(self):
|
||||
r = detect_crisis_multilingual("bom dia, como vai?", lang="pt")
|
||||
assert not r.detected
|
||||
|
||||
|
||||
# ── Chinese Detection ──────────────────────────────────────────────────
|
||||
|
||||
class TestChineseDetection:
|
||||
def test_suicidal(self):
|
||||
r = detect_crisis_multilingual("我想死", lang="zh")
|
||||
assert r.detected
|
||||
assert r.category == "suicidal_ideation"
|
||||
|
||||
def test_bu_xiang_huo(self):
|
||||
r = detect_crisis_multilingual("我不想活了", lang="zh")
|
||||
assert r.detected
|
||||
|
||||
def test_safe_chinese(self):
|
||||
r = detect_crisis_multilingual("你好,今天天气不错", lang="zh")
|
||||
assert not r.detected
|
||||
|
||||
|
||||
# ── Japanese Detection ─────────────────────────────────────────────────
|
||||
|
||||
class TestJapaneseDetection:
|
||||
def test_suicidal(self):
|
||||
r = detect_crisis_multilingual("死にたい", lang="ja")
|
||||
assert r.detected
|
||||
assert r.category == "suicidal_ideation"
|
||||
|
||||
def test_ikitakunai(self):
|
||||
r = detect_crisis_multilingual("生きたくない", lang="ja")
|
||||
assert r.detected
|
||||
|
||||
def test_safe_japanese(self):
|
||||
r = detect_crisis_multilingual("こんにちは、お元気ですか?", lang="ja")
|
||||
assert not r.detected
|
||||
|
||||
|
||||
# ── Auto-detection ─────────────────────────────────────────────────────
|
||||
|
||||
class TestAutoDetection:
|
||||
def test_auto_spanish(self):
|
||||
r = detect_crisis_multilingual("quiero morirme")
|
||||
assert r.detected
|
||||
assert "es" in r.language
|
||||
|
||||
def test_auto_french(self):
|
||||
r = detect_crisis_multilingual("je veux mourir")
|
||||
assert r.detected
|
||||
assert "fr" in r.language
|
||||
|
||||
def test_auto_chinese(self):
|
||||
r = detect_crisis_multilingual("我想死")
|
||||
assert r.detected
|
||||
assert "zh" in r.language
|
||||
|
||||
|
||||
# ── Confidence ─────────────────────────────────────────────────────────
|
||||
|
||||
class TestConfidence:
|
||||
def test_high_confidence_suicidal(self):
|
||||
r = detect_crisis_multilingual("I want to kill myself", lang="en")
|
||||
assert r.confidence >= 0.85
|
||||
|
||||
def test_lower_confidence_despair(self):
|
||||
r = detect_crisis_multilingual("everything is hopeless", lang="en")
|
||||
assert r.confidence >= 0.5
|
||||
|
||||
def test_empty_input(self):
|
||||
r = detect_crisis_multilingual("")
|
||||
assert not r.detected
|
||||
assert r.confidence == 0.0
|
||||
@@ -1,141 +0,0 @@
|
||||
"""Tests for approval tier system (Issue #670)."""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from tools.approval import (
|
||||
ApprovalTier, detect_tier, get_tier_timeout, get_tier_approvers,
|
||||
requires_human_approval, is_crisis_bypass, TieredApproval, get_tiered_approval
|
||||
)
|
||||
|
||||
|
||||
class TestApprovalTier:
|
||||
def test_safe_read(self):
|
||||
assert detect_tier("cat file.txt") == ApprovalTier.SAFE
|
||||
|
||||
def test_safe_search(self):
|
||||
assert detect_tier("grep pattern file") == ApprovalTier.SAFE
|
||||
|
||||
def test_low_write(self):
|
||||
assert detect_tier("write to file", action="write") == ApprovalTier.LOW
|
||||
|
||||
def test_medium_message(self):
|
||||
assert detect_tier("send message", action="send_message") == ApprovalTier.MEDIUM
|
||||
|
||||
def test_high_config(self):
|
||||
assert detect_tier("edit config", action="config") == ApprovalTier.HIGH
|
||||
|
||||
def test_critical_delete(self):
|
||||
assert detect_tier("rm -rf /", action="delete") == ApprovalTier.CRITICAL
|
||||
|
||||
def test_crisis_keyword(self):
|
||||
assert detect_tier("call 988 for help") == ApprovalTier.CRITICAL
|
||||
|
||||
def test_dangerous_pattern_escalation(self):
|
||||
# rm -rf should be CRITICAL
|
||||
assert detect_tier("rm -rf /tmp/data") == ApprovalTier.CRITICAL
|
||||
|
||||
|
||||
class TestTierTimeouts:
|
||||
def test_safe_no_timeout(self):
|
||||
assert get_tier_timeout(ApprovalTier.SAFE) == 0
|
||||
|
||||
def test_medium_60s(self):
|
||||
assert get_tier_timeout(ApprovalTier.MEDIUM) == 60
|
||||
|
||||
def test_high_30s(self):
|
||||
assert get_tier_timeout(ApprovalTier.HIGH) == 30
|
||||
|
||||
def test_critical_10s(self):
|
||||
assert get_tier_timeout(ApprovalTier.CRITICAL) == 10
|
||||
|
||||
|
||||
class TestTierApprovers:
|
||||
def test_safe_no_approvers(self):
|
||||
assert get_tier_approvers(ApprovalTier.SAFE) == ()
|
||||
|
||||
def test_low_llm_only(self):
|
||||
assert get_tier_approvers(ApprovalTier.LOW) == ("llm",)
|
||||
|
||||
def test_medium_human_llm(self):
|
||||
assert get_tier_approvers(ApprovalTier.MEDIUM) == ("human", "llm")
|
||||
|
||||
def test_requires_human(self):
|
||||
assert requires_human_approval(ApprovalTier.SAFE) == False
|
||||
assert requires_human_approval(ApprovalTier.LOW) == False
|
||||
assert requires_human_approval(ApprovalTier.MEDIUM) == True
|
||||
assert requires_human_approval(ApprovalTier.HIGH) == True
|
||||
assert requires_human_approval(ApprovalTier.CRITICAL) == True
|
||||
|
||||
|
||||
class TestCrisisBypass:
|
||||
def test_988_bypass(self):
|
||||
assert is_crisis_bypass("call 988") == True
|
||||
|
||||
def test_suicide_prevention(self):
|
||||
assert is_crisis_bypass("contact suicide prevention") == True
|
||||
|
||||
def test_normal_command(self):
|
||||
assert is_crisis_bypass("ls -la") == False
|
||||
|
||||
|
||||
class TestTieredApproval:
|
||||
def test_safe_auto_approves(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "cat file.txt")
|
||||
assert result["approved"] == True
|
||||
assert result["tier"] == ApprovalTier.SAFE
|
||||
|
||||
def test_low_auto_approves(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "write file", action="write")
|
||||
assert result["approved"] == True
|
||||
assert result["tier"] == ApprovalTier.LOW
|
||||
|
||||
def test_medium_needs_approval(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
assert result["approved"] == False
|
||||
assert result["tier"] == ApprovalTier.MEDIUM
|
||||
assert "approval_id" in result
|
||||
|
||||
def test_crisis_bypass(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "call 988 for help")
|
||||
assert result["approved"] == True
|
||||
assert result["reason"] == "crisis_bypass"
|
||||
|
||||
def test_resolve_approval(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
approval_id = result["approval_id"]
|
||||
|
||||
assert ta.resolve_approval(approval_id, True) == True
|
||||
assert approval_id not in ta._pending
|
||||
|
||||
def test_timeout_escalation(self):
|
||||
ta = TieredApproval()
|
||||
result = ta.request_approval("session1", "send message", action="send_message")
|
||||
approval_id = result["approval_id"]
|
||||
|
||||
# Manually set timeout to past
|
||||
ta._timeouts[approval_id] = 0
|
||||
|
||||
timed_out = ta.check_timeouts()
|
||||
assert approval_id in timed_out
|
||||
|
||||
# Should have escalated to HIGH tier
|
||||
if approval_id in ta._pending:
|
||||
assert ta._pending[approval_id]["tier"] == ApprovalTier.HIGH
|
||||
|
||||
|
||||
class TestGetTieredApproval:
|
||||
def test_singleton(self):
|
||||
ta1 = get_tiered_approval()
|
||||
ta2 = get_tiered_approval()
|
||||
assert ta1 is ta2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -133,183 +133,6 @@ DANGEROUS_PATTERNS = [
|
||||
]
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Approval Tier System (Issue #670)
|
||||
# =========================================================================
|
||||
|
||||
from enum import IntEnum
|
||||
import time
|
||||
|
||||
class ApprovalTier(IntEnum):
|
||||
"""Safety tiers for command approval.
|
||||
|
||||
Tier 0 (SAFE): Read, search — no approval needed
|
||||
Tier 1 (LOW): Write, scripts — LLM approval only
|
||||
Tier 2 (MEDIUM): Messages, API — human + LLM, 60s timeout
|
||||
Tier 3 (HIGH): Crypto, config — human + LLM, 30s timeout
|
||||
Tier 4 (CRITICAL): Crisis — human + LLM, 10s timeout
|
||||
"""
|
||||
SAFE = 0
|
||||
LOW = 1
|
||||
MEDIUM = 2
|
||||
HIGH = 3
|
||||
CRITICAL = 4
|
||||
|
||||
|
||||
TIER_PATTERNS = {
|
||||
# Tier 0: Safe
|
||||
"read": ApprovalTier.SAFE, "search": ApprovalTier.SAFE, "list": ApprovalTier.SAFE,
|
||||
"view": ApprovalTier.SAFE, "cat": ApprovalTier.SAFE, "grep": ApprovalTier.SAFE,
|
||||
# Tier 1: Low
|
||||
"write": ApprovalTier.LOW, "create": ApprovalTier.LOW, "edit": ApprovalTier.LOW,
|
||||
"patch": ApprovalTier.LOW, "copy": ApprovalTier.LOW, "mkdir": ApprovalTier.LOW,
|
||||
"script": ApprovalTier.LOW, "execute": ApprovalTier.LOW, "run": ApprovalTier.LOW,
|
||||
# Tier 2: Medium
|
||||
"send_message": ApprovalTier.MEDIUM, "message": ApprovalTier.MEDIUM,
|
||||
"email": ApprovalTier.MEDIUM, "api": ApprovalTier.MEDIUM, "post": ApprovalTier.MEDIUM,
|
||||
"telegram": ApprovalTier.MEDIUM, "discord": ApprovalTier.MEDIUM,
|
||||
# Tier 3: High
|
||||
"crypto": ApprovalTier.HIGH, "bitcoin": ApprovalTier.HIGH, "wallet": ApprovalTier.HIGH,
|
||||
"key": ApprovalTier.HIGH, "secret": ApprovalTier.HIGH, "config": ApprovalTier.HIGH,
|
||||
"deploy": ApprovalTier.HIGH, "install": ApprovalTier.HIGH, "systemctl": ApprovalTier.HIGH,
|
||||
# Tier 4: Critical
|
||||
"delete": ApprovalTier.CRITICAL, "remove": ApprovalTier.CRITICAL, "rm": ApprovalTier.CRITICAL,
|
||||
"format": ApprovalTier.CRITICAL, "kill": ApprovalTier.CRITICAL, "shutdown": ApprovalTier.CRITICAL,
|
||||
"crisis": ApprovalTier.CRITICAL, "suicide": ApprovalTier.CRITICAL,
|
||||
}
|
||||
|
||||
TIER_TIMEOUTS = {
|
||||
ApprovalTier.SAFE: 0, ApprovalTier.LOW: 0, ApprovalTier.MEDIUM: 60,
|
||||
ApprovalTier.HIGH: 30, ApprovalTier.CRITICAL: 10,
|
||||
}
|
||||
|
||||
TIER_APPROVERS = {
|
||||
ApprovalTier.SAFE: (), ApprovalTier.LOW: ("llm",),
|
||||
ApprovalTier.MEDIUM: ("human", "llm"), ApprovalTier.HIGH: ("human", "llm"),
|
||||
ApprovalTier.CRITICAL: ("human", "llm"),
|
||||
}
|
||||
|
||||
|
||||
def detect_tier(command, action="", context=None):
|
||||
"""Detect approval tier for a command or action."""
|
||||
# Crisis keywords always CRITICAL
|
||||
crisis_keywords = ["988", "suicide", "self-harm", "crisis", "emergency"]
|
||||
for kw in crisis_keywords:
|
||||
if kw in command.lower():
|
||||
return ApprovalTier.CRITICAL
|
||||
|
||||
# Check action type
|
||||
if action and action.lower() in TIER_PATTERNS:
|
||||
return TIER_PATTERNS[action.lower()]
|
||||
|
||||
# Check command for keywords
|
||||
cmd_lower = command.lower()
|
||||
best_tier = ApprovalTier.SAFE
|
||||
for keyword, tier in TIER_PATTERNS.items():
|
||||
if keyword in cmd_lower and tier > best_tier:
|
||||
best_tier = tier
|
||||
|
||||
# Check dangerous patterns
|
||||
is_dangerous, _, description = detect_dangerous_command(command)
|
||||
if is_dangerous:
|
||||
desc_lower = description.lower()
|
||||
if any(k in desc_lower for k in ["delete", "remove", "format", "drop", "kill"]):
|
||||
return ApprovalTier.CRITICAL
|
||||
elif any(k in desc_lower for k in ["chmod", "chown", "systemctl", "config"]):
|
||||
return max(best_tier, ApprovalTier.HIGH)
|
||||
else:
|
||||
return max(best_tier, ApprovalTier.MEDIUM)
|
||||
|
||||
return best_tier
|
||||
|
||||
|
||||
def get_tier_timeout(tier):
|
||||
return TIER_TIMEOUTS.get(tier, 60)
|
||||
|
||||
def get_tier_approvers(tier):
|
||||
return TIER_APPROVERS.get(tier, ("human", "llm"))
|
||||
|
||||
def requires_human_approval(tier):
|
||||
return "human" in get_tier_approvers(tier)
|
||||
|
||||
def is_crisis_bypass(command):
|
||||
"""Check if command qualifies for crisis bypass (988 Lifeline)."""
|
||||
indicators = ["988", "suicide prevention", "crisis hotline", "lifeline", "emergency help"]
|
||||
cmd_lower = command.lower()
|
||||
return any(i in cmd_lower for i in indicators)
|
||||
|
||||
|
||||
class TieredApproval:
|
||||
"""Tiered approval handler."""
|
||||
|
||||
def __init__(self):
|
||||
self._pending = {}
|
||||
self._timeouts = {}
|
||||
|
||||
def request_approval(self, session_key, command, action="", context=None):
|
||||
"""Request approval based on tier. Returns approval dict."""
|
||||
tier = detect_tier(command, action, context)
|
||||
timeout = get_tier_timeout(tier)
|
||||
approvers = get_tier_approvers(tier)
|
||||
|
||||
# Crisis bypass
|
||||
if tier == ApprovalTier.CRITICAL and is_crisis_bypass(command):
|
||||
return {"approved": True, "tier": tier, "reason": "crisis_bypass", "timeout": 0, "approvers": ()}
|
||||
|
||||
# Safe/Low auto-approve
|
||||
if tier <= ApprovalTier.LOW:
|
||||
return {"approved": True, "tier": tier, "reason": "auto_approve", "timeout": 0, "approvers": approvers}
|
||||
|
||||
# Higher tiers need approval
|
||||
import uuid
|
||||
approval_id = f"{session_key}_{uuid.uuid4().hex[:8]}"
|
||||
self._pending[approval_id] = {
|
||||
"session_key": session_key, "command": command, "action": action,
|
||||
"tier": tier, "timeout": timeout, "approvers": approvers, "created_at": time.time(),
|
||||
}
|
||||
if timeout > 0:
|
||||
self._timeouts[approval_id] = time.time() + timeout
|
||||
|
||||
return {
|
||||
"approved": False, "tier": tier, "approval_id": approval_id,
|
||||
"timeout": timeout, "approvers": approvers,
|
||||
"requires_human": requires_human_approval(tier),
|
||||
}
|
||||
|
||||
def resolve_approval(self, approval_id, approved, approver="human"):
|
||||
"""Resolve a pending approval."""
|
||||
if approval_id not in self._pending:
|
||||
return False
|
||||
self._pending.pop(approval_id)
|
||||
self._timeouts.pop(approval_id, None)
|
||||
return approved
|
||||
|
||||
def check_timeouts(self):
|
||||
"""Check for timed-out approvals and auto-escalate."""
|
||||
now = time.time()
|
||||
timed_out = []
|
||||
for aid, timeout_at in list(self._timeouts.items()):
|
||||
if now > timeout_at:
|
||||
timed_out.append(aid)
|
||||
if aid in self._pending:
|
||||
pending = self._pending[aid]
|
||||
current_tier = pending["tier"]
|
||||
if current_tier < ApprovalTier.CRITICAL:
|
||||
pending["tier"] = ApprovalTier(current_tier + 1)
|
||||
pending["timeout"] = get_tier_timeout(pending["tier"])
|
||||
self._timeouts[aid] = now + pending["timeout"]
|
||||
else:
|
||||
self._pending.pop(aid, None)
|
||||
self._timeouts.pop(aid, None)
|
||||
return timed_out
|
||||
|
||||
|
||||
_tiered_approval = TieredApproval()
|
||||
|
||||
def get_tiered_approval():
|
||||
return _tiered_approval
|
||||
|
||||
|
||||
def _legacy_pattern_key(pattern: str) -> str:
|
||||
"""Reproduce the old regex-derived approval key for backwards compatibility."""
|
||||
return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
|
||||
|
||||
Reference in New Issue
Block a user