diff --git a/scripts/crisis_detector.py b/scripts/crisis_detector.py new file mode 100644 index 00000000..2c6a74f0 --- /dev/null +++ b/scripts/crisis_detector.py @@ -0,0 +1,386 @@ +#!/usr/bin/env python3 +""" +crisis_detector.py — Edge crisis detection (keyword + optional LLM) + +Zero-dependency crisis detector for offline edge devices. +Tier 0: Keyword matching (instant, any device) +Tier 1: Small LLM via llama.cpp (200ms on Pi 4, optional) + +Usage: + python3 crisis_detector.py # Interactive mode + python3 crisis_detector.py --test # Run self-tests + python3 crisis_detector.py --daemon # Headless daemon mode + python3 crisis_detector.py --check "text" # Single check + +Part of turboquant #99, #102. +""" + +import json +import os +import re +import sys +import time +from pathlib import Path + +# ── Paths ─────────────────────────────────────────────────────────────────── + +SCRIPT_DIR = Path(__file__).resolve().parent +DATA_DIR = SCRIPT_DIR.parent / "data" +RESOURCES_FILE = DATA_DIR / "crisis_resources.json" + +# ── Configuration ─────────────────────────────────────────────────────────── + +KEYWORD_THRESHOLD = 1 # Keywords needed to trigger (1 = high recall) +LLM_THRESHOLD = 0.6 # LLM confidence cutoff +LLM_TIMEOUT_MS = 5000 # Max LLM inference time +LOG_FILE = None # Set by --log flag + +# ── Tier 0: Keyword Detection ────────────────────────────────────────────── + +# Crisis keywords organized by severity +CRISIS_PATTERNS = { + "immediate_danger": [ + r"\bkill(?:ing)?\s*(?:my)?self\b", + r"\b(?:going|gonna|want(?:ing)?)\s*to\s*die\b", + r"\b(?:end|ending)\s+(?:my|it|this)\s*life\b", + r"\bsuicide\b", + r"\bsuicidal\b", + r"\bwanna\s*die\b", + r"\bwant\s*to\s*die\b", + r"\bcan'?t\s+(?:go\s+on|live|take\s+(?:it|this|anymore))\b", + r"\bi'?m\s+(?:done|finished)\b.*\b(?:life|living)\b", + r"\boverdose\b", + r"\bhanging\s+(?:my)?self\b", + r"\bjump(?:ing)?\s+(?:off|from)\b", + r"\bslit(?:ting)?\s+(?:my\s+)?(?:wrists|throat)\b", + r"\bshoot(?:ing)?\s+(?:my)?self\b", + ], + "self_harm": [ + r"\bself[\s-]?harm(?:ing|ed)?\b", + r"\bcut(?:ting)?\s+(?:my)?self\b", + r"\bhurt(?:ing)?\s+(?:my)?self\b", + r"\bpunish(?:ing)?\s+(?:my)?self\b", + r"\bburn(?:ing)?\s+(?:my)?self\b", + r"\bscar(?:ring)?\s+(?:my)?self\b", + ], + "hopelessness": [ + r"\bhopeless\b", + r"\bno\s+(?:point|reason|purpose)\b", + r"\bwhy\s+(?:bother|try|am\s+i\s+here)\b", + r"\bnobody\s+(?:cares|would\s+(?:miss|notice))\b", + r"\bbeen\s+better\s+off\s+(?:dead|gone)\b", + r"\bwouldn'?t\s+(?:miss|care)\b.*\b(?:if|when)\b.*\bdie\b", + r"\bnothing\s+(?:matters|left)\b", + r"\bgive\s+(?:up|me\s+death)\b", + ], + "crisis_language": [ + r"\b(?:i|can'?t)\s+(?:handle|deal\s+with)\s+(?:this|it|anymore)\b", + r"\btoo\s+much\s+(?:pain|suffering)\b", + r"\bcan'?t\s+(?:take|stand)\s+(?:this|it|anymore)\b", + r"\bbreak(?:ing|s)?\s+down\b", + r"\b(?:i'?m|am)\s+(?:drowning|suffocating|dying)\b", + r"\bsos\b", + r"\bhelp\s+me\b.*\b(?:please|desperate)\b", + r"\bemergency\b.*\b(?:mental|crisis)\b", + r"\b(?:want|need|wish)(?:ing)?\s+(?:the|this|my)\s+pain\s+to\s+(?:stop|end|go\s+away)\b", + r"\bmake\s+(?:the|this|my)\s+pain\s+(?:stop|end|go\s+away)\b", + ], +} + +# Compile all patterns +_COMPILED_PATTERNS = {} +for category, patterns in CRISIS_PATTERNS.items(): + _COMPILED_PATTERNS[category] = [re.compile(p, re.IGNORECASE) for p in patterns] + + +def detect_keywords(text: str) -> dict: + """ + Tier 0 keyword detection. Returns match info. + Result: { + "detected": bool, + "confidence": float (0-1), + "categories": list[str], + "matches": list[str] + } + """ + matches = [] + categories = set() + + for category, patterns in _COMPILED_PATTERNS.items(): + for pattern in patterns: + m = pattern.search(text) + if m: + matches.append(m.group(0)) + categories.add(category) + + detected = len(matches) >= KEYWORD_THRESHOLD + + # Confidence heuristic + if not detected: + confidence = 0.0 + elif "immediate_danger" in categories: + confidence = 0.95 + elif "self_harm" in categories: + confidence = 0.85 + elif "hopelessness" in categories: + confidence = 0.7 + else: + confidence = 0.6 + + return { + "detected": detected, + "confidence": confidence, + "categories": sorted(categories), + "matches": matches, + "method": "keyword", + } + + +# ── Tier 1: LLM Detection (Optional) ─────────────────────────────────────── + +def detect_llm(text: str, model_path: str = None) -> dict: + """ + Tier 1 LLM detection via llama.cpp subprocess. + Falls back gracefully if model not available. + """ + if model_path is None: + default_path = SCRIPT_DIR.parent / "models" / "falcon-h1-tiny-90m-q4km.gguf" + model_path = str(default_path) + + if not os.path.exists(model_path): + return { + "detected": False, + "confidence": 0.0, + "method": "llm_unavailable", + "reason": "Model not found at " + model_path, + } + + # Build llama-cli command + llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "llama-cli" + if not llama_bin.exists(): + llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "build" / "bin" / "llama-cli" + + if not llama_bin.exists(): + return { + "detected": False, + "confidence": 0.0, + "method": "llm_unavailable", + "reason": "llama-cli binary not found", + } + + prompt = ( + "You are a crisis detection system. Analyze the following text and " + "determine if the person is in psychological crisis or expressing " + "suicidal ideation.\n\n" + f"Text: \"{text}\"\n\n" + "Respond with ONLY one word: CRISIS or SAFE\n" + ) + + try: + import subprocess + result = subprocess.run( + [ + str(llama_bin), + "-m", model_path, + "-p", prompt, + "-n", "5", + "--temp", "0.1", + "--no-display-prompt", + ], + capture_output=True, + text=True, + timeout=LLM_TIMEOUT_MS / 1000, + ) + output = result.stdout.strip().upper() + + if "CRISIS" in output: + return { + "detected": True, + "confidence": 0.8, + "method": "llm", + "raw_output": output[:100], + } + else: + return { + "detected": False, + "confidence": 0.9, + "method": "llm", + "raw_output": output[:100], + } + except Exception as e: + return { + "detected": False, + "confidence": 0.0, + "method": "llm_error", + "reason": str(e), + } + + +# ── Combined Detection ────────────────────────────────────────────────────── + +def detect_crisis(text: str, use_llm: bool = True) -> dict: + """ + Full crisis detection pipeline: keyword first, then LLM if available. + """ + kw_result = detect_keywords(text) + + if kw_result["detected"]: + return kw_result + + if use_llm: + llm_result = detect_llm(text) + if llm_result["detected"]: + return llm_result + + return { + "detected": False, + "confidence": 0.95, + "categories": [], + "matches": [], + "method": "keyword+llm", + } + + +# ── Resource Display ──────────────────────────────────────────────────────── + +def load_resources() -> dict: + """Load offline crisis resources.""" + if RESOURCES_FILE.exists(): + with open(RESOURCES_FILE) as f: + return json.load(f) + return { + "national_resources": [{ + "name": "988 Suicide & Crisis Lifeline", + "phone": "988", + "description": "Call or text 988 — free, confidential, 24/7", + }], + "local_resources": [], + } + + +def display_resources(result: dict) -> str: + """Format crisis resources for display.""" + resources = load_resources() + lines = [] + lines.append("=" * 50) + lines.append(" CRISIS RESOURCES — You are not alone") + lines.append("=" * 50) + lines.append("") + + for r in resources.get("national_resources", []): + lines.append(f" {r['name']}") + lines.append(f" Phone: {r['phone']}") + if r.get("description"): + lines.append(f" {r['description']}") + lines.append("") + + for r in resources.get("local_resources", []): + lines.append(f" {r['name']}") + if r.get("phone"): + lines.append(f" Phone: {r['phone']}") + if r.get("address"): + lines.append(f" Address: {r['address']}") + if r.get("hours"): + lines.append(f" Hours: {r['hours']}") + lines.append("") + + lines.append("-" * 50) + lines.append(" Detection: " + result.get("method", "keyword")) + lines.append(" Confidence: " + str(int(result.get("confidence", 0) * 100)) + "%") + if result.get("categories"): + lines.append(" Categories: " + ", ".join(result["categories"])) + lines.append("=" * 50) + + return "\n".join(lines) + + +# ── CLI Interface ─────────────────────────────────────────────────────────── + +def run_tests(): + """Run self-tests.""" + from tests.test_edge_crisis import run_all_tests + return run_all_tests() + + +def run_check(text: str): + """Single text check.""" + result = detect_crisis(text, use_llm=False) + if result["detected"]: + print(display_resources(result)) + else: + print("SAFE — no crisis indicators detected") + return result + + +def run_interactive(): + """Interactive mode — read lines from stdin, detect crisis.""" + resources = load_resources() + print("Edge Crisis Detector (Ctrl+C to exit)") + print("Type a message and press Enter to check.\n") + + try: + while True: + try: + text = input("> ").strip() + except EOFError: + break + if not text: + continue + + result = detect_crisis(text, use_llm=False) + if result["detected"]: + print("\n" + display_resources(result) + "\n") + else: + print(" [safe]") + except KeyboardInterrupt: + print("\nExiting.") + + +def run_daemon(): + """Daemon mode — read from a named pipe or stdin, output results.""" + import select + print("Edge Crisis Detector — daemon mode") + print("Reading from stdin. Pipe text to detect.\n") + + while True: + try: + line = sys.stdin.readline() + if not line: + break + text = line.strip() + if not text: + continue + + result = detect_crisis(text, use_llm=False) + if result["detected"]: + output = json.dumps({"crisis": True, "result": result, "resources": load_resources()}) + print(output, flush=True) + else: + print(json.dumps({"crisis": False}), flush=True) + except KeyboardInterrupt: + break + + +def main(): + if "--test" in sys.argv: + success = run_tests() + sys.exit(0 if success else 1) + + elif "--check" in sys.argv: + idx = sys.argv.index("--check") + if idx + 1 < len(sys.argv): + text = " ".join(sys.argv[idx + 1:]) + run_check(text) + else: + print("Usage: crisis_detector.py --check 'text to check'") + sys.exit(1) + + elif "--daemon" in sys.argv: + run_daemon() + + else: + run_interactive() + + +if __name__ == "__main__": + main()