Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
0ab2626ef2 feat: image content screening for self-harm indicators (closes #132)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 10s
2026-04-15 12:02:22 -04:00
4 changed files with 239 additions and 472 deletions

View File

@@ -1,336 +0,0 @@
"""Behavioral Pattern Detection — crisis risk from usage patterns.
Detects crisis signals from HOW someone uses the system, not just
what they say. Complements content-based crisis detection.
Behavioral signals:
- Frequency spike (anxiety/agitation)
- Frequency drop (withdrawal/isolation)
- Late-night messaging (2-5 AM)
- Session length increase (loneliness)
- Abrupt termination after emotional content
- Return after long absence
Part of Epic #102 (Multimodal Crisis Detection).
"""
import logging
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class SessionEvent:
"""A single session interaction."""
session_id: str
timestamp: float
message_length: int
is_user: bool = True
emotional_content: bool = False
terminated_abruptly: bool = False
@dataclass
class BehavioralSignals:
"""Detected behavioral risk signals."""
frequency_change: float = 0.0 # -1 (drop) to +1 (spike) vs baseline
is_late_night: bool = False # 2-5 AM local time
session_length_trend: str = "stable" # increasing/decreasing/stable
withdrawal_detected: bool = False # significant drop in activity
return_after_absence: bool = False # came back after extended absence
abrupt_termination: bool = False # left after emotional content
behavioral_score: float = 0.0 # 0-1 aggregate risk
def to_dict(self) -> Dict[str, Any]:
return {
"frequency_change": self.frequency_change,
"is_late_night": self.is_late_night,
"session_length_trend": self.session_length_trend,
"withdrawal_detected": self.withdrawal_detected,
"return_after_absence": self.return_after_absence,
"abrupt_termination": self.abrupt_termination,
"behavioral_score": self.behavioral_score,
}
class BehavioralTracker:
"""Track behavioral patterns per session/user and detect risk signals.
Uses a 7-day rolling window for baseline calculation.
Thread-safe for concurrent session tracking.
"""
# Time window constants
LATE_NIGHT_START = 2 # 2 AM
LATE_NIGHT_END = 5 # 5 AM
BASELINE_WINDOW_DAYS = 7
ABSENCE_THRESHOLD_HOURS = 48
SPIKE_THRESHOLD = 2.0 # 2x baseline = spike
DROP_THRESHOLD = 0.3 # 30% of baseline = withdrawal
def __init__(self):
# session_id -> list of events
self._events: Dict[str, List[SessionEvent]] = defaultdict(list)
# session_id -> baseline metrics
self._baselines: Dict[str, Dict[str, float]] = {}
# session_id -> last activity timestamp
self._last_activity: Dict[str, float] = {}
# Global baseline (all sessions)
self._global_baseline: Dict[str, float] = {
"avg_messages_per_hour": 5.0,
"avg_session_length_min": 15.0,
"avg_message_length": 200.0,
}
def record(
self,
session_id: str,
timestamp: Optional[float] = None,
message_length: int = 0,
is_user: bool = True,
emotional_content: bool = False,
terminated_abruptly: bool = False,
) -> None:
"""Record a session event."""
if timestamp is None:
timestamp = time.time()
event = SessionEvent(
session_id=session_id,
timestamp=timestamp,
message_length=message_length,
is_user=is_user,
emotional_content=emotional_content,
terminated_abruptly=terminated_abruptly,
)
self._events[session_id].append(event)
self._last_activity[session_id] = timestamp
# Periodically update baseline
if len(self._events[session_id]) % 20 == 0:
self._update_baseline(session_id)
def get_risk_signals(self, session_id: str) -> BehavioralSignals:
"""Analyze behavioral patterns and return risk signals."""
events = self._events.get(session_id, [])
if not events:
return BehavioralSignals()
signals = BehavioralSignals()
now = time.time()
# 1. Frequency analysis
signals.frequency_change = self._analyze_frequency(session_id, now)
# 2. Late-night detection
signals.is_late_night = self._is_late_night(events[-1].timestamp)
# 3. Session length trend
signals.session_length_trend = self._analyze_session_length_trend(session_id)
# 4. Withdrawal detection
signals.withdrawal_detected = signals.frequency_change < -0.5
# 5. Return after absence
signals.return_after_absence = self._detect_return_after_absence(session_id, now)
# 6. Abrupt termination
signals.abrupt_termination = self._detect_abrupt_termination(events)
# 7. Aggregate behavioral score
signals.behavioral_score = self._compute_behavioral_score(signals)
return signals
def _analyze_frequency(self, session_id: str, now: float) -> float:
"""Compare recent frequency to baseline. Returns -1 to +1."""
events = self._events.get(session_id, [])
if len(events) < 3:
return 0.0
# Count messages in last hour
one_hour_ago = now - 3600
recent_count = sum(1 for e in events if e.timestamp > one_hour_ago and e.is_user)
# Get baseline
baseline = self._get_baseline(session_id)
baseline_rate = baseline.get("avg_messages_per_hour", 5.0)
if baseline_rate <= 0:
return 0.0
ratio = recent_count / baseline_rate
# Map to -1..+1: 0.5x = -0.5, 1x = 0, 2x = +1
if ratio < 1.0:
return max(-1.0, (ratio - 1.0))
else:
return min(1.0, (ratio - 1.0) / 2.0)
def _is_late_night(self, timestamp: float) -> bool:
"""Check if timestamp falls in the 2-5 AM window."""
dt = datetime.fromtimestamp(timestamp)
hour = dt.hour
return self.LATE_NIGHT_START <= hour < self.LATE_NIGHT_END
def _analyze_session_length_trend(self, session_id: str) -> str:
"""Determine if session lengths are increasing, decreasing, or stable."""
events = self._events.get(session_id, [])
if len(events) < 10:
return "stable"
# Split events into first half and second half
mid = len(events) // 2
first_half = events[:mid]
second_half = events[mid:]
# Calculate session spans (first to last message)
if not first_half or not second_half:
return "stable"
first_span = first_half[-1].timestamp - first_half[0].timestamp
second_span = second_half[-1].timestamp - second_half[0].timestamp
if second_span > first_span * 1.3:
return "increasing"
elif second_span < first_span * 0.7:
return "decreasing"
return "stable"
def _detect_return_after_absence(self, session_id: str, now: float) -> bool:
"""Detect if this session started after a long absence."""
events = self._events.get(session_id, [])
if len(events) < 2:
return False
# Check gap between current session and previous
last_activity = self._last_activity.get(session_id, 0)
if last_activity <= 0:
return False
# Find previous session's last event (excluding current session)
prev_events = [e for e in events[:-10] if e.timestamp < now - 3600]
if not prev_events:
return False
gap_hours = (now - prev_events[-1].timestamp) / 3600
return gap_hours >= self.ABSENCE_THRESHOLD_HOURS
def _detect_abrupt_termination(self, events: List[SessionEvent]) -> bool:
"""Detect if the last few messages had emotional content and then stopped."""
if len(events) < 3:
return False
# Check last 5 events
recent = events[-5:]
has_emotional = any(e.emotional_content for e in recent)
last_was_user = recent[-1].is_user if recent else False
last_was_short = recent[-1].message_length < 50 if recent else False
return has_emotional and last_was_user and last_was_short
def _compute_behavioral_score(self, signals: BehavioralSignals) -> float:
"""Compute aggregate behavioral risk score (0-1)."""
score = 0.0
weights = {
"frequency_spike": 0.15,
"frequency_drop": 0.20,
"late_night": 0.10,
"session_increasing": 0.10,
"withdrawal": 0.20,
"return_after_absence": 0.05,
"abrupt_termination": 0.20,
}
# Frequency spike (anxiety)
if signals.frequency_change > 0.5:
score += weights["frequency_spike"] * min(signals.frequency_change, 1.0)
# Frequency drop (withdrawal)
if signals.frequency_change < -0.3:
score += weights["frequency_drop"] * min(abs(signals.frequency_change), 1.0)
# Late night
if signals.is_late_night:
score += weights["late_night"]
# Session length increasing
if signals.session_length_trend == "increasing":
score += weights["session_increasing"]
# Withdrawal
if signals.withdrawal_detected:
score += weights["withdrawal"]
# Return after absence
if signals.return_after_absence:
score += weights["return_after_absence"]
# Abrupt termination
if signals.abrupt_termination:
score += weights["abrupt_termination"]
return min(1.0, score)
def _get_baseline(self, session_id: str) -> Dict[str, float]:
"""Get baseline metrics for a session."""
if session_id in self._baselines:
return self._baselines[session_id]
return self._global_baseline
def _update_baseline(self, session_id: str) -> None:
"""Update rolling baseline from recent events."""
events = self._events.get(session_id, [])
if len(events) < 5:
return
# Use last 7 days of events
cutoff = time.time() - (self.BASELINE_WINDOW_DAYS * 86400)
recent = [e for e in events if e.timestamp > cutoff and e.is_user]
if not recent:
return
# Calculate metrics
time_span_hours = (recent[-1].timestamp - recent[0].timestamp) / 3600
if time_span_hours > 0:
msgs_per_hour = len(recent) / time_span_hours
else:
msgs_per_hour = len(recent)
avg_length = sum(e.message_length for e in recent) / len(recent)
self._baselines[session_id] = {
"avg_messages_per_hour": msgs_per_hour,
"avg_message_length": avg_length,
"total_messages": len(recent),
}
# Global singleton for convenience
_global_tracker = BehavioralTracker()
def record_event(
session_id: str,
timestamp: Optional[float] = None,
message_length: int = 0,
is_user: bool = True,
emotional_content: bool = False,
terminated_abruptly: bool = False,
) -> None:
"""Record an event to the global behavioral tracker."""
_global_tracker.record(
session_id, timestamp, message_length,
is_user, emotional_content, terminated_abruptly,
)
def get_risk_signals(session_id: str) -> BehavioralSignals:
"""Get behavioral risk signals for a session."""
return _global_tracker.get_risk_signals(session_id)

155
image_screening.py Normal file
View File

@@ -0,0 +1,155 @@
"""
Image Content Screening for Self-Harm Indicators
Screens images sent in chat for self-harm content using vision models.
Privacy-preserving: analyzes in-memory, discards after scoring. No image retention.
"""
import base64
import json
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional
logger = logging.getLogger("the-door.image_screening")
class RiskLevel(str, Enum):
SAFE = "safe"
CONCERNING = "concerning"
CRITICAL = "critical"
@dataclass
class ImageScreenResult:
risk_level: RiskLevel
confidence: float
categories: list
reasoning: str
model_used: str
@property
def needs_crisis_response(self):
return self.risk_level == RiskLevel.CRITICAL
@property
def needs_followup(self):
return self.risk_level in (RiskLevel.CONCERNING, RiskLevel.CRITICAL)
def to_dict(self):
return {
"risk_level": self.risk_level.value,
"confidence": round(self.confidence, 3),
"categories": self.categories,
"reasoning": self.reasoning,
"model_used": self.model_used,
}
SCREENING_PROMPT = """Analyze this image for indicators of self-harm or crisis. Look for:
1. Self-harm wounds or injuries
2. Medication/pills in concerning quantities
3. Farewell-type imagery (empty rooms, items laid out, goodbye notes)
4. Screenshots of crisis-related searches
5. Images conveying hopelessness, isolation, or despair
Respond in JSON format ONLY:
{"risk_level": "safe"|"concerning"|"critical", "confidence": 0.0-1.0, "categories": [], "reasoning": ""}
CRITICAL: imminent self-harm indicators
CONCERNING: ambiguous but worrying
SAFE: no indicators detected"""
def _analyze_with_ollama(image_b64, model="gemma3:4b"):
try:
import urllib.request
payload = json.dumps({
"model": model,
"messages": [{
"role": "user",
"content": SCREENING_PROMPT,
"images": [image_b64],
}],
"stream": False,
"options": {"temperature": 0.1},
}).encode()
req = urllib.request.Request(
"http://localhost:11434/api/chat",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
resp = urllib.request.urlopen(req, timeout=30)
data = json.loads(resp.read())
content = data.get("message", {}).get("content", "")
json_start = content.find("{")
json_end = content.rfind("}") + 1
if json_start == -1 or json_end <= json_start:
return None
result = json.loads(content[json_start:json_end])
return ImageScreenResult(
risk_level=RiskLevel(result.get("risk_level", "safe")),
confidence=float(result.get("confidence", 0.5)),
categories=result.get("categories", []),
reasoning=result.get("reasoning", ""),
model_used=f"ollama:{model}",
)
except Exception as e:
logger.warning(f"Ollama vision analysis failed: {e}")
return None
def _analyze_fallback(image_bytes):
return ImageScreenResult(
risk_level=RiskLevel.SAFE,
confidence=0.2,
categories=["unanalyzed"],
reasoning="No vision model available. Defaulting to safe with low confidence.",
model_used="fallback:heuristic",
)
def screen_image(image_data, use_vision_model=True, model="gemma3:4b"):
"""Screen image for self-harm indicators. Analyzes in-memory, no retention."""
if isinstance(image_data, bytes):
image_b64 = base64.b64encode(image_data).decode()
else:
image_b64 = image_data
image_data = base64.b64decode(image_b64)
if use_vision_model:
result = _analyze_with_ollama(image_b64, model)
if result:
logger.info(f"Image screened: {result.risk_level.value} (conf: {result.confidence:.2f})")
if result.needs_crisis_response:
logger.warning(f"CRITICAL image: {result.reasoning}")
return result
return _analyze_fallback(image_data)
def handle_chat_image(image_data):
"""Handle image from chat. Returns action dict for gateway."""
result = screen_image(image_data)
action = {
"result": result.to_dict(),
"show_crisis_overlay": result.needs_crisis_response,
"log_event": result.needs_followup,
"response_text": None,
}
if result.risk_level == RiskLevel.CRITICAL:
action["response_text"] = (
"I noticed something concerning in the image you shared. "
"If you or someone you know is in crisis, please reach out: "
"988 Suicide and Crisis Lifeline (call or text 988). "
"You are not alone."
)
elif result.risk_level == RiskLevel.CONCERNING:
action["response_text"] = (
"I want to check in \u2014 how are you doing? "
"If you need to talk to someone, the 988 Lifeline is available 24/7."
)
return action

View File

@@ -1,136 +0,0 @@
"""Tests for behavioral pattern detection."""
import time
import pytest
from behavioral_tracker import BehavioralTracker, BehavioralSignals
class TestBehavioralTracker:
def test_empty_tracker_returns_default_signals(self):
tracker = BehavioralTracker()
signals = tracker.get_risk_signals("session-1")
assert signals.behavioral_score == 0.0
assert not signals.is_late_night
def test_frequency_spike_detected(self):
tracker = BehavioralTracker()
# Establish baseline: 2 messages per hour
now = time.time()
for i in range(10):
tracker.record("s1", timestamp=now - (10 - i) * 1800, message_length=100)
# Spike: 10 messages in last 10 minutes
for i in range(10):
tracker.record("s1", timestamp=now - (10 - i) * 60, message_length=50)
signals = tracker.get_risk_signals("s1")
assert signals.frequency_change > 0.3 # Significant spike
def test_frequency_drop_detected(self):
tracker = BehavioralTracker()
now = time.time()
# Baseline: heavy usage
for i in range(50):
tracker.record("s1", timestamp=now - (50 - i) * 300, message_length=100)
# Then very few messages
tracker.record("s1", timestamp=now - 60, message_length=50)
signals = tracker.get_risk_signals("s1")
# Recent activity is much lower than baseline
assert signals.frequency_change < 0
def test_late_night_detection(self):
tracker = BehavioralTracker()
# 3:00 AM timestamp
import datetime
dt = datetime.datetime(2026, 4, 14, 3, 0, 0)
ts = dt.timestamp()
tracker.record("s1", timestamp=ts, message_length=100)
signals = tracker.get_risk_signals("s1")
assert signals.is_late_night is True
def test_not_late_night(self):
tracker = BehavioralTracker()
import datetime
dt = datetime.datetime(2026, 4, 14, 14, 0, 0) # 2 PM
ts = dt.timestamp()
tracker.record("s1", timestamp=ts, message_length=100)
signals = tracker.get_risk_signals("s1")
assert signals.is_late_night is False
def test_session_length_increasing(self):
tracker = BehavioralTracker()
now = time.time()
# First half: messages spread over 5 minutes
for i in range(10):
tracker.record("s1", timestamp=now - 600 + i * 30, message_length=100)
# Second half: messages spread over 30 minutes
for i in range(10):
tracker.record("s1", timestamp=now - 300 + i * 180, message_length=100)
signals = tracker.get_risk_signals("s1")
assert signals.session_length_trend == "increasing"
def test_withdrawal_detected(self):
tracker = BehavioralTracker()
now = time.time()
# High baseline
for i in range(50):
tracker.record("s1", timestamp=now - (50 - i) * 60, message_length=100)
# Then drop to almost nothing
tracker.record("s1", timestamp=now - 60, message_length=20)
signals = tracker.get_risk_signals("s1")
assert signals.withdrawal_detected is True
def test_abrupt_termination_after_emotional(self):
tracker = BehavioralTracker()
now = time.time()
# Normal messages
for i in range(5):
tracker.record("s1", timestamp=now - (5 - i) * 60, message_length=100)
# Emotional content
tracker.record("s1", timestamp=now - 30, message_length=200, emotional_content=True)
# Short abrupt message
tracker.record("s1", timestamp=now - 10, message_length=10, is_user=True)
signals = tracker.get_risk_signals("s1")
assert signals.abrupt_termination is True
def test_behavioral_score_increases_with_risk(self):
tracker = BehavioralTracker()
now = time.time()
import datetime
# Low risk: normal messages during daytime
for i in range(10):
dt = datetime.datetime(2026, 4, 14, 14, i, 0) # 2 PM
tracker.record("s1", timestamp=dt.timestamp(), message_length=100)
low_risk = tracker.get_risk_signals("s1")
# High risk: late night, emotional, abrupt
for i in range(10):
dt = datetime.datetime(2026, 4, 14, 3, i, 0) # 3 AM
tracker.record("s2", timestamp=dt.timestamp(), message_length=100, emotional_content=True)
tracker.record("s2", timestamp=datetime.datetime(2026, 4, 14, 3, 10, 0).timestamp(),
message_length=10, is_user=True)
high_risk = tracker.get_risk_signals("s2")
assert high_risk.behavioral_score > low_risk.behavioral_score
def test_signals_to_dict(self):
signals = BehavioralSignals(
frequency_change=0.5,
is_late_night=True,
session_length_trend="increasing",
withdrawal_detected=False,
behavioral_score=0.4,
)
d = signals.to_dict()
assert d["frequency_change"] == 0.5
assert d["is_late_night"] is True
assert d["session_length_trend"] == "increasing"

View File

@@ -0,0 +1,84 @@
"""Tests for image content screening module."""
import json
from unittest.mock import patch, MagicMock
from image_screening import (
RiskLevel,
ImageScreenResult,
screen_image,
handle_chat_image,
_analyze_fallback,
)
class TestImageScreenResult:
def test_safe_result(self):
result = ImageScreenResult(
risk_level=RiskLevel.SAFE, confidence=0.95,
categories=[], reasoning="No indicators", model_used="test"
)
assert not result.needs_crisis_response
assert not result.needs_followup
assert result.to_dict()["risk_level"] == "safe"
def test_critical_result(self):
result = ImageScreenResult(
risk_level=RiskLevel.CRITICAL, confidence=0.9,
categories=["wounds"], reasoning="Detected", model_used="test"
)
assert result.needs_crisis_response
assert result.needs_followup
def test_concerning_result(self):
result = ImageScreenResult(
risk_level=RiskLevel.CONCERNING, confidence=0.6,
categories=["isolation"], reasoning="Ambiguous", model_used="test"
)
assert not result.needs_crisis_response
assert result.needs_followup
class TestScreenImage:
def test_fallback_returns_safe(self):
result = screen_image(b"fake_image_data", use_vision_model=False)
assert result.risk_level == RiskLevel.SAFE
assert result.model_used == "fallback:heuristic"
assert result.confidence < 0.5
def test_base64_input(self):
import base64
b64 = base64.b64encode(b"fake").decode()
result = screen_image(b64, use_vision_model=False)
assert result.risk_level == RiskLevel.SAFE
class TestHandleChatImage:
def test_safe_image_no_overlay(self):
action = handle_chat_image(b"safe_image")
assert not action["show_crisis_overlay"]
assert action["response_text"] is None
@patch("image_screening._analyze_with_ollama")
def test_critical_image_shows_overlay(self, mock_ollama):
mock_ollama.return_value = ImageScreenResult(
risk_level=RiskLevel.CRITICAL, confidence=0.95,
categories=["wounds"], reasoning="Self-harm detected",
model_used="ollama:gemma3:4b"
)
action = handle_chat_image(b"concerning_image")
assert action["show_crisis_overlay"]
assert "988" in action["response_text"]
assert action["log_event"]
@patch("image_screening._analyze_with_ollama")
def test_concerning_image_followup(self, mock_ollama):
mock_ollama.return_value = ImageScreenResult(
risk_level=RiskLevel.CONCERNING, confidence=0.6,
categories=["isolation"], reasoning="Empty room",
model_used="ollama:gemma3:4b"
)
action = handle_chat_image(b"maybe_concerning")
assert not action["show_crisis_overlay"]
assert action["log_event"]
assert "check in" in action["response_text"]