Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
0ab2626ef2 feat: image content screening for self-harm indicators (closes #132)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 10s
2026-04-15 12:02:22 -04:00
4 changed files with 239 additions and 208 deletions

View File

@@ -46,12 +46,3 @@ ssl:
service:
ssh root@$(VPS) "cd /opt/the-door && bash deploy/deploy.sh --service"
# Crisis metrics
.PHONY: metrics metrics-json
metrics: ## Show crisis metrics summary (last 7 days)
python3 -m crisis.metrics --summary
metrics-json: ## Export crisis metrics as JSON
python3 -m crisis.metrics --json

View File

@@ -1,199 +0,0 @@
"""Crisis metrics — aggregate detection data for operators.
Tracks crisis detection events and provides summary reports.
Usage:
python3 -m crisis.metrics --summary # weekly report
python3 -m crisis.metrics --json # raw JSON export
python3 -m crisis.metrics --last 7d # last 7 days
"""
from __future__ import annotations
import json
import os
import sys
import time
from collections import Counter
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional
# Data directory for metrics storage
_DATA_DIR = Path(os.getenv("CRISIS_DATA_DIR", str(Path.home() / ".the-door")))
_METRICS_FILE = _DATA_DIR / "crisis-metrics.jsonl"
@dataclass
class CrisisEvent:
"""A single crisis detection event."""
timestamp: float
level: str # NONE, LOW, MODERATE, HIGH, CRITICAL
indicators: list
session_id: str = ""
source: str = "" # "chat", "gateway", "cli"
@dataclass
class MetricsSummary:
"""Aggregated metrics summary."""
period_days: int
total_events: int
by_level: Dict[str, int]
top_indicators: List[tuple]
sessions_affected: int
avg_daily: float
peak_day: str
peak_count: int
generated_at: str
def log_event(event: CrisisEvent) -> None:
"""Log a crisis event to the metrics file."""
_DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(_METRICS_FILE, "a") as f:
f.write(json.dumps(asdict(event)) + "\n")
def load_events(days: int = 7) -> List[CrisisEvent]:
"""Load crisis events from the last N days."""
if not _METRICS_FILE.exists():
return []
cutoff = time.time() - (days * 86400)
events = []
try:
with open(_METRICS_FILE) as f:
for line in f:
line = line.strip()
if not line:
continue
data = json.loads(line)
if data.get("timestamp", 0) >= cutoff:
events.append(CrisisEvent(**data))
except (json.JSONDecodeError, KeyError):
pass
return events
def compute_summary(days: int = 7) -> MetricsSummary:
"""Compute metrics summary for the given period."""
events = load_events(days)
now = time.time()
# By level
by_level = Counter(e.level for e in events)
# Top indicators
indicator_counts = Counter()
for e in events:
for ind in e.indicators:
indicator_counts[ind] += 1
top_indicators = indicator_counts.most_common(10)
# Sessions
sessions = set(e.session_id for e in events if e.session_id)
# Peak day
from collections import defaultdict
daily = defaultdict(int)
for e in events:
day = time.strftime("%Y-%m-%d", time.localtime(e.timestamp))
daily[day] += 1
peak_day = max(daily, key=daily.get) if daily else "N/A"
peak_count = daily.get(peak_day, 0)
return MetricsSummary(
period_days=days,
total_events=len(events),
by_level=dict(by_level),
top_indicators=top_indicators,
sessions_affected=len(sessions),
avg_daily=round(len(events) / max(days, 1), 1),
peak_day=peak_day,
peak_count=peak_count,
generated_at=time.strftime("%Y-%m-%d %H:%M:%S"),
)
def format_summary(summary: MetricsSummary) -> str:
"""Format metrics summary as human-readable report."""
lines = [
"Crisis Metrics Summary",
"=" * 40,
f"Period: Last {summary.period_days} days",
f"Generated: {summary.generated_at}",
"",
f"Total events: {summary.total_events}",
f"Daily avg: {summary.avg_daily}",
f"Sessions: {summary.sessions_affected}",
f"Peak day: {summary.peak_day} ({summary.peak_count} events)",
"",
]
if summary.by_level:
lines.append("By severity:")
for level in ["CRITICAL", "HIGH", "MODERATE", "LOW", "NONE"]:
count = summary.by_level.get(level, 0)
if count > 0:
bar = "" * min(count, 30)
lines.append(f" {level:10s} {count:4d} {bar}")
lines.append("")
if summary.top_indicators:
lines.append("Top indicators:")
for indicator, count in summary.top_indicators[:5]:
lines.append(f" {indicator}: {count}")
lines.append("")
if summary.total_events == 0:
lines.append("No crisis events in this period.")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Crisis metrics summary")
parser.add_argument("--summary", action="store_true", help="Print summary report")
parser.add_argument("--json", action="store_true", dest="as_json", help="Output JSON")
parser.add_argument("--last", default="7d", help="Time period (e.g., 7d, 30d)")
parser.add_argument("--log", nargs=2, metavar=("LEVEL", "INDICATOR"), help="Log a test event")
args = parser.parse_args()
# Parse period
period_str = args.last.rstrip("d")
try:
days = int(period_str)
except ValueError:
days = 7
# Log mode
if args.log:
level, indicator = args.log
event = CrisisEvent(
timestamp=time.time(),
level=level.upper(),
indicators=[indicator],
session_id="cli-test",
source="cli",
)
log_event(event)
print(f"Logged: {level.upper()} / {indicator}")
return 0
# Compute summary
summary = compute_summary(days)
if args.as_json:
print(json.dumps(asdict(summary), indent=2))
else:
print(format_summary(summary))
return 0
if __name__ == "__main__":
sys.exit(main())

155
image_screening.py Normal file
View File

@@ -0,0 +1,155 @@
"""
Image Content Screening for Self-Harm Indicators
Screens images sent in chat for self-harm content using vision models.
Privacy-preserving: analyzes in-memory, discards after scoring. No image retention.
"""
import base64
import json
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional
logger = logging.getLogger("the-door.image_screening")
class RiskLevel(str, Enum):
SAFE = "safe"
CONCERNING = "concerning"
CRITICAL = "critical"
@dataclass
class ImageScreenResult:
risk_level: RiskLevel
confidence: float
categories: list
reasoning: str
model_used: str
@property
def needs_crisis_response(self):
return self.risk_level == RiskLevel.CRITICAL
@property
def needs_followup(self):
return self.risk_level in (RiskLevel.CONCERNING, RiskLevel.CRITICAL)
def to_dict(self):
return {
"risk_level": self.risk_level.value,
"confidence": round(self.confidence, 3),
"categories": self.categories,
"reasoning": self.reasoning,
"model_used": self.model_used,
}
SCREENING_PROMPT = """Analyze this image for indicators of self-harm or crisis. Look for:
1. Self-harm wounds or injuries
2. Medication/pills in concerning quantities
3. Farewell-type imagery (empty rooms, items laid out, goodbye notes)
4. Screenshots of crisis-related searches
5. Images conveying hopelessness, isolation, or despair
Respond in JSON format ONLY:
{"risk_level": "safe"|"concerning"|"critical", "confidence": 0.0-1.0, "categories": [], "reasoning": ""}
CRITICAL: imminent self-harm indicators
CONCERNING: ambiguous but worrying
SAFE: no indicators detected"""
def _analyze_with_ollama(image_b64, model="gemma3:4b"):
try:
import urllib.request
payload = json.dumps({
"model": model,
"messages": [{
"role": "user",
"content": SCREENING_PROMPT,
"images": [image_b64],
}],
"stream": False,
"options": {"temperature": 0.1},
}).encode()
req = urllib.request.Request(
"http://localhost:11434/api/chat",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
resp = urllib.request.urlopen(req, timeout=30)
data = json.loads(resp.read())
content = data.get("message", {}).get("content", "")
json_start = content.find("{")
json_end = content.rfind("}") + 1
if json_start == -1 or json_end <= json_start:
return None
result = json.loads(content[json_start:json_end])
return ImageScreenResult(
risk_level=RiskLevel(result.get("risk_level", "safe")),
confidence=float(result.get("confidence", 0.5)),
categories=result.get("categories", []),
reasoning=result.get("reasoning", ""),
model_used=f"ollama:{model}",
)
except Exception as e:
logger.warning(f"Ollama vision analysis failed: {e}")
return None
def _analyze_fallback(image_bytes):
return ImageScreenResult(
risk_level=RiskLevel.SAFE,
confidence=0.2,
categories=["unanalyzed"],
reasoning="No vision model available. Defaulting to safe with low confidence.",
model_used="fallback:heuristic",
)
def screen_image(image_data, use_vision_model=True, model="gemma3:4b"):
"""Screen image for self-harm indicators. Analyzes in-memory, no retention."""
if isinstance(image_data, bytes):
image_b64 = base64.b64encode(image_data).decode()
else:
image_b64 = image_data
image_data = base64.b64decode(image_b64)
if use_vision_model:
result = _analyze_with_ollama(image_b64, model)
if result:
logger.info(f"Image screened: {result.risk_level.value} (conf: {result.confidence:.2f})")
if result.needs_crisis_response:
logger.warning(f"CRITICAL image: {result.reasoning}")
return result
return _analyze_fallback(image_data)
def handle_chat_image(image_data):
"""Handle image from chat. Returns action dict for gateway."""
result = screen_image(image_data)
action = {
"result": result.to_dict(),
"show_crisis_overlay": result.needs_crisis_response,
"log_event": result.needs_followup,
"response_text": None,
}
if result.risk_level == RiskLevel.CRITICAL:
action["response_text"] = (
"I noticed something concerning in the image you shared. "
"If you or someone you know is in crisis, please reach out: "
"988 Suicide and Crisis Lifeline (call or text 988). "
"You are not alone."
)
elif result.risk_level == RiskLevel.CONCERNING:
action["response_text"] = (
"I want to check in \u2014 how are you doing? "
"If you need to talk to someone, the 988 Lifeline is available 24/7."
)
return action

View File

@@ -0,0 +1,84 @@
"""Tests for image content screening module."""
import json
from unittest.mock import patch, MagicMock
from image_screening import (
RiskLevel,
ImageScreenResult,
screen_image,
handle_chat_image,
_analyze_fallback,
)
class TestImageScreenResult:
def test_safe_result(self):
result = ImageScreenResult(
risk_level=RiskLevel.SAFE, confidence=0.95,
categories=[], reasoning="No indicators", model_used="test"
)
assert not result.needs_crisis_response
assert not result.needs_followup
assert result.to_dict()["risk_level"] == "safe"
def test_critical_result(self):
result = ImageScreenResult(
risk_level=RiskLevel.CRITICAL, confidence=0.9,
categories=["wounds"], reasoning="Detected", model_used="test"
)
assert result.needs_crisis_response
assert result.needs_followup
def test_concerning_result(self):
result = ImageScreenResult(
risk_level=RiskLevel.CONCERNING, confidence=0.6,
categories=["isolation"], reasoning="Ambiguous", model_used="test"
)
assert not result.needs_crisis_response
assert result.needs_followup
class TestScreenImage:
def test_fallback_returns_safe(self):
result = screen_image(b"fake_image_data", use_vision_model=False)
assert result.risk_level == RiskLevel.SAFE
assert result.model_used == "fallback:heuristic"
assert result.confidence < 0.5
def test_base64_input(self):
import base64
b64 = base64.b64encode(b"fake").decode()
result = screen_image(b64, use_vision_model=False)
assert result.risk_level == RiskLevel.SAFE
class TestHandleChatImage:
def test_safe_image_no_overlay(self):
action = handle_chat_image(b"safe_image")
assert not action["show_crisis_overlay"]
assert action["response_text"] is None
@patch("image_screening._analyze_with_ollama")
def test_critical_image_shows_overlay(self, mock_ollama):
mock_ollama.return_value = ImageScreenResult(
risk_level=RiskLevel.CRITICAL, confidence=0.95,
categories=["wounds"], reasoning="Self-harm detected",
model_used="ollama:gemma3:4b"
)
action = handle_chat_image(b"concerning_image")
assert action["show_crisis_overlay"]
assert "988" in action["response_text"]
assert action["log_event"]
@patch("image_screening._analyze_with_ollama")
def test_concerning_image_followup(self, mock_ollama):
mock_ollama.return_value = ImageScreenResult(
risk_level=RiskLevel.CONCERNING, confidence=0.6,
categories=["isolation"], reasoning="Empty room",
model_used="ollama:gemma3:4b"
)
action = handle_chat_image(b"maybe_concerning")
assert not action["show_crisis_overlay"]
assert action["log_event"]
assert "check in" in action["response_text"]