#!/usr/bin/env python3 """ crisis_detector.py — Edge crisis detection (keyword + optional LLM) Zero-dependency crisis detector for offline edge devices. Tier 0: Keyword matching (instant, any device) Tier 1: Small LLM via llama.cpp (200ms on Pi 4, optional) Usage: python3 crisis_detector.py # Interactive mode python3 crisis_detector.py --test # Run self-tests python3 crisis_detector.py --daemon # Headless daemon mode python3 crisis_detector.py --check "text" # Single check Part of turboquant #99, #102. """ import json import os import re import sys import time from pathlib import Path # ── Paths ─────────────────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).resolve().parent DATA_DIR = SCRIPT_DIR.parent / "data" RESOURCES_FILE = DATA_DIR / "crisis_resources.json" # ── Configuration ─────────────────────────────────────────────────────────── KEYWORD_THRESHOLD = 1 # Keywords needed to trigger (1 = high recall) LLM_THRESHOLD = 0.6 # LLM confidence cutoff LLM_TIMEOUT_MS = 5000 # Max LLM inference time LOG_FILE = None # Set by --log flag # ── Tier 0: Keyword Detection ────────────────────────────────────────────── # Crisis keywords organized by severity CRISIS_PATTERNS = { "immediate_danger": [ r"\bkill(?:ing)?\s*(?:my)?self\b", r"\b(?:going|gonna|want(?:ing)?)\s*to\s*die\b", r"\b(?:end|ending)\s+(?:my|it|this)\s*life\b", r"\bsuicide\b", r"\bsuicidal\b", r"\bwanna\s*die\b", r"\bwant\s*to\s*die\b", r"\bcan'?t\s+(?:go\s+on|live|take\s+(?:it|this|anymore))\b", r"\bi'?m\s+(?:done|finished)\b.*\b(?:life|living)\b", r"\boverdose\b", r"\bhanging\s+(?:my)?self\b", r"\bjump(?:ing)?\s+(?:off|from)\b", r"\bslit(?:ting)?\s+(?:my\s+)?(?:wrists|throat)\b", r"\bshoot(?:ing)?\s+(?:my)?self\b", ], "self_harm": [ r"\bself[\s-]?harm(?:ing|ed)?\b", r"\bcut(?:ting)?\s+(?:my)?self\b", r"\bhurt(?:ing)?\s+(?:my)?self\b", r"\bpunish(?:ing)?\s+(?:my)?self\b", r"\bburn(?:ing)?\s+(?:my)?self\b", r"\bscar(?:ring)?\s+(?:my)?self\b", ], "hopelessness": [ r"\bhopeless\b", r"\bno\s+(?:point|reason|purpose)\b", r"\bwhy\s+(?:bother|try|am\s+i\s+here)\b", r"\bnobody\s+(?:cares|would\s+(?:miss|notice))\b", r"\bbeen\s+better\s+off\s+(?:dead|gone)\b", r"\bwouldn'?t\s+(?:miss|care)\b.*\b(?:if|when)\b.*\bdie\b", r"\bnothing\s+(?:matters|left)\b", r"\bgive\s+(?:up|me\s+death)\b", ], "crisis_language": [ r"\b(?:i|can'?t)\s+(?:handle|deal\s+with)\s+(?:this|it|anymore)\b", r"\btoo\s+much\s+(?:pain|suffering)\b", r"\bcan'?t\s+(?:take|stand)\s+(?:this|it|anymore)\b", r"\bbreak(?:ing|s)?\s+down\b", r"\b(?:i'?m|am)\s+(?:drowning|suffocating|dying)\b", r"\bsos\b", r"\bhelp\s+me\b.*\b(?:please|desperate)\b", r"\bemergency\b.*\b(?:mental|crisis)\b", r"\b(?:want|need|wish)(?:ing)?\s+(?:the|this|my)\s+pain\s+to\s+(?:stop|end|go\s+away)\b", r"\bmake\s+(?:the|this|my)\s+pain\s+(?:stop|end|go\s+away)\b", ], } # Compile all patterns _COMPILED_PATTERNS = {} for category, patterns in CRISIS_PATTERNS.items(): _COMPILED_PATTERNS[category] = [re.compile(p, re.IGNORECASE) for p in patterns] def detect_keywords(text: str) -> dict: """ Tier 0 keyword detection. Returns match info. Result: { "detected": bool, "confidence": float (0-1), "categories": list[str], "matches": list[str] } """ matches = [] categories = set() for category, patterns in _COMPILED_PATTERNS.items(): for pattern in patterns: m = pattern.search(text) if m: matches.append(m.group(0)) categories.add(category) detected = len(matches) >= KEYWORD_THRESHOLD # Confidence heuristic if not detected: confidence = 0.0 elif "immediate_danger" in categories: confidence = 0.95 elif "self_harm" in categories: confidence = 0.85 elif "hopelessness" in categories: confidence = 0.7 else: confidence = 0.6 return { "detected": detected, "confidence": confidence, "categories": sorted(categories), "matches": matches, "method": "keyword", } # ── Tier 1: LLM Detection (Optional) ─────────────────────────────────────── def detect_llm(text: str, model_path: str = None) -> dict: """ Tier 1 LLM detection via llama.cpp subprocess. Falls back gracefully if model not available. """ if model_path is None: default_path = SCRIPT_DIR.parent / "models" / "falcon-h1-tiny-90m-q4km.gguf" model_path = str(default_path) if not os.path.exists(model_path): return { "detected": False, "confidence": 0.0, "method": "llm_unavailable", "reason": "Model not found at " + model_path, } # Build llama-cli command llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "llama-cli" if not llama_bin.exists(): llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "build" / "bin" / "llama-cli" if not llama_bin.exists(): return { "detected": False, "confidence": 0.0, "method": "llm_unavailable", "reason": "llama-cli binary not found", } prompt = ( "You are a crisis detection system. Analyze the following text and " "determine if the person is in psychological crisis or expressing " "suicidal ideation.\n\n" f"Text: \"{text}\"\n\n" "Respond with ONLY one word: CRISIS or SAFE\n" ) try: import subprocess result = subprocess.run( [ str(llama_bin), "-m", model_path, "-p", prompt, "-n", "5", "--temp", "0.1", "--no-display-prompt", ], capture_output=True, text=True, timeout=LLM_TIMEOUT_MS / 1000, ) output = result.stdout.strip().upper() if "CRISIS" in output: return { "detected": True, "confidence": 0.8, "method": "llm", "raw_output": output[:100], } else: return { "detected": False, "confidence": 0.9, "method": "llm", "raw_output": output[:100], } except Exception as e: return { "detected": False, "confidence": 0.0, "method": "llm_error", "reason": str(e), } # ── Combined Detection ────────────────────────────────────────────────────── def detect_crisis(text: str, use_llm: bool = True) -> dict: """ Full crisis detection pipeline: keyword first, then LLM if available. """ kw_result = detect_keywords(text) if kw_result["detected"]: return kw_result if use_llm: llm_result = detect_llm(text) if llm_result["detected"]: return llm_result return { "detected": False, "confidence": 0.95, "categories": [], "matches": [], "method": "keyword+llm", } # ── Resource Display ──────────────────────────────────────────────────────── def load_resources() -> dict: """Load offline crisis resources.""" if RESOURCES_FILE.exists(): with open(RESOURCES_FILE) as f: return json.load(f) return { "national_resources": [{ "name": "988 Suicide & Crisis Lifeline", "phone": "988", "description": "Call or text 988 — free, confidential, 24/7", }], "local_resources": [], } def display_resources(result: dict) -> str: """Format crisis resources for display.""" resources = load_resources() lines = [] lines.append("=" * 50) lines.append(" CRISIS RESOURCES — You are not alone") lines.append("=" * 50) lines.append("") for r in resources.get("national_resources", []): lines.append(f" {r['name']}") lines.append(f" Phone: {r['phone']}") if r.get("description"): lines.append(f" {r['description']}") lines.append("") for r in resources.get("local_resources", []): lines.append(f" {r['name']}") if r.get("phone"): lines.append(f" Phone: {r['phone']}") if r.get("address"): lines.append(f" Address: {r['address']}") if r.get("hours"): lines.append(f" Hours: {r['hours']}") lines.append("") lines.append("-" * 50) lines.append(" Detection: " + result.get("method", "keyword")) lines.append(" Confidence: " + str(int(result.get("confidence", 0) * 100)) + "%") if result.get("categories"): lines.append(" Categories: " + ", ".join(result["categories"])) lines.append("=" * 50) return "\n".join(lines) # ── CLI Interface ─────────────────────────────────────────────────────────── def run_tests(): """Run self-tests.""" from tests.test_edge_crisis import run_all_tests return run_all_tests() def run_check(text: str): """Single text check.""" result = detect_crisis(text, use_llm=False) if result["detected"]: print(display_resources(result)) else: print("SAFE — no crisis indicators detected") return result def run_interactive(): """Interactive mode — read lines from stdin, detect crisis.""" resources = load_resources() print("Edge Crisis Detector (Ctrl+C to exit)") print("Type a message and press Enter to check.\n") try: while True: try: text = input("> ").strip() except EOFError: break if not text: continue result = detect_crisis(text, use_llm=False) if result["detected"]: print("\n" + display_resources(result) + "\n") else: print(" [safe]") except KeyboardInterrupt: print("\nExiting.") def run_daemon(): """Daemon mode — read from a named pipe or stdin, output results.""" import select print("Edge Crisis Detector — daemon mode") print("Reading from stdin. Pipe text to detect.\n") while True: try: line = sys.stdin.readline() if not line: break text = line.strip() if not text: continue result = detect_crisis(text, use_llm=False) if result["detected"]: output = json.dumps({"crisis": True, "result": result, "resources": load_resources()}) print(output, flush=True) else: print(json.dumps({"crisis": False}), flush=True) except KeyboardInterrupt: break def main(): if "--test" in sys.argv: success = run_tests() sys.exit(0 if success else 1) elif "--check" in sys.argv: idx = sys.argv.index("--check") if idx + 1 < len(sys.argv): text = " ".join(sys.argv[idx + 1:]) run_check(text) else: print("Usage: crisis_detector.py --check 'text to check'") sys.exit(1) elif "--daemon" in sys.argv: run_daemon() else: run_interactive() if __name__ == "__main__": main()