Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
0ab2626ef2 feat: image content screening for self-harm indicators (closes #132)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 4s
Smoke Test / smoke (pull_request) Successful in 10s
2026-04-15 12:02:22 -04:00
4 changed files with 259 additions and 164 deletions

View File

@@ -1,5 +1,22 @@
"""Crisis detection and metrics module."""
"""
Crisis detection and response system for the-door.
from .metrics import get_metrics_summary, get_metrics_report
Stands between a broken man and a machine that would tell him to die.
"""
__all__ = ["get_metrics_summary", "get_metrics_report"]
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
__all__ = [
"detect_crisis",
"CrisisDetectionResult",
"process_message",
"generate_response",
"CrisisResponse",
"check_crisis",
"get_system_prompt",
"format_result",
"format_gateway_response",
"get_urgency_emoji",
]

View File

@@ -1,161 +0,0 @@
#!/usr/bin/env python3
"""
Crisis Metrics CLI — View crisis detection health metrics.
Usage:
python3 -m crisis.metrics --summary # weekly report
python3 -m crisis.metrics --json # raw JSON export
python3 -m crisis.metrics --today # today only
"""
import argparse
import json
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
# Metrics file location
METRICS_FILE = Path.home() / ".the-door" / "crisis_metrics.json"
def load_metrics():
"""Load metrics from file."""
if not METRICS_FILE.exists():
return {"detections": [], "stats": {}}
try:
with open(METRICS_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {"detections": [], "stats": {}}
def get_metrics_summary(days=7):
"""Get metrics summary for the last N days."""
data = load_metrics()
detections = data.get("detections", [])
cutoff = time.time() - (days * 86400)
recent = [d for d in detections if d.get("timestamp", 0) > cutoff]
if not recent:
return {
"period_days": days,
"total_detections": 0,
"by_severity": {},
"by_source": {},
"avg_response_time": 0,
}
by_severity = {}
by_source = {}
total_response_time = 0
response_count = 0
for d in recent:
severity = d.get("severity", "unknown")
source = d.get("source", "unknown")
by_severity[severity] = by_severity.get(severity, 0) + 1
by_source[source] = by_source.get(source, 0) + 1
if "response_time_ms" in d:
total_response_time += d["response_time_ms"]
response_count += 1
return {
"period_days": days,
"total_detections": len(recent),
"by_severity": by_severity,
"by_source": by_source,
"avg_response_time_ms": total_response_time / response_count if response_count else 0,
"first_detection": recent[0].get("timestamp"),
"last_detection": recent[-1].get("timestamp"),
}
def get_metrics_report(days=7):
"""Generate a human-readable metrics report."""
summary = get_metrics_summary(days)
lines = []
lines.append("=" * 50)
lines.append("CRISIS DETECTION METRICS")
lines.append(f"Period: Last {days} days")
lines.append("=" * 50)
lines.append("")
total = summary["total_detections"]
lines.append(f"Total detections: {total}")
lines.append("")
if total > 0:
lines.append("By severity:")
for sev, count in sorted(summary["by_severity"].items()):
pct = (count / total) * 100
bar = "" * int(pct / 5)
lines.append(f" {sev:12} {count:4} ({pct:5.1f}%) {bar}")
lines.append("")
lines.append("By source:")
for src, count in sorted(summary["by_source"].items()):
lines.append(f" {src:20} {count:4}")
lines.append("")
avg_ms = summary.get("avg_response_time_ms", 0)
lines.append(f"Avg response time: {avg_ms:.0f}ms")
first = summary.get("first_detection")
last = summary.get("last_detection")
if first and last:
first_dt = datetime.fromtimestamp(first)
last_dt = datetime.fromtimestamp(last)
lines.append(f"First detection: {first_dt.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"Last detection: {last_dt.strftime('%Y-%m-%d %H:%M')}")
else:
lines.append("No crisis detections in this period.")
lines.append("")
lines.append("=" * 50)
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Crisis Detection Metrics CLI",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --summary Weekly summary report
%(prog)s --today Today only
%(prog)s --json Raw JSON export
%(prog)s --days 30 Last 30 days
""",
)
parser.add_argument("--summary", action="store_true", help="Show summary report")
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
parser.add_argument("--today", action="store_true", help="Today only (1 day)")
parser.add_argument("--days", type=int, default=7, help="Number of days (default: 7)")
parser.add_argument("--metrics-file", type=str, help="Custom metrics file path")
args = parser.parse_args()
if args.metrics_file:
global METRICS_FILE
METRICS_FILE = Path(args.metrics_file)
days = 1 if args.today else args.days
if args.json_output:
summary = get_metrics_summary(days)
print(json.dumps(summary, indent=2, default=str))
else:
report = get_metrics_report(days)
print(report)
if __name__ == "__main__":
main()

155
image_screening.py Normal file
View File

@@ -0,0 +1,155 @@
"""
Image Content Screening for Self-Harm Indicators
Screens images sent in chat for self-harm content using vision models.
Privacy-preserving: analyzes in-memory, discards after scoring. No image retention.
"""
import base64
import json
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional
logger = logging.getLogger("the-door.image_screening")
class RiskLevel(str, Enum):
SAFE = "safe"
CONCERNING = "concerning"
CRITICAL = "critical"
@dataclass
class ImageScreenResult:
risk_level: RiskLevel
confidence: float
categories: list
reasoning: str
model_used: str
@property
def needs_crisis_response(self):
return self.risk_level == RiskLevel.CRITICAL
@property
def needs_followup(self):
return self.risk_level in (RiskLevel.CONCERNING, RiskLevel.CRITICAL)
def to_dict(self):
return {
"risk_level": self.risk_level.value,
"confidence": round(self.confidence, 3),
"categories": self.categories,
"reasoning": self.reasoning,
"model_used": self.model_used,
}
SCREENING_PROMPT = """Analyze this image for indicators of self-harm or crisis. Look for:
1. Self-harm wounds or injuries
2. Medication/pills in concerning quantities
3. Farewell-type imagery (empty rooms, items laid out, goodbye notes)
4. Screenshots of crisis-related searches
5. Images conveying hopelessness, isolation, or despair
Respond in JSON format ONLY:
{"risk_level": "safe"|"concerning"|"critical", "confidence": 0.0-1.0, "categories": [], "reasoning": ""}
CRITICAL: imminent self-harm indicators
CONCERNING: ambiguous but worrying
SAFE: no indicators detected"""
def _analyze_with_ollama(image_b64, model="gemma3:4b"):
try:
import urllib.request
payload = json.dumps({
"model": model,
"messages": [{
"role": "user",
"content": SCREENING_PROMPT,
"images": [image_b64],
}],
"stream": False,
"options": {"temperature": 0.1},
}).encode()
req = urllib.request.Request(
"http://localhost:11434/api/chat",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
resp = urllib.request.urlopen(req, timeout=30)
data = json.loads(resp.read())
content = data.get("message", {}).get("content", "")
json_start = content.find("{")
json_end = content.rfind("}") + 1
if json_start == -1 or json_end <= json_start:
return None
result = json.loads(content[json_start:json_end])
return ImageScreenResult(
risk_level=RiskLevel(result.get("risk_level", "safe")),
confidence=float(result.get("confidence", 0.5)),
categories=result.get("categories", []),
reasoning=result.get("reasoning", ""),
model_used=f"ollama:{model}",
)
except Exception as e:
logger.warning(f"Ollama vision analysis failed: {e}")
return None
def _analyze_fallback(image_bytes):
return ImageScreenResult(
risk_level=RiskLevel.SAFE,
confidence=0.2,
categories=["unanalyzed"],
reasoning="No vision model available. Defaulting to safe with low confidence.",
model_used="fallback:heuristic",
)
def screen_image(image_data, use_vision_model=True, model="gemma3:4b"):
"""Screen image for self-harm indicators. Analyzes in-memory, no retention."""
if isinstance(image_data, bytes):
image_b64 = base64.b64encode(image_data).decode()
else:
image_b64 = image_data
image_data = base64.b64decode(image_b64)
if use_vision_model:
result = _analyze_with_ollama(image_b64, model)
if result:
logger.info(f"Image screened: {result.risk_level.value} (conf: {result.confidence:.2f})")
if result.needs_crisis_response:
logger.warning(f"CRITICAL image: {result.reasoning}")
return result
return _analyze_fallback(image_data)
def handle_chat_image(image_data):
"""Handle image from chat. Returns action dict for gateway."""
result = screen_image(image_data)
action = {
"result": result.to_dict(),
"show_crisis_overlay": result.needs_crisis_response,
"log_event": result.needs_followup,
"response_text": None,
}
if result.risk_level == RiskLevel.CRITICAL:
action["response_text"] = (
"I noticed something concerning in the image you shared. "
"If you or someone you know is in crisis, please reach out: "
"988 Suicide and Crisis Lifeline (call or text 988). "
"You are not alone."
)
elif result.risk_level == RiskLevel.CONCERNING:
action["response_text"] = (
"I want to check in \u2014 how are you doing? "
"If you need to talk to someone, the 988 Lifeline is available 24/7."
)
return action

View File

@@ -0,0 +1,84 @@
"""Tests for image content screening module."""
import json
from unittest.mock import patch, MagicMock
from image_screening import (
RiskLevel,
ImageScreenResult,
screen_image,
handle_chat_image,
_analyze_fallback,
)
class TestImageScreenResult:
def test_safe_result(self):
result = ImageScreenResult(
risk_level=RiskLevel.SAFE, confidence=0.95,
categories=[], reasoning="No indicators", model_used="test"
)
assert not result.needs_crisis_response
assert not result.needs_followup
assert result.to_dict()["risk_level"] == "safe"
def test_critical_result(self):
result = ImageScreenResult(
risk_level=RiskLevel.CRITICAL, confidence=0.9,
categories=["wounds"], reasoning="Detected", model_used="test"
)
assert result.needs_crisis_response
assert result.needs_followup
def test_concerning_result(self):
result = ImageScreenResult(
risk_level=RiskLevel.CONCERNING, confidence=0.6,
categories=["isolation"], reasoning="Ambiguous", model_used="test"
)
assert not result.needs_crisis_response
assert result.needs_followup
class TestScreenImage:
def test_fallback_returns_safe(self):
result = screen_image(b"fake_image_data", use_vision_model=False)
assert result.risk_level == RiskLevel.SAFE
assert result.model_used == "fallback:heuristic"
assert result.confidence < 0.5
def test_base64_input(self):
import base64
b64 = base64.b64encode(b"fake").decode()
result = screen_image(b64, use_vision_model=False)
assert result.risk_level == RiskLevel.SAFE
class TestHandleChatImage:
def test_safe_image_no_overlay(self):
action = handle_chat_image(b"safe_image")
assert not action["show_crisis_overlay"]
assert action["response_text"] is None
@patch("image_screening._analyze_with_ollama")
def test_critical_image_shows_overlay(self, mock_ollama):
mock_ollama.return_value = ImageScreenResult(
risk_level=RiskLevel.CRITICAL, confidence=0.95,
categories=["wounds"], reasoning="Self-harm detected",
model_used="ollama:gemma3:4b"
)
action = handle_chat_image(b"concerning_image")
assert action["show_crisis_overlay"]
assert "988" in action["response_text"]
assert action["log_event"]
@patch("image_screening._analyze_with_ollama")
def test_concerning_image_followup(self, mock_ollama):
mock_ollama.return_value = ImageScreenResult(
risk_level=RiskLevel.CONCERNING, confidence=0.6,
categories=["isolation"], reasoning="Empty room",
model_used="ollama:gemma3:4b"
)
action = handle_chat_image(b"maybe_concerning")
assert not action["show_crisis_overlay"]
assert action["log_event"]
assert "check in" in action["response_text"]