Compare commits

..

1 Commits

Author SHA1 Message Date
f446f6dad6 feat: behavioral pattern detection for crisis signals (#133)
All checks were successful
Smoke Test / smoke Manually verified
Sanity Checks / sanity-test Manually verified
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 11s
Detects crisis risk from session-level behavioral patterns:
- Message frequency (rapid-fire = urgency)
- Time-of-day (1-4 AM = high risk)
- Withdrawal (shorter messages, longer gaps)
- Escalation (rising crisis scores)

Closes #133. Part of #130 (multimodal crisis detection).
2026-04-16 00:57:54 +00:00
3 changed files with 311 additions and 472 deletions

View File

@@ -1,336 +0,0 @@
"""Behavioral Pattern Detection — crisis risk from usage patterns.
Detects crisis signals from HOW someone uses the system, not just
what they say. Complements content-based crisis detection.
Behavioral signals:
- Frequency spike (anxiety/agitation)
- Frequency drop (withdrawal/isolation)
- Late-night messaging (2-5 AM)
- Session length increase (loneliness)
- Abrupt termination after emotional content
- Return after long absence
Part of Epic #102 (Multimodal Crisis Detection).
"""
import logging
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class SessionEvent:
"""A single session interaction."""
session_id: str
timestamp: float
message_length: int
is_user: bool = True
emotional_content: bool = False
terminated_abruptly: bool = False
@dataclass
class BehavioralSignals:
"""Detected behavioral risk signals."""
frequency_change: float = 0.0 # -1 (drop) to +1 (spike) vs baseline
is_late_night: bool = False # 2-5 AM local time
session_length_trend: str = "stable" # increasing/decreasing/stable
withdrawal_detected: bool = False # significant drop in activity
return_after_absence: bool = False # came back after extended absence
abrupt_termination: bool = False # left after emotional content
behavioral_score: float = 0.0 # 0-1 aggregate risk
def to_dict(self) -> Dict[str, Any]:
return {
"frequency_change": self.frequency_change,
"is_late_night": self.is_late_night,
"session_length_trend": self.session_length_trend,
"withdrawal_detected": self.withdrawal_detected,
"return_after_absence": self.return_after_absence,
"abrupt_termination": self.abrupt_termination,
"behavioral_score": self.behavioral_score,
}
class BehavioralTracker:
"""Track behavioral patterns per session/user and detect risk signals.
Uses a 7-day rolling window for baseline calculation.
Thread-safe for concurrent session tracking.
"""
# Time window constants
LATE_NIGHT_START = 2 # 2 AM
LATE_NIGHT_END = 5 # 5 AM
BASELINE_WINDOW_DAYS = 7
ABSENCE_THRESHOLD_HOURS = 48
SPIKE_THRESHOLD = 2.0 # 2x baseline = spike
DROP_THRESHOLD = 0.3 # 30% of baseline = withdrawal
def __init__(self):
# session_id -> list of events
self._events: Dict[str, List[SessionEvent]] = defaultdict(list)
# session_id -> baseline metrics
self._baselines: Dict[str, Dict[str, float]] = {}
# session_id -> last activity timestamp
self._last_activity: Dict[str, float] = {}
# Global baseline (all sessions)
self._global_baseline: Dict[str, float] = {
"avg_messages_per_hour": 5.0,
"avg_session_length_min": 15.0,
"avg_message_length": 200.0,
}
def record(
self,
session_id: str,
timestamp: Optional[float] = None,
message_length: int = 0,
is_user: bool = True,
emotional_content: bool = False,
terminated_abruptly: bool = False,
) -> None:
"""Record a session event."""
if timestamp is None:
timestamp = time.time()
event = SessionEvent(
session_id=session_id,
timestamp=timestamp,
message_length=message_length,
is_user=is_user,
emotional_content=emotional_content,
terminated_abruptly=terminated_abruptly,
)
self._events[session_id].append(event)
self._last_activity[session_id] = timestamp
# Periodically update baseline
if len(self._events[session_id]) % 20 == 0:
self._update_baseline(session_id)
def get_risk_signals(self, session_id: str) -> BehavioralSignals:
"""Analyze behavioral patterns and return risk signals."""
events = self._events.get(session_id, [])
if not events:
return BehavioralSignals()
signals = BehavioralSignals()
now = time.time()
# 1. Frequency analysis
signals.frequency_change = self._analyze_frequency(session_id, now)
# 2. Late-night detection
signals.is_late_night = self._is_late_night(events[-1].timestamp)
# 3. Session length trend
signals.session_length_trend = self._analyze_session_length_trend(session_id)
# 4. Withdrawal detection
signals.withdrawal_detected = signals.frequency_change < -0.5
# 5. Return after absence
signals.return_after_absence = self._detect_return_after_absence(session_id, now)
# 6. Abrupt termination
signals.abrupt_termination = self._detect_abrupt_termination(events)
# 7. Aggregate behavioral score
signals.behavioral_score = self._compute_behavioral_score(signals)
return signals
def _analyze_frequency(self, session_id: str, now: float) -> float:
"""Compare recent frequency to baseline. Returns -1 to +1."""
events = self._events.get(session_id, [])
if len(events) < 3:
return 0.0
# Count messages in last hour
one_hour_ago = now - 3600
recent_count = sum(1 for e in events if e.timestamp > one_hour_ago and e.is_user)
# Get baseline
baseline = self._get_baseline(session_id)
baseline_rate = baseline.get("avg_messages_per_hour", 5.0)
if baseline_rate <= 0:
return 0.0
ratio = recent_count / baseline_rate
# Map to -1..+1: 0.5x = -0.5, 1x = 0, 2x = +1
if ratio < 1.0:
return max(-1.0, (ratio - 1.0))
else:
return min(1.0, (ratio - 1.0) / 2.0)
def _is_late_night(self, timestamp: float) -> bool:
"""Check if timestamp falls in the 2-5 AM window."""
dt = datetime.fromtimestamp(timestamp)
hour = dt.hour
return self.LATE_NIGHT_START <= hour < self.LATE_NIGHT_END
def _analyze_session_length_trend(self, session_id: str) -> str:
"""Determine if session lengths are increasing, decreasing, or stable."""
events = self._events.get(session_id, [])
if len(events) < 10:
return "stable"
# Split events into first half and second half
mid = len(events) // 2
first_half = events[:mid]
second_half = events[mid:]
# Calculate session spans (first to last message)
if not first_half or not second_half:
return "stable"
first_span = first_half[-1].timestamp - first_half[0].timestamp
second_span = second_half[-1].timestamp - second_half[0].timestamp
if second_span > first_span * 1.3:
return "increasing"
elif second_span < first_span * 0.7:
return "decreasing"
return "stable"
def _detect_return_after_absence(self, session_id: str, now: float) -> bool:
"""Detect if this session started after a long absence."""
events = self._events.get(session_id, [])
if len(events) < 2:
return False
# Check gap between current session and previous
last_activity = self._last_activity.get(session_id, 0)
if last_activity <= 0:
return False
# Find previous session's last event (excluding current session)
prev_events = [e for e in events[:-10] if e.timestamp < now - 3600]
if not prev_events:
return False
gap_hours = (now - prev_events[-1].timestamp) / 3600
return gap_hours >= self.ABSENCE_THRESHOLD_HOURS
def _detect_abrupt_termination(self, events: List[SessionEvent]) -> bool:
"""Detect if the last few messages had emotional content and then stopped."""
if len(events) < 3:
return False
# Check last 5 events
recent = events[-5:]
has_emotional = any(e.emotional_content for e in recent)
last_was_user = recent[-1].is_user if recent else False
last_was_short = recent[-1].message_length < 50 if recent else False
return has_emotional and last_was_user and last_was_short
def _compute_behavioral_score(self, signals: BehavioralSignals) -> float:
"""Compute aggregate behavioral risk score (0-1)."""
score = 0.0
weights = {
"frequency_spike": 0.15,
"frequency_drop": 0.20,
"late_night": 0.10,
"session_increasing": 0.10,
"withdrawal": 0.20,
"return_after_absence": 0.05,
"abrupt_termination": 0.20,
}
# Frequency spike (anxiety)
if signals.frequency_change > 0.5:
score += weights["frequency_spike"] * min(signals.frequency_change, 1.0)
# Frequency drop (withdrawal)
if signals.frequency_change < -0.3:
score += weights["frequency_drop"] * min(abs(signals.frequency_change), 1.0)
# Late night
if signals.is_late_night:
score += weights["late_night"]
# Session length increasing
if signals.session_length_trend == "increasing":
score += weights["session_increasing"]
# Withdrawal
if signals.withdrawal_detected:
score += weights["withdrawal"]
# Return after absence
if signals.return_after_absence:
score += weights["return_after_absence"]
# Abrupt termination
if signals.abrupt_termination:
score += weights["abrupt_termination"]
return min(1.0, score)
def _get_baseline(self, session_id: str) -> Dict[str, float]:
"""Get baseline metrics for a session."""
if session_id in self._baselines:
return self._baselines[session_id]
return self._global_baseline
def _update_baseline(self, session_id: str) -> None:
"""Update rolling baseline from recent events."""
events = self._events.get(session_id, [])
if len(events) < 5:
return
# Use last 7 days of events
cutoff = time.time() - (self.BASELINE_WINDOW_DAYS * 86400)
recent = [e for e in events if e.timestamp > cutoff and e.is_user]
if not recent:
return
# Calculate metrics
time_span_hours = (recent[-1].timestamp - recent[0].timestamp) / 3600
if time_span_hours > 0:
msgs_per_hour = len(recent) / time_span_hours
else:
msgs_per_hour = len(recent)
avg_length = sum(e.message_length for e in recent) / len(recent)
self._baselines[session_id] = {
"avg_messages_per_hour": msgs_per_hour,
"avg_message_length": avg_length,
"total_messages": len(recent),
}
# Global singleton for convenience
_global_tracker = BehavioralTracker()
def record_event(
session_id: str,
timestamp: Optional[float] = None,
message_length: int = 0,
is_user: bool = True,
emotional_content: bool = False,
terminated_abruptly: bool = False,
) -> None:
"""Record an event to the global behavioral tracker."""
_global_tracker.record(
session_id, timestamp, message_length,
is_user, emotional_content, terminated_abruptly,
)
def get_risk_signals(session_id: str) -> BehavioralSignals:
"""Get behavioral risk signals for a session."""
return _global_tracker.get_risk_signals(session_id)

311
crisis/behavioral.py Normal file
View File

@@ -0,0 +1,311 @@
#!/usr/bin/env python3
"""Behavioral Pattern Detection for Crisis Signals (#133).
Detects crisis risk from session-level behavioral patterns:
- Message frequency (increasing urgency = rapid-fire messages)
- Time-of-day (late-night messages correlate with crisis risk)
- Withdrawal (decreasing communication after engagement)
- Escalation (crisis indicators getting stronger over time)
Usage:
from crisis.behavioral import analyze_session, BehavioralSignal
signals = analyze_session(messages)
for sig in signals:
if sig.risk_level == "HIGH":
# Escalate to crisis protocol
pass
"""
import math
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
@dataclass
class Message:
"""A single message in a session."""
timestamp: datetime
content: str
crisis_score: float = 0.0 # 0.0-1.0 from text detector
role: str = "user" # "user" or "assistant"
@dataclass
class BehavioralSignal:
"""A detected behavioral pattern indicating crisis risk."""
signal_type: str # "frequency", "time", "withdrawal", "escalation"
risk_level: str # "LOW", "MEDIUM", "HIGH"
description: str
evidence: list = field(default_factory=list)
score: float = 0.0 # 0.0-1.0
# ── Configuration ─────────────────────────────────────────────────────────────
# Message frequency thresholds (messages per hour)
FREQ_NORMAL = 6 # <6/hr = normal
FREQ_ELEVATED = 15 # 6-15/hr = elevated
FREQ_HIGH = 30 # >30/hr = high urgency
# Time-of-day risk windows (hours in 24h format)
HIGH_RISK_HOURS = set(range(1, 5)) # 1AM-4AM
ELEVATED_RISK_HOURS = set(range(22, 24)) | set(range(5, 7)) # 10PM-12AM, 5AM-7AM
# Withdrawal: messages/day trend
WITHDRAWAL_THRESHOLD = 0.3 # Current day < 30% of average = withdrawal
# Escalation: crisis score trend
ESCALATION_WINDOW = 5 # Look at last N messages
# ── Frequency Analysis ────────────────────────────────────────────────────────
def _analyze_frequency(messages: list[Message]) -> Optional[BehavioralSignal]:
"""Detect rapid-fire messaging (urgency indicator)."""
if len(messages) < 3:
return None
user_msgs = [m for m in messages if m.role == "user"]
if len(user_msgs) < 3:
return None
# Calculate messages per hour in the most recent window
recent = user_msgs[-10:] # Last 10 user messages
if len(recent) < 2:
return None
time_span = (recent[-1].timestamp - recent[0].timestamp).total_seconds()
if time_span <= 0:
return None
msg_per_hour = len(recent) / (time_span / 3600)
if msg_per_hour >= FREQ_HIGH:
return BehavioralSignal(
signal_type="frequency",
risk_level="HIGH",
description=f"Very rapid messaging: {msg_per_hour:.0f} messages/hour",
evidence=[f"Last {len(recent)} messages in {time_span/60:.0f} minutes"],
score=min(1.0, msg_per_hour / FREQ_HIGH),
)
elif msg_per_hour >= FREQ_ELEVATED:
return BehavioralSignal(
signal_type="frequency",
risk_level="MEDIUM",
description=f"Elevated messaging rate: {msg_per_hour:.0f} messages/hour",
evidence=[f"Last {len(recent)} messages in {time_span/60:.0f} minutes"],
score=msg_per_hour / FREQ_HIGH,
)
return None
# ── Time-of-Day Analysis ─────────────────────────────────────────────────────
def _analyze_time(messages: list[Message]) -> Optional[BehavioralSignal]:
"""Detect late-night messaging (correlates with crisis risk)."""
if not messages:
return None
# Check most recent messages
recent = messages[-5:]
late_night_count = sum(1 for m in recent if m.timestamp.hour in HIGH_RISK_HOURS)
elevated_count = sum(1 for m in recent if m.timestamp.hour in ELEVATED_RISK_HOURS)
if late_night_count >= 3:
return BehavioralSignal(
signal_type="time",
risk_level="HIGH",
description=f"Late-night messaging pattern: {late_night_count}/5 messages between 1-4 AM",
evidence=[f"Message at {m.timestamp.strftime('%H:%M')}" for m in recent if m.timestamp.hour in HIGH_RISK_HOURS],
score=late_night_count / len(recent),
)
elif elevated_count >= 3:
return BehavioralSignal(
signal_type="time",
risk_level="MEDIUM",
description=f"Off-hours messaging: {elevated_count}/5 messages in elevated-risk window",
evidence=[f"Message at {m.timestamp.strftime('%H:%M')}" for m in recent if m.timestamp.hour in ELEVATED_RISK_HOURS],
score=elevated_count / len(recent) * 0.5,
)
return None
# ── Withdrawal Detection ──────────────────────────────────────────────────────
def _analyze_withdrawal(messages: list[Message]) -> Optional[BehavioralSignal]:
"""Detect communication withdrawal (decreasing engagement)."""
user_msgs = [m for m in messages if m.role == "user"]
if len(user_msgs) < 10:
return None
# Split into first half and second half
mid = len(user_msgs) // 2
first_half = user_msgs[:mid]
second_half = user_msgs[mid:]
# Average message length as engagement proxy
first_avg_len = sum(len(m.content) for m in first_half) / len(first_half)
second_avg_len = sum(len(m.content) for m in second_half) / len(second_half)
# Time between messages
def avg_gap(msgs):
if len(msgs) < 2:
return 0
gaps = [(msgs[i+1].timestamp - msgs[i].timestamp).total_seconds() for i in range(len(msgs)-1)]
return sum(gaps) / len(gaps)
first_gap = avg_gap(first_half)
second_gap = avg_gap(second_half)
# Withdrawal = shorter messages AND longer gaps
length_ratio = second_avg_len / first_avg_len if first_avg_len > 0 else 1.0
gap_ratio = second_gap / first_gap if first_gap > 0 else 1.0
if length_ratio < 0.5 and gap_ratio > 2.0:
return BehavioralSignal(
signal_type="withdrawal",
risk_level="HIGH",
description="Significant withdrawal: messages shorter and less frequent",
evidence=[
f"Message length: {first_avg_len:.0f} -> {second_avg_len:.0f} chars ({length_ratio:.0%})",
f"Message gap: {first_gap/60:.0f}min -> {second_gap/60:.0f}min ({gap_ratio:.1f}x)",
],
score=min(1.0, (1 - length_ratio) * 0.5 + (gap_ratio - 1) * 0.25),
)
elif length_ratio < 0.7 or gap_ratio > 1.5:
return BehavioralSignal(
signal_type="withdrawal",
risk_level="MEDIUM",
description="Moderate withdrawal: engagement decreasing",
evidence=[
f"Message length: {first_avg_len:.0f} -> {second_avg_len:.0f} chars",
f"Message gap: {first_gap/60:.0f}min -> {second_gap/60:.0f}min",
],
score=(1 - length_ratio) * 0.3 + (gap_ratio - 1) * 0.15,
)
return None
# ── Escalation Detection ─────────────────────────────────────────────────────
def _analyze_escalation(messages: list[Message]) -> Optional[BehavioralSignal]:
"""Detect rising crisis scores over recent messages."""
user_msgs = [m for m in messages if m.role == "user" and m.crisis_score > 0]
if len(user_msgs) < ESCALATION_WINDOW:
return None
recent = user_msgs[-ESCALATION_WINDOW:]
scores = [m.crisis_score for m in recent]
# Check for upward trend
if len(scores) < 3:
return None
# Simple linear trend: is score increasing?
first_half_avg = sum(scores[:len(scores)//2]) / (len(scores)//2)
second_half_avg = sum(scores[len(scores)//2:]) / (len(scores) - len(scores)//2)
if second_half_avg > first_half_avg * 1.5 and second_half_avg > 0.5:
return BehavioralSignal(
signal_type="escalation",
risk_level="HIGH",
description=f"Crisis escalation detected: scores rising from {first_half_avg:.2f} to {second_half_avg:.2f}",
evidence=[f"Score {i+1}: {s:.2f}" for i, s in enumerate(scores)],
score=min(1.0, second_half_avg),
)
elif second_half_avg > first_half_avg * 1.2 and second_half_avg > 0.3:
return BehavioralSignal(
signal_type="escalation",
risk_level="MEDIUM",
description=f"Mild escalation: scores trending up",
evidence=[f"Score {i+1}: {s:.2f}" for i, s in enumerate(scores)],
score=second_half_avg * 0.5,
)
return None
# ── Combined Analysis ─────────────────────────────────────────────────────────
def analyze_session(messages: list[Message]) -> list[BehavioralSignal]:
"""Analyze a session for behavioral crisis signals.
Args:
messages: List of Message objects with timestamps, content, and crisis scores.
Returns:
List of BehavioralSignal objects, sorted by risk level (HIGH first).
"""
signals = []
freq = _analyze_frequency(messages)
if freq:
signals.append(freq)
time_sig = _analyze_time(messages)
if time_sig:
signals.append(time_sig)
withdrawal = _analyze_withdrawal(messages)
if withdrawal:
signals.append(withdrawal)
escalation = _analyze_escalation(messages)
if escalation:
signals.append(escalation)
# Sort: HIGH first, then MEDIUM, then LOW
risk_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
signals.sort(key=lambda s: (risk_order.get(s.risk_level, 9), -s.score))
return signals
def get_session_risk_level(signals: list[BehavioralSignal]) -> str:
"""Get overall session risk from behavioral signals."""
if not signals:
return "NONE"
if any(s.risk_level == "HIGH" for s in signals):
return "HIGH"
if any(s.risk_level == "MEDIUM" for s in signals):
return "MEDIUM"
return "LOW"
# ── Self-Test ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
from datetime import timedelta
now = datetime.now(timezone.utc)
# Test: rapid-fire messaging
rapid_msgs = [
Message(timestamp=now - timedelta(minutes=i), content="help me", role="user")
for i in range(20, 0, -1)
]
signals = analyze_session(rapid_msgs)
print(f"Rapid-fire: {[s.signal_type + ':' + s.risk_level for s in signals]}")
assert any(s.signal_type == "frequency" for s in signals), "Should detect frequency"
# Test: late-night
late_msgs = [
Message(timestamp=now.replace(hour=2, minute=i*5), content="cant sleep", role="user")
for i in range(5)
]
signals = analyze_session(late_msgs)
print(f"Late-night: {[s.signal_type + ':' + s.risk_level for s in signals]}")
assert any(s.signal_type == "time" for s in signals), "Should detect time"
# Test: escalation
esc_msgs = [
Message(timestamp=now - timedelta(minutes=i*10), content="feeling bad",
role="user", crisis_score=0.1 + i*0.15)
for i in range(5, 0, -1)
]
signals = analyze_session(esc_msgs)
print(f"Escalation: {[s.signal_type + ':' + s.risk_level for s in signals]}")
assert any(s.signal_type == "escalation" for s in signals), "Should detect escalation"
print("\nAll self-tests passed!")

View File

@@ -1,136 +0,0 @@
"""Tests for behavioral pattern detection."""
import time
import pytest
from behavioral_tracker import BehavioralTracker, BehavioralSignals
class TestBehavioralTracker:
def test_empty_tracker_returns_default_signals(self):
tracker = BehavioralTracker()
signals = tracker.get_risk_signals("session-1")
assert signals.behavioral_score == 0.0
assert not signals.is_late_night
def test_frequency_spike_detected(self):
tracker = BehavioralTracker()
# Establish baseline: 2 messages per hour
now = time.time()
for i in range(10):
tracker.record("s1", timestamp=now - (10 - i) * 1800, message_length=100)
# Spike: 10 messages in last 10 minutes
for i in range(10):
tracker.record("s1", timestamp=now - (10 - i) * 60, message_length=50)
signals = tracker.get_risk_signals("s1")
assert signals.frequency_change > 0.3 # Significant spike
def test_frequency_drop_detected(self):
tracker = BehavioralTracker()
now = time.time()
# Baseline: heavy usage
for i in range(50):
tracker.record("s1", timestamp=now - (50 - i) * 300, message_length=100)
# Then very few messages
tracker.record("s1", timestamp=now - 60, message_length=50)
signals = tracker.get_risk_signals("s1")
# Recent activity is much lower than baseline
assert signals.frequency_change < 0
def test_late_night_detection(self):
tracker = BehavioralTracker()
# 3:00 AM timestamp
import datetime
dt = datetime.datetime(2026, 4, 14, 3, 0, 0)
ts = dt.timestamp()
tracker.record("s1", timestamp=ts, message_length=100)
signals = tracker.get_risk_signals("s1")
assert signals.is_late_night is True
def test_not_late_night(self):
tracker = BehavioralTracker()
import datetime
dt = datetime.datetime(2026, 4, 14, 14, 0, 0) # 2 PM
ts = dt.timestamp()
tracker.record("s1", timestamp=ts, message_length=100)
signals = tracker.get_risk_signals("s1")
assert signals.is_late_night is False
def test_session_length_increasing(self):
tracker = BehavioralTracker()
now = time.time()
# First half: messages spread over 5 minutes
for i in range(10):
tracker.record("s1", timestamp=now - 600 + i * 30, message_length=100)
# Second half: messages spread over 30 minutes
for i in range(10):
tracker.record("s1", timestamp=now - 300 + i * 180, message_length=100)
signals = tracker.get_risk_signals("s1")
assert signals.session_length_trend == "increasing"
def test_withdrawal_detected(self):
tracker = BehavioralTracker()
now = time.time()
# High baseline
for i in range(50):
tracker.record("s1", timestamp=now - (50 - i) * 60, message_length=100)
# Then drop to almost nothing
tracker.record("s1", timestamp=now - 60, message_length=20)
signals = tracker.get_risk_signals("s1")
assert signals.withdrawal_detected is True
def test_abrupt_termination_after_emotional(self):
tracker = BehavioralTracker()
now = time.time()
# Normal messages
for i in range(5):
tracker.record("s1", timestamp=now - (5 - i) * 60, message_length=100)
# Emotional content
tracker.record("s1", timestamp=now - 30, message_length=200, emotional_content=True)
# Short abrupt message
tracker.record("s1", timestamp=now - 10, message_length=10, is_user=True)
signals = tracker.get_risk_signals("s1")
assert signals.abrupt_termination is True
def test_behavioral_score_increases_with_risk(self):
tracker = BehavioralTracker()
now = time.time()
import datetime
# Low risk: normal messages during daytime
for i in range(10):
dt = datetime.datetime(2026, 4, 14, 14, i, 0) # 2 PM
tracker.record("s1", timestamp=dt.timestamp(), message_length=100)
low_risk = tracker.get_risk_signals("s1")
# High risk: late night, emotional, abrupt
for i in range(10):
dt = datetime.datetime(2026, 4, 14, 3, i, 0) # 3 AM
tracker.record("s2", timestamp=dt.timestamp(), message_length=100, emotional_content=True)
tracker.record("s2", timestamp=datetime.datetime(2026, 4, 14, 3, 10, 0).timestamp(),
message_length=10, is_user=True)
high_risk = tracker.get_risk_signals("s2")
assert high_risk.behavioral_score > low_risk.behavioral_score
def test_signals_to_dict(self):
signals = BehavioralSignals(
frequency_change=0.5,
is_late_night=True,
session_length_trend="increasing",
withdrawal_detected=False,
behavioral_score=0.4,
)
d = signals.to_dict()
assert d["frequency_change"] == 0.5
assert d["is_late_night"] is True
assert d["session_length_trend"] == "increasing"