Compare commits
2 Commits
burn/123-1
...
feat/133-b
| Author | SHA1 | Date | |
|---|---|---|---|
| ba02b63a13 | |||
| 161a9eee5d |
336
behavioral_tracker.py
Normal file
336
behavioral_tracker.py
Normal file
@@ -0,0 +1,336 @@
|
||||
"""Behavioral Pattern Detection — crisis risk from usage patterns.
|
||||
|
||||
Detects crisis signals from HOW someone uses the system, not just
|
||||
what they say. Complements content-based crisis detection.
|
||||
|
||||
Behavioral signals:
|
||||
- Frequency spike (anxiety/agitation)
|
||||
- Frequency drop (withdrawal/isolation)
|
||||
- Late-night messaging (2-5 AM)
|
||||
- Session length increase (loneliness)
|
||||
- Abrupt termination after emotional content
|
||||
- Return after long absence
|
||||
|
||||
Part of Epic #102 (Multimodal Crisis Detection).
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionEvent:
|
||||
"""A single session interaction."""
|
||||
session_id: str
|
||||
timestamp: float
|
||||
message_length: int
|
||||
is_user: bool = True
|
||||
emotional_content: bool = False
|
||||
terminated_abruptly: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class BehavioralSignals:
|
||||
"""Detected behavioral risk signals."""
|
||||
frequency_change: float = 0.0 # -1 (drop) to +1 (spike) vs baseline
|
||||
is_late_night: bool = False # 2-5 AM local time
|
||||
session_length_trend: str = "stable" # increasing/decreasing/stable
|
||||
withdrawal_detected: bool = False # significant drop in activity
|
||||
return_after_absence: bool = False # came back after extended absence
|
||||
abrupt_termination: bool = False # left after emotional content
|
||||
behavioral_score: float = 0.0 # 0-1 aggregate risk
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"frequency_change": self.frequency_change,
|
||||
"is_late_night": self.is_late_night,
|
||||
"session_length_trend": self.session_length_trend,
|
||||
"withdrawal_detected": self.withdrawal_detected,
|
||||
"return_after_absence": self.return_after_absence,
|
||||
"abrupt_termination": self.abrupt_termination,
|
||||
"behavioral_score": self.behavioral_score,
|
||||
}
|
||||
|
||||
|
||||
class BehavioralTracker:
|
||||
"""Track behavioral patterns per session/user and detect risk signals.
|
||||
|
||||
Uses a 7-day rolling window for baseline calculation.
|
||||
Thread-safe for concurrent session tracking.
|
||||
"""
|
||||
|
||||
# Time window constants
|
||||
LATE_NIGHT_START = 2 # 2 AM
|
||||
LATE_NIGHT_END = 5 # 5 AM
|
||||
BASELINE_WINDOW_DAYS = 7
|
||||
ABSENCE_THRESHOLD_HOURS = 48
|
||||
SPIKE_THRESHOLD = 2.0 # 2x baseline = spike
|
||||
DROP_THRESHOLD = 0.3 # 30% of baseline = withdrawal
|
||||
|
||||
def __init__(self):
|
||||
# session_id -> list of events
|
||||
self._events: Dict[str, List[SessionEvent]] = defaultdict(list)
|
||||
# session_id -> baseline metrics
|
||||
self._baselines: Dict[str, Dict[str, float]] = {}
|
||||
# session_id -> last activity timestamp
|
||||
self._last_activity: Dict[str, float] = {}
|
||||
# Global baseline (all sessions)
|
||||
self._global_baseline: Dict[str, float] = {
|
||||
"avg_messages_per_hour": 5.0,
|
||||
"avg_session_length_min": 15.0,
|
||||
"avg_message_length": 200.0,
|
||||
}
|
||||
|
||||
def record(
|
||||
self,
|
||||
session_id: str,
|
||||
timestamp: Optional[float] = None,
|
||||
message_length: int = 0,
|
||||
is_user: bool = True,
|
||||
emotional_content: bool = False,
|
||||
terminated_abruptly: bool = False,
|
||||
) -> None:
|
||||
"""Record a session event."""
|
||||
if timestamp is None:
|
||||
timestamp = time.time()
|
||||
|
||||
event = SessionEvent(
|
||||
session_id=session_id,
|
||||
timestamp=timestamp,
|
||||
message_length=message_length,
|
||||
is_user=is_user,
|
||||
emotional_content=emotional_content,
|
||||
terminated_abruptly=terminated_abruptly,
|
||||
)
|
||||
|
||||
self._events[session_id].append(event)
|
||||
self._last_activity[session_id] = timestamp
|
||||
|
||||
# Periodically update baseline
|
||||
if len(self._events[session_id]) % 20 == 0:
|
||||
self._update_baseline(session_id)
|
||||
|
||||
def get_risk_signals(self, session_id: str) -> BehavioralSignals:
|
||||
"""Analyze behavioral patterns and return risk signals."""
|
||||
events = self._events.get(session_id, [])
|
||||
if not events:
|
||||
return BehavioralSignals()
|
||||
|
||||
signals = BehavioralSignals()
|
||||
now = time.time()
|
||||
|
||||
# 1. Frequency analysis
|
||||
signals.frequency_change = self._analyze_frequency(session_id, now)
|
||||
|
||||
# 2. Late-night detection
|
||||
signals.is_late_night = self._is_late_night(events[-1].timestamp)
|
||||
|
||||
# 3. Session length trend
|
||||
signals.session_length_trend = self._analyze_session_length_trend(session_id)
|
||||
|
||||
# 4. Withdrawal detection
|
||||
signals.withdrawal_detected = signals.frequency_change < -0.5
|
||||
|
||||
# 5. Return after absence
|
||||
signals.return_after_absence = self._detect_return_after_absence(session_id, now)
|
||||
|
||||
# 6. Abrupt termination
|
||||
signals.abrupt_termination = self._detect_abrupt_termination(events)
|
||||
|
||||
# 7. Aggregate behavioral score
|
||||
signals.behavioral_score = self._compute_behavioral_score(signals)
|
||||
|
||||
return signals
|
||||
|
||||
def _analyze_frequency(self, session_id: str, now: float) -> float:
|
||||
"""Compare recent frequency to baseline. Returns -1 to +1."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 3:
|
||||
return 0.0
|
||||
|
||||
# Count messages in last hour
|
||||
one_hour_ago = now - 3600
|
||||
recent_count = sum(1 for e in events if e.timestamp > one_hour_ago and e.is_user)
|
||||
|
||||
# Get baseline
|
||||
baseline = self._get_baseline(session_id)
|
||||
baseline_rate = baseline.get("avg_messages_per_hour", 5.0)
|
||||
|
||||
if baseline_rate <= 0:
|
||||
return 0.0
|
||||
|
||||
ratio = recent_count / baseline_rate
|
||||
# Map to -1..+1: 0.5x = -0.5, 1x = 0, 2x = +1
|
||||
if ratio < 1.0:
|
||||
return max(-1.0, (ratio - 1.0))
|
||||
else:
|
||||
return min(1.0, (ratio - 1.0) / 2.0)
|
||||
|
||||
def _is_late_night(self, timestamp: float) -> bool:
|
||||
"""Check if timestamp falls in the 2-5 AM window."""
|
||||
dt = datetime.fromtimestamp(timestamp)
|
||||
hour = dt.hour
|
||||
return self.LATE_NIGHT_START <= hour < self.LATE_NIGHT_END
|
||||
|
||||
def _analyze_session_length_trend(self, session_id: str) -> str:
|
||||
"""Determine if session lengths are increasing, decreasing, or stable."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 10:
|
||||
return "stable"
|
||||
|
||||
# Split events into first half and second half
|
||||
mid = len(events) // 2
|
||||
first_half = events[:mid]
|
||||
second_half = events[mid:]
|
||||
|
||||
# Calculate session spans (first to last message)
|
||||
if not first_half or not second_half:
|
||||
return "stable"
|
||||
|
||||
first_span = first_half[-1].timestamp - first_half[0].timestamp
|
||||
second_span = second_half[-1].timestamp - second_half[0].timestamp
|
||||
|
||||
if second_span > first_span * 1.3:
|
||||
return "increasing"
|
||||
elif second_span < first_span * 0.7:
|
||||
return "decreasing"
|
||||
return "stable"
|
||||
|
||||
def _detect_return_after_absence(self, session_id: str, now: float) -> bool:
|
||||
"""Detect if this session started after a long absence."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 2:
|
||||
return False
|
||||
|
||||
# Check gap between current session and previous
|
||||
last_activity = self._last_activity.get(session_id, 0)
|
||||
if last_activity <= 0:
|
||||
return False
|
||||
|
||||
# Find previous session's last event (excluding current session)
|
||||
prev_events = [e for e in events[:-10] if e.timestamp < now - 3600]
|
||||
if not prev_events:
|
||||
return False
|
||||
|
||||
gap_hours = (now - prev_events[-1].timestamp) / 3600
|
||||
return gap_hours >= self.ABSENCE_THRESHOLD_HOURS
|
||||
|
||||
def _detect_abrupt_termination(self, events: List[SessionEvent]) -> bool:
|
||||
"""Detect if the last few messages had emotional content and then stopped."""
|
||||
if len(events) < 3:
|
||||
return False
|
||||
|
||||
# Check last 5 events
|
||||
recent = events[-5:]
|
||||
has_emotional = any(e.emotional_content for e in recent)
|
||||
last_was_user = recent[-1].is_user if recent else False
|
||||
last_was_short = recent[-1].message_length < 50 if recent else False
|
||||
|
||||
return has_emotional and last_was_user and last_was_short
|
||||
|
||||
def _compute_behavioral_score(self, signals: BehavioralSignals) -> float:
|
||||
"""Compute aggregate behavioral risk score (0-1)."""
|
||||
score = 0.0
|
||||
weights = {
|
||||
"frequency_spike": 0.15,
|
||||
"frequency_drop": 0.20,
|
||||
"late_night": 0.10,
|
||||
"session_increasing": 0.10,
|
||||
"withdrawal": 0.20,
|
||||
"return_after_absence": 0.05,
|
||||
"abrupt_termination": 0.20,
|
||||
}
|
||||
|
||||
# Frequency spike (anxiety)
|
||||
if signals.frequency_change > 0.5:
|
||||
score += weights["frequency_spike"] * min(signals.frequency_change, 1.0)
|
||||
|
||||
# Frequency drop (withdrawal)
|
||||
if signals.frequency_change < -0.3:
|
||||
score += weights["frequency_drop"] * min(abs(signals.frequency_change), 1.0)
|
||||
|
||||
# Late night
|
||||
if signals.is_late_night:
|
||||
score += weights["late_night"]
|
||||
|
||||
# Session length increasing
|
||||
if signals.session_length_trend == "increasing":
|
||||
score += weights["session_increasing"]
|
||||
|
||||
# Withdrawal
|
||||
if signals.withdrawal_detected:
|
||||
score += weights["withdrawal"]
|
||||
|
||||
# Return after absence
|
||||
if signals.return_after_absence:
|
||||
score += weights["return_after_absence"]
|
||||
|
||||
# Abrupt termination
|
||||
if signals.abrupt_termination:
|
||||
score += weights["abrupt_termination"]
|
||||
|
||||
return min(1.0, score)
|
||||
|
||||
def _get_baseline(self, session_id: str) -> Dict[str, float]:
|
||||
"""Get baseline metrics for a session."""
|
||||
if session_id in self._baselines:
|
||||
return self._baselines[session_id]
|
||||
return self._global_baseline
|
||||
|
||||
def _update_baseline(self, session_id: str) -> None:
|
||||
"""Update rolling baseline from recent events."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 5:
|
||||
return
|
||||
|
||||
# Use last 7 days of events
|
||||
cutoff = time.time() - (self.BASELINE_WINDOW_DAYS * 86400)
|
||||
recent = [e for e in events if e.timestamp > cutoff and e.is_user]
|
||||
|
||||
if not recent:
|
||||
return
|
||||
|
||||
# Calculate metrics
|
||||
time_span_hours = (recent[-1].timestamp - recent[0].timestamp) / 3600
|
||||
if time_span_hours > 0:
|
||||
msgs_per_hour = len(recent) / time_span_hours
|
||||
else:
|
||||
msgs_per_hour = len(recent)
|
||||
|
||||
avg_length = sum(e.message_length for e in recent) / len(recent)
|
||||
|
||||
self._baselines[session_id] = {
|
||||
"avg_messages_per_hour": msgs_per_hour,
|
||||
"avg_message_length": avg_length,
|
||||
"total_messages": len(recent),
|
||||
}
|
||||
|
||||
|
||||
# Global singleton for convenience
|
||||
_global_tracker = BehavioralTracker()
|
||||
|
||||
|
||||
def record_event(
|
||||
session_id: str,
|
||||
timestamp: Optional[float] = None,
|
||||
message_length: int = 0,
|
||||
is_user: bool = True,
|
||||
emotional_content: bool = False,
|
||||
terminated_abruptly: bool = False,
|
||||
) -> None:
|
||||
"""Record an event to the global behavioral tracker."""
|
||||
_global_tracker.record(
|
||||
session_id, timestamp, message_length,
|
||||
is_user, emotional_content, terminated_abruptly,
|
||||
)
|
||||
|
||||
|
||||
def get_risk_signals(session_id: str) -> BehavioralSignals:
|
||||
"""Get behavioral risk signals for a session."""
|
||||
return _global_tracker.get_risk_signals(session_id)
|
||||
136
tests/test_behavioral_tracker.py
Normal file
136
tests/test_behavioral_tracker.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""Tests for behavioral pattern detection."""
|
||||
|
||||
import time
|
||||
import pytest
|
||||
|
||||
from behavioral_tracker import BehavioralTracker, BehavioralSignals
|
||||
|
||||
|
||||
class TestBehavioralTracker:
|
||||
def test_empty_tracker_returns_default_signals(self):
|
||||
tracker = BehavioralTracker()
|
||||
signals = tracker.get_risk_signals("session-1")
|
||||
assert signals.behavioral_score == 0.0
|
||||
assert not signals.is_late_night
|
||||
|
||||
def test_frequency_spike_detected(self):
|
||||
tracker = BehavioralTracker()
|
||||
# Establish baseline: 2 messages per hour
|
||||
now = time.time()
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - (10 - i) * 1800, message_length=100)
|
||||
|
||||
# Spike: 10 messages in last 10 minutes
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - (10 - i) * 60, message_length=50)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.frequency_change > 0.3 # Significant spike
|
||||
|
||||
def test_frequency_drop_detected(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# Baseline: heavy usage
|
||||
for i in range(50):
|
||||
tracker.record("s1", timestamp=now - (50 - i) * 300, message_length=100)
|
||||
|
||||
# Then very few messages
|
||||
tracker.record("s1", timestamp=now - 60, message_length=50)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
# Recent activity is much lower than baseline
|
||||
assert signals.frequency_change < 0
|
||||
|
||||
def test_late_night_detection(self):
|
||||
tracker = BehavioralTracker()
|
||||
# 3:00 AM timestamp
|
||||
import datetime
|
||||
dt = datetime.datetime(2026, 4, 14, 3, 0, 0)
|
||||
ts = dt.timestamp()
|
||||
|
||||
tracker.record("s1", timestamp=ts, message_length=100)
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.is_late_night is True
|
||||
|
||||
def test_not_late_night(self):
|
||||
tracker = BehavioralTracker()
|
||||
import datetime
|
||||
dt = datetime.datetime(2026, 4, 14, 14, 0, 0) # 2 PM
|
||||
ts = dt.timestamp()
|
||||
|
||||
tracker.record("s1", timestamp=ts, message_length=100)
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.is_late_night is False
|
||||
|
||||
def test_session_length_increasing(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# First half: messages spread over 5 minutes
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - 600 + i * 30, message_length=100)
|
||||
# Second half: messages spread over 30 minutes
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - 300 + i * 180, message_length=100)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.session_length_trend == "increasing"
|
||||
|
||||
def test_withdrawal_detected(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# High baseline
|
||||
for i in range(50):
|
||||
tracker.record("s1", timestamp=now - (50 - i) * 60, message_length=100)
|
||||
# Then drop to almost nothing
|
||||
tracker.record("s1", timestamp=now - 60, message_length=20)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.withdrawal_detected is True
|
||||
|
||||
def test_abrupt_termination_after_emotional(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# Normal messages
|
||||
for i in range(5):
|
||||
tracker.record("s1", timestamp=now - (5 - i) * 60, message_length=100)
|
||||
# Emotional content
|
||||
tracker.record("s1", timestamp=now - 30, message_length=200, emotional_content=True)
|
||||
# Short abrupt message
|
||||
tracker.record("s1", timestamp=now - 10, message_length=10, is_user=True)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.abrupt_termination is True
|
||||
|
||||
def test_behavioral_score_increases_with_risk(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
import datetime
|
||||
|
||||
# Low risk: normal messages during daytime
|
||||
for i in range(10):
|
||||
dt = datetime.datetime(2026, 4, 14, 14, i, 0) # 2 PM
|
||||
tracker.record("s1", timestamp=dt.timestamp(), message_length=100)
|
||||
low_risk = tracker.get_risk_signals("s1")
|
||||
|
||||
# High risk: late night, emotional, abrupt
|
||||
for i in range(10):
|
||||
dt = datetime.datetime(2026, 4, 14, 3, i, 0) # 3 AM
|
||||
tracker.record("s2", timestamp=dt.timestamp(), message_length=100, emotional_content=True)
|
||||
tracker.record("s2", timestamp=datetime.datetime(2026, 4, 14, 3, 10, 0).timestamp(),
|
||||
message_length=10, is_user=True)
|
||||
high_risk = tracker.get_risk_signals("s2")
|
||||
|
||||
assert high_risk.behavioral_score > low_risk.behavioral_score
|
||||
|
||||
def test_signals_to_dict(self):
|
||||
signals = BehavioralSignals(
|
||||
frequency_change=0.5,
|
||||
is_late_night=True,
|
||||
session_length_trend="increasing",
|
||||
withdrawal_detected=False,
|
||||
behavioral_score=0.4,
|
||||
)
|
||||
d = signals.to_dict()
|
||||
assert d["frequency_change"] == 0.5
|
||||
assert d["is_late_night"] is True
|
||||
assert d["session_length_trend"] == "increasing"
|
||||
Reference in New Issue
Block a user