Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
44e0396a1f feat: add behavioral crisis pattern detection (#133)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-20 12:11:02 -04:00
Alexander Whitestone
ac2d230bc1 wip: add behavioral tracker regression tests 2026-04-20 12:03:08 -04:00
7 changed files with 480 additions and 379 deletions

View File

@@ -7,6 +7,7 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .behavioral import BehavioralTracker, BehavioralSignal
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
__all__ = [
@@ -20,6 +21,8 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"BehavioralTracker",
"BehavioralSignal",
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",

304
crisis/behavioral.py Normal file
View File

@@ -0,0 +1,304 @@
"""Behavioral crisis pattern detection for the-door (#133).
Detects crisis risk from behavioral patterns, not just message content:
- message frequency spikes versus a 7-day rolling baseline
- late-night messaging (2-5 AM)
- withdrawal / isolation via a sharp drop from the recent daily baseline
- session length trend versus recent sessions
- return after long absence
- rising crisis-score trend across recent messages
Privacy-first:
- in-memory only
- no database
- no file I/O
- no network calls
"""
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any
HIGH_RISK_HOURS = {2, 3, 4}
ELEVATED_RISK_HOURS = {1, 5}
ROLLING_BASELINE_DAYS = 7
RETURN_AFTER_ABSENCE_DAYS = 7
@dataclass
class BehavioralEvent:
session_id: str
timestamp: datetime
message_length: int
crisis_score: float = 0.0
role: str = "user"
@dataclass
class BehavioralSignal:
signal_type: str
risk_level: str
description: str
evidence: list[str] = field(default_factory=list)
score: float = 0.0
def as_dict(self) -> dict[str, Any]:
return {
"signal_type": self.signal_type,
"risk_level": self.risk_level,
"description": self.description,
"evidence": list(self.evidence),
"score": self.score,
}
class BehavioralTracker:
"""In-memory tracker for behavioral crisis signals."""
def __init__(self) -> None:
self._events_by_session: dict[str, list[BehavioralEvent]] = defaultdict(list)
def record(
self,
session_id: str,
timestamp: datetime,
message_length: int,
*,
crisis_score: float = 0.0,
role: str = "user",
) -> None:
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=timezone.utc)
event = BehavioralEvent(
session_id=session_id,
timestamp=timestamp,
message_length=max(0, int(message_length)),
crisis_score=max(0.0, min(1.0, float(crisis_score))),
role=role,
)
self._events_by_session[session_id].append(event)
self._events_by_session[session_id].sort(key=lambda item: item.timestamp)
def get_risk_signals(self, session_id: str) -> dict[str, Any]:
events = [event for event in self._events_by_session.get(session_id, []) if event.role == "user"]
if not events:
return {
"frequency_change": 1.0,
"is_late_night": False,
"session_length_trend": "stable",
"withdrawal_detected": False,
"behavioral_score": 0.0,
"signals": [],
}
signals: list[BehavioralSignal] = []
frequency_change = self._compute_frequency_change(events)
frequency_signal = self._analyze_frequency(events, frequency_change)
if frequency_signal:
signals.append(frequency_signal)
time_signal = self._analyze_time(events)
if time_signal:
signals.append(time_signal)
withdrawal_signal = self._analyze_withdrawal(session_id, events)
if withdrawal_signal:
signals.append(withdrawal_signal)
absence_signal = self._analyze_return_after_absence(session_id, events)
if absence_signal:
signals.append(absence_signal)
escalation_signal = self._analyze_escalation(events)
if escalation_signal:
signals.append(escalation_signal)
session_length_trend = self._compute_session_length_trend(session_id, events)
behavioral_score = self._compute_behavioral_score(signals)
risk_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
signals.sort(key=lambda item: (risk_order.get(item.risk_level, 9), -item.score))
return {
"frequency_change": frequency_change,
"is_late_night": any(item.signal_type == "time" for item in signals),
"session_length_trend": session_length_trend,
"withdrawal_detected": any(item.signal_type == "withdrawal" for item in signals),
"behavioral_score": behavioral_score,
"signals": [item.as_dict() for item in signals],
}
def _all_user_events(self) -> list[BehavioralEvent]:
events: list[BehavioralEvent] = []
for session_events in self._events_by_session.values():
events.extend(event for event in session_events if event.role == "user")
events.sort(key=lambda item: item.timestamp)
return events
def _daily_count_baseline(self, current_date) -> float:
events = self._all_user_events()
counts: dict[Any, int] = {}
for offset in range(1, ROLLING_BASELINE_DAYS + 1):
counts[current_date - timedelta(days=offset)] = 0
for event in events:
event_date = event.timestamp.date()
if event_date in counts:
counts[event_date] += 1
return sum(counts.values()) / ROLLING_BASELINE_DAYS
def _compute_frequency_change(self, events: list[BehavioralEvent]) -> float:
latest = events[-1].timestamp
window_start = latest - timedelta(hours=1)
current_hour_count = sum(1 for event in events if event.timestamp >= window_start)
baseline_daily = self._daily_count_baseline(latest.date())
baseline_hourly = max(baseline_daily / 24.0, 0.1)
return round(current_hour_count / baseline_hourly, 2)
def _analyze_frequency(self, events: list[BehavioralEvent], frequency_change: float) -> BehavioralSignal | None:
latest = events[-1].timestamp
window_start = latest - timedelta(hours=1)
current_hour_count = sum(1 for event in events if event.timestamp >= window_start)
if current_hour_count >= 6 and frequency_change >= 3.0:
level = "HIGH" if frequency_change >= 6.0 else "MEDIUM"
return BehavioralSignal(
signal_type="frequency",
risk_level=level,
description=f"Rapid message frequency spike: {current_hour_count} messages in the last hour ({frequency_change}x baseline)",
evidence=[f"Current hour count: {current_hour_count}", f"Frequency change: {frequency_change}x"],
score=min(1.0, frequency_change / 8.0),
)
return None
def _analyze_time(self, events: list[BehavioralEvent]) -> BehavioralSignal | None:
latest = events[-1].timestamp
hour = latest.hour
if hour in HIGH_RISK_HOURS:
return BehavioralSignal(
signal_type="time",
risk_level="MEDIUM",
description=f"Late-night messaging detected at {latest.strftime('%H:%M')}",
evidence=[f"Latest message timestamp: {latest.isoformat()}"],
score=0.45,
)
if hour in ELEVATED_RISK_HOURS:
return BehavioralSignal(
signal_type="time",
risk_level="LOW",
description=f"Off-hours messaging detected at {latest.strftime('%H:%M')}",
evidence=[f"Latest message timestamp: {latest.isoformat()}"],
score=0.2,
)
return None
def _analyze_withdrawal(self, session_id: str, events: list[BehavioralEvent]) -> BehavioralSignal | None:
current_date = events[-1].timestamp.date()
baseline_daily = self._daily_count_baseline(current_date)
if baseline_daily < 3.0:
return None
current_day_count = sum(1 for event in events if event.timestamp.date() == current_date)
current_avg_len = sum(event.message_length for event in events if event.timestamp.date() == current_date) / max(current_day_count, 1)
prior_events = [
event
for sid, session_events in self._events_by_session.items()
if sid != session_id
for event in session_events
if event.role == "user" and event.timestamp.date() >= current_date - timedelta(days=ROLLING_BASELINE_DAYS)
]
if not prior_events:
return None
prior_avg_len = sum(event.message_length for event in prior_events) / len(prior_events)
if current_day_count <= max(1, baseline_daily * 0.3):
score = 0.55 if current_day_count == 1 else 0.4
if current_avg_len < prior_avg_len * 0.5:
score += 0.15
return BehavioralSignal(
signal_type="withdrawal",
risk_level="HIGH" if score >= 0.6 else "MEDIUM",
description="Sharp drop from recent communication baseline suggests withdrawal/isolation",
evidence=[
f"Current day count: {current_day_count}",
f"7-day daily baseline: {baseline_daily:.2f}",
f"Average message length: {current_avg_len:.1f} vs {prior_avg_len:.1f}",
],
score=min(1.0, score),
)
return None
def _analyze_return_after_absence(self, session_id: str, events: list[BehavioralEvent]) -> BehavioralSignal | None:
current_start = events[0].timestamp
prior_events = [
event
for sid, session_events in self._events_by_session.items()
if sid != session_id
for event in session_events
if event.role == "user" and event.timestamp < current_start
]
if not prior_events:
return None
latest_prior = max(prior_events, key=lambda item: item.timestamp)
gap = current_start - latest_prior.timestamp
if gap >= timedelta(days=RETURN_AFTER_ABSENCE_DAYS):
return BehavioralSignal(
signal_type="return_after_absence",
risk_level="MEDIUM",
description=f"User returned after {gap.days} days of silence",
evidence=[f"Last prior activity: {latest_prior.timestamp.isoformat()}"],
score=min(1.0, gap.days / 14.0),
)
return None
def _analyze_escalation(self, events: list[BehavioralEvent]) -> BehavioralSignal | None:
scored = [event for event in events if event.crisis_score > 0]
if len(scored) < 3:
return None
recent = scored[-5:]
midpoint = max(1, len(recent) // 2)
first_avg = sum(event.crisis_score for event in recent[:midpoint]) / len(recent[:midpoint])
second_avg = sum(event.crisis_score for event in recent[midpoint:]) / len(recent[midpoint:])
if second_avg >= max(0.4, first_avg * 1.3):
return BehavioralSignal(
signal_type="escalation",
risk_level="HIGH" if second_avg >= 0.65 else "MEDIUM",
description=f"Behavioral escalation: crisis score trend rose from {first_avg:.2f} to {second_avg:.2f}",
evidence=[f"Recent crisis scores: {[round(event.crisis_score, 2) for event in recent]}"],
score=min(1.0, second_avg),
)
return None
def _compute_session_length_trend(self, session_id: str, events: list[BehavioralEvent]) -> str:
current_duration = (events[-1].timestamp - events[0].timestamp).total_seconds()
previous_durations = []
current_start = events[0].timestamp
for sid, session_events in self._events_by_session.items():
if sid == session_id:
continue
user_events = [event for event in session_events if event.role == "user"]
if len(user_events) < 2:
continue
if user_events[-1].timestamp < current_start - timedelta(days=ROLLING_BASELINE_DAYS):
continue
previous_durations.append((user_events[-1].timestamp - user_events[0].timestamp).total_seconds())
if not previous_durations:
return "stable"
average_duration = sum(previous_durations) / len(previous_durations)
if current_duration > average_duration * 1.5:
return "increasing"
if current_duration < average_duration * 0.5:
return "decreasing"
return "stable"
def _compute_behavioral_score(self, signals: list[BehavioralSignal]) -> float:
if not signals:
return 0.0
max_score = max(signal.score for signal in signals)
multi_signal_boost = min(0.2, 0.05 * (len(signals) - 1))
return round(min(1.0, max_score + multi_signal_boost), 2)

View File

@@ -14,8 +14,6 @@ Usage:
import json
from typing import Optional
from image_screening import screen_image_signals
from .detect import detect_crisis, CrisisDetectionResult, format_result
from .compassion_router import router
from .response import (
@@ -52,67 +50,6 @@ def check_crisis(text: str) -> dict:
}
def _image_detection_from_score(image_result) -> CrisisDetectionResult:
if image_result.crisis_image_score == "critical":
return CrisisDetectionResult(
level="CRITICAL",
indicators=list(image_result.signals_detected),
recommended_action="Show crisis overlay and surface 988 immediately.",
score=image_result.distress_score,
)
if image_result.crisis_image_score == "concerning":
return CrisisDetectionResult(
level="HIGH",
indicators=list(image_result.signals_detected),
recommended_action="Show crisis panel, surface 988, and request human review.",
score=image_result.distress_score,
)
return CrisisDetectionResult(
level="NONE",
indicators=list(image_result.signals_detected),
recommended_action="No crisis action required.",
score=image_result.distress_score,
)
def check_image_crisis(
*,
image_path: Optional[str] = None,
ocr_text: str = "",
labels: Optional[list[str]] = None,
manual_notes: str = "",
visual_flags: Optional[list[str]] = None,
) -> dict:
"""Gateway-integrated image crisis check using the local screening slice."""
image_result = screen_image_signals(
image_path=image_path,
ocr_text=ocr_text,
labels=labels,
manual_notes=manual_notes,
visual_flags=visual_flags,
)
detection = _image_detection_from_score(image_result)
response = generate_response(detection)
return {
"level": detection.level,
"image_score": image_result.crisis_image_score,
"score": detection.score,
"indicators": detection.indicators,
"recommended_action": detection.recommended_action,
"timmy_message": response.timmy_message,
"ui": {
"show_crisis_panel": response.show_crisis_panel,
"show_overlay": response.show_overlay,
"provide_988": response.provide_988,
},
"escalate": response.escalate,
"requires_human_review": image_result.requires_human_review,
"grounded_scope": image_result.grounded_scope,
"screening": image_result.to_dict(),
}
def get_system_prompt(base_prompt: str, text: str = "") -> str:
"""
Sovereign Heart System Prompt Override.

View File

@@ -34,6 +34,7 @@ Usage:
from dataclasses import dataclass, field
from typing import List, Optional
from .behavioral import BehavioralTracker
from .detect import CrisisDetectionResult, SCORES
# Level ordering for comparison (higher = more severe)
@@ -52,6 +53,12 @@ class SessionState:
is_deescalating: bool = False
escalation_rate: float = 0.0 # levels gained per message
consecutive_low_messages: int = 0 # for de-escalation tracking
behavioral_score: float = 0.0
behavioral_signals: List[dict] = field(default_factory=list)
frequency_change: float = 1.0
is_late_night: bool = False
session_length_trend: str = "stable"
withdrawal_detected: bool = False
class CrisisSessionTracker:
@@ -77,6 +84,8 @@ class CrisisSessionTracker:
self._message_count = 0
self._level_history: List[str] = []
self._consecutive_low = 0
self._behavioral_tracker = BehavioralTracker()
self._behavioral_session_id = "current-session"
@property
def state(self) -> SessionState:
@@ -84,6 +93,7 @@ class CrisisSessionTracker:
is_escalating = self._detect_escalation()
is_deescalating = self._detect_deescalation()
rate = self._compute_escalation_rate()
behavioral = self._behavioral_tracker.get_risk_signals(self._behavioral_session_id)
return SessionState(
current_level=self._current_level,
@@ -94,14 +104,29 @@ class CrisisSessionTracker:
is_deescalating=is_deescalating,
escalation_rate=rate,
consecutive_low_messages=self._consecutive_low,
behavioral_score=behavioral["behavioral_score"],
behavioral_signals=behavioral["signals"],
frequency_change=behavioral["frequency_change"],
is_late_night=behavioral["is_late_night"],
session_length_trend=behavioral["session_length_trend"],
withdrawal_detected=behavioral["withdrawal_detected"],
)
def record(self, detection: CrisisDetectionResult) -> SessionState:
def record(
self,
detection: CrisisDetectionResult,
*,
timestamp=None,
message_length: int = 0,
role: str = "user",
) -> SessionState:
"""
Record a crisis detection result for the current message.
Returns updated SessionState.
"""
from datetime import datetime, timezone
level = detection.level
self._message_count += 1
self._level_history.append(level)
@@ -116,6 +141,17 @@ class CrisisSessionTracker:
else:
self._consecutive_low = 0
if role == "user":
if timestamp is None:
timestamp = datetime.now(timezone.utc)
self._behavioral_tracker.record(
self._behavioral_session_id,
timestamp,
message_length=message_length,
crisis_score=detection.score,
role=role,
)
self._current_level = level
return self.state
@@ -195,14 +231,22 @@ class CrisisSessionTracker:
"supportive engagement while remaining vigilant."
)
notes = []
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
return (
f"User previously reached {s.peak_level} crisis level "
f"this session (currently {s.current_level}). "
notes.append(
f"User previously reached {s.peak_level} crisis level this session (currently {s.current_level}). "
"Continue with care and awareness of the earlier crisis."
)
return ""
if s.behavioral_score >= 0.35 and s.behavioral_signals:
signal_names = ", ".join(item["signal_type"] for item in s.behavioral_signals)
notes.append(
f"Behavioral risk signals detected this session: {signal_names}. "
"Use the behavioral context to increase sensitivity and warmth."
)
return " ".join(notes)
def get_ui_hints(self) -> dict:
"""
@@ -217,6 +261,10 @@ class CrisisSessionTracker:
"session_deescalating": s.is_deescalating,
"session_peak_level": s.peak_level,
"session_message_count": s.message_count,
"behavioral_score": s.behavioral_score,
"is_late_night": s.is_late_night,
"withdrawal_detected": s.withdrawal_detected,
"session_length_trend": s.session_length_trend,
}
if s.is_escalating:
@@ -226,12 +274,20 @@ class CrisisSessionTracker:
"Consider increasing intervention level."
)
if s.behavioral_score >= 0.5:
hints["behavioral_warning"] = True
hints.setdefault(
"suggested_action",
"Behavioral risk patterns are active. Keep the response warm, grounded, and alert."
)
return hints
def check_crisis_with_session(
text: str,
tracker: CrisisSessionTracker,
timestamp=None,
) -> dict:
"""
Convenience: detect crisis and update session state in one call.
@@ -243,7 +299,16 @@ def check_crisis_with_session(
single_result = check_crisis(text)
detection = detect_crisis(text)
session_state = tracker.record(detection)
session_state = tracker.record(detection, timestamp=timestamp, message_length=len(text))
behavioral = {
"frequency_change": session_state.frequency_change,
"is_late_night": session_state.is_late_night,
"session_length_trend": session_state.session_length_trend,
"withdrawal_detected": session_state.withdrawal_detected,
"behavioral_score": session_state.behavioral_score,
"signals": session_state.behavioral_signals,
}
return {
**single_result,
@@ -255,5 +320,6 @@ def check_crisis_with_session(
"is_deescalating": session_state.is_deescalating,
"modifier": tracker.get_session_modifier(),
"ui_hints": tracker.get_ui_hints(),
"behavioral": behavioral,
},
}

View File

@@ -1,195 +0,0 @@
"""
image_screening.py — local image crisis screening slice for epic #130.
Grounded scope:
- screens OCR text, upstream object labels, and operator notes for crisis signals
- intentionally does NOT claim raw computer-vision understanding of pixels
- designed to plug into future multimodal scoring once a dedicated image model lands
"""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from typing import Iterable, List, Optional
from crisis.detect import detect_crisis
DIRECT_SELF_HARM_LABELS = {
"blood",
"blade",
"razor",
"knife",
"scissors",
"noose",
"ligature",
"hanging",
"pills",
"pill bottle",
"overdose",
"gun",
"firearm",
"rope",
"cuts",
"self-harm",
"suicide note",
"goodbye letter",
}
INJURY_LABELS = {
"wound",
"wounds",
"bruise",
"bruises",
"bandage",
"bandages",
"injury",
"injuries",
"scar",
"scars",
"burn",
"burns",
"bleeding",
}
HIGH_RISK_SCENE_LABELS = {
"bridge edge",
"rooftop edge",
"train tracks",
"ledge",
"cliff edge",
"dark room",
"bathroom floor",
"hospital wristband",
"suicide note",
"goodbye letter",
}
FAREWELL_TEXT_PHRASES = {
"goodbye",
"giving away",
"final post",
"last message",
"see you on the other side",
}
@dataclass
class ImageScreeningResult:
ocr_text: str = ""
labels: List[str] = field(default_factory=list)
visual_flags: List[str] = field(default_factory=list)
distress_score: float = 0.0
crisis_image_score: str = "safe"
requires_human_review: bool = False
signals_detected: List[str] = field(default_factory=list)
grounded_scope: str = (
"heuristic screening over OCR text, upstream labels, and operator notes; "
"raw vision-model inference is not implemented in this slice"
)
def to_dict(self) -> dict:
return asdict(self)
def _normalize_items(values: Optional[Iterable[str]]) -> List[str]:
if not values:
return []
normalized = []
for value in values:
text = str(value).strip().lower()
if text:
normalized.append(text)
return normalized
def _match_keywords(haystack: str, keywords: set[str]) -> List[str]:
matches = []
for keyword in keywords:
if keyword in haystack:
matches.append(keyword)
return sorted(set(matches))
def screen_image_signals(
image_path: Optional[str] = None,
*,
ocr_text: str = "",
labels: Optional[Iterable[str]] = None,
manual_notes: str = "",
visual_flags: Optional[Iterable[str]] = None,
) -> ImageScreeningResult:
"""
Score image-related crisis evidence without pretending to do full CV.
Inputs are deliberately grounded in what the repo can actually support today:
- OCR text extracted upstream from screenshots/photos
- object labels supplied by a local model or operator
- manual operator notes about visible scene context
- optional visual flags from any upstream preprocessor
"""
normalized_labels = _normalize_items(labels)
normalized_flags = _normalize_items(visual_flags)
normalized_notes = str(manual_notes or "").strip().lower()
normalized_ocr = str(ocr_text or "").strip()
combined_label_text = " ".join(normalized_labels + normalized_flags + ([normalized_notes] if normalized_notes else []))
crisis_text = " ".join(part for part in [normalized_ocr, normalized_notes] if part).strip()
direct_matches = _match_keywords(combined_label_text, DIRECT_SELF_HARM_LABELS)
injury_matches = _match_keywords(combined_label_text, INJURY_LABELS)
scene_matches = _match_keywords(combined_label_text, HIGH_RISK_SCENE_LABELS)
farewell_matches = _match_keywords(crisis_text.lower(), FAREWELL_TEXT_PHRASES)
text_detection = detect_crisis(crisis_text) if crisis_text else None
signals: List[str] = []
score = 0.0
if direct_matches:
score = max(score, 0.85)
for match in direct_matches:
signals.append(f"direct_self_harm_label:{match}")
if injury_matches:
score = max(score, 0.55)
for match in injury_matches:
signals.append(f"injury_indicator:{match}")
if scene_matches:
score = max(score, 0.4)
for match in scene_matches:
signals.append(f"high_risk_scene:{match}")
if farewell_matches:
score = max(score, 0.85)
for match in farewell_matches:
signals.append(f"farewell_text:{match}")
if text_detection and text_detection.level != "NONE":
score = max(score, min(1.0, text_detection.score))
signals.append(f"ocr_crisis_level:{text_detection.level}")
for indicator in text_detection.indicators[:3]:
signals.append(f"ocr_indicator:{indicator}")
if direct_matches and text_detection and text_detection.level in {"HIGH", "CRITICAL"}:
score = min(1.0, max(score, 0.95))
signals.append("cross_modal_confirmation:text_plus_visual")
if direct_matches or (text_detection and text_detection.level == "CRITICAL") or score >= 0.85:
crisis_image_score = "critical"
elif score >= 0.4 or (text_detection and text_detection.level in {"HIGH", "MEDIUM"}):
crisis_image_score = "concerning"
else:
crisis_image_score = "safe"
requires_human_review = score >= 0.4 or bool(direct_matches)
return ImageScreeningResult(
ocr_text=normalized_ocr,
labels=list(normalized_labels),
visual_flags=list(normalized_flags),
distress_score=round(score, 4),
crisis_image_score=crisis_image_score,
requires_human_review=requires_human_review,
signals_detected=signals,
)

View File

@@ -0,0 +1,101 @@
"""
Tests for behavioral crisis pattern detection (#133).
"""
import os
import sys
import unittest
from datetime import datetime, timedelta, timezone
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.session_tracker import CrisisSessionTracker, check_crisis_with_session
from crisis.behavioral import BehavioralTracker
class TestBehavioralTracker(unittest.TestCase):
def _seed_day(self, tracker, *, session_id, day, count, start_hour=10, message_length=48, crisis_score=0.0):
base = datetime(2026, 4, day, start_hour, 0, tzinfo=timezone.utc)
for i in range(count):
tracker.record(
session_id,
base + timedelta(minutes=i * 10),
message_length=message_length,
crisis_score=crisis_score,
)
def test_frequency_change_uses_seven_day_baseline(self):
tracker = BehavioralTracker()
for day in range(1, 8):
self._seed_day(tracker, session_id=f"baseline-{day}", day=day, count=2)
burst_base = datetime(2026, 4, 8, 14, 0, tzinfo=timezone.utc)
for i in range(8):
tracker.record(
"current-session",
burst_base + timedelta(minutes=i),
message_length=72,
crisis_score=0.1,
)
summary = tracker.get_risk_signals("current-session")
self.assertGreater(summary["frequency_change"], 2.0)
self.assertTrue(any(sig["signal_type"] == "frequency" for sig in summary["signals"]))
self.assertGreater(summary["behavioral_score"], 0.0)
def test_late_night_messages_raise_flag(self):
tracker = BehavioralTracker()
base = datetime(2026, 4, 10, 2, 15, tzinfo=timezone.utc)
for i in range(3):
tracker.record(
"late-night",
base + timedelta(minutes=i * 7),
message_length=35,
crisis_score=0.0,
)
summary = tracker.get_risk_signals("late-night")
self.assertTrue(summary["is_late_night"])
self.assertTrue(any(sig["signal_type"] == "time" for sig in summary["signals"]))
def test_withdrawal_detected_after_large_drop_from_baseline(self):
tracker = BehavioralTracker()
for day in range(1, 8):
self._seed_day(tracker, session_id=f"baseline-{day}", day=day, count=10, message_length=80)
tracker.record(
"withdrawal-session",
datetime(2026, 4, 9, 11, 0, tzinfo=timezone.utc),
message_length=18,
crisis_score=0.0,
)
summary = tracker.get_risk_signals("withdrawal-session")
self.assertTrue(summary["withdrawal_detected"])
self.assertTrue(any(sig["signal_type"] == "withdrawal" for sig in summary["signals"]))
class TestBehavioralSessionIntegration(unittest.TestCase):
def test_check_crisis_with_session_includes_behavioral_summary(self):
tracker = CrisisSessionTracker()
base = datetime(2026, 4, 20, 2, 0, tzinfo=timezone.utc)
check_crisis_with_session("can't sleep", tracker, timestamp=base)
check_crisis_with_session("still here", tracker, timestamp=base + timedelta(minutes=1))
result = check_crisis_with_session("everything feels loud", tracker, timestamp=base + timedelta(minutes=2))
behavioral = result["session"]["behavioral"]
self.assertIn("frequency_change", behavioral)
self.assertIn("is_late_night", behavioral)
self.assertIn("session_length_trend", behavioral)
self.assertIn("withdrawal_detected", behavioral)
self.assertIn("behavioral_score", behavioral)
self.assertTrue(behavioral["is_late_night"])
self.assertGreater(behavioral["behavioral_score"], 0.0)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,115 +0,0 @@
"""Tests for local image crisis screening slice under epic #130."""
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.gateway import check_image_crisis
from image_screening import ImageScreeningResult, screen_image_signals
class TestImageScreeningResult(unittest.TestCase):
def test_to_dict_preserves_core_fields(self):
result = ImageScreeningResult(
ocr_text="help me",
labels=["blood"],
visual_flags=["dark_scene"],
distress_score=0.8,
requires_human_review=True,
signals_detected=["direct_self_harm_label:blood"],
)
data = result.to_dict()
self.assertEqual(data["ocr_text"], "help me")
self.assertEqual(data["labels"], ["blood"])
self.assertTrue(data["requires_human_review"])
class TestScreenImageSignals(unittest.TestCase):
def test_direct_self_harm_labels_trigger_high_risk(self):
result = screen_image_signals(
labels=["razor blade", "blood droplets"],
manual_notes="photo of fresh cuts on forearm",
)
self.assertEqual(result.crisis_image_score, "critical")
self.assertGreaterEqual(result.distress_score, 0.8)
self.assertTrue(result.requires_human_review)
self.assertTrue(any("self_harm" in signal for signal in result.signals_detected))
def test_ocr_text_uses_existing_crisis_detector(self):
result = screen_image_signals(
ocr_text="I want to kill myself tonight",
labels=["handwritten note"],
)
self.assertGreaterEqual(result.distress_score, 0.7)
self.assertTrue(result.requires_human_review)
self.assertTrue(any(signal.startswith("ocr_crisis_level:") for signal in result.signals_detected))
def test_neutral_image_stays_low_risk(self):
result = screen_image_signals(
labels=["dog", "park", "sunlight"],
manual_notes="family outing in daylight",
)
self.assertEqual(result.crisis_image_score, "safe")
self.assertLess(result.distress_score, 0.2)
self.assertFalse(result.requires_human_review)
self.assertEqual(result.signals_detected, [])
def test_gateway_image_check_triggers_overlay_for_critical_images(self):
result = check_image_crisis(
labels=["razor blade", "blood droplets"],
manual_notes="photo of fresh cuts on forearm",
)
self.assertEqual(result["level"], "CRITICAL")
self.assertEqual(result["image_score"], "critical")
self.assertTrue(result["ui"]["show_overlay"])
self.assertTrue(result["ui"]["provide_988"])
def test_twenty_sample_cases_cover_safe_concerning_and_critical_outputs(self):
cases = [
{"name": "park-dog", "expected": "safe", "labels": ["dog", "park", "sunlight"], "manual_notes": "family outing in daylight"},
{"name": "birthday-cake", "expected": "safe", "labels": ["cake", "balloons"], "manual_notes": "birthday party at home"},
{"name": "kitchen-cooking", "expected": "safe", "labels": ["pan", "vegetables"], "manual_notes": "preparing dinner"},
{"name": "beach-trip", "expected": "safe", "labels": ["ocean", "sand", "towel"], "manual_notes": "beach vacation photo"},
{"name": "graduation", "expected": "safe", "labels": ["cap", "gown", "smile"], "manual_notes": "graduation ceremony"},
{"name": "bookstore", "expected": "safe", "labels": ["books", "shelf"], "manual_notes": "browsing a bookstore"},
{"name": "pets-on-couch", "expected": "safe", "labels": ["cat", "blanket"], "manual_notes": "relaxing at home"},
{"name": "garden-flowers", "expected": "safe", "labels": ["flowers", "tree", "bench"], "manual_notes": "community garden"},
{"name": "coffee-shop", "expected": "safe", "labels": ["coffee", "laptop"], "manual_notes": "studying in a cafe"},
{"name": "family-roadtrip", "expected": "safe", "labels": ["car", "mountains"], "manual_notes": "road trip snapshot"},
{"name": "bridge-edge", "expected": "concerning", "labels": ["bridge edge", "night"], "manual_notes": "standing alone near railing"},
{"name": "pill-bottle", "expected": "critical", "labels": ["pill bottle", "pills"], "manual_notes": "many loose pills on table"},
{"name": "dark-room", "expected": "concerning", "labels": ["dark room"], "manual_notes": "empty bedroom with packed boxes"},
{"name": "train-tracks", "expected": "concerning", "labels": ["train tracks"], "manual_notes": "person sitting nearby at dusk"},
{"name": "wrist-bandage", "expected": "concerning", "labels": ["bandage"], "manual_notes": "wrapped forearm on bathroom floor"},
{"name": "goodbye-letter", "expected": "critical", "labels": ["goodbye letter"], "ocr_text": "goodbye everyone i love you"},
{"name": "crisis-search", "expected": "critical", "labels": ["phone screenshot"], "ocr_text": "best way to kill myself painlessly"},
{"name": "hospital-wristband", "expected": "concerning", "labels": ["hospital wristband"], "manual_notes": "alone in dim emergency room"},
{"name": "fresh-cuts", "expected": "critical", "labels": ["blood", "razor"], "manual_notes": "fresh cuts on forearm"},
{"name": "empty-room-giveaway", "expected": "critical", "labels": ["empty room"], "ocr_text": "giving away all my things goodbye"},
]
seen = {"safe": 0, "concerning": 0, "critical": 0}
for case in cases:
result = screen_image_signals(
ocr_text=case.get("ocr_text", ""),
labels=case.get("labels", []),
manual_notes=case.get("manual_notes", ""),
)
self.assertEqual(result.crisis_image_score, case["expected"], case["name"])
seen[case["expected"]] += 1
self.assertEqual(sum(seen.values()), 20)
self.assertEqual(seen["safe"], 10)
self.assertGreaterEqual(seen["concerning"], 5)
self.assertGreaterEqual(seen["critical"], 5)
if __name__ == "__main__":
unittest.main()