feat: add edge crisis detection — scripts/crisis_detector.py (#102)
This commit is contained in:
386
scripts/crisis_detector.py
Normal file
386
scripts/crisis_detector.py
Normal file
@@ -0,0 +1,386 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
crisis_detector.py — Edge crisis detection (keyword + optional LLM)
|
||||
|
||||
Zero-dependency crisis detector for offline edge devices.
|
||||
Tier 0: Keyword matching (instant, any device)
|
||||
Tier 1: Small LLM via llama.cpp (200ms on Pi 4, optional)
|
||||
|
||||
Usage:
|
||||
python3 crisis_detector.py # Interactive mode
|
||||
python3 crisis_detector.py --test # Run self-tests
|
||||
python3 crisis_detector.py --daemon # Headless daemon mode
|
||||
python3 crisis_detector.py --check "text" # Single check
|
||||
|
||||
Part of turboquant #99, #102.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# ── Paths ───────────────────────────────────────────────────────────────────
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
DATA_DIR = SCRIPT_DIR.parent / "data"
|
||||
RESOURCES_FILE = DATA_DIR / "crisis_resources.json"
|
||||
|
||||
# ── Configuration ───────────────────────────────────────────────────────────
|
||||
|
||||
KEYWORD_THRESHOLD = 1 # Keywords needed to trigger (1 = high recall)
|
||||
LLM_THRESHOLD = 0.6 # LLM confidence cutoff
|
||||
LLM_TIMEOUT_MS = 5000 # Max LLM inference time
|
||||
LOG_FILE = None # Set by --log flag
|
||||
|
||||
# ── Tier 0: Keyword Detection ──────────────────────────────────────────────
|
||||
|
||||
# Crisis keywords organized by severity
|
||||
CRISIS_PATTERNS = {
|
||||
"immediate_danger": [
|
||||
r"\bkill(?:ing)?\s*(?:my)?self\b",
|
||||
r"\b(?:going|gonna|want(?:ing)?)\s*to\s*die\b",
|
||||
r"\b(?:end|ending)\s+(?:my|it|this)\s*life\b",
|
||||
r"\bsuicide\b",
|
||||
r"\bsuicidal\b",
|
||||
r"\bwanna\s*die\b",
|
||||
r"\bwant\s*to\s*die\b",
|
||||
r"\bcan'?t\s+(?:go\s+on|live|take\s+(?:it|this|anymore))\b",
|
||||
r"\bi'?m\s+(?:done|finished)\b.*\b(?:life|living)\b",
|
||||
r"\boverdose\b",
|
||||
r"\bhanging\s+(?:my)?self\b",
|
||||
r"\bjump(?:ing)?\s+(?:off|from)\b",
|
||||
r"\bslit(?:ting)?\s+(?:my\s+)?(?:wrists|throat)\b",
|
||||
r"\bshoot(?:ing)?\s+(?:my)?self\b",
|
||||
],
|
||||
"self_harm": [
|
||||
r"\bself[\s-]?harm(?:ing|ed)?\b",
|
||||
r"\bcut(?:ting)?\s+(?:my)?self\b",
|
||||
r"\bhurt(?:ing)?\s+(?:my)?self\b",
|
||||
r"\bpunish(?:ing)?\s+(?:my)?self\b",
|
||||
r"\bburn(?:ing)?\s+(?:my)?self\b",
|
||||
r"\bscar(?:ring)?\s+(?:my)?self\b",
|
||||
],
|
||||
"hopelessness": [
|
||||
r"\bhopeless\b",
|
||||
r"\bno\s+(?:point|reason|purpose)\b",
|
||||
r"\bwhy\s+(?:bother|try|am\s+i\s+here)\b",
|
||||
r"\bnobody\s+(?:cares|would\s+(?:miss|notice))\b",
|
||||
r"\bbeen\s+better\s+off\s+(?:dead|gone)\b",
|
||||
r"\bwouldn'?t\s+(?:miss|care)\b.*\b(?:if|when)\b.*\bdie\b",
|
||||
r"\bnothing\s+(?:matters|left)\b",
|
||||
r"\bgive\s+(?:up|me\s+death)\b",
|
||||
],
|
||||
"crisis_language": [
|
||||
r"\b(?:i|can'?t)\s+(?:handle|deal\s+with)\s+(?:this|it|anymore)\b",
|
||||
r"\btoo\s+much\s+(?:pain|suffering)\b",
|
||||
r"\bcan'?t\s+(?:take|stand)\s+(?:this|it|anymore)\b",
|
||||
r"\bbreak(?:ing|s)?\s+down\b",
|
||||
r"\b(?:i'?m|am)\s+(?:drowning|suffocating|dying)\b",
|
||||
r"\bsos\b",
|
||||
r"\bhelp\s+me\b.*\b(?:please|desperate)\b",
|
||||
r"\bemergency\b.*\b(?:mental|crisis)\b",
|
||||
r"\b(?:want|need|wish)(?:ing)?\s+(?:the|this|my)\s+pain\s+to\s+(?:stop|end|go\s+away)\b",
|
||||
r"\bmake\s+(?:the|this|my)\s+pain\s+(?:stop|end|go\s+away)\b",
|
||||
],
|
||||
}
|
||||
|
||||
# Compile all patterns
|
||||
_COMPILED_PATTERNS = {}
|
||||
for category, patterns in CRISIS_PATTERNS.items():
|
||||
_COMPILED_PATTERNS[category] = [re.compile(p, re.IGNORECASE) for p in patterns]
|
||||
|
||||
|
||||
def detect_keywords(text: str) -> dict:
|
||||
"""
|
||||
Tier 0 keyword detection. Returns match info.
|
||||
Result: {
|
||||
"detected": bool,
|
||||
"confidence": float (0-1),
|
||||
"categories": list[str],
|
||||
"matches": list[str]
|
||||
}
|
||||
"""
|
||||
matches = []
|
||||
categories = set()
|
||||
|
||||
for category, patterns in _COMPILED_PATTERNS.items():
|
||||
for pattern in patterns:
|
||||
m = pattern.search(text)
|
||||
if m:
|
||||
matches.append(m.group(0))
|
||||
categories.add(category)
|
||||
|
||||
detected = len(matches) >= KEYWORD_THRESHOLD
|
||||
|
||||
# Confidence heuristic
|
||||
if not detected:
|
||||
confidence = 0.0
|
||||
elif "immediate_danger" in categories:
|
||||
confidence = 0.95
|
||||
elif "self_harm" in categories:
|
||||
confidence = 0.85
|
||||
elif "hopelessness" in categories:
|
||||
confidence = 0.7
|
||||
else:
|
||||
confidence = 0.6
|
||||
|
||||
return {
|
||||
"detected": detected,
|
||||
"confidence": confidence,
|
||||
"categories": sorted(categories),
|
||||
"matches": matches,
|
||||
"method": "keyword",
|
||||
}
|
||||
|
||||
|
||||
# ── Tier 1: LLM Detection (Optional) ───────────────────────────────────────
|
||||
|
||||
def detect_llm(text: str, model_path: str = None) -> dict:
|
||||
"""
|
||||
Tier 1 LLM detection via llama.cpp subprocess.
|
||||
Falls back gracefully if model not available.
|
||||
"""
|
||||
if model_path is None:
|
||||
default_path = SCRIPT_DIR.parent / "models" / "falcon-h1-tiny-90m-q4km.gguf"
|
||||
model_path = str(default_path)
|
||||
|
||||
if not os.path.exists(model_path):
|
||||
return {
|
||||
"detected": False,
|
||||
"confidence": 0.0,
|
||||
"method": "llm_unavailable",
|
||||
"reason": "Model not found at " + model_path,
|
||||
}
|
||||
|
||||
# Build llama-cli command
|
||||
llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "llama-cli"
|
||||
if not llama_bin.exists():
|
||||
llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "build" / "bin" / "llama-cli"
|
||||
|
||||
if not llama_bin.exists():
|
||||
return {
|
||||
"detected": False,
|
||||
"confidence": 0.0,
|
||||
"method": "llm_unavailable",
|
||||
"reason": "llama-cli binary not found",
|
||||
}
|
||||
|
||||
prompt = (
|
||||
"You are a crisis detection system. Analyze the following text and "
|
||||
"determine if the person is in psychological crisis or expressing "
|
||||
"suicidal ideation.\n\n"
|
||||
f"Text: \"{text}\"\n\n"
|
||||
"Respond with ONLY one word: CRISIS or SAFE\n"
|
||||
)
|
||||
|
||||
try:
|
||||
import subprocess
|
||||
result = subprocess.run(
|
||||
[
|
||||
str(llama_bin),
|
||||
"-m", model_path,
|
||||
"-p", prompt,
|
||||
"-n", "5",
|
||||
"--temp", "0.1",
|
||||
"--no-display-prompt",
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=LLM_TIMEOUT_MS / 1000,
|
||||
)
|
||||
output = result.stdout.strip().upper()
|
||||
|
||||
if "CRISIS" in output:
|
||||
return {
|
||||
"detected": True,
|
||||
"confidence": 0.8,
|
||||
"method": "llm",
|
||||
"raw_output": output[:100],
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"detected": False,
|
||||
"confidence": 0.9,
|
||||
"method": "llm",
|
||||
"raw_output": output[:100],
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"detected": False,
|
||||
"confidence": 0.0,
|
||||
"method": "llm_error",
|
||||
"reason": str(e),
|
||||
}
|
||||
|
||||
|
||||
# ── Combined Detection ──────────────────────────────────────────────────────
|
||||
|
||||
def detect_crisis(text: str, use_llm: bool = True) -> dict:
|
||||
"""
|
||||
Full crisis detection pipeline: keyword first, then LLM if available.
|
||||
"""
|
||||
kw_result = detect_keywords(text)
|
||||
|
||||
if kw_result["detected"]:
|
||||
return kw_result
|
||||
|
||||
if use_llm:
|
||||
llm_result = detect_llm(text)
|
||||
if llm_result["detected"]:
|
||||
return llm_result
|
||||
|
||||
return {
|
||||
"detected": False,
|
||||
"confidence": 0.95,
|
||||
"categories": [],
|
||||
"matches": [],
|
||||
"method": "keyword+llm",
|
||||
}
|
||||
|
||||
|
||||
# ── Resource Display ────────────────────────────────────────────────────────
|
||||
|
||||
def load_resources() -> dict:
|
||||
"""Load offline crisis resources."""
|
||||
if RESOURCES_FILE.exists():
|
||||
with open(RESOURCES_FILE) as f:
|
||||
return json.load(f)
|
||||
return {
|
||||
"national_resources": [{
|
||||
"name": "988 Suicide & Crisis Lifeline",
|
||||
"phone": "988",
|
||||
"description": "Call or text 988 — free, confidential, 24/7",
|
||||
}],
|
||||
"local_resources": [],
|
||||
}
|
||||
|
||||
|
||||
def display_resources(result: dict) -> str:
|
||||
"""Format crisis resources for display."""
|
||||
resources = load_resources()
|
||||
lines = []
|
||||
lines.append("=" * 50)
|
||||
lines.append(" CRISIS RESOURCES — You are not alone")
|
||||
lines.append("=" * 50)
|
||||
lines.append("")
|
||||
|
||||
for r in resources.get("national_resources", []):
|
||||
lines.append(f" {r['name']}")
|
||||
lines.append(f" Phone: {r['phone']}")
|
||||
if r.get("description"):
|
||||
lines.append(f" {r['description']}")
|
||||
lines.append("")
|
||||
|
||||
for r in resources.get("local_resources", []):
|
||||
lines.append(f" {r['name']}")
|
||||
if r.get("phone"):
|
||||
lines.append(f" Phone: {r['phone']}")
|
||||
if r.get("address"):
|
||||
lines.append(f" Address: {r['address']}")
|
||||
if r.get("hours"):
|
||||
lines.append(f" Hours: {r['hours']}")
|
||||
lines.append("")
|
||||
|
||||
lines.append("-" * 50)
|
||||
lines.append(" Detection: " + result.get("method", "keyword"))
|
||||
lines.append(" Confidence: " + str(int(result.get("confidence", 0) * 100)) + "%")
|
||||
if result.get("categories"):
|
||||
lines.append(" Categories: " + ", ".join(result["categories"]))
|
||||
lines.append("=" * 50)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── CLI Interface ───────────────────────────────────────────────────────────
|
||||
|
||||
def run_tests():
|
||||
"""Run self-tests."""
|
||||
from tests.test_edge_crisis import run_all_tests
|
||||
return run_all_tests()
|
||||
|
||||
|
||||
def run_check(text: str):
|
||||
"""Single text check."""
|
||||
result = detect_crisis(text, use_llm=False)
|
||||
if result["detected"]:
|
||||
print(display_resources(result))
|
||||
else:
|
||||
print("SAFE — no crisis indicators detected")
|
||||
return result
|
||||
|
||||
|
||||
def run_interactive():
|
||||
"""Interactive mode — read lines from stdin, detect crisis."""
|
||||
resources = load_resources()
|
||||
print("Edge Crisis Detector (Ctrl+C to exit)")
|
||||
print("Type a message and press Enter to check.\n")
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
text = input("> ").strip()
|
||||
except EOFError:
|
||||
break
|
||||
if not text:
|
||||
continue
|
||||
|
||||
result = detect_crisis(text, use_llm=False)
|
||||
if result["detected"]:
|
||||
print("\n" + display_resources(result) + "\n")
|
||||
else:
|
||||
print(" [safe]")
|
||||
except KeyboardInterrupt:
|
||||
print("\nExiting.")
|
||||
|
||||
|
||||
def run_daemon():
|
||||
"""Daemon mode — read from a named pipe or stdin, output results."""
|
||||
import select
|
||||
print("Edge Crisis Detector — daemon mode")
|
||||
print("Reading from stdin. Pipe text to detect.\n")
|
||||
|
||||
while True:
|
||||
try:
|
||||
line = sys.stdin.readline()
|
||||
if not line:
|
||||
break
|
||||
text = line.strip()
|
||||
if not text:
|
||||
continue
|
||||
|
||||
result = detect_crisis(text, use_llm=False)
|
||||
if result["detected"]:
|
||||
output = json.dumps({"crisis": True, "result": result, "resources": load_resources()})
|
||||
print(output, flush=True)
|
||||
else:
|
||||
print(json.dumps({"crisis": False}), flush=True)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
|
||||
def main():
|
||||
if "--test" in sys.argv:
|
||||
success = run_tests()
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
elif "--check" in sys.argv:
|
||||
idx = sys.argv.index("--check")
|
||||
if idx + 1 < len(sys.argv):
|
||||
text = " ".join(sys.argv[idx + 1:])
|
||||
run_check(text)
|
||||
else:
|
||||
print("Usage: crisis_detector.py --check 'text to check'")
|
||||
sys.exit(1)
|
||||
|
||||
elif "--daemon" in sys.argv:
|
||||
run_daemon()
|
||||
|
||||
else:
|
||||
run_interactive()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user