diff --git a/crisis/__init__.py b/crisis/__init__.py index 25353cc..3bdfc5e 100644 --- a/crisis/__init__.py +++ b/crisis/__init__.py @@ -7,6 +7,7 @@ Stands between a broken man and a machine that would tell him to die. from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji from .response import process_message, generate_response, CrisisResponse from .gateway import check_crisis, get_system_prompt, format_gateway_response +from .behavioral import BehavioralTracker, BehavioralSignal from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session __all__ = [ @@ -20,6 +21,8 @@ __all__ = [ "format_result", "format_gateway_response", "get_urgency_emoji", + "BehavioralTracker", + "BehavioralSignal", "CrisisSessionTracker", "SessionState", "check_crisis_with_session", diff --git a/crisis/behavioral.py b/crisis/behavioral.py new file mode 100644 index 0000000..8f4ce58 --- /dev/null +++ b/crisis/behavioral.py @@ -0,0 +1,304 @@ +"""Behavioral crisis pattern detection for the-door (#133). + +Detects crisis risk from behavioral patterns, not just message content: +- message frequency spikes versus a 7-day rolling baseline +- late-night messaging (2-5 AM) +- withdrawal / isolation via a sharp drop from the recent daily baseline +- session length trend versus recent sessions +- return after long absence +- rising crisis-score trend across recent messages + +Privacy-first: +- in-memory only +- no database +- no file I/O +- no network calls +""" + +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from typing import Any + + +HIGH_RISK_HOURS = {2, 3, 4} +ELEVATED_RISK_HOURS = {1, 5} +ROLLING_BASELINE_DAYS = 7 +RETURN_AFTER_ABSENCE_DAYS = 7 + + +@dataclass +class BehavioralEvent: + session_id: str + timestamp: datetime + message_length: int + crisis_score: float = 0.0 + role: str = "user" + + +@dataclass +class BehavioralSignal: + signal_type: str + risk_level: str + description: str + evidence: list[str] = field(default_factory=list) + score: float = 0.0 + + def as_dict(self) -> dict[str, Any]: + return { + "signal_type": self.signal_type, + "risk_level": self.risk_level, + "description": self.description, + "evidence": list(self.evidence), + "score": self.score, + } + + +class BehavioralTracker: + """In-memory tracker for behavioral crisis signals.""" + + def __init__(self) -> None: + self._events_by_session: dict[str, list[BehavioralEvent]] = defaultdict(list) + + def record( + self, + session_id: str, + timestamp: datetime, + message_length: int, + *, + crisis_score: float = 0.0, + role: str = "user", + ) -> None: + if timestamp.tzinfo is None: + timestamp = timestamp.replace(tzinfo=timezone.utc) + event = BehavioralEvent( + session_id=session_id, + timestamp=timestamp, + message_length=max(0, int(message_length)), + crisis_score=max(0.0, min(1.0, float(crisis_score))), + role=role, + ) + self._events_by_session[session_id].append(event) + self._events_by_session[session_id].sort(key=lambda item: item.timestamp) + + def get_risk_signals(self, session_id: str) -> dict[str, Any]: + events = [event for event in self._events_by_session.get(session_id, []) if event.role == "user"] + if not events: + return { + "frequency_change": 1.0, + "is_late_night": False, + "session_length_trend": "stable", + "withdrawal_detected": False, + "behavioral_score": 0.0, + "signals": [], + } + + signals: list[BehavioralSignal] = [] + + frequency_change = self._compute_frequency_change(events) + frequency_signal = self._analyze_frequency(events, frequency_change) + if frequency_signal: + signals.append(frequency_signal) + + time_signal = self._analyze_time(events) + if time_signal: + signals.append(time_signal) + + withdrawal_signal = self._analyze_withdrawal(session_id, events) + if withdrawal_signal: + signals.append(withdrawal_signal) + + absence_signal = self._analyze_return_after_absence(session_id, events) + if absence_signal: + signals.append(absence_signal) + + escalation_signal = self._analyze_escalation(events) + if escalation_signal: + signals.append(escalation_signal) + + session_length_trend = self._compute_session_length_trend(session_id, events) + behavioral_score = self._compute_behavioral_score(signals) + + risk_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2} + signals.sort(key=lambda item: (risk_order.get(item.risk_level, 9), -item.score)) + + return { + "frequency_change": frequency_change, + "is_late_night": any(item.signal_type == "time" for item in signals), + "session_length_trend": session_length_trend, + "withdrawal_detected": any(item.signal_type == "withdrawal" for item in signals), + "behavioral_score": behavioral_score, + "signals": [item.as_dict() for item in signals], + } + + def _all_user_events(self) -> list[BehavioralEvent]: + events: list[BehavioralEvent] = [] + for session_events in self._events_by_session.values(): + events.extend(event for event in session_events if event.role == "user") + events.sort(key=lambda item: item.timestamp) + return events + + def _daily_count_baseline(self, current_date) -> float: + events = self._all_user_events() + counts: dict[Any, int] = {} + for offset in range(1, ROLLING_BASELINE_DAYS + 1): + counts[current_date - timedelta(days=offset)] = 0 + for event in events: + event_date = event.timestamp.date() + if event_date in counts: + counts[event_date] += 1 + return sum(counts.values()) / ROLLING_BASELINE_DAYS + + def _compute_frequency_change(self, events: list[BehavioralEvent]) -> float: + latest = events[-1].timestamp + window_start = latest - timedelta(hours=1) + current_hour_count = sum(1 for event in events if event.timestamp >= window_start) + baseline_daily = self._daily_count_baseline(latest.date()) + baseline_hourly = max(baseline_daily / 24.0, 0.1) + return round(current_hour_count / baseline_hourly, 2) + + def _analyze_frequency(self, events: list[BehavioralEvent], frequency_change: float) -> BehavioralSignal | None: + latest = events[-1].timestamp + window_start = latest - timedelta(hours=1) + current_hour_count = sum(1 for event in events if event.timestamp >= window_start) + if current_hour_count >= 6 and frequency_change >= 3.0: + level = "HIGH" if frequency_change >= 6.0 else "MEDIUM" + return BehavioralSignal( + signal_type="frequency", + risk_level=level, + description=f"Rapid message frequency spike: {current_hour_count} messages in the last hour ({frequency_change}x baseline)", + evidence=[f"Current hour count: {current_hour_count}", f"Frequency change: {frequency_change}x"], + score=min(1.0, frequency_change / 8.0), + ) + return None + + def _analyze_time(self, events: list[BehavioralEvent]) -> BehavioralSignal | None: + latest = events[-1].timestamp + hour = latest.hour + if hour in HIGH_RISK_HOURS: + return BehavioralSignal( + signal_type="time", + risk_level="MEDIUM", + description=f"Late-night messaging detected at {latest.strftime('%H:%M')}", + evidence=[f"Latest message timestamp: {latest.isoformat()}"], + score=0.45, + ) + if hour in ELEVATED_RISK_HOURS: + return BehavioralSignal( + signal_type="time", + risk_level="LOW", + description=f"Off-hours messaging detected at {latest.strftime('%H:%M')}", + evidence=[f"Latest message timestamp: {latest.isoformat()}"], + score=0.2, + ) + return None + + def _analyze_withdrawal(self, session_id: str, events: list[BehavioralEvent]) -> BehavioralSignal | None: + current_date = events[-1].timestamp.date() + baseline_daily = self._daily_count_baseline(current_date) + if baseline_daily < 3.0: + return None + + current_day_count = sum(1 for event in events if event.timestamp.date() == current_date) + current_avg_len = sum(event.message_length for event in events if event.timestamp.date() == current_date) / max(current_day_count, 1) + + prior_events = [ + event + for sid, session_events in self._events_by_session.items() + if sid != session_id + for event in session_events + if event.role == "user" and event.timestamp.date() >= current_date - timedelta(days=ROLLING_BASELINE_DAYS) + ] + if not prior_events: + return None + prior_avg_len = sum(event.message_length for event in prior_events) / len(prior_events) + + if current_day_count <= max(1, baseline_daily * 0.3): + score = 0.55 if current_day_count == 1 else 0.4 + if current_avg_len < prior_avg_len * 0.5: + score += 0.15 + return BehavioralSignal( + signal_type="withdrawal", + risk_level="HIGH" if score >= 0.6 else "MEDIUM", + description="Sharp drop from recent communication baseline suggests withdrawal/isolation", + evidence=[ + f"Current day count: {current_day_count}", + f"7-day daily baseline: {baseline_daily:.2f}", + f"Average message length: {current_avg_len:.1f} vs {prior_avg_len:.1f}", + ], + score=min(1.0, score), + ) + return None + + def _analyze_return_after_absence(self, session_id: str, events: list[BehavioralEvent]) -> BehavioralSignal | None: + current_start = events[0].timestamp + prior_events = [ + event + for sid, session_events in self._events_by_session.items() + if sid != session_id + for event in session_events + if event.role == "user" and event.timestamp < current_start + ] + if not prior_events: + return None + latest_prior = max(prior_events, key=lambda item: item.timestamp) + gap = current_start - latest_prior.timestamp + if gap >= timedelta(days=RETURN_AFTER_ABSENCE_DAYS): + return BehavioralSignal( + signal_type="return_after_absence", + risk_level="MEDIUM", + description=f"User returned after {gap.days} days of silence", + evidence=[f"Last prior activity: {latest_prior.timestamp.isoformat()}"], + score=min(1.0, gap.days / 14.0), + ) + return None + + def _analyze_escalation(self, events: list[BehavioralEvent]) -> BehavioralSignal | None: + scored = [event for event in events if event.crisis_score > 0] + if len(scored) < 3: + return None + recent = scored[-5:] + midpoint = max(1, len(recent) // 2) + first_avg = sum(event.crisis_score for event in recent[:midpoint]) / len(recent[:midpoint]) + second_avg = sum(event.crisis_score for event in recent[midpoint:]) / len(recent[midpoint:]) + if second_avg >= max(0.4, first_avg * 1.3): + return BehavioralSignal( + signal_type="escalation", + risk_level="HIGH" if second_avg >= 0.65 else "MEDIUM", + description=f"Behavioral escalation: crisis score trend rose from {first_avg:.2f} to {second_avg:.2f}", + evidence=[f"Recent crisis scores: {[round(event.crisis_score, 2) for event in recent]}"], + score=min(1.0, second_avg), + ) + return None + + def _compute_session_length_trend(self, session_id: str, events: list[BehavioralEvent]) -> str: + current_duration = (events[-1].timestamp - events[0].timestamp).total_seconds() + previous_durations = [] + current_start = events[0].timestamp + for sid, session_events in self._events_by_session.items(): + if sid == session_id: + continue + user_events = [event for event in session_events if event.role == "user"] + if len(user_events) < 2: + continue + if user_events[-1].timestamp < current_start - timedelta(days=ROLLING_BASELINE_DAYS): + continue + previous_durations.append((user_events[-1].timestamp - user_events[0].timestamp).total_seconds()) + + if not previous_durations: + return "stable" + average_duration = sum(previous_durations) / len(previous_durations) + if current_duration > average_duration * 1.5: + return "increasing" + if current_duration < average_duration * 0.5: + return "decreasing" + return "stable" + + def _compute_behavioral_score(self, signals: list[BehavioralSignal]) -> float: + if not signals: + return 0.0 + max_score = max(signal.score for signal in signals) + multi_signal_boost = min(0.2, 0.05 * (len(signals) - 1)) + return round(min(1.0, max_score + multi_signal_boost), 2) diff --git a/crisis/session_tracker.py b/crisis/session_tracker.py index 57a6a14..376d04e 100644 --- a/crisis/session_tracker.py +++ b/crisis/session_tracker.py @@ -34,6 +34,7 @@ Usage: from dataclasses import dataclass, field from typing import List, Optional +from .behavioral import BehavioralTracker from .detect import CrisisDetectionResult, SCORES # Level ordering for comparison (higher = more severe) @@ -52,6 +53,12 @@ class SessionState: is_deescalating: bool = False escalation_rate: float = 0.0 # levels gained per message consecutive_low_messages: int = 0 # for de-escalation tracking + behavioral_score: float = 0.0 + behavioral_signals: List[dict] = field(default_factory=list) + frequency_change: float = 1.0 + is_late_night: bool = False + session_length_trend: str = "stable" + withdrawal_detected: bool = False class CrisisSessionTracker: @@ -77,6 +84,8 @@ class CrisisSessionTracker: self._message_count = 0 self._level_history: List[str] = [] self._consecutive_low = 0 + self._behavioral_tracker = BehavioralTracker() + self._behavioral_session_id = "current-session" @property def state(self) -> SessionState: @@ -84,6 +93,7 @@ class CrisisSessionTracker: is_escalating = self._detect_escalation() is_deescalating = self._detect_deescalation() rate = self._compute_escalation_rate() + behavioral = self._behavioral_tracker.get_risk_signals(self._behavioral_session_id) return SessionState( current_level=self._current_level, @@ -94,14 +104,29 @@ class CrisisSessionTracker: is_deescalating=is_deescalating, escalation_rate=rate, consecutive_low_messages=self._consecutive_low, + behavioral_score=behavioral["behavioral_score"], + behavioral_signals=behavioral["signals"], + frequency_change=behavioral["frequency_change"], + is_late_night=behavioral["is_late_night"], + session_length_trend=behavioral["session_length_trend"], + withdrawal_detected=behavioral["withdrawal_detected"], ) - def record(self, detection: CrisisDetectionResult) -> SessionState: + def record( + self, + detection: CrisisDetectionResult, + *, + timestamp=None, + message_length: int = 0, + role: str = "user", + ) -> SessionState: """ Record a crisis detection result for the current message. Returns updated SessionState. """ + from datetime import datetime, timezone + level = detection.level self._message_count += 1 self._level_history.append(level) @@ -116,6 +141,17 @@ class CrisisSessionTracker: else: self._consecutive_low = 0 + if role == "user": + if timestamp is None: + timestamp = datetime.now(timezone.utc) + self._behavioral_tracker.record( + self._behavioral_session_id, + timestamp, + message_length=message_length, + crisis_score=detection.score, + role=role, + ) + self._current_level = level return self.state @@ -195,14 +231,22 @@ class CrisisSessionTracker: "supportive engagement while remaining vigilant." ) + notes = [] + if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"): - return ( - f"User previously reached {s.peak_level} crisis level " - f"this session (currently {s.current_level}). " + notes.append( + f"User previously reached {s.peak_level} crisis level this session (currently {s.current_level}). " "Continue with care and awareness of the earlier crisis." ) - return "" + if s.behavioral_score >= 0.35 and s.behavioral_signals: + signal_names = ", ".join(item["signal_type"] for item in s.behavioral_signals) + notes.append( + f"Behavioral risk signals detected this session: {signal_names}. " + "Use the behavioral context to increase sensitivity and warmth." + ) + + return " ".join(notes) def get_ui_hints(self) -> dict: """ @@ -217,6 +261,10 @@ class CrisisSessionTracker: "session_deescalating": s.is_deescalating, "session_peak_level": s.peak_level, "session_message_count": s.message_count, + "behavioral_score": s.behavioral_score, + "is_late_night": s.is_late_night, + "withdrawal_detected": s.withdrawal_detected, + "session_length_trend": s.session_length_trend, } if s.is_escalating: @@ -226,12 +274,20 @@ class CrisisSessionTracker: "Consider increasing intervention level." ) + if s.behavioral_score >= 0.5: + hints["behavioral_warning"] = True + hints.setdefault( + "suggested_action", + "Behavioral risk patterns are active. Keep the response warm, grounded, and alert." + ) + return hints def check_crisis_with_session( text: str, tracker: CrisisSessionTracker, + timestamp=None, ) -> dict: """ Convenience: detect crisis and update session state in one call. @@ -243,7 +299,16 @@ def check_crisis_with_session( single_result = check_crisis(text) detection = detect_crisis(text) - session_state = tracker.record(detection) + session_state = tracker.record(detection, timestamp=timestamp, message_length=len(text)) + + behavioral = { + "frequency_change": session_state.frequency_change, + "is_late_night": session_state.is_late_night, + "session_length_trend": session_state.session_length_trend, + "withdrawal_detected": session_state.withdrawal_detected, + "behavioral_score": session_state.behavioral_score, + "signals": session_state.behavioral_signals, + } return { **single_result, @@ -255,5 +320,6 @@ def check_crisis_with_session( "is_deescalating": session_state.is_deescalating, "modifier": tracker.get_session_modifier(), "ui_hints": tracker.get_ui_hints(), + "behavioral": behavioral, }, } diff --git a/tests/test_behavioral_tracker.py b/tests/test_behavioral_tracker.py new file mode 100644 index 0000000..6f4ec04 --- /dev/null +++ b/tests/test_behavioral_tracker.py @@ -0,0 +1,101 @@ +""" +Tests for behavioral crisis pattern detection (#133). +""" + +import os +import sys +import unittest +from datetime import datetime, timedelta, timezone + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from crisis.session_tracker import CrisisSessionTracker, check_crisis_with_session +from crisis.behavioral import BehavioralTracker + + +class TestBehavioralTracker(unittest.TestCase): + def _seed_day(self, tracker, *, session_id, day, count, start_hour=10, message_length=48, crisis_score=0.0): + base = datetime(2026, 4, day, start_hour, 0, tzinfo=timezone.utc) + for i in range(count): + tracker.record( + session_id, + base + timedelta(minutes=i * 10), + message_length=message_length, + crisis_score=crisis_score, + ) + + def test_frequency_change_uses_seven_day_baseline(self): + tracker = BehavioralTracker() + for day in range(1, 8): + self._seed_day(tracker, session_id=f"baseline-{day}", day=day, count=2) + + burst_base = datetime(2026, 4, 8, 14, 0, tzinfo=timezone.utc) + for i in range(8): + tracker.record( + "current-session", + burst_base + timedelta(minutes=i), + message_length=72, + crisis_score=0.1, + ) + + summary = tracker.get_risk_signals("current-session") + + self.assertGreater(summary["frequency_change"], 2.0) + self.assertTrue(any(sig["signal_type"] == "frequency" for sig in summary["signals"])) + self.assertGreater(summary["behavioral_score"], 0.0) + + def test_late_night_messages_raise_flag(self): + tracker = BehavioralTracker() + base = datetime(2026, 4, 10, 2, 15, tzinfo=timezone.utc) + for i in range(3): + tracker.record( + "late-night", + base + timedelta(minutes=i * 7), + message_length=35, + crisis_score=0.0, + ) + + summary = tracker.get_risk_signals("late-night") + + self.assertTrue(summary["is_late_night"]) + self.assertTrue(any(sig["signal_type"] == "time" for sig in summary["signals"])) + + def test_withdrawal_detected_after_large_drop_from_baseline(self): + tracker = BehavioralTracker() + for day in range(1, 8): + self._seed_day(tracker, session_id=f"baseline-{day}", day=day, count=10, message_length=80) + + tracker.record( + "withdrawal-session", + datetime(2026, 4, 9, 11, 0, tzinfo=timezone.utc), + message_length=18, + crisis_score=0.0, + ) + + summary = tracker.get_risk_signals("withdrawal-session") + + self.assertTrue(summary["withdrawal_detected"]) + self.assertTrue(any(sig["signal_type"] == "withdrawal" for sig in summary["signals"])) + + +class TestBehavioralSessionIntegration(unittest.TestCase): + def test_check_crisis_with_session_includes_behavioral_summary(self): + tracker = CrisisSessionTracker() + base = datetime(2026, 4, 20, 2, 0, tzinfo=timezone.utc) + + check_crisis_with_session("can't sleep", tracker, timestamp=base) + check_crisis_with_session("still here", tracker, timestamp=base + timedelta(minutes=1)) + result = check_crisis_with_session("everything feels loud", tracker, timestamp=base + timedelta(minutes=2)) + + behavioral = result["session"]["behavioral"] + self.assertIn("frequency_change", behavioral) + self.assertIn("is_late_night", behavioral) + self.assertIn("session_length_trend", behavioral) + self.assertIn("withdrawal_detected", behavioral) + self.assertIn("behavioral_score", behavioral) + self.assertTrue(behavioral["is_late_night"]) + self.assertGreater(behavioral["behavioral_score"], 0.0) + + +if __name__ == '__main__': + unittest.main()