Files
the-door/behavioral_tracker.py

337 lines
11 KiB
Python

"""Behavioral Pattern Detection — crisis risk from usage patterns.
Detects crisis signals from HOW someone uses the system, not just
what they say. Complements content-based crisis detection.
Behavioral signals:
- Frequency spike (anxiety/agitation)
- Frequency drop (withdrawal/isolation)
- Late-night messaging (2-5 AM)
- Session length increase (loneliness)
- Abrupt termination after emotional content
- Return after long absence
Part of Epic #102 (Multimodal Crisis Detection).
"""
import logging
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class SessionEvent:
"""A single session interaction."""
session_id: str
timestamp: float
message_length: int
is_user: bool = True
emotional_content: bool = False
terminated_abruptly: bool = False
@dataclass
class BehavioralSignals:
"""Detected behavioral risk signals."""
frequency_change: float = 0.0 # -1 (drop) to +1 (spike) vs baseline
is_late_night: bool = False # 2-5 AM local time
session_length_trend: str = "stable" # increasing/decreasing/stable
withdrawal_detected: bool = False # significant drop in activity
return_after_absence: bool = False # came back after extended absence
abrupt_termination: bool = False # left after emotional content
behavioral_score: float = 0.0 # 0-1 aggregate risk
def to_dict(self) -> Dict[str, Any]:
return {
"frequency_change": self.frequency_change,
"is_late_night": self.is_late_night,
"session_length_trend": self.session_length_trend,
"withdrawal_detected": self.withdrawal_detected,
"return_after_absence": self.return_after_absence,
"abrupt_termination": self.abrupt_termination,
"behavioral_score": self.behavioral_score,
}
class BehavioralTracker:
"""Track behavioral patterns per session/user and detect risk signals.
Uses a 7-day rolling window for baseline calculation.
Thread-safe for concurrent session tracking.
"""
# Time window constants
LATE_NIGHT_START = 2 # 2 AM
LATE_NIGHT_END = 5 # 5 AM
BASELINE_WINDOW_DAYS = 7
ABSENCE_THRESHOLD_HOURS = 48
SPIKE_THRESHOLD = 2.0 # 2x baseline = spike
DROP_THRESHOLD = 0.3 # 30% of baseline = withdrawal
def __init__(self):
# session_id -> list of events
self._events: Dict[str, List[SessionEvent]] = defaultdict(list)
# session_id -> baseline metrics
self._baselines: Dict[str, Dict[str, float]] = {}
# session_id -> last activity timestamp
self._last_activity: Dict[str, float] = {}
# Global baseline (all sessions)
self._global_baseline: Dict[str, float] = {
"avg_messages_per_hour": 5.0,
"avg_session_length_min": 15.0,
"avg_message_length": 200.0,
}
def record(
self,
session_id: str,
timestamp: Optional[float] = None,
message_length: int = 0,
is_user: bool = True,
emotional_content: bool = False,
terminated_abruptly: bool = False,
) -> None:
"""Record a session event."""
if timestamp is None:
timestamp = time.time()
event = SessionEvent(
session_id=session_id,
timestamp=timestamp,
message_length=message_length,
is_user=is_user,
emotional_content=emotional_content,
terminated_abruptly=terminated_abruptly,
)
self._events[session_id].append(event)
self._last_activity[session_id] = timestamp
# Periodically update baseline
if len(self._events[session_id]) % 20 == 0:
self._update_baseline(session_id)
def get_risk_signals(self, session_id: str) -> BehavioralSignals:
"""Analyze behavioral patterns and return risk signals."""
events = self._events.get(session_id, [])
if not events:
return BehavioralSignals()
signals = BehavioralSignals()
now = time.time()
# 1. Frequency analysis
signals.frequency_change = self._analyze_frequency(session_id, now)
# 2. Late-night detection
signals.is_late_night = self._is_late_night(events[-1].timestamp)
# 3. Session length trend
signals.session_length_trend = self._analyze_session_length_trend(session_id)
# 4. Withdrawal detection
signals.withdrawal_detected = signals.frequency_change < -0.5
# 5. Return after absence
signals.return_after_absence = self._detect_return_after_absence(session_id, now)
# 6. Abrupt termination
signals.abrupt_termination = self._detect_abrupt_termination(events)
# 7. Aggregate behavioral score
signals.behavioral_score = self._compute_behavioral_score(signals)
return signals
def _analyze_frequency(self, session_id: str, now: float) -> float:
"""Compare recent frequency to baseline. Returns -1 to +1."""
events = self._events.get(session_id, [])
if len(events) < 3:
return 0.0
# Count messages in last hour
one_hour_ago = now - 3600
recent_count = sum(1 for e in events if e.timestamp > one_hour_ago and e.is_user)
# Get baseline
baseline = self._get_baseline(session_id)
baseline_rate = baseline.get("avg_messages_per_hour", 5.0)
if baseline_rate <= 0:
return 0.0
ratio = recent_count / baseline_rate
# Map to -1..+1: 0.5x = -0.5, 1x = 0, 2x = +1
if ratio < 1.0:
return max(-1.0, (ratio - 1.0))
else:
return min(1.0, (ratio - 1.0) / 2.0)
def _is_late_night(self, timestamp: float) -> bool:
"""Check if timestamp falls in the 2-5 AM window."""
dt = datetime.fromtimestamp(timestamp)
hour = dt.hour
return self.LATE_NIGHT_START <= hour < self.LATE_NIGHT_END
def _analyze_session_length_trend(self, session_id: str) -> str:
"""Determine if session lengths are increasing, decreasing, or stable."""
events = self._events.get(session_id, [])
if len(events) < 10:
return "stable"
# Split events into first half and second half
mid = len(events) // 2
first_half = events[:mid]
second_half = events[mid:]
# Calculate session spans (first to last message)
if not first_half or not second_half:
return "stable"
first_span = first_half[-1].timestamp - first_half[0].timestamp
second_span = second_half[-1].timestamp - second_half[0].timestamp
if second_span > first_span * 1.3:
return "increasing"
elif second_span < first_span * 0.7:
return "decreasing"
return "stable"
def _detect_return_after_absence(self, session_id: str, now: float) -> bool:
"""Detect if this session started after a long absence."""
events = self._events.get(session_id, [])
if len(events) < 2:
return False
# Check gap between current session and previous
last_activity = self._last_activity.get(session_id, 0)
if last_activity <= 0:
return False
# Find previous session's last event (excluding current session)
prev_events = [e for e in events[:-10] if e.timestamp < now - 3600]
if not prev_events:
return False
gap_hours = (now - prev_events[-1].timestamp) / 3600
return gap_hours >= self.ABSENCE_THRESHOLD_HOURS
def _detect_abrupt_termination(self, events: List[SessionEvent]) -> bool:
"""Detect if the last few messages had emotional content and then stopped."""
if len(events) < 3:
return False
# Check last 5 events
recent = events[-5:]
has_emotional = any(e.emotional_content for e in recent)
last_was_user = recent[-1].is_user if recent else False
last_was_short = recent[-1].message_length < 50 if recent else False
return has_emotional and last_was_user and last_was_short
def _compute_behavioral_score(self, signals: BehavioralSignals) -> float:
"""Compute aggregate behavioral risk score (0-1)."""
score = 0.0
weights = {
"frequency_spike": 0.15,
"frequency_drop": 0.20,
"late_night": 0.10,
"session_increasing": 0.10,
"withdrawal": 0.20,
"return_after_absence": 0.05,
"abrupt_termination": 0.20,
}
# Frequency spike (anxiety)
if signals.frequency_change > 0.5:
score += weights["frequency_spike"] * min(signals.frequency_change, 1.0)
# Frequency drop (withdrawal)
if signals.frequency_change < -0.3:
score += weights["frequency_drop"] * min(abs(signals.frequency_change), 1.0)
# Late night
if signals.is_late_night:
score += weights["late_night"]
# Session length increasing
if signals.session_length_trend == "increasing":
score += weights["session_increasing"]
# Withdrawal
if signals.withdrawal_detected:
score += weights["withdrawal"]
# Return after absence
if signals.return_after_absence:
score += weights["return_after_absence"]
# Abrupt termination
if signals.abrupt_termination:
score += weights["abrupt_termination"]
return min(1.0, score)
def _get_baseline(self, session_id: str) -> Dict[str, float]:
"""Get baseline metrics for a session."""
if session_id in self._baselines:
return self._baselines[session_id]
return self._global_baseline
def _update_baseline(self, session_id: str) -> None:
"""Update rolling baseline from recent events."""
events = self._events.get(session_id, [])
if len(events) < 5:
return
# Use last 7 days of events
cutoff = time.time() - (self.BASELINE_WINDOW_DAYS * 86400)
recent = [e for e in events if e.timestamp > cutoff and e.is_user]
if not recent:
return
# Calculate metrics
time_span_hours = (recent[-1].timestamp - recent[0].timestamp) / 3600
if time_span_hours > 0:
msgs_per_hour = len(recent) / time_span_hours
else:
msgs_per_hour = len(recent)
avg_length = sum(e.message_length for e in recent) / len(recent)
self._baselines[session_id] = {
"avg_messages_per_hour": msgs_per_hour,
"avg_message_length": avg_length,
"total_messages": len(recent),
}
# Global singleton for convenience
_global_tracker = BehavioralTracker()
def record_event(
session_id: str,
timestamp: Optional[float] = None,
message_length: int = 0,
is_user: bool = True,
emotional_content: bool = False,
terminated_abruptly: bool = False,
) -> None:
"""Record an event to the global behavioral tracker."""
_global_tracker.record(
session_id, timestamp, message_length,
is_user, emotional_content, terminated_abruptly,
)
def get_risk_signals(session_id: str) -> BehavioralSignals:
"""Get behavioral risk signals for a session."""
return _global_tracker.get_risk_signals(session_id)