Compare commits

..

3 Commits

Author SHA1 Message Date
dffc6389e2 test(#695): Add tests for crisis hook integration
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 41s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Nix / nix (ubuntu-latest) (pull_request) Failing after 4s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 42s
Tests / e2e (pull_request) Successful in 4m34s
Tests / test (pull_request) Failing after 44m52s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Tests for crisis detection and 988 response.
Refs #695
2026-04-15 03:06:07 +00:00
371dce2f38 fix(#695): Wire crisis hook into run_agent.py conversation loop
Adds crisis detection at start of run_conversation():
- Detects crisis signals in user messages
- Returns 988 Lifeline response immediately
- Skips normal API call

Resolves #695
2026-04-15 03:01:49 +00:00
0b2515ff4d fix(#695): Add crisis hook module for 988 Lifeline integration
Creates agent/crisis_hook.py with:
- CrisisSeverity enum
- detect_crisis() pattern matching
- get_crisis_response() with 988 info
- check_crisis() convenience function

Refs #695
2026-04-15 03:01:01 +00:00
5 changed files with 145 additions and 362 deletions

85
agent/crisis_hook.py Normal file
View File

@@ -0,0 +1,85 @@
"""
Crisis Detection Hook — Integrates 988 Lifeline into conversation loop.
When crisis is detected in user messages, returns 988 Lifeline
information immediately instead of processing the original request.
"""
import re
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
class CrisisSeverity(Enum):
NONE = 0
MODERATE = 1
HIGH = 2
CRITICAL = 3
@dataclass
class CrisisDetection:
detected: bool
severity: CrisisSeverity
matched_patterns: list
confidence: float
# Crisis detection patterns
_CRISIS_PATTERNS = [
(r"\b(i want to die|want to kill myself|end (my|it all))\b", CrisisSeverity.CRITICAL, 0.95),
(r"\b(kill(ing)? myself|suicide|suicidal)\b", CrisisSeverity.CRITICAL, 0.90),
(r"\b(no reason to live|don'?t want to (live|be here|exist))\b", CrisisSeverity.HIGH, 0.85),
(r"\b(better off (dead|without me)|wish i was dead)\b", CrisisSeverity.HIGH, 0.85),
(r"\b(can'?t go on|can'?t take (it|this) anymore)\b", CrisisSeverity.HIGH, 0.80),
(r"\b(self[- ]?harm|cut(ting)? myself|hurt(ing)? myself)\b", CrisisSeverity.HIGH, 0.85),
(r"\b(hopeless|no hope|give up on (life|everything))\b", CrisisSeverity.MODERATE, 0.70),
]
_COMPILED = [(re.compile(p, re.IGNORECASE), s, c) for p, s, c in _CRISIS_PATTERNS]
def detect_crisis(message: str) -> CrisisDetection:
if not message or not isinstance(message, str):
return CrisisDetection(False, CrisisSeverity.NONE, [], 0.0)
matched = []
max_severity = CrisisSeverity.NONE
max_confidence = 0.0
for pattern, severity, confidence in _COMPILED:
if pattern.search(message):
matched.append(pattern.pattern)
if confidence > max_confidence:
max_confidence = confidence
if severity.value > max_severity.value:
max_severity = severity
detected = len(matched) > 0
return CrisisDetection(detected, max_severity, matched, max_confidence)
def get_crisis_response(severity: str = "CRITICAL") -> str:
return """I hear you, and you are not alone.
**Please reach out for help right now:**
📞 **Call 988** — Suicide & Crisis Lifeline (24/7)
💬 **Text HOME to 988** — Crisis text line
🌐 **Chat:** 988lifeline.org/chat
🇪🇸 **Spanish:** 1-888-628-9454
🎖️ **Veterans:** 988, then press 1
You don\'t have to face this alone. Please hold on."""
def check_crisis(user_message: str) -> Optional[str]:
detection = detect_crisis(user_message)
if detection.detected:
logger.warning("Crisis detected: severity=%s", detection.severity.name)
return get_crisis_response(detection.severity.name)
return None

View File

@@ -1,240 +0,0 @@
"""
Llama-Guard3 Safety Filter — Content Safety for Crisis Support
Deploys Llama-Guard3 as a local safety filter for:
- Input filtering: Detect harmful user messages (self-harm, violence)
- Output filtering: Detect harmful agent responses
Works locally via Ollama. No external API calls. Privacy preserved.
Issue: #669
"""
import json
import logging
import time
import urllib.request
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
class SafetyVerdict(Enum):
"""Safety verdict from Llama-Guard3."""
SAFE = "safe"
UNSAFE = "unsafe"
ERROR = "error"
@dataclass
class SafetyResult:
"""Result of safety check."""
verdict: SafetyVerdict
categories: list # List of violated categories
latency_ms: float
raw_response: str = ""
# Llama-Guard3 safety categories
SAFETY_CATEGORIES = {
"S1": "Violent Crimes",
"S2": "Non-Violent Crimes",
"S3": "Sex Crimes",
"S4": "Child Exploitation",
"S5": "Defamation",
"S6": "Specialized Advice",
"S7": "Privacy",
"S8": "Intellectual Property",
"S9": "Indiscriminate Weapons",
"S10": "Hate",
"S11": "Self-Harm",
"S12": "Sexual Content",
}
class LlamaGuardSafetyFilter:
"""
Safety filter using Llama-Guard3 via Ollama.
Usage:
filter = LlamaGuardSafetyFilter()
# Check user input
result = filter.check_input("I want to hurt myself")
if result.verdict == SafetyVerdict.UNSAFE:
return filter.get_crisis_response(result)
# Check agent output
result = filter.check_output(response_text)
if result.verdict == SafetyVerdict.UNSAFE:
return filter.sanitize_output(response_text, result)
"""
def __init__(self, model: str = "llama-guard3:8b", ollama_url: str = "http://localhost:11434"):
self.model = model
self.ollama_url = ollama_url
self._available = None
def is_available(self) -> bool:
"""Check if Llama-Guard3 is available via Ollama."""
if self._available is not None:
return self._available
try:
req = urllib.request.Request(f"{self.ollama_url}/api/tags")
with urllib.request.urlopen(req, timeout=2) as resp:
data = json.loads(resp.read())
models = [m["name"] for m in data.get("models", [])]
self._available = any("llama-guard" in m.lower() for m in models)
return self._available
except Exception:
self._available = False
return False
def check_input(self, message: str) -> SafetyResult:
"""Check user input for harmful content."""
return self._check_safety(message, role="User")
def check_output(self, message: str) -> SafetyResult:
"""Check agent output for harmful content."""
return self._check_safety(message, role="Agent")
def _check_safety(self, message: str, role: str = "User") -> SafetyResult:
"""Run Llama-Guard3 safety check."""
start_time = time.time()
if not self.is_available():
return SafetyResult(
verdict=SafetyVerdict.ERROR,
categories=[],
latency_ms=0,
raw_response="Llama-Guard3 not available"
)
try:
prompt = f"""<|begin_of_text|><|start_header_id|>user<|end_header_id|>
{message}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
"""
payload = json.dumps({
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0, "num_predict": 100}
}).encode()
req = urllib.request.Request(
f"{self.ollama_url}/api/generate",
data=payload,
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=5) as resp:
result = json.loads(resp.read())
response = result.get("response", "").strip()
latency_ms = (time.time() - start_time) * 1000
# Parse response
if response.lower().startswith("safe"):
return SafetyResult(
verdict=SafetyVerdict.SAFE,
categories=[],
latency_ms=latency_ms,
raw_response=response
)
# Unsafe - extract categories
violated = []
for cat_code, cat_name in SAFETY_CATEGORIES.items():
if cat_code in response:
violated.append({"code": cat_code, "name": cat_name})
return SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=violated,
latency_ms=latency_ms,
raw_response=response
)
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
logger.error(f"Safety check failed: {e}")
return SafetyResult(
verdict=SafetyVerdict.ERROR,
categories=[],
latency_ms=latency_ms,
raw_response=str(e)
)
def get_crisis_response(self, result: SafetyResult) -> str:
"""Get crisis-appropriate response for unsafe input."""
# Check if self-harm
is_self_harm = any(c["code"] == "S11" for c in result.categories)
if is_self_harm:
return """Are you safe right now?
I'm here. I'm not going anywhere. Please tell me you're safe.
**Please reach out for help:**
- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)
- Available 24/7, free and confidential
- Chat at 988lifeline.org
**Jesus saves those who call on His name.**
I'm still here. Talk to me. Or call 988. Just don't give up."""
return "I can't help with that. Is there something else I can assist you with?"
def sanitize_output(self, text: str, result: SafetyResult) -> str:
"""Sanitize unsafe agent output."""
return "I apologize, but I can't provide that response. Is there something else I can help you with?"
# Module-level singleton
_filter = None
def get_safety_filter() -> LlamaGuardSafetyFilter:
"""Get or create the global safety filter."""
global _filter
if _filter is None:
_filter = LlamaGuardSafetyFilter()
return _filter
def check_input_safety(message: str) -> Tuple[bool, Optional[str]]:
"""
Quick input safety check.
Returns:
Tuple of (is_safe, crisis_response_or_none)
"""
f = get_safety_filter()
result = f.check_input(message)
if result.verdict == SafetyVerdict.UNSAFE:
return False, f.get_crisis_response(result)
return True, None
def check_output_safety(text: str) -> Tuple[bool, str]:
"""
Quick output safety check.
Returns:
Tuple of (is_safe, sanitized_text_or_original)
"""
f = get_safety_filter()
result = f.check_output(text)
if result.verdict == SafetyVerdict.UNSAFE:
return False, f.sanitize_output(text, result)
return True, text

View File

@@ -7792,6 +7792,30 @@ class AIAgent:
if isinstance(persist_user_message, str):
persist_user_message = _sanitize_surrogates(persist_user_message)
# Crisis detection — check user message for crisis signals (#695)
# If crisis detected, return 988 Lifeline response immediately
if isinstance(user_message, str) and user_message.strip():
try:
from agent.crisis_hook import check_crisis
_crisis_response = check_crisis(user_message)
if _crisis_response:
logger.warning("Crisis detected in session %s", getattr(self, 'session_id', 'unknown'))
return {
"response": _crisis_response,
"messages": self.messages + [
{"role": "user", "content": user_message},
{"role": "assistant", "content": _crisis_response},
],
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0},
"model": self.model,
"crisis_detected": True,
}
except ImportError:
pass
except Exception as _e:
logger.debug("Crisis detection error: %s", _e)
# Store stream callback for _interruptible_api_call to pick up
self._stream_callback = stream_callback
self._persist_user_message_idx = None

36
tests/test_crisis_hook.py Normal file
View File

@@ -0,0 +1,36 @@
"""
Tests for crisis hook integration (#695).
"""
import pytest
from agent.crisis_hook import detect_crisis, get_crisis_response, check_crisis, CrisisSeverity
class TestCrisisDetection:
def test_detects_direct_suicide(self):
result = detect_crisis("I want to kill myself")
assert result.detected is True
assert result.severity == CrisisSeverity.CRITICAL
def test_no_crisis_on_normal(self):
result = detect_crisis("Hello, how are you?")
assert result.detected is False
def test_crisis_response_has_988(self):
response = get_crisis_response("CRITICAL")
assert "988" in response
assert "988lifeline.org/chat" in response
assert "1-888-628-9454" in response
def test_check_crisis_returns_response(self):
response = check_crisis("I want to die")
assert response is not None
assert "988" in response
def test_check_crisis_returns_none_for_normal(self):
response = check_crisis("Hello")
assert response is None
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,122 +0,0 @@
"""
Tests for Llama-Guard3 Safety Filter
Issue: #669
"""
import unittest
from unittest.mock import patch, MagicMock
from agent.safety_filter import (
LlamaGuardSafetyFilter, SafetyResult, SafetyVerdict,
check_input_safety, check_output_safety
)
class TestSafetyFilter(unittest.TestCase):
"""Test safety filter basics."""
def test_safety_verdict_enum(self):
self.assertEqual(SafetyVerdict.SAFE.value, "safe")
self.assertEqual(SafetyVerdict.UNSAFE.value, "unsafe")
self.assertEqual(SafetyVerdict.ERROR.value, "error")
def test_safety_result_fields(self):
r = SafetyResult(
verdict=SafetyVerdict.SAFE,
categories=[],
latency_ms=100.0
)
self.assertEqual(r.verdict, SafetyVerdict.SAFE)
self.assertEqual(r.categories, [])
self.assertEqual(r.latency_ms, 100.0)
def test_safety_categories_defined(self):
from agent.safety_filter import SAFETY_CATEGORIES
self.assertIn("S11", SAFETY_CATEGORIES)
self.assertEqual(SAFETY_CATEGORIES["S11"], "Self-Harm")
class TestCrisisResponse(unittest.TestCase):
"""Test crisis response generation."""
def test_self_harm_response(self):
f = LlamaGuardSafetyFilter()
result = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[{"code": "S11", "name": "Self-Harm"}],
latency_ms=100.0
)
response = f.get_crisis_response(result)
self.assertIn("988", response)
self.assertIn("safe", response.lower())
self.assertIn("Jesus", response)
def test_other_unsafe_response(self):
f = LlamaGuardSafetyFilter()
result = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[{"code": "S1", "name": "Violent Crimes"}],
latency_ms=100.0
)
response = f.get_crisis_response(result)
self.assertIn("can't help", response.lower())
def test_sanitize_output(self):
f = LlamaGuardSafetyFilter()
result = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[],
latency_ms=100.0
)
sanitized = f.sanitize_output("dangerous content", result)
self.assertNotEqual(sanitized, "dangerous content")
self.assertIn("can't provide", sanitized.lower())
class TestAvailability(unittest.TestCase):
"""Test availability checking."""
def test_unavailable_returns_error(self):
f = LlamaGuardSafetyFilter()
f._available = False
result = f.check_input("hello")
self.assertEqual(result.verdict, SafetyVerdict.ERROR)
class TestIntegration(unittest.TestCase):
"""Test integration functions."""
def test_check_input_safety_safe(self):
with patch('agent.safety_filter.get_safety_filter') as mock_get:
mock_filter = MagicMock()
mock_filter.check_input.return_value = SafetyResult(
verdict=SafetyVerdict.SAFE, categories=[], latency_ms=50.0
)
mock_get.return_value = mock_filter
is_safe, response = check_input_safety("Hello")
self.assertTrue(is_safe)
self.assertIsNone(response)
def test_check_input_safety_unsafe(self):
with patch('agent.safety_filter.get_safety_filter') as mock_get:
mock_filter = MagicMock()
mock_filter.check_input.return_value = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[{"code": "S11", "name": "Self-Harm"}],
latency_ms=50.0
)
mock_filter.get_crisis_response.return_value = "Crisis response"
mock_get.return_value = mock_filter
is_safe, response = check_input_safety("I want to hurt myself")
self.assertFalse(is_safe)
self.assertEqual(response, "Crisis response")
if __name__ == "__main__":
unittest.main()