Compare commits
1 Commits
feat/133-b
...
fix/130-be
| Author | SHA1 | Date | |
|---|---|---|---|
| f446f6dad6 |
@@ -1,336 +0,0 @@
|
||||
"""Behavioral Pattern Detection — crisis risk from usage patterns.
|
||||
|
||||
Detects crisis signals from HOW someone uses the system, not just
|
||||
what they say. Complements content-based crisis detection.
|
||||
|
||||
Behavioral signals:
|
||||
- Frequency spike (anxiety/agitation)
|
||||
- Frequency drop (withdrawal/isolation)
|
||||
- Late-night messaging (2-5 AM)
|
||||
- Session length increase (loneliness)
|
||||
- Abrupt termination after emotional content
|
||||
- Return after long absence
|
||||
|
||||
Part of Epic #102 (Multimodal Crisis Detection).
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionEvent:
|
||||
"""A single session interaction."""
|
||||
session_id: str
|
||||
timestamp: float
|
||||
message_length: int
|
||||
is_user: bool = True
|
||||
emotional_content: bool = False
|
||||
terminated_abruptly: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class BehavioralSignals:
|
||||
"""Detected behavioral risk signals."""
|
||||
frequency_change: float = 0.0 # -1 (drop) to +1 (spike) vs baseline
|
||||
is_late_night: bool = False # 2-5 AM local time
|
||||
session_length_trend: str = "stable" # increasing/decreasing/stable
|
||||
withdrawal_detected: bool = False # significant drop in activity
|
||||
return_after_absence: bool = False # came back after extended absence
|
||||
abrupt_termination: bool = False # left after emotional content
|
||||
behavioral_score: float = 0.0 # 0-1 aggregate risk
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"frequency_change": self.frequency_change,
|
||||
"is_late_night": self.is_late_night,
|
||||
"session_length_trend": self.session_length_trend,
|
||||
"withdrawal_detected": self.withdrawal_detected,
|
||||
"return_after_absence": self.return_after_absence,
|
||||
"abrupt_termination": self.abrupt_termination,
|
||||
"behavioral_score": self.behavioral_score,
|
||||
}
|
||||
|
||||
|
||||
class BehavioralTracker:
|
||||
"""Track behavioral patterns per session/user and detect risk signals.
|
||||
|
||||
Uses a 7-day rolling window for baseline calculation.
|
||||
Thread-safe for concurrent session tracking.
|
||||
"""
|
||||
|
||||
# Time window constants
|
||||
LATE_NIGHT_START = 2 # 2 AM
|
||||
LATE_NIGHT_END = 5 # 5 AM
|
||||
BASELINE_WINDOW_DAYS = 7
|
||||
ABSENCE_THRESHOLD_HOURS = 48
|
||||
SPIKE_THRESHOLD = 2.0 # 2x baseline = spike
|
||||
DROP_THRESHOLD = 0.3 # 30% of baseline = withdrawal
|
||||
|
||||
def __init__(self):
|
||||
# session_id -> list of events
|
||||
self._events: Dict[str, List[SessionEvent]] = defaultdict(list)
|
||||
# session_id -> baseline metrics
|
||||
self._baselines: Dict[str, Dict[str, float]] = {}
|
||||
# session_id -> last activity timestamp
|
||||
self._last_activity: Dict[str, float] = {}
|
||||
# Global baseline (all sessions)
|
||||
self._global_baseline: Dict[str, float] = {
|
||||
"avg_messages_per_hour": 5.0,
|
||||
"avg_session_length_min": 15.0,
|
||||
"avg_message_length": 200.0,
|
||||
}
|
||||
|
||||
def record(
|
||||
self,
|
||||
session_id: str,
|
||||
timestamp: Optional[float] = None,
|
||||
message_length: int = 0,
|
||||
is_user: bool = True,
|
||||
emotional_content: bool = False,
|
||||
terminated_abruptly: bool = False,
|
||||
) -> None:
|
||||
"""Record a session event."""
|
||||
if timestamp is None:
|
||||
timestamp = time.time()
|
||||
|
||||
event = SessionEvent(
|
||||
session_id=session_id,
|
||||
timestamp=timestamp,
|
||||
message_length=message_length,
|
||||
is_user=is_user,
|
||||
emotional_content=emotional_content,
|
||||
terminated_abruptly=terminated_abruptly,
|
||||
)
|
||||
|
||||
self._events[session_id].append(event)
|
||||
self._last_activity[session_id] = timestamp
|
||||
|
||||
# Periodically update baseline
|
||||
if len(self._events[session_id]) % 20 == 0:
|
||||
self._update_baseline(session_id)
|
||||
|
||||
def get_risk_signals(self, session_id: str) -> BehavioralSignals:
|
||||
"""Analyze behavioral patterns and return risk signals."""
|
||||
events = self._events.get(session_id, [])
|
||||
if not events:
|
||||
return BehavioralSignals()
|
||||
|
||||
signals = BehavioralSignals()
|
||||
now = time.time()
|
||||
|
||||
# 1. Frequency analysis
|
||||
signals.frequency_change = self._analyze_frequency(session_id, now)
|
||||
|
||||
# 2. Late-night detection
|
||||
signals.is_late_night = self._is_late_night(events[-1].timestamp)
|
||||
|
||||
# 3. Session length trend
|
||||
signals.session_length_trend = self._analyze_session_length_trend(session_id)
|
||||
|
||||
# 4. Withdrawal detection
|
||||
signals.withdrawal_detected = signals.frequency_change < -0.5
|
||||
|
||||
# 5. Return after absence
|
||||
signals.return_after_absence = self._detect_return_after_absence(session_id, now)
|
||||
|
||||
# 6. Abrupt termination
|
||||
signals.abrupt_termination = self._detect_abrupt_termination(events)
|
||||
|
||||
# 7. Aggregate behavioral score
|
||||
signals.behavioral_score = self._compute_behavioral_score(signals)
|
||||
|
||||
return signals
|
||||
|
||||
def _analyze_frequency(self, session_id: str, now: float) -> float:
|
||||
"""Compare recent frequency to baseline. Returns -1 to +1."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 3:
|
||||
return 0.0
|
||||
|
||||
# Count messages in last hour
|
||||
one_hour_ago = now - 3600
|
||||
recent_count = sum(1 for e in events if e.timestamp > one_hour_ago and e.is_user)
|
||||
|
||||
# Get baseline
|
||||
baseline = self._get_baseline(session_id)
|
||||
baseline_rate = baseline.get("avg_messages_per_hour", 5.0)
|
||||
|
||||
if baseline_rate <= 0:
|
||||
return 0.0
|
||||
|
||||
ratio = recent_count / baseline_rate
|
||||
# Map to -1..+1: 0.5x = -0.5, 1x = 0, 2x = +1
|
||||
if ratio < 1.0:
|
||||
return max(-1.0, (ratio - 1.0))
|
||||
else:
|
||||
return min(1.0, (ratio - 1.0) / 2.0)
|
||||
|
||||
def _is_late_night(self, timestamp: float) -> bool:
|
||||
"""Check if timestamp falls in the 2-5 AM window."""
|
||||
dt = datetime.fromtimestamp(timestamp)
|
||||
hour = dt.hour
|
||||
return self.LATE_NIGHT_START <= hour < self.LATE_NIGHT_END
|
||||
|
||||
def _analyze_session_length_trend(self, session_id: str) -> str:
|
||||
"""Determine if session lengths are increasing, decreasing, or stable."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 10:
|
||||
return "stable"
|
||||
|
||||
# Split events into first half and second half
|
||||
mid = len(events) // 2
|
||||
first_half = events[:mid]
|
||||
second_half = events[mid:]
|
||||
|
||||
# Calculate session spans (first to last message)
|
||||
if not first_half or not second_half:
|
||||
return "stable"
|
||||
|
||||
first_span = first_half[-1].timestamp - first_half[0].timestamp
|
||||
second_span = second_half[-1].timestamp - second_half[0].timestamp
|
||||
|
||||
if second_span > first_span * 1.3:
|
||||
return "increasing"
|
||||
elif second_span < first_span * 0.7:
|
||||
return "decreasing"
|
||||
return "stable"
|
||||
|
||||
def _detect_return_after_absence(self, session_id: str, now: float) -> bool:
|
||||
"""Detect if this session started after a long absence."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 2:
|
||||
return False
|
||||
|
||||
# Check gap between current session and previous
|
||||
last_activity = self._last_activity.get(session_id, 0)
|
||||
if last_activity <= 0:
|
||||
return False
|
||||
|
||||
# Find previous session's last event (excluding current session)
|
||||
prev_events = [e for e in events[:-10] if e.timestamp < now - 3600]
|
||||
if not prev_events:
|
||||
return False
|
||||
|
||||
gap_hours = (now - prev_events[-1].timestamp) / 3600
|
||||
return gap_hours >= self.ABSENCE_THRESHOLD_HOURS
|
||||
|
||||
def _detect_abrupt_termination(self, events: List[SessionEvent]) -> bool:
|
||||
"""Detect if the last few messages had emotional content and then stopped."""
|
||||
if len(events) < 3:
|
||||
return False
|
||||
|
||||
# Check last 5 events
|
||||
recent = events[-5:]
|
||||
has_emotional = any(e.emotional_content for e in recent)
|
||||
last_was_user = recent[-1].is_user if recent else False
|
||||
last_was_short = recent[-1].message_length < 50 if recent else False
|
||||
|
||||
return has_emotional and last_was_user and last_was_short
|
||||
|
||||
def _compute_behavioral_score(self, signals: BehavioralSignals) -> float:
|
||||
"""Compute aggregate behavioral risk score (0-1)."""
|
||||
score = 0.0
|
||||
weights = {
|
||||
"frequency_spike": 0.15,
|
||||
"frequency_drop": 0.20,
|
||||
"late_night": 0.10,
|
||||
"session_increasing": 0.10,
|
||||
"withdrawal": 0.20,
|
||||
"return_after_absence": 0.05,
|
||||
"abrupt_termination": 0.20,
|
||||
}
|
||||
|
||||
# Frequency spike (anxiety)
|
||||
if signals.frequency_change > 0.5:
|
||||
score += weights["frequency_spike"] * min(signals.frequency_change, 1.0)
|
||||
|
||||
# Frequency drop (withdrawal)
|
||||
if signals.frequency_change < -0.3:
|
||||
score += weights["frequency_drop"] * min(abs(signals.frequency_change), 1.0)
|
||||
|
||||
# Late night
|
||||
if signals.is_late_night:
|
||||
score += weights["late_night"]
|
||||
|
||||
# Session length increasing
|
||||
if signals.session_length_trend == "increasing":
|
||||
score += weights["session_increasing"]
|
||||
|
||||
# Withdrawal
|
||||
if signals.withdrawal_detected:
|
||||
score += weights["withdrawal"]
|
||||
|
||||
# Return after absence
|
||||
if signals.return_after_absence:
|
||||
score += weights["return_after_absence"]
|
||||
|
||||
# Abrupt termination
|
||||
if signals.abrupt_termination:
|
||||
score += weights["abrupt_termination"]
|
||||
|
||||
return min(1.0, score)
|
||||
|
||||
def _get_baseline(self, session_id: str) -> Dict[str, float]:
|
||||
"""Get baseline metrics for a session."""
|
||||
if session_id in self._baselines:
|
||||
return self._baselines[session_id]
|
||||
return self._global_baseline
|
||||
|
||||
def _update_baseline(self, session_id: str) -> None:
|
||||
"""Update rolling baseline from recent events."""
|
||||
events = self._events.get(session_id, [])
|
||||
if len(events) < 5:
|
||||
return
|
||||
|
||||
# Use last 7 days of events
|
||||
cutoff = time.time() - (self.BASELINE_WINDOW_DAYS * 86400)
|
||||
recent = [e for e in events if e.timestamp > cutoff and e.is_user]
|
||||
|
||||
if not recent:
|
||||
return
|
||||
|
||||
# Calculate metrics
|
||||
time_span_hours = (recent[-1].timestamp - recent[0].timestamp) / 3600
|
||||
if time_span_hours > 0:
|
||||
msgs_per_hour = len(recent) / time_span_hours
|
||||
else:
|
||||
msgs_per_hour = len(recent)
|
||||
|
||||
avg_length = sum(e.message_length for e in recent) / len(recent)
|
||||
|
||||
self._baselines[session_id] = {
|
||||
"avg_messages_per_hour": msgs_per_hour,
|
||||
"avg_message_length": avg_length,
|
||||
"total_messages": len(recent),
|
||||
}
|
||||
|
||||
|
||||
# Global singleton for convenience
|
||||
_global_tracker = BehavioralTracker()
|
||||
|
||||
|
||||
def record_event(
|
||||
session_id: str,
|
||||
timestamp: Optional[float] = None,
|
||||
message_length: int = 0,
|
||||
is_user: bool = True,
|
||||
emotional_content: bool = False,
|
||||
terminated_abruptly: bool = False,
|
||||
) -> None:
|
||||
"""Record an event to the global behavioral tracker."""
|
||||
_global_tracker.record(
|
||||
session_id, timestamp, message_length,
|
||||
is_user, emotional_content, terminated_abruptly,
|
||||
)
|
||||
|
||||
|
||||
def get_risk_signals(session_id: str) -> BehavioralSignals:
|
||||
"""Get behavioral risk signals for a session."""
|
||||
return _global_tracker.get_risk_signals(session_id)
|
||||
311
crisis/behavioral.py
Normal file
311
crisis/behavioral.py
Normal file
@@ -0,0 +1,311 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Behavioral Pattern Detection for Crisis Signals (#133).
|
||||
|
||||
Detects crisis risk from session-level behavioral patterns:
|
||||
- Message frequency (increasing urgency = rapid-fire messages)
|
||||
- Time-of-day (late-night messages correlate with crisis risk)
|
||||
- Withdrawal (decreasing communication after engagement)
|
||||
- Escalation (crisis indicators getting stronger over time)
|
||||
|
||||
Usage:
|
||||
from crisis.behavioral import analyze_session, BehavioralSignal
|
||||
|
||||
signals = analyze_session(messages)
|
||||
for sig in signals:
|
||||
if sig.risk_level == "HIGH":
|
||||
# Escalate to crisis protocol
|
||||
pass
|
||||
"""
|
||||
|
||||
import math
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class Message:
|
||||
"""A single message in a session."""
|
||||
timestamp: datetime
|
||||
content: str
|
||||
crisis_score: float = 0.0 # 0.0-1.0 from text detector
|
||||
role: str = "user" # "user" or "assistant"
|
||||
|
||||
|
||||
@dataclass
|
||||
class BehavioralSignal:
|
||||
"""A detected behavioral pattern indicating crisis risk."""
|
||||
signal_type: str # "frequency", "time", "withdrawal", "escalation"
|
||||
risk_level: str # "LOW", "MEDIUM", "HIGH"
|
||||
description: str
|
||||
evidence: list = field(default_factory=list)
|
||||
score: float = 0.0 # 0.0-1.0
|
||||
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────────
|
||||
|
||||
# Message frequency thresholds (messages per hour)
|
||||
FREQ_NORMAL = 6 # <6/hr = normal
|
||||
FREQ_ELEVATED = 15 # 6-15/hr = elevated
|
||||
FREQ_HIGH = 30 # >30/hr = high urgency
|
||||
|
||||
# Time-of-day risk windows (hours in 24h format)
|
||||
HIGH_RISK_HOURS = set(range(1, 5)) # 1AM-4AM
|
||||
ELEVATED_RISK_HOURS = set(range(22, 24)) | set(range(5, 7)) # 10PM-12AM, 5AM-7AM
|
||||
|
||||
# Withdrawal: messages/day trend
|
||||
WITHDRAWAL_THRESHOLD = 0.3 # Current day < 30% of average = withdrawal
|
||||
|
||||
# Escalation: crisis score trend
|
||||
ESCALATION_WINDOW = 5 # Look at last N messages
|
||||
|
||||
|
||||
# ── Frequency Analysis ────────────────────────────────────────────────────────
|
||||
|
||||
def _analyze_frequency(messages: list[Message]) -> Optional[BehavioralSignal]:
|
||||
"""Detect rapid-fire messaging (urgency indicator)."""
|
||||
if len(messages) < 3:
|
||||
return None
|
||||
|
||||
user_msgs = [m for m in messages if m.role == "user"]
|
||||
if len(user_msgs) < 3:
|
||||
return None
|
||||
|
||||
# Calculate messages per hour in the most recent window
|
||||
recent = user_msgs[-10:] # Last 10 user messages
|
||||
if len(recent) < 2:
|
||||
return None
|
||||
|
||||
time_span = (recent[-1].timestamp - recent[0].timestamp).total_seconds()
|
||||
if time_span <= 0:
|
||||
return None
|
||||
|
||||
msg_per_hour = len(recent) / (time_span / 3600)
|
||||
|
||||
if msg_per_hour >= FREQ_HIGH:
|
||||
return BehavioralSignal(
|
||||
signal_type="frequency",
|
||||
risk_level="HIGH",
|
||||
description=f"Very rapid messaging: {msg_per_hour:.0f} messages/hour",
|
||||
evidence=[f"Last {len(recent)} messages in {time_span/60:.0f} minutes"],
|
||||
score=min(1.0, msg_per_hour / FREQ_HIGH),
|
||||
)
|
||||
elif msg_per_hour >= FREQ_ELEVATED:
|
||||
return BehavioralSignal(
|
||||
signal_type="frequency",
|
||||
risk_level="MEDIUM",
|
||||
description=f"Elevated messaging rate: {msg_per_hour:.0f} messages/hour",
|
||||
evidence=[f"Last {len(recent)} messages in {time_span/60:.0f} minutes"],
|
||||
score=msg_per_hour / FREQ_HIGH,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# ── Time-of-Day Analysis ─────────────────────────────────────────────────────
|
||||
|
||||
def _analyze_time(messages: list[Message]) -> Optional[BehavioralSignal]:
|
||||
"""Detect late-night messaging (correlates with crisis risk)."""
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
# Check most recent messages
|
||||
recent = messages[-5:]
|
||||
late_night_count = sum(1 for m in recent if m.timestamp.hour in HIGH_RISK_HOURS)
|
||||
elevated_count = sum(1 for m in recent if m.timestamp.hour in ELEVATED_RISK_HOURS)
|
||||
|
||||
if late_night_count >= 3:
|
||||
return BehavioralSignal(
|
||||
signal_type="time",
|
||||
risk_level="HIGH",
|
||||
description=f"Late-night messaging pattern: {late_night_count}/5 messages between 1-4 AM",
|
||||
evidence=[f"Message at {m.timestamp.strftime('%H:%M')}" for m in recent if m.timestamp.hour in HIGH_RISK_HOURS],
|
||||
score=late_night_count / len(recent),
|
||||
)
|
||||
elif elevated_count >= 3:
|
||||
return BehavioralSignal(
|
||||
signal_type="time",
|
||||
risk_level="MEDIUM",
|
||||
description=f"Off-hours messaging: {elevated_count}/5 messages in elevated-risk window",
|
||||
evidence=[f"Message at {m.timestamp.strftime('%H:%M')}" for m in recent if m.timestamp.hour in ELEVATED_RISK_HOURS],
|
||||
score=elevated_count / len(recent) * 0.5,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# ── Withdrawal Detection ──────────────────────────────────────────────────────
|
||||
|
||||
def _analyze_withdrawal(messages: list[Message]) -> Optional[BehavioralSignal]:
|
||||
"""Detect communication withdrawal (decreasing engagement)."""
|
||||
user_msgs = [m for m in messages if m.role == "user"]
|
||||
if len(user_msgs) < 10:
|
||||
return None
|
||||
|
||||
# Split into first half and second half
|
||||
mid = len(user_msgs) // 2
|
||||
first_half = user_msgs[:mid]
|
||||
second_half = user_msgs[mid:]
|
||||
|
||||
# Average message length as engagement proxy
|
||||
first_avg_len = sum(len(m.content) for m in first_half) / len(first_half)
|
||||
second_avg_len = sum(len(m.content) for m in second_half) / len(second_half)
|
||||
|
||||
# Time between messages
|
||||
def avg_gap(msgs):
|
||||
if len(msgs) < 2:
|
||||
return 0
|
||||
gaps = [(msgs[i+1].timestamp - msgs[i].timestamp).total_seconds() for i in range(len(msgs)-1)]
|
||||
return sum(gaps) / len(gaps)
|
||||
|
||||
first_gap = avg_gap(first_half)
|
||||
second_gap = avg_gap(second_half)
|
||||
|
||||
# Withdrawal = shorter messages AND longer gaps
|
||||
length_ratio = second_avg_len / first_avg_len if first_avg_len > 0 else 1.0
|
||||
gap_ratio = second_gap / first_gap if first_gap > 0 else 1.0
|
||||
|
||||
if length_ratio < 0.5 and gap_ratio > 2.0:
|
||||
return BehavioralSignal(
|
||||
signal_type="withdrawal",
|
||||
risk_level="HIGH",
|
||||
description="Significant withdrawal: messages shorter and less frequent",
|
||||
evidence=[
|
||||
f"Message length: {first_avg_len:.0f} -> {second_avg_len:.0f} chars ({length_ratio:.0%})",
|
||||
f"Message gap: {first_gap/60:.0f}min -> {second_gap/60:.0f}min ({gap_ratio:.1f}x)",
|
||||
],
|
||||
score=min(1.0, (1 - length_ratio) * 0.5 + (gap_ratio - 1) * 0.25),
|
||||
)
|
||||
elif length_ratio < 0.7 or gap_ratio > 1.5:
|
||||
return BehavioralSignal(
|
||||
signal_type="withdrawal",
|
||||
risk_level="MEDIUM",
|
||||
description="Moderate withdrawal: engagement decreasing",
|
||||
evidence=[
|
||||
f"Message length: {first_avg_len:.0f} -> {second_avg_len:.0f} chars",
|
||||
f"Message gap: {first_gap/60:.0f}min -> {second_gap/60:.0f}min",
|
||||
],
|
||||
score=(1 - length_ratio) * 0.3 + (gap_ratio - 1) * 0.15,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# ── Escalation Detection ─────────────────────────────────────────────────────
|
||||
|
||||
def _analyze_escalation(messages: list[Message]) -> Optional[BehavioralSignal]:
|
||||
"""Detect rising crisis scores over recent messages."""
|
||||
user_msgs = [m for m in messages if m.role == "user" and m.crisis_score > 0]
|
||||
if len(user_msgs) < ESCALATION_WINDOW:
|
||||
return None
|
||||
|
||||
recent = user_msgs[-ESCALATION_WINDOW:]
|
||||
scores = [m.crisis_score for m in recent]
|
||||
|
||||
# Check for upward trend
|
||||
if len(scores) < 3:
|
||||
return None
|
||||
|
||||
# Simple linear trend: is score increasing?
|
||||
first_half_avg = sum(scores[:len(scores)//2]) / (len(scores)//2)
|
||||
second_half_avg = sum(scores[len(scores)//2:]) / (len(scores) - len(scores)//2)
|
||||
|
||||
if second_half_avg > first_half_avg * 1.5 and second_half_avg > 0.5:
|
||||
return BehavioralSignal(
|
||||
signal_type="escalation",
|
||||
risk_level="HIGH",
|
||||
description=f"Crisis escalation detected: scores rising from {first_half_avg:.2f} to {second_half_avg:.2f}",
|
||||
evidence=[f"Score {i+1}: {s:.2f}" for i, s in enumerate(scores)],
|
||||
score=min(1.0, second_half_avg),
|
||||
)
|
||||
elif second_half_avg > first_half_avg * 1.2 and second_half_avg > 0.3:
|
||||
return BehavioralSignal(
|
||||
signal_type="escalation",
|
||||
risk_level="MEDIUM",
|
||||
description=f"Mild escalation: scores trending up",
|
||||
evidence=[f"Score {i+1}: {s:.2f}" for i, s in enumerate(scores)],
|
||||
score=second_half_avg * 0.5,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# ── Combined Analysis ─────────────────────────────────────────────────────────
|
||||
|
||||
def analyze_session(messages: list[Message]) -> list[BehavioralSignal]:
|
||||
"""Analyze a session for behavioral crisis signals.
|
||||
|
||||
Args:
|
||||
messages: List of Message objects with timestamps, content, and crisis scores.
|
||||
|
||||
Returns:
|
||||
List of BehavioralSignal objects, sorted by risk level (HIGH first).
|
||||
"""
|
||||
signals = []
|
||||
|
||||
freq = _analyze_frequency(messages)
|
||||
if freq:
|
||||
signals.append(freq)
|
||||
|
||||
time_sig = _analyze_time(messages)
|
||||
if time_sig:
|
||||
signals.append(time_sig)
|
||||
|
||||
withdrawal = _analyze_withdrawal(messages)
|
||||
if withdrawal:
|
||||
signals.append(withdrawal)
|
||||
|
||||
escalation = _analyze_escalation(messages)
|
||||
if escalation:
|
||||
signals.append(escalation)
|
||||
|
||||
# Sort: HIGH first, then MEDIUM, then LOW
|
||||
risk_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
|
||||
signals.sort(key=lambda s: (risk_order.get(s.risk_level, 9), -s.score))
|
||||
|
||||
return signals
|
||||
|
||||
|
||||
def get_session_risk_level(signals: list[BehavioralSignal]) -> str:
|
||||
"""Get overall session risk from behavioral signals."""
|
||||
if not signals:
|
||||
return "NONE"
|
||||
if any(s.risk_level == "HIGH" for s in signals):
|
||||
return "HIGH"
|
||||
if any(s.risk_level == "MEDIUM" for s in signals):
|
||||
return "MEDIUM"
|
||||
return "LOW"
|
||||
|
||||
|
||||
# ── Self-Test ─────────────────────────────────────────────────────────────────
|
||||
|
||||
if __name__ == "__main__":
|
||||
from datetime import timedelta
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# Test: rapid-fire messaging
|
||||
rapid_msgs = [
|
||||
Message(timestamp=now - timedelta(minutes=i), content="help me", role="user")
|
||||
for i in range(20, 0, -1)
|
||||
]
|
||||
signals = analyze_session(rapid_msgs)
|
||||
print(f"Rapid-fire: {[s.signal_type + ':' + s.risk_level for s in signals]}")
|
||||
assert any(s.signal_type == "frequency" for s in signals), "Should detect frequency"
|
||||
|
||||
# Test: late-night
|
||||
late_msgs = [
|
||||
Message(timestamp=now.replace(hour=2, minute=i*5), content="cant sleep", role="user")
|
||||
for i in range(5)
|
||||
]
|
||||
signals = analyze_session(late_msgs)
|
||||
print(f"Late-night: {[s.signal_type + ':' + s.risk_level for s in signals]}")
|
||||
assert any(s.signal_type == "time" for s in signals), "Should detect time"
|
||||
|
||||
# Test: escalation
|
||||
esc_msgs = [
|
||||
Message(timestamp=now - timedelta(minutes=i*10), content="feeling bad",
|
||||
role="user", crisis_score=0.1 + i*0.15)
|
||||
for i in range(5, 0, -1)
|
||||
]
|
||||
signals = analyze_session(esc_msgs)
|
||||
print(f"Escalation: {[s.signal_type + ':' + s.risk_level for s in signals]}")
|
||||
assert any(s.signal_type == "escalation" for s in signals), "Should detect escalation"
|
||||
|
||||
print("\nAll self-tests passed!")
|
||||
@@ -1,136 +0,0 @@
|
||||
"""Tests for behavioral pattern detection."""
|
||||
|
||||
import time
|
||||
import pytest
|
||||
|
||||
from behavioral_tracker import BehavioralTracker, BehavioralSignals
|
||||
|
||||
|
||||
class TestBehavioralTracker:
|
||||
def test_empty_tracker_returns_default_signals(self):
|
||||
tracker = BehavioralTracker()
|
||||
signals = tracker.get_risk_signals("session-1")
|
||||
assert signals.behavioral_score == 0.0
|
||||
assert not signals.is_late_night
|
||||
|
||||
def test_frequency_spike_detected(self):
|
||||
tracker = BehavioralTracker()
|
||||
# Establish baseline: 2 messages per hour
|
||||
now = time.time()
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - (10 - i) * 1800, message_length=100)
|
||||
|
||||
# Spike: 10 messages in last 10 minutes
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - (10 - i) * 60, message_length=50)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.frequency_change > 0.3 # Significant spike
|
||||
|
||||
def test_frequency_drop_detected(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# Baseline: heavy usage
|
||||
for i in range(50):
|
||||
tracker.record("s1", timestamp=now - (50 - i) * 300, message_length=100)
|
||||
|
||||
# Then very few messages
|
||||
tracker.record("s1", timestamp=now - 60, message_length=50)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
# Recent activity is much lower than baseline
|
||||
assert signals.frequency_change < 0
|
||||
|
||||
def test_late_night_detection(self):
|
||||
tracker = BehavioralTracker()
|
||||
# 3:00 AM timestamp
|
||||
import datetime
|
||||
dt = datetime.datetime(2026, 4, 14, 3, 0, 0)
|
||||
ts = dt.timestamp()
|
||||
|
||||
tracker.record("s1", timestamp=ts, message_length=100)
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.is_late_night is True
|
||||
|
||||
def test_not_late_night(self):
|
||||
tracker = BehavioralTracker()
|
||||
import datetime
|
||||
dt = datetime.datetime(2026, 4, 14, 14, 0, 0) # 2 PM
|
||||
ts = dt.timestamp()
|
||||
|
||||
tracker.record("s1", timestamp=ts, message_length=100)
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.is_late_night is False
|
||||
|
||||
def test_session_length_increasing(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# First half: messages spread over 5 minutes
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - 600 + i * 30, message_length=100)
|
||||
# Second half: messages spread over 30 minutes
|
||||
for i in range(10):
|
||||
tracker.record("s1", timestamp=now - 300 + i * 180, message_length=100)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.session_length_trend == "increasing"
|
||||
|
||||
def test_withdrawal_detected(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# High baseline
|
||||
for i in range(50):
|
||||
tracker.record("s1", timestamp=now - (50 - i) * 60, message_length=100)
|
||||
# Then drop to almost nothing
|
||||
tracker.record("s1", timestamp=now - 60, message_length=20)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.withdrawal_detected is True
|
||||
|
||||
def test_abrupt_termination_after_emotional(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
# Normal messages
|
||||
for i in range(5):
|
||||
tracker.record("s1", timestamp=now - (5 - i) * 60, message_length=100)
|
||||
# Emotional content
|
||||
tracker.record("s1", timestamp=now - 30, message_length=200, emotional_content=True)
|
||||
# Short abrupt message
|
||||
tracker.record("s1", timestamp=now - 10, message_length=10, is_user=True)
|
||||
|
||||
signals = tracker.get_risk_signals("s1")
|
||||
assert signals.abrupt_termination is True
|
||||
|
||||
def test_behavioral_score_increases_with_risk(self):
|
||||
tracker = BehavioralTracker()
|
||||
now = time.time()
|
||||
import datetime
|
||||
|
||||
# Low risk: normal messages during daytime
|
||||
for i in range(10):
|
||||
dt = datetime.datetime(2026, 4, 14, 14, i, 0) # 2 PM
|
||||
tracker.record("s1", timestamp=dt.timestamp(), message_length=100)
|
||||
low_risk = tracker.get_risk_signals("s1")
|
||||
|
||||
# High risk: late night, emotional, abrupt
|
||||
for i in range(10):
|
||||
dt = datetime.datetime(2026, 4, 14, 3, i, 0) # 3 AM
|
||||
tracker.record("s2", timestamp=dt.timestamp(), message_length=100, emotional_content=True)
|
||||
tracker.record("s2", timestamp=datetime.datetime(2026, 4, 14, 3, 10, 0).timestamp(),
|
||||
message_length=10, is_user=True)
|
||||
high_risk = tracker.get_risk_signals("s2")
|
||||
|
||||
assert high_risk.behavioral_score > low_risk.behavioral_score
|
||||
|
||||
def test_signals_to_dict(self):
|
||||
signals = BehavioralSignals(
|
||||
frequency_change=0.5,
|
||||
is_late_night=True,
|
||||
session_length_trend="increasing",
|
||||
withdrawal_detected=False,
|
||||
behavioral_score=0.4,
|
||||
)
|
||||
d = signals.to_dict()
|
||||
assert d["frequency_change"] == 0.5
|
||||
assert d["is_late_night"] is True
|
||||
assert d["session_length_trend"] == "increasing"
|
||||
Reference in New Issue
Block a user