diff --git a/behavioral_tracker.py b/behavioral_tracker.py new file mode 100644 index 0000000..cc7d4fc --- /dev/null +++ b/behavioral_tracker.py @@ -0,0 +1,336 @@ +"""Behavioral Pattern Detection — crisis risk from usage patterns. + +Detects crisis signals from HOW someone uses the system, not just +what they say. Complements content-based crisis detection. + +Behavioral signals: +- Frequency spike (anxiety/agitation) +- Frequency drop (withdrawal/isolation) +- Late-night messaging (2-5 AM) +- Session length increase (loneliness) +- Abrupt termination after emotional content +- Return after long absence + +Part of Epic #102 (Multimodal Crisis Detection). +""" + +import logging +import time +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + + +@dataclass +class SessionEvent: + """A single session interaction.""" + session_id: str + timestamp: float + message_length: int + is_user: bool = True + emotional_content: bool = False + terminated_abruptly: bool = False + + +@dataclass +class BehavioralSignals: + """Detected behavioral risk signals.""" + frequency_change: float = 0.0 # -1 (drop) to +1 (spike) vs baseline + is_late_night: bool = False # 2-5 AM local time + session_length_trend: str = "stable" # increasing/decreasing/stable + withdrawal_detected: bool = False # significant drop in activity + return_after_absence: bool = False # came back after extended absence + abrupt_termination: bool = False # left after emotional content + behavioral_score: float = 0.0 # 0-1 aggregate risk + + def to_dict(self) -> Dict[str, Any]: + return { + "frequency_change": self.frequency_change, + "is_late_night": self.is_late_night, + "session_length_trend": self.session_length_trend, + "withdrawal_detected": self.withdrawal_detected, + "return_after_absence": self.return_after_absence, + "abrupt_termination": self.abrupt_termination, + "behavioral_score": self.behavioral_score, + } + + +class BehavioralTracker: + """Track behavioral patterns per session/user and detect risk signals. + + Uses a 7-day rolling window for baseline calculation. + Thread-safe for concurrent session tracking. + """ + + # Time window constants + LATE_NIGHT_START = 2 # 2 AM + LATE_NIGHT_END = 5 # 5 AM + BASELINE_WINDOW_DAYS = 7 + ABSENCE_THRESHOLD_HOURS = 48 + SPIKE_THRESHOLD = 2.0 # 2x baseline = spike + DROP_THRESHOLD = 0.3 # 30% of baseline = withdrawal + + def __init__(self): + # session_id -> list of events + self._events: Dict[str, List[SessionEvent]] = defaultdict(list) + # session_id -> baseline metrics + self._baselines: Dict[str, Dict[str, float]] = {} + # session_id -> last activity timestamp + self._last_activity: Dict[str, float] = {} + # Global baseline (all sessions) + self._global_baseline: Dict[str, float] = { + "avg_messages_per_hour": 5.0, + "avg_session_length_min": 15.0, + "avg_message_length": 200.0, + } + + def record( + self, + session_id: str, + timestamp: Optional[float] = None, + message_length: int = 0, + is_user: bool = True, + emotional_content: bool = False, + terminated_abruptly: bool = False, + ) -> None: + """Record a session event.""" + if timestamp is None: + timestamp = time.time() + + event = SessionEvent( + session_id=session_id, + timestamp=timestamp, + message_length=message_length, + is_user=is_user, + emotional_content=emotional_content, + terminated_abruptly=terminated_abruptly, + ) + + self._events[session_id].append(event) + self._last_activity[session_id] = timestamp + + # Periodically update baseline + if len(self._events[session_id]) % 20 == 0: + self._update_baseline(session_id) + + def get_risk_signals(self, session_id: str) -> BehavioralSignals: + """Analyze behavioral patterns and return risk signals.""" + events = self._events.get(session_id, []) + if not events: + return BehavioralSignals() + + signals = BehavioralSignals() + now = time.time() + + # 1. Frequency analysis + signals.frequency_change = self._analyze_frequency(session_id, now) + + # 2. Late-night detection + signals.is_late_night = self._is_late_night(events[-1].timestamp) + + # 3. Session length trend + signals.session_length_trend = self._analyze_session_length_trend(session_id) + + # 4. Withdrawal detection + signals.withdrawal_detected = signals.frequency_change < -0.5 + + # 5. Return after absence + signals.return_after_absence = self._detect_return_after_absence(session_id, now) + + # 6. Abrupt termination + signals.abrupt_termination = self._detect_abrupt_termination(events) + + # 7. Aggregate behavioral score + signals.behavioral_score = self._compute_behavioral_score(signals) + + return signals + + def _analyze_frequency(self, session_id: str, now: float) -> float: + """Compare recent frequency to baseline. Returns -1 to +1.""" + events = self._events.get(session_id, []) + if len(events) < 3: + return 0.0 + + # Count messages in last hour + one_hour_ago = now - 3600 + recent_count = sum(1 for e in events if e.timestamp > one_hour_ago and e.is_user) + + # Get baseline + baseline = self._get_baseline(session_id) + baseline_rate = baseline.get("avg_messages_per_hour", 5.0) + + if baseline_rate <= 0: + return 0.0 + + ratio = recent_count / baseline_rate + # Map to -1..+1: 0.5x = -0.5, 1x = 0, 2x = +1 + if ratio < 1.0: + return max(-1.0, (ratio - 1.0)) + else: + return min(1.0, (ratio - 1.0) / 2.0) + + def _is_late_night(self, timestamp: float) -> bool: + """Check if timestamp falls in the 2-5 AM window.""" + dt = datetime.fromtimestamp(timestamp) + hour = dt.hour + return self.LATE_NIGHT_START <= hour < self.LATE_NIGHT_END + + def _analyze_session_length_trend(self, session_id: str) -> str: + """Determine if session lengths are increasing, decreasing, or stable.""" + events = self._events.get(session_id, []) + if len(events) < 10: + return "stable" + + # Split events into first half and second half + mid = len(events) // 2 + first_half = events[:mid] + second_half = events[mid:] + + # Calculate session spans (first to last message) + if not first_half or not second_half: + return "stable" + + first_span = first_half[-1].timestamp - first_half[0].timestamp + second_span = second_half[-1].timestamp - second_half[0].timestamp + + if second_span > first_span * 1.3: + return "increasing" + elif second_span < first_span * 0.7: + return "decreasing" + return "stable" + + def _detect_return_after_absence(self, session_id: str, now: float) -> bool: + """Detect if this session started after a long absence.""" + events = self._events.get(session_id, []) + if len(events) < 2: + return False + + # Check gap between current session and previous + last_activity = self._last_activity.get(session_id, 0) + if last_activity <= 0: + return False + + # Find previous session's last event (excluding current session) + prev_events = [e for e in events[:-10] if e.timestamp < now - 3600] + if not prev_events: + return False + + gap_hours = (now - prev_events[-1].timestamp) / 3600 + return gap_hours >= self.ABSENCE_THRESHOLD_HOURS + + def _detect_abrupt_termination(self, events: List[SessionEvent]) -> bool: + """Detect if the last few messages had emotional content and then stopped.""" + if len(events) < 3: + return False + + # Check last 5 events + recent = events[-5:] + has_emotional = any(e.emotional_content for e in recent) + last_was_user = recent[-1].is_user if recent else False + last_was_short = recent[-1].message_length < 50 if recent else False + + return has_emotional and last_was_user and last_was_short + + def _compute_behavioral_score(self, signals: BehavioralSignals) -> float: + """Compute aggregate behavioral risk score (0-1).""" + score = 0.0 + weights = { + "frequency_spike": 0.15, + "frequency_drop": 0.20, + "late_night": 0.10, + "session_increasing": 0.10, + "withdrawal": 0.20, + "return_after_absence": 0.05, + "abrupt_termination": 0.20, + } + + # Frequency spike (anxiety) + if signals.frequency_change > 0.5: + score += weights["frequency_spike"] * min(signals.frequency_change, 1.0) + + # Frequency drop (withdrawal) + if signals.frequency_change < -0.3: + score += weights["frequency_drop"] * min(abs(signals.frequency_change), 1.0) + + # Late night + if signals.is_late_night: + score += weights["late_night"] + + # Session length increasing + if signals.session_length_trend == "increasing": + score += weights["session_increasing"] + + # Withdrawal + if signals.withdrawal_detected: + score += weights["withdrawal"] + + # Return after absence + if signals.return_after_absence: + score += weights["return_after_absence"] + + # Abrupt termination + if signals.abrupt_termination: + score += weights["abrupt_termination"] + + return min(1.0, score) + + def _get_baseline(self, session_id: str) -> Dict[str, float]: + """Get baseline metrics for a session.""" + if session_id in self._baselines: + return self._baselines[session_id] + return self._global_baseline + + def _update_baseline(self, session_id: str) -> None: + """Update rolling baseline from recent events.""" + events = self._events.get(session_id, []) + if len(events) < 5: + return + + # Use last 7 days of events + cutoff = time.time() - (self.BASELINE_WINDOW_DAYS * 86400) + recent = [e for e in events if e.timestamp > cutoff and e.is_user] + + if not recent: + return + + # Calculate metrics + time_span_hours = (recent[-1].timestamp - recent[0].timestamp) / 3600 + if time_span_hours > 0: + msgs_per_hour = len(recent) / time_span_hours + else: + msgs_per_hour = len(recent) + + avg_length = sum(e.message_length for e in recent) / len(recent) + + self._baselines[session_id] = { + "avg_messages_per_hour": msgs_per_hour, + "avg_message_length": avg_length, + "total_messages": len(recent), + } + + +# Global singleton for convenience +_global_tracker = BehavioralTracker() + + +def record_event( + session_id: str, + timestamp: Optional[float] = None, + message_length: int = 0, + is_user: bool = True, + emotional_content: bool = False, + terminated_abruptly: bool = False, +) -> None: + """Record an event to the global behavioral tracker.""" + _global_tracker.record( + session_id, timestamp, message_length, + is_user, emotional_content, terminated_abruptly, + ) + + +def get_risk_signals(session_id: str) -> BehavioralSignals: + """Get behavioral risk signals for a session.""" + return _global_tracker.get_risk_signals(session_id) diff --git a/tests/test_behavioral_tracker.py b/tests/test_behavioral_tracker.py new file mode 100644 index 0000000..d186d62 --- /dev/null +++ b/tests/test_behavioral_tracker.py @@ -0,0 +1,136 @@ +"""Tests for behavioral pattern detection.""" + +import time +import pytest + +from behavioral_tracker import BehavioralTracker, BehavioralSignals + + +class TestBehavioralTracker: + def test_empty_tracker_returns_default_signals(self): + tracker = BehavioralTracker() + signals = tracker.get_risk_signals("session-1") + assert signals.behavioral_score == 0.0 + assert not signals.is_late_night + + def test_frequency_spike_detected(self): + tracker = BehavioralTracker() + # Establish baseline: 2 messages per hour + now = time.time() + for i in range(10): + tracker.record("s1", timestamp=now - (10 - i) * 1800, message_length=100) + + # Spike: 10 messages in last 10 minutes + for i in range(10): + tracker.record("s1", timestamp=now - (10 - i) * 60, message_length=50) + + signals = tracker.get_risk_signals("s1") + assert signals.frequency_change > 0.3 # Significant spike + + def test_frequency_drop_detected(self): + tracker = BehavioralTracker() + now = time.time() + # Baseline: heavy usage + for i in range(50): + tracker.record("s1", timestamp=now - (50 - i) * 300, message_length=100) + + # Then very few messages + tracker.record("s1", timestamp=now - 60, message_length=50) + + signals = tracker.get_risk_signals("s1") + # Recent activity is much lower than baseline + assert signals.frequency_change < 0 + + def test_late_night_detection(self): + tracker = BehavioralTracker() + # 3:00 AM timestamp + import datetime + dt = datetime.datetime(2026, 4, 14, 3, 0, 0) + ts = dt.timestamp() + + tracker.record("s1", timestamp=ts, message_length=100) + signals = tracker.get_risk_signals("s1") + assert signals.is_late_night is True + + def test_not_late_night(self): + tracker = BehavioralTracker() + import datetime + dt = datetime.datetime(2026, 4, 14, 14, 0, 0) # 2 PM + ts = dt.timestamp() + + tracker.record("s1", timestamp=ts, message_length=100) + signals = tracker.get_risk_signals("s1") + assert signals.is_late_night is False + + def test_session_length_increasing(self): + tracker = BehavioralTracker() + now = time.time() + # First half: messages spread over 5 minutes + for i in range(10): + tracker.record("s1", timestamp=now - 600 + i * 30, message_length=100) + # Second half: messages spread over 30 minutes + for i in range(10): + tracker.record("s1", timestamp=now - 300 + i * 180, message_length=100) + + signals = tracker.get_risk_signals("s1") + assert signals.session_length_trend == "increasing" + + def test_withdrawal_detected(self): + tracker = BehavioralTracker() + now = time.time() + # High baseline + for i in range(50): + tracker.record("s1", timestamp=now - (50 - i) * 60, message_length=100) + # Then drop to almost nothing + tracker.record("s1", timestamp=now - 60, message_length=20) + + signals = tracker.get_risk_signals("s1") + assert signals.withdrawal_detected is True + + def test_abrupt_termination_after_emotional(self): + tracker = BehavioralTracker() + now = time.time() + # Normal messages + for i in range(5): + tracker.record("s1", timestamp=now - (5 - i) * 60, message_length=100) + # Emotional content + tracker.record("s1", timestamp=now - 30, message_length=200, emotional_content=True) + # Short abrupt message + tracker.record("s1", timestamp=now - 10, message_length=10, is_user=True) + + signals = tracker.get_risk_signals("s1") + assert signals.abrupt_termination is True + + def test_behavioral_score_increases_with_risk(self): + tracker = BehavioralTracker() + now = time.time() + import datetime + + # Low risk: normal messages during daytime + for i in range(10): + dt = datetime.datetime(2026, 4, 14, 14, i, 0) # 2 PM + tracker.record("s1", timestamp=dt.timestamp(), message_length=100) + low_risk = tracker.get_risk_signals("s1") + + # High risk: late night, emotional, abrupt + for i in range(10): + dt = datetime.datetime(2026, 4, 14, 3, i, 0) # 3 AM + tracker.record("s2", timestamp=dt.timestamp(), message_length=100, emotional_content=True) + tracker.record("s2", timestamp=datetime.datetime(2026, 4, 14, 3, 10, 0).timestamp(), + message_length=10, is_user=True) + high_risk = tracker.get_risk_signals("s2") + + assert high_risk.behavioral_score > low_risk.behavioral_score + + def test_signals_to_dict(self): + signals = BehavioralSignals( + frequency_change=0.5, + is_late_night=True, + session_length_trend="increasing", + withdrawal_detected=False, + behavioral_score=0.4, + ) + d = signals.to_dict() + assert d["frequency_change"] == 0.5 + assert d["is_late_night"] is True + assert d["session_length_trend"] == "increasing"