Files
the-door/crisis/detect.py
Alexander Whitestone da31288525
All checks were successful
Smoke Test / smoke (push) Successful in 4s
fix: deprecate dying_detection and consolidate crisis detection (#40) (#76)
Merge PR #76 (squash)
2026-04-14 22:08:29 +00:00

406 lines
14 KiB
Python

"""
Crisis Detection Module for the-door (CANONICAL).
Single source of truth for crisis keyword detection.
Parses incoming text for despair/suicide indicators and classifies into
tiers: LOW, MEDIUM, HIGH, CRITICAL.
Both `crisis/detect.py` and `crisis_detector.py` (legacy shim) use this module.
Do NOT duplicate keyword lists elsewhere.
Consolidated from:
- crisis_detector.py (root, legacy — MODERATE tier, class API)
- crisis/detect.py (module, newer — MEDIUM tier, functional API)
"""
import re
from dataclasses import dataclass, field
from typing import List, Optional
# ── Canonical indicator sets ──────────────────────────────────────
CRITICAL_INDICATORS = [
r"\bbetter off without me\b",
r"\bkill\s*(my)?self\b",
r"\bend\s*my\s*life\b",
r"\bsuicid(?:al|ed|e)\b",
r"\bnot\s+worth\s+living\b",
r"\bbetter\s+off\s+dead\b",
r"\bend\s+it\s+all\b",
r"\bno\s+reason\s+to\s+live\b",
r"\bdon'?t\s+want\s+to\s+live\b",
r"\bwant\s+to\s+die\b",
r"\bgoing\s+to\s+(?:kill\s+myself|die)\b",
r"\bplan\s+to\s+(?:end|kill|die)\b",
r"\btired\s+of\s+(?:living|life|existence)\b",
r"\bsaying\s+goodbye\s+(?:forever|permanently|one\s+last\s+time)\b",
r"\bwrote\s+a\s+suicide\s*(?:note|letter)\b",
r"\bgiving\s+away\s+(?:my|all\s+my)\s+(?:stuff|things|possessions?)\s+(?:to|because|—)\b",
r"\btied\s+(?:up|down)\s+my\s+(?:loose\s+)?ends",
]
HIGH_INDICATORS = [
r"\bdespair\b",
r"\bhopeless(?:ly)?\s+(?:about\s+(?:my|this|everything|life)|inside|right\s+now)\b",
r"\bno(?!t)\s+(?:one|body|point|hope|way\s+out)\b",
r"\bno\s+future\s+(?:for\s+me|ahead|left)\b",
r"\beverything\s+is\s+(?:pointless|broken|ruined)\b",
r"\bcan'?t\s+take\s+this\s+anymore\b",
r"\bdon'?t\s+care\s+if\s+I\s+die\b",
r"\bwish\s+I\s+(?:was|were)\s+(?:dead|gone|never\s+born)\b",
r"\bdon'?t\s+matter\s+if\s+I\s+exist\b",
r"\bno\s+one\s+would\s+(?:care|miss)\b",
r"\bworld\s+would?\s+be\s+better\s+without\s+me\b",
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment|anguish)\b",
r"\bcan'?t\s+see\s+any\s+(?:point|reason|hope|way)\b",
r"\bescape\s+from\s*this",
r"\bjust\s+want\s+it\s+to\s+stop\b",
r"\bnothing\s+left\s+(?:to\s+(?:live\s+for|hope\s+for|give)|inside)\b",
r"\bdisappeared\s+forever\b",
# Contextual despair phrases (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
r"\beverything\s+is\s+hopeless\b",
r"\bcan'?t\s+(?:go\s+on|keep\s+going)\b",
r"\bgive(?:n)?\s*up\s+(?:on\s+)?(?:life|living|everything)\b",
r"\bgive(?:n)?\s*up\s+on\s+myself\b",
r"\bno\s*point\s+(?:in\s+)?living\b",
r"\bno\s*hope\s+(?:left|remaining)\b",
r"\bno\s*way\s*out\b",
r"\bfeel(?:s|ing)?\s+trapped\b",
r"\btrapped\s+in\s+this\s+(?:situation|life|pain|darkness|hell)\b",
r"\btrapped\s+and\s+can'?t\s+escape\b",
r"\bdesperate\s+(?:for\s+)?help\b",
r"\bfeel(?:s|ing)?\s+desperate\b",
]
MEDIUM_INDICATORS = [
r"\bno\s+hope\b",
r"\bforgotten\b",
r"\balone\s+in\s+this\b",
r"\balways\s+alone\b",
r"\bnobody\s+(?:understands|cares)\b",
r"\bwish\s+I\s+could\b",
r"\bexhaust(?:ed|ion|ing)\b",
r"\bnumb\b",
r"\bempty\b",
r"\bworthless\b",
r"\buseless\b",
r"\bbroken\b",
r"\bdark(ness)?\b",
r"\bdepress(?:ed|ion)\b",
r"\bcrying\b",
r"\btears\b",
r"\bsad(ness)?\b",
r"\bmiserable\b",
r"\boverwhelm(?:ed|ing)\b",
r"\bfailing\b",
r"\bcannot\s+cope\b",
r"\blosing\s*(?:my)?\s*control\b",
r"\bdown\s*for\s*the\s*count\b",
r"\bsinking\b",
r"\bdrowning\b",
r"\bhopeless\b",
r"\blost\s+all\s+hope\b",
r"\bno\s+tomorrow\b",
# Contextual versions (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
r"\bfeel(?:s|ing)?\s+trapped\b",
r"\bfeel(?:s|ing)?\s+desperate\b",
r"\bno\s+future\s+(?:for\s+me|ahead|left)\b",
r"\bnothing\s+left\s+(?:to\s+(?:live|hope)\s+for|inside)\b",
r"\bgive(?:n)?\s*up\s+on\s+myself\b",
]
LOW_INDICATORS = [
r"\bunhappy\b",
r"\bdown\b",
r"\btough\s*(?:time|day|week)\b",
r"\brough\s+(?:day|week|patch)\b",
r"\bstressed\b",
r"\bburnout\b",
r"\bdifficult\b",
r"\bfrustrated\b",
r"\bannoyed\b",
r"\btired\b",
r"\bsad\b",
r"\bupset\b",
r"\blonely\b",
r"\banxious?\b",
r"\bnot\s*(?:good|great|okay)\b",
r"\bthings\s*are\s*hard\b",
r"\bstruggling\b",
r"\bnot\s+feeling\s+(?:like\s+)?myself\b",
r"\bhard\s*time\b",
]
# ── Scoring ───────────────────────────────────────────────────────
SCORES = {"CRITICAL": 1.0, "HIGH": 0.75, "MEDIUM": 0.5, "LOW": 0.25, "NONE": 0.0}
ACTIONS = {
"CRITICAL": (
"INTERRUPT CONVERSATION. Ask 'Are you safe right now?' "
"Provide 988 Suicide & Crisis Lifeline. "
"Provide Crisis Text Line (HOME to 741741). "
"Stay present. Do not disconnect. Redirect to professional help."
),
"HIGH": (
"Show crisis panel. Ask about safety. Surface 988 number prominently. "
"Continue conversation with crisis awareness."
),
"MEDIUM": (
"Increase warmth and presence. Subtly surface help resources. "
"Keep conversation anchored in the present."
),
"LOW": (
"Normal conversation with warm undertone. "
"No crisis UI elements needed. Remain vigilant."
),
}
# ── Result types ──────────────────────────────────────────────────
@dataclass
class CrisisDetectionResult:
"""Result used by crisis/detect.py, gateway, dying_detection."""
level: str
indicators: List[str] = field(default_factory=list)
recommended_action: str = ""
score: float = 0.0
matches: List[dict] = field(default_factory=list)
@dataclass
class CrisisResult:
"""Legacy result used by crisis_detector.py and crisis_responder.py.
Backward-compatible shim: wraps CrisisDetectionResult with the old field names.
"""
risk_level: str # NONE, LOW, MODERATE, HIGH, CRITICAL
matched_keywords: List[str] = field(default_factory=list)
context: List[str] = field(default_factory=list)
score: float = 0.0
timestamp: Optional[str] = None
def __bool__(self):
return self.risk_level != "NONE"
@classmethod
def from_detection_result(cls, dr: CrisisDetectionResult, text: str = "") -> "CrisisResult":
"""Convert a CrisisDetectionResult to legacy CrisisResult format."""
# Map MEDIUM -> MODERATE for legacy consumers
level = "MODERATE" if dr.level == "MEDIUM" else dr.level
# Extract context snippets from matches
contexts = []
if text:
for m in dr.matches:
ctx = extract_context(text, m["start"], m["end"])
contexts.append(ctx)
return cls(
risk_level=level,
matched_keywords=dr.indicators,
context=contexts,
score=dr.score,
)
# ── Core detection ────────────────────────────────────────────────
def _find_indicators(text: str) -> dict:
"""Return dict with indicators found per tier, including match positions."""
results = {"CRITICAL": [], "HIGH": [], "MEDIUM": [], "LOW": []}
for pattern in CRITICAL_INDICATORS:
m = re.search(pattern, text)
if m:
results["CRITICAL"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
for pattern in HIGH_INDICATORS:
m = re.search(pattern, text)
if m:
results["HIGH"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
for pattern in MEDIUM_INDICATORS:
m = re.search(pattern, text)
if m:
results["MEDIUM"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
for pattern in LOW_INDICATORS:
m = re.search(pattern, text)
if m:
results["LOW"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
return results
def detect_crisis(text: str) -> CrisisDetectionResult:
"""
Detect crisis level in a message.
Detection hierarchy:
CRITICAL — immediate risk of self-harm or suicide (single match)
HIGH — strong despair signals, ideation present (single match)
MEDIUM — distress signals, requires 2+ indicators to escalate
LOW — emotional difficulty, warrant gentle support (single match)
NONE — no crisis indicators detected
Design principles:
- Never computes the value of a human life
- Never suggests someone should die or that death is a solution
- Always errs on the side of higher risk when uncertain
"""
if not text or not text.strip():
return CrisisDetectionResult(level="NONE", score=0.0)
text_lower = text.lower()
matches = _find_indicators(text_lower)
if not matches:
return CrisisDetectionResult(level="NONE", score=0.0)
# CRITICAL and HIGH: single match is enough
for tier in ("CRITICAL", "HIGH"):
if matches[tier]:
tier_matches = matches[tier]
patterns = [m["pattern"] for m in tier_matches]
return CrisisDetectionResult(
level=tier,
indicators=patterns,
recommended_action=ACTIONS[tier],
score=SCORES[tier],
matches=tier_matches,
)
# MEDIUM tier: require at least 2 indicators before escalating
if len(matches["MEDIUM"]) >= 2:
tier_matches = matches["MEDIUM"]
patterns = [m["pattern"] for m in tier_matches]
return CrisisDetectionResult(
level="MEDIUM",
indicators=patterns,
recommended_action=ACTIONS["MEDIUM"],
score=SCORES["MEDIUM"],
matches=tier_matches,
)
if matches["LOW"]:
tier_matches = matches["LOW"]
patterns = [m["pattern"] for m in tier_matches]
return CrisisDetectionResult(
level="LOW",
indicators=patterns,
recommended_action=ACTIONS["LOW"],
score=SCORES["LOW"],
matches=tier_matches,
)
# Single MEDIUM match falls through to LOW sensitivity
if matches["MEDIUM"]:
tier_matches = matches["MEDIUM"]
patterns = [m["pattern"] for m in tier_matches]
return CrisisDetectionResult(
level="LOW",
indicators=patterns,
recommended_action=ACTIONS["LOW"],
score=SCORES["LOW"],
matches=tier_matches,
)
return CrisisDetectionResult(level="NONE", score=0.0)
# ── CrisisDetector class (backward compat) ───────────────────────
class CrisisDetector:
"""
Legacy class API for crisis detection. Wraps the canonical detect_crisis().
Used by crisis_responder.py and tests/test_false_positive_fixes.py.
Maps MEDIUM -> MODERATE for legacy consumers.
"""
def scan(self, text: str) -> CrisisResult:
dr = detect_crisis(text)
return CrisisResult.from_detection_result(dr, text=text)
def scan_multiple(self, texts: List[str]) -> List[CrisisResult]:
return [self.scan(t) for t in texts]
def get_highest_risk(self, texts: List[str]) -> CrisisResult:
results = self.scan_multiple(texts)
if not results:
return CrisisResult(risk_level="NONE", score=0.0)
return max(results, key=lambda r: r.score)
@staticmethod
def format_result(result: CrisisResult) -> str:
level_emoji = {
"CRITICAL": "\U0001f6a8",
"HIGH": "\u26a0\ufe0f",
"MODERATE": "\U0001f536",
"LOW": "\U0001f535",
"NONE": "\u2705",
}
emoji = level_emoji.get(result.risk_level, "\u2753")
lines = [
f"{emoji} Risk Level: {result.risk_level} (score: {result.score:.2f})",
f"Matched keywords: {len(result.matched_keywords)}",
]
if result.matched_keywords:
lines.append(f" Keywords: {', '.join(result.matched_keywords)}")
if result.context:
lines.append("Context:")
for ctx in result.context:
lines.append(f" {ctx}")
return "\n".join(lines)
# ── Module-level convenience (backward compat) ────────────────────
_default_detector = CrisisDetector()
def detect_crisis_legacy(text: str) -> CrisisResult:
"""Convenience function returning legacy CrisisResult format."""
return _default_detector.scan(text)
# ── Utility functions ─────────────────────────────────────────────
def scan(text: str) -> CrisisDetectionResult:
"""Alias for detect_crisis — shorter name used in tests."""
return detect_crisis(text)
def extract_context(text: str, start: int, end: int, window: int = 60) -> str:
"""Extract surrounding context around a match position."""
ctx_start = max(0, start - window)
ctx_end = min(len(text), end + window)
snippet = text[ctx_start:ctx_end].strip()
if ctx_start > 0:
snippet = "..." + snippet
if ctx_end < len(text):
snippet = snippet + "..."
return snippet
def get_urgency_emoji(level: str) -> str:
mapping = {"CRITICAL": "\U0001f6a8", "HIGH": "\u26a0\ufe0f", "MEDIUM": "\U0001f536", "LOW": "\U0001f535", "NONE": "\u2705"}
return mapping.get(level, "\u2753")
def format_result(result: CrisisDetectionResult) -> str:
emoji = get_urgency_emoji(result.level)
lines = [
f"{emoji} Crisis Level: {result.level} (score: {result.score})",
f"Indicators: {len(result.indicators)} found",
f"Action: {result.recommended_action or 'None needed'}",
]
if result.indicators:
lines.append(f"Patterns: {result.indicators}")
return "\n".join(lines)