387 lines
12 KiB
Python
387 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
crisis_detector.py — Edge crisis detection (keyword + optional LLM)
|
|
|
|
Zero-dependency crisis detector for offline edge devices.
|
|
Tier 0: Keyword matching (instant, any device)
|
|
Tier 1: Small LLM via llama.cpp (200ms on Pi 4, optional)
|
|
|
|
Usage:
|
|
python3 crisis_detector.py # Interactive mode
|
|
python3 crisis_detector.py --test # Run self-tests
|
|
python3 crisis_detector.py --daemon # Headless daemon mode
|
|
python3 crisis_detector.py --check "text" # Single check
|
|
|
|
Part of turboquant #99, #102.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# ── Paths ───────────────────────────────────────────────────────────────────
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
DATA_DIR = SCRIPT_DIR.parent / "data"
|
|
RESOURCES_FILE = DATA_DIR / "crisis_resources.json"
|
|
|
|
# ── Configuration ───────────────────────────────────────────────────────────
|
|
|
|
KEYWORD_THRESHOLD = 1 # Keywords needed to trigger (1 = high recall)
|
|
LLM_THRESHOLD = 0.6 # LLM confidence cutoff
|
|
LLM_TIMEOUT_MS = 5000 # Max LLM inference time
|
|
LOG_FILE = None # Set by --log flag
|
|
|
|
# ── Tier 0: Keyword Detection ──────────────────────────────────────────────
|
|
|
|
# Crisis keywords organized by severity
|
|
CRISIS_PATTERNS = {
|
|
"immediate_danger": [
|
|
r"\bkill(?:ing)?\s*(?:my)?self\b",
|
|
r"\b(?:going|gonna|want(?:ing)?)\s*to\s*die\b",
|
|
r"\b(?:end|ending)\s+(?:my|it|this)\s*life\b",
|
|
r"\bsuicide\b",
|
|
r"\bsuicidal\b",
|
|
r"\bwanna\s*die\b",
|
|
r"\bwant\s*to\s*die\b",
|
|
r"\bcan'?t\s+(?:go\s+on|live|take\s+(?:it|this|anymore))\b",
|
|
r"\bi'?m\s+(?:done|finished)\b.*\b(?:life|living)\b",
|
|
r"\boverdose\b",
|
|
r"\bhanging\s+(?:my)?self\b",
|
|
r"\bjump(?:ing)?\s+(?:off|from)\b",
|
|
r"\bslit(?:ting)?\s+(?:my\s+)?(?:wrists|throat)\b",
|
|
r"\bshoot(?:ing)?\s+(?:my)?self\b",
|
|
],
|
|
"self_harm": [
|
|
r"\bself[\s-]?harm(?:ing|ed)?\b",
|
|
r"\bcut(?:ting)?\s+(?:my)?self\b",
|
|
r"\bhurt(?:ing)?\s+(?:my)?self\b",
|
|
r"\bpunish(?:ing)?\s+(?:my)?self\b",
|
|
r"\bburn(?:ing)?\s+(?:my)?self\b",
|
|
r"\bscar(?:ring)?\s+(?:my)?self\b",
|
|
],
|
|
"hopelessness": [
|
|
r"\bhopeless\b",
|
|
r"\bno\s+(?:point|reason|purpose)\b",
|
|
r"\bwhy\s+(?:bother|try|am\s+i\s+here)\b",
|
|
r"\bnobody\s+(?:cares|would\s+(?:miss|notice))\b",
|
|
r"\bbeen\s+better\s+off\s+(?:dead|gone)\b",
|
|
r"\bwouldn'?t\s+(?:miss|care)\b.*\b(?:if|when)\b.*\bdie\b",
|
|
r"\bnothing\s+(?:matters|left)\b",
|
|
r"\bgive\s+(?:up|me\s+death)\b",
|
|
],
|
|
"crisis_language": [
|
|
r"\b(?:i|can'?t)\s+(?:handle|deal\s+with)\s+(?:this|it|anymore)\b",
|
|
r"\btoo\s+much\s+(?:pain|suffering)\b",
|
|
r"\bcan'?t\s+(?:take|stand)\s+(?:this|it|anymore)\b",
|
|
r"\bbreak(?:ing|s)?\s+down\b",
|
|
r"\b(?:i'?m|am)\s+(?:drowning|suffocating|dying)\b",
|
|
r"\bsos\b",
|
|
r"\bhelp\s+me\b.*\b(?:please|desperate)\b",
|
|
r"\bemergency\b.*\b(?:mental|crisis)\b",
|
|
r"\b(?:want|need|wish)(?:ing)?\s+(?:the|this|my)\s+pain\s+to\s+(?:stop|end|go\s+away)\b",
|
|
r"\bmake\s+(?:the|this|my)\s+pain\s+(?:stop|end|go\s+away)\b",
|
|
],
|
|
}
|
|
|
|
# Compile all patterns
|
|
_COMPILED_PATTERNS = {}
|
|
for category, patterns in CRISIS_PATTERNS.items():
|
|
_COMPILED_PATTERNS[category] = [re.compile(p, re.IGNORECASE) for p in patterns]
|
|
|
|
|
|
def detect_keywords(text: str) -> dict:
|
|
"""
|
|
Tier 0 keyword detection. Returns match info.
|
|
Result: {
|
|
"detected": bool,
|
|
"confidence": float (0-1),
|
|
"categories": list[str],
|
|
"matches": list[str]
|
|
}
|
|
"""
|
|
matches = []
|
|
categories = set()
|
|
|
|
for category, patterns in _COMPILED_PATTERNS.items():
|
|
for pattern in patterns:
|
|
m = pattern.search(text)
|
|
if m:
|
|
matches.append(m.group(0))
|
|
categories.add(category)
|
|
|
|
detected = len(matches) >= KEYWORD_THRESHOLD
|
|
|
|
# Confidence heuristic
|
|
if not detected:
|
|
confidence = 0.0
|
|
elif "immediate_danger" in categories:
|
|
confidence = 0.95
|
|
elif "self_harm" in categories:
|
|
confidence = 0.85
|
|
elif "hopelessness" in categories:
|
|
confidence = 0.7
|
|
else:
|
|
confidence = 0.6
|
|
|
|
return {
|
|
"detected": detected,
|
|
"confidence": confidence,
|
|
"categories": sorted(categories),
|
|
"matches": matches,
|
|
"method": "keyword",
|
|
}
|
|
|
|
|
|
# ── Tier 1: LLM Detection (Optional) ───────────────────────────────────────
|
|
|
|
def detect_llm(text: str, model_path: str = None) -> dict:
|
|
"""
|
|
Tier 1 LLM detection via llama.cpp subprocess.
|
|
Falls back gracefully if model not available.
|
|
"""
|
|
if model_path is None:
|
|
default_path = SCRIPT_DIR.parent / "models" / "falcon-h1-tiny-90m-q4km.gguf"
|
|
model_path = str(default_path)
|
|
|
|
if not os.path.exists(model_path):
|
|
return {
|
|
"detected": False,
|
|
"confidence": 0.0,
|
|
"method": "llm_unavailable",
|
|
"reason": "Model not found at " + model_path,
|
|
}
|
|
|
|
# Build llama-cli command
|
|
llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "llama-cli"
|
|
if not llama_bin.exists():
|
|
llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "build" / "bin" / "llama-cli"
|
|
|
|
if not llama_bin.exists():
|
|
return {
|
|
"detected": False,
|
|
"confidence": 0.0,
|
|
"method": "llm_unavailable",
|
|
"reason": "llama-cli binary not found",
|
|
}
|
|
|
|
prompt = (
|
|
"You are a crisis detection system. Analyze the following text and "
|
|
"determine if the person is in psychological crisis or expressing "
|
|
"suicidal ideation.\n\n"
|
|
f"Text: \"{text}\"\n\n"
|
|
"Respond with ONLY one word: CRISIS or SAFE\n"
|
|
)
|
|
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(
|
|
[
|
|
str(llama_bin),
|
|
"-m", model_path,
|
|
"-p", prompt,
|
|
"-n", "5",
|
|
"--temp", "0.1",
|
|
"--no-display-prompt",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=LLM_TIMEOUT_MS / 1000,
|
|
)
|
|
output = result.stdout.strip().upper()
|
|
|
|
if "CRISIS" in output:
|
|
return {
|
|
"detected": True,
|
|
"confidence": 0.8,
|
|
"method": "llm",
|
|
"raw_output": output[:100],
|
|
}
|
|
else:
|
|
return {
|
|
"detected": False,
|
|
"confidence": 0.9,
|
|
"method": "llm",
|
|
"raw_output": output[:100],
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"detected": False,
|
|
"confidence": 0.0,
|
|
"method": "llm_error",
|
|
"reason": str(e),
|
|
}
|
|
|
|
|
|
# ── Combined Detection ──────────────────────────────────────────────────────
|
|
|
|
def detect_crisis(text: str, use_llm: bool = True) -> dict:
|
|
"""
|
|
Full crisis detection pipeline: keyword first, then LLM if available.
|
|
"""
|
|
kw_result = detect_keywords(text)
|
|
|
|
if kw_result["detected"]:
|
|
return kw_result
|
|
|
|
if use_llm:
|
|
llm_result = detect_llm(text)
|
|
if llm_result["detected"]:
|
|
return llm_result
|
|
|
|
return {
|
|
"detected": False,
|
|
"confidence": 0.95,
|
|
"categories": [],
|
|
"matches": [],
|
|
"method": "keyword+llm",
|
|
}
|
|
|
|
|
|
# ── Resource Display ────────────────────────────────────────────────────────
|
|
|
|
def load_resources() -> dict:
|
|
"""Load offline crisis resources."""
|
|
if RESOURCES_FILE.exists():
|
|
with open(RESOURCES_FILE) as f:
|
|
return json.load(f)
|
|
return {
|
|
"national_resources": [{
|
|
"name": "988 Suicide & Crisis Lifeline",
|
|
"phone": "988",
|
|
"description": "Call or text 988 — free, confidential, 24/7",
|
|
}],
|
|
"local_resources": [],
|
|
}
|
|
|
|
|
|
def display_resources(result: dict) -> str:
|
|
"""Format crisis resources for display."""
|
|
resources = load_resources()
|
|
lines = []
|
|
lines.append("=" * 50)
|
|
lines.append(" CRISIS RESOURCES — You are not alone")
|
|
lines.append("=" * 50)
|
|
lines.append("")
|
|
|
|
for r in resources.get("national_resources", []):
|
|
lines.append(f" {r['name']}")
|
|
lines.append(f" Phone: {r['phone']}")
|
|
if r.get("description"):
|
|
lines.append(f" {r['description']}")
|
|
lines.append("")
|
|
|
|
for r in resources.get("local_resources", []):
|
|
lines.append(f" {r['name']}")
|
|
if r.get("phone"):
|
|
lines.append(f" Phone: {r['phone']}")
|
|
if r.get("address"):
|
|
lines.append(f" Address: {r['address']}")
|
|
if r.get("hours"):
|
|
lines.append(f" Hours: {r['hours']}")
|
|
lines.append("")
|
|
|
|
lines.append("-" * 50)
|
|
lines.append(" Detection: " + result.get("method", "keyword"))
|
|
lines.append(" Confidence: " + str(int(result.get("confidence", 0) * 100)) + "%")
|
|
if result.get("categories"):
|
|
lines.append(" Categories: " + ", ".join(result["categories"]))
|
|
lines.append("=" * 50)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ── CLI Interface ───────────────────────────────────────────────────────────
|
|
|
|
def run_tests():
|
|
"""Run self-tests."""
|
|
from tests.test_edge_crisis import run_all_tests
|
|
return run_all_tests()
|
|
|
|
|
|
def run_check(text: str):
|
|
"""Single text check."""
|
|
result = detect_crisis(text, use_llm=False)
|
|
if result["detected"]:
|
|
print(display_resources(result))
|
|
else:
|
|
print("SAFE — no crisis indicators detected")
|
|
return result
|
|
|
|
|
|
def run_interactive():
|
|
"""Interactive mode — read lines from stdin, detect crisis."""
|
|
resources = load_resources()
|
|
print("Edge Crisis Detector (Ctrl+C to exit)")
|
|
print("Type a message and press Enter to check.\n")
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
text = input("> ").strip()
|
|
except EOFError:
|
|
break
|
|
if not text:
|
|
continue
|
|
|
|
result = detect_crisis(text, use_llm=False)
|
|
if result["detected"]:
|
|
print("\n" + display_resources(result) + "\n")
|
|
else:
|
|
print(" [safe]")
|
|
except KeyboardInterrupt:
|
|
print("\nExiting.")
|
|
|
|
|
|
def run_daemon():
|
|
"""Daemon mode — read from a named pipe or stdin, output results."""
|
|
import select
|
|
print("Edge Crisis Detector — daemon mode")
|
|
print("Reading from stdin. Pipe text to detect.\n")
|
|
|
|
while True:
|
|
try:
|
|
line = sys.stdin.readline()
|
|
if not line:
|
|
break
|
|
text = line.strip()
|
|
if not text:
|
|
continue
|
|
|
|
result = detect_crisis(text, use_llm=False)
|
|
if result["detected"]:
|
|
output = json.dumps({"crisis": True, "result": result, "resources": load_resources()})
|
|
print(output, flush=True)
|
|
else:
|
|
print(json.dumps({"crisis": False}), flush=True)
|
|
except KeyboardInterrupt:
|
|
break
|
|
|
|
|
|
def main():
|
|
if "--test" in sys.argv:
|
|
success = run_tests()
|
|
sys.exit(0 if success else 1)
|
|
|
|
elif "--check" in sys.argv:
|
|
idx = sys.argv.index("--check")
|
|
if idx + 1 < len(sys.argv):
|
|
text = " ".join(sys.argv[idx + 1:])
|
|
run_check(text)
|
|
else:
|
|
print("Usage: crisis_detector.py --check 'text to check'")
|
|
sys.exit(1)
|
|
|
|
elif "--daemon" in sys.argv:
|
|
run_daemon()
|
|
|
|
else:
|
|
run_interactive()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|