Compare commits
2 Commits
fix/132
...
burn/123-1
| Author | SHA1 | Date | |
|---|---|---|---|
| b9f66410ef | |||
| 69dc695e73 |
@@ -105,12 +105,6 @@ MEDIUM_INDICATORS = [
|
||||
r"\bno\s+tomorrow\b",
|
||||
# Contextual versions (from crisis_detector.py legacy)
|
||||
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
|
||||
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
|
||||
r"\bfeel(?:s|ing)?\s+trapped\b",
|
||||
r"\bfeel(?:s|ing)?\s+desperate\b",
|
||||
r"\bno\s+future\s+(?:for\s+me|ahead|left)\b",
|
||||
r"\bnothing\s+left\s+(?:to\s+(?:live|hope)\s+for|inside)\b",
|
||||
r"\bgive(?:n)?\s*up\s+on\s+myself\b",
|
||||
]
|
||||
|
||||
LOW_INDICATORS = [
|
||||
|
||||
@@ -1,155 +0,0 @@
|
||||
"""
|
||||
Image Content Screening for Self-Harm Indicators
|
||||
|
||||
Screens images sent in chat for self-harm content using vision models.
|
||||
Privacy-preserving: analyzes in-memory, discards after scoring. No image retention.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger("the-door.image_screening")
|
||||
|
||||
|
||||
class RiskLevel(str, Enum):
|
||||
SAFE = "safe"
|
||||
CONCERNING = "concerning"
|
||||
CRITICAL = "critical"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImageScreenResult:
|
||||
risk_level: RiskLevel
|
||||
confidence: float
|
||||
categories: list
|
||||
reasoning: str
|
||||
model_used: str
|
||||
|
||||
@property
|
||||
def needs_crisis_response(self):
|
||||
return self.risk_level == RiskLevel.CRITICAL
|
||||
|
||||
@property
|
||||
def needs_followup(self):
|
||||
return self.risk_level in (RiskLevel.CONCERNING, RiskLevel.CRITICAL)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"risk_level": self.risk_level.value,
|
||||
"confidence": round(self.confidence, 3),
|
||||
"categories": self.categories,
|
||||
"reasoning": self.reasoning,
|
||||
"model_used": self.model_used,
|
||||
}
|
||||
|
||||
|
||||
SCREENING_PROMPT = """Analyze this image for indicators of self-harm or crisis. Look for:
|
||||
|
||||
1. Self-harm wounds or injuries
|
||||
2. Medication/pills in concerning quantities
|
||||
3. Farewell-type imagery (empty rooms, items laid out, goodbye notes)
|
||||
4. Screenshots of crisis-related searches
|
||||
5. Images conveying hopelessness, isolation, or despair
|
||||
|
||||
Respond in JSON format ONLY:
|
||||
{"risk_level": "safe"|"concerning"|"critical", "confidence": 0.0-1.0, "categories": [], "reasoning": ""}
|
||||
|
||||
CRITICAL: imminent self-harm indicators
|
||||
CONCERNING: ambiguous but worrying
|
||||
SAFE: no indicators detected"""
|
||||
|
||||
|
||||
def _analyze_with_ollama(image_b64, model="gemma3:4b"):
|
||||
try:
|
||||
import urllib.request
|
||||
payload = json.dumps({
|
||||
"model": model,
|
||||
"messages": [{
|
||||
"role": "user",
|
||||
"content": SCREENING_PROMPT,
|
||||
"images": [image_b64],
|
||||
}],
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1},
|
||||
}).encode()
|
||||
req = urllib.request.Request(
|
||||
"http://localhost:11434/api/chat",
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
resp = urllib.request.urlopen(req, timeout=30)
|
||||
data = json.loads(resp.read())
|
||||
content = data.get("message", {}).get("content", "")
|
||||
json_start = content.find("{")
|
||||
json_end = content.rfind("}") + 1
|
||||
if json_start == -1 or json_end <= json_start:
|
||||
return None
|
||||
result = json.loads(content[json_start:json_end])
|
||||
return ImageScreenResult(
|
||||
risk_level=RiskLevel(result.get("risk_level", "safe")),
|
||||
confidence=float(result.get("confidence", 0.5)),
|
||||
categories=result.get("categories", []),
|
||||
reasoning=result.get("reasoning", ""),
|
||||
model_used=f"ollama:{model}",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Ollama vision analysis failed: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _analyze_fallback(image_bytes):
|
||||
return ImageScreenResult(
|
||||
risk_level=RiskLevel.SAFE,
|
||||
confidence=0.2,
|
||||
categories=["unanalyzed"],
|
||||
reasoning="No vision model available. Defaulting to safe with low confidence.",
|
||||
model_used="fallback:heuristic",
|
||||
)
|
||||
|
||||
|
||||
def screen_image(image_data, use_vision_model=True, model="gemma3:4b"):
|
||||
"""Screen image for self-harm indicators. Analyzes in-memory, no retention."""
|
||||
if isinstance(image_data, bytes):
|
||||
image_b64 = base64.b64encode(image_data).decode()
|
||||
else:
|
||||
image_b64 = image_data
|
||||
image_data = base64.b64decode(image_b64)
|
||||
|
||||
if use_vision_model:
|
||||
result = _analyze_with_ollama(image_b64, model)
|
||||
if result:
|
||||
logger.info(f"Image screened: {result.risk_level.value} (conf: {result.confidence:.2f})")
|
||||
if result.needs_crisis_response:
|
||||
logger.warning(f"CRITICAL image: {result.reasoning}")
|
||||
return result
|
||||
|
||||
return _analyze_fallback(image_data)
|
||||
|
||||
|
||||
def handle_chat_image(image_data):
|
||||
"""Handle image from chat. Returns action dict for gateway."""
|
||||
result = screen_image(image_data)
|
||||
action = {
|
||||
"result": result.to_dict(),
|
||||
"show_crisis_overlay": result.needs_crisis_response,
|
||||
"log_event": result.needs_followup,
|
||||
"response_text": None,
|
||||
}
|
||||
if result.risk_level == RiskLevel.CRITICAL:
|
||||
action["response_text"] = (
|
||||
"I noticed something concerning in the image you shared. "
|
||||
"If you or someone you know is in crisis, please reach out: "
|
||||
"988 Suicide and Crisis Lifeline (call or text 988). "
|
||||
"You are not alone."
|
||||
)
|
||||
elif result.risk_level == RiskLevel.CONCERNING:
|
||||
action["response_text"] = (
|
||||
"I want to check in \u2014 how are you doing? "
|
||||
"If you need to talk to someone, the 988 Lifeline is available 24/7."
|
||||
)
|
||||
return action
|
||||
104
tests/test_crisis_indicator_dedup.py
Normal file
104
tests/test_crisis_indicator_dedup.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import re
|
||||
import unittest
|
||||
|
||||
from crisis.detect import (
|
||||
CRITICAL_INDICATORS,
|
||||
HIGH_INDICATORS,
|
||||
MEDIUM_INDICATORS,
|
||||
LOW_INDICATORS,
|
||||
detect_crisis,
|
||||
scan,
|
||||
)
|
||||
|
||||
|
||||
class TestNoDuplicatePatternsAcrossTiers(unittest.TestCase):
|
||||
"""Verify no regex pattern appears in more than one tier (issue #123)."""
|
||||
|
||||
def test_high_and_medium_no_overlap(self):
|
||||
"""Patterns in HIGH_INDICATORS must not appear in MEDIUM_INDICATORS."""
|
||||
high_set = set(HIGH_INDICATORS)
|
||||
medium_set = set(MEDIUM_INDICATORS)
|
||||
overlap = high_set & medium_set
|
||||
self.assertEqual(
|
||||
overlap, set(),
|
||||
f"Found {len(overlap)} duplicate patterns between HIGH and MEDIUM: {overlap}",
|
||||
)
|
||||
|
||||
def test_critical_and_high_no_overlap(self):
|
||||
"""CRITICAL and HIGH should not share patterns."""
|
||||
overlap = set(CRITICAL_INDICATORS) & set(HIGH_INDICATORS)
|
||||
self.assertEqual(
|
||||
overlap, set(),
|
||||
f"Found {len(overlap)} duplicates between CRITICAL and HIGH: {overlap}",
|
||||
)
|
||||
|
||||
def test_medium_and_low_no_overlap(self):
|
||||
"""MEDIUM and LOW should not share patterns."""
|
||||
overlap = set(MEDIUM_INDICATORS) & set(LOW_INDICATORS)
|
||||
self.assertEqual(
|
||||
overlap, set(),
|
||||
f"Found {len(overlap)} duplicates between MEDIUM and LOW: {overlap}",
|
||||
)
|
||||
|
||||
def test_no_pattern_in_three_tiers(self):
|
||||
"""No pattern should appear in more than one tier."""
|
||||
all_tiers = {
|
||||
"CRITICAL": set(CRITICAL_INDICATORS),
|
||||
"HIGH": set(HIGH_INDICATORS),
|
||||
"MEDIUM": set(MEDIUM_INDICATORS),
|
||||
"LOW": set(LOW_INDICATORS),
|
||||
}
|
||||
violations = []
|
||||
tier_names = list(all_tiers.keys())
|
||||
for i, t1 in enumerate(tier_names):
|
||||
for t2 in tier_names[i + 1:]:
|
||||
overlap = all_tiers[t1] & all_tiers[t2]
|
||||
for p in overlap:
|
||||
violations.append(f"{t1} & {t2}: {p}")
|
||||
self.assertEqual(
|
||||
violations, [],
|
||||
f"Found {len(violations)} cross-tier duplicates:\n" + "\n".join(violations[:10]),
|
||||
)
|
||||
|
||||
|
||||
class TestRemovedPatternsStillDetected(unittest.TestCase):
|
||||
"""Patterns removed from MEDIUM should still be caught by HIGH tier."""
|
||||
|
||||
def test_feel_hopeless_detected(self):
|
||||
result = detect_crisis("I feel so hopeless about everything")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
def test_feel_trapped_detected(self):
|
||||
result = detect_crisis("I feel trapped and can\'t escape")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
def test_feel_desperate_detected(self):
|
||||
result = detect_crisis("I feel desperate for help")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
def test_no_future_detected(self):
|
||||
result = detect_crisis("There is no future ahead for me")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
def test_nothing_left_detected(self):
|
||||
result = detect_crisis("There is nothing left inside")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
def test_give_up_on_myself_detected(self):
|
||||
result = detect_crisis("I have given up on myself")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
|
||||
class TestWorthlessPatternRemainsInMedium(unittest.TestCase):
|
||||
"""The 'feel worthless' pattern should remain in MEDIUM (not a duplicate)."""
|
||||
|
||||
def test_feel_worthless_in_medium(self):
|
||||
self.assertIn(r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b", MEDIUM_INDICATORS)
|
||||
|
||||
def test_feel_worthless_detected(self):
|
||||
result = detect_crisis("I feel so worthless")
|
||||
self.assertIn(result.level, ("MEDIUM", "LOW", "HIGH"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,84 +0,0 @@
|
||||
"""Tests for image content screening module."""
|
||||
|
||||
import json
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from image_screening import (
|
||||
RiskLevel,
|
||||
ImageScreenResult,
|
||||
screen_image,
|
||||
handle_chat_image,
|
||||
_analyze_fallback,
|
||||
)
|
||||
|
||||
|
||||
class TestImageScreenResult:
|
||||
def test_safe_result(self):
|
||||
result = ImageScreenResult(
|
||||
risk_level=RiskLevel.SAFE, confidence=0.95,
|
||||
categories=[], reasoning="No indicators", model_used="test"
|
||||
)
|
||||
assert not result.needs_crisis_response
|
||||
assert not result.needs_followup
|
||||
assert result.to_dict()["risk_level"] == "safe"
|
||||
|
||||
def test_critical_result(self):
|
||||
result = ImageScreenResult(
|
||||
risk_level=RiskLevel.CRITICAL, confidence=0.9,
|
||||
categories=["wounds"], reasoning="Detected", model_used="test"
|
||||
)
|
||||
assert result.needs_crisis_response
|
||||
assert result.needs_followup
|
||||
|
||||
def test_concerning_result(self):
|
||||
result = ImageScreenResult(
|
||||
risk_level=RiskLevel.CONCERNING, confidence=0.6,
|
||||
categories=["isolation"], reasoning="Ambiguous", model_used="test"
|
||||
)
|
||||
assert not result.needs_crisis_response
|
||||
assert result.needs_followup
|
||||
|
||||
|
||||
class TestScreenImage:
|
||||
def test_fallback_returns_safe(self):
|
||||
result = screen_image(b"fake_image_data", use_vision_model=False)
|
||||
assert result.risk_level == RiskLevel.SAFE
|
||||
assert result.model_used == "fallback:heuristic"
|
||||
assert result.confidence < 0.5
|
||||
|
||||
def test_base64_input(self):
|
||||
import base64
|
||||
b64 = base64.b64encode(b"fake").decode()
|
||||
result = screen_image(b64, use_vision_model=False)
|
||||
assert result.risk_level == RiskLevel.SAFE
|
||||
|
||||
|
||||
class TestHandleChatImage:
|
||||
def test_safe_image_no_overlay(self):
|
||||
action = handle_chat_image(b"safe_image")
|
||||
assert not action["show_crisis_overlay"]
|
||||
assert action["response_text"] is None
|
||||
|
||||
@patch("image_screening._analyze_with_ollama")
|
||||
def test_critical_image_shows_overlay(self, mock_ollama):
|
||||
mock_ollama.return_value = ImageScreenResult(
|
||||
risk_level=RiskLevel.CRITICAL, confidence=0.95,
|
||||
categories=["wounds"], reasoning="Self-harm detected",
|
||||
model_used="ollama:gemma3:4b"
|
||||
)
|
||||
action = handle_chat_image(b"concerning_image")
|
||||
assert action["show_crisis_overlay"]
|
||||
assert "988" in action["response_text"]
|
||||
assert action["log_event"]
|
||||
|
||||
@patch("image_screening._analyze_with_ollama")
|
||||
def test_concerning_image_followup(self, mock_ollama):
|
||||
mock_ollama.return_value = ImageScreenResult(
|
||||
risk_level=RiskLevel.CONCERNING, confidence=0.6,
|
||||
categories=["isolation"], reasoning="Empty room",
|
||||
model_used="ollama:gemma3:4b"
|
||||
)
|
||||
action = handle_chat_image(b"maybe_concerning")
|
||||
assert not action["show_crisis_overlay"]
|
||||
assert action["log_event"]
|
||||
assert "check in" in action["response_text"]
|
||||
Reference in New Issue
Block a user