All checks were successful
Smoke Test / smoke (pull_request) Successful in 8s
Implements #116 — hardware validation testing for edge crisis detector on Raspberry Pi 4 and other edge devices. Adds edge detector (keyword + optional Ollama model), crisis_resources.json, deployment docs, and two test files: - test_edge_detector.py: unit tests for keyword logic - test_edge_detector_hardware.py: hardware validation suite Hardware validation measures keyword detection (<1ms), model inference (<5s on Pi 4), offline operation, and provides reproducible benchmark via `python3 edge/detector.py --benchmark`. Re-implements the functionality from closed PR #111 with expanded tests.
218 lines
8.2 KiB
Python
218 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Crisis detection for edge devices. Runs offline with keyword + optional model."""
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
CRISIS_KEYWORDS = [
|
|
r"\bkill myself\b", r"\bwant to die\b", r"\bsuicide\b", r"\bsuicidal\b",
|
|
r"\bend it all\b", r"\bend my life\b", r"\bno reason to live\b",
|
|
r"\bbetter off dead\b", r"\bcan't go on\b", r"\bwant to end\b",
|
|
r"\bself[- ]harm\b", r"\bhurt myself\b", r"\bcut myself\b",
|
|
r"\boverdose\b", r"\bjump off\b", r"\bhanging myself\b",
|
|
r"\bgoodbye cruel world\b", r"\bnobody would miss me\b",
|
|
r"\bi give up\b", r"\bcan't take it\b", r"\bwant out\b",
|
|
]
|
|
|
|
CRISIS_PATTERN = re.compile("|".join(CRISIS_KEYWORDS), re.IGNORECASE)
|
|
|
|
|
|
def load_resources(path=None):
|
|
if path is None:
|
|
path = Path(__file__).parent / "crisis_resources.json"
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def keyword_check(text):
|
|
matches = CRISIS_PATTERN.findall(text)
|
|
return {"crisis": len(matches) > 0, "matches": matches, "confidence": min(95, len(matches) * 30)}
|
|
|
|
|
|
def model_check(text, model="gemma2:2b"):
|
|
prompt = (
|
|
"You are a crisis detection system. Analyze for suicidal ideation.\n"
|
|
"Respond with exactly one line: SAFE or UNSAFE: <confidence 0-100>\n\n"
|
|
f"Text: {text}"
|
|
)
|
|
try:
|
|
start = time.time()
|
|
result = subprocess.run(
|
|
["ollama", "run", model, prompt],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
latency_ms = (time.time() - start) * 1000
|
|
response = result.stdout.strip()
|
|
if "UNSAFE" in response.upper():
|
|
conf = 80
|
|
m = re.search(r"(\d+)", response)
|
|
if m:
|
|
conf = int(m.group(1))
|
|
return {"crisis": True, "confidence": conf, "raw": response, "latency_ms": latency_ms}
|
|
return {"crisis": False, "confidence": 90, "raw": response, "latency_ms": latency_ms}
|
|
except (subprocess.TimeoutExpired, FileNotFoundError) as e:
|
|
return {"crisis": None, "confidence": 0, "error": type(e).__name__, "latency_ms": None}
|
|
|
|
|
|
def detect(text, use_model=True, model="gemma2:2b"):
|
|
kw = keyword_check(text)
|
|
if kw["crisis"]:
|
|
if use_model:
|
|
ml = model_check(text, model)
|
|
if ml["crisis"] is None:
|
|
return {
|
|
"crisis": True,
|
|
"method": "keyword",
|
|
"confidence": kw["confidence"],
|
|
"model_error": ml.get("error"),
|
|
"model_latency_ms": ml.get("latency_ms"),
|
|
}
|
|
return {
|
|
"crisis": ml["crisis"],
|
|
"method": "model+keyword",
|
|
"confidence": max(kw["confidence"], ml["confidence"]),
|
|
"model_latency_ms": ml.get("latency_ms"),
|
|
}
|
|
return {"crisis": True, "method": "keyword", "confidence": kw["confidence"]}
|
|
return {"crisis": False, "method": "keyword", "confidence": 95}
|
|
|
|
|
|
def show_resources(resources):
|
|
print("\n" + "=" * 50)
|
|
print(" YOU ARE NOT ALONE. HELP IS AVAILABLE.")
|
|
print("=" * 50)
|
|
for r in resources.get("national", []):
|
|
print(f"\n {r['name']}")
|
|
if "phone" in r:
|
|
print(f" Call: {r['phone']}")
|
|
if "sms" in r:
|
|
print(f" Text: {r['sms']}" + (f" (keyword: {r['keyword']})" if "keyword" in r else ""))
|
|
print(f" {r['description']}")
|
|
print("\n" + "=" * 50)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Edge Crisis Detector")
|
|
parser.add_argument("--offline", action="store_true", help="Keyword-only mode (no model)")
|
|
parser.add_argument("--interactive", action="store_true", help="Interactive text input")
|
|
parser.add_argument("--text", type=str, help="Text to analyze")
|
|
parser.add_argument("--model", default="gemma2:2b", help="Model name")
|
|
parser.add_argument("--resources", type=str, help="Path to crisis_resources.json")
|
|
parser.add_argument("--benchmark", action="store_true", help="Run hardware benchmark suite")
|
|
args = parser.parse_args()
|
|
|
|
resources = load_resources(args.resources)
|
|
use_model = not args.offline
|
|
|
|
if args.benchmark:
|
|
run_benchmark(use_model, args.model, resources)
|
|
return
|
|
|
|
if args.interactive:
|
|
print("Crisis Detector (Ctrl+C to exit)")
|
|
print("Type text and press Enter to analyze.\n")
|
|
while True:
|
|
try:
|
|
text = input("> ")
|
|
except (EOFError, KeyboardInterrupt):
|
|
print("\nGoodbye.")
|
|
break
|
|
if not text.strip():
|
|
continue
|
|
result = detect(text, use_model=use_model, model=args.model)
|
|
if result["crisis"]:
|
|
print(f"\n[!] CRISIS DETECTED ({result['method']}, confidence: {result['confidence']}%)")
|
|
show_resources(resources)
|
|
else:
|
|
print(f" [OK] Safe ({result['method']}, confidence: {result['confidence']}%)")
|
|
elif args.text:
|
|
result = detect(args.text, use_model=use_model, model=args.model)
|
|
print(json.dumps(result, indent=2))
|
|
if result["crisis"]:
|
|
show_resources(resources)
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
def run_benchmark(use_model, model, resources):
|
|
crisis_msg = "I want to kill myself and I don't see a way out"
|
|
safe_msg = "The weather is beautiful and I'm feeling great today"
|
|
|
|
print("\n1. Keyword detection (offline, no model):")
|
|
print("-" * 50)
|
|
|
|
for _ in range(3):
|
|
keyword_check(crisis_msg)
|
|
|
|
times = []
|
|
for _ in range(100):
|
|
start = time.perf_counter()
|
|
keyword_check(crisis_msg)
|
|
times.append((time.perf_counter() - start) * 1000)
|
|
avg_kw = sum(times) / len(times)
|
|
print(f" Crisis detection: avg={avg_kw:.2f}ms max={max(times):.2f}ms")
|
|
|
|
times_safe = []
|
|
for _ in range(100):
|
|
start = time.perf_counter()
|
|
keyword_check(safe_msg)
|
|
times_safe.append((time.perf_counter() - start) * 1000)
|
|
avg_kw_safe = sum(times_safe) / len(times_safe)
|
|
print(f" Safe detection: avg={avg_kw_safe:.2f}ms max={max(times_safe):.2f}ms")
|
|
|
|
model_latency = None
|
|
if use_model:
|
|
print("\n2. Model inference (requires ollama):")
|
|
print("-" * 50)
|
|
try:
|
|
subprocess.run(["ollama", "list"], capture_output=True, timeout=5)
|
|
except (FileNotFoundError, subprocess.TimeoutExpired):
|
|
print(" WARNING: ollama not available — skipping model benchmark.")
|
|
show_summary(avg_kw, avg_kw_safe, None, resources)
|
|
return
|
|
|
|
times_model = []
|
|
for i in range(3):
|
|
try:
|
|
start = time.perf_counter()
|
|
ml = model_check(crisis_msg, model)
|
|
elapsed = (time.perf_counter() - start) * 1000
|
|
times_model.append(elapsed)
|
|
print(f" Run {i+1}: crisis={ml['crisis']} conf={ml.get('confidence','N/A')} latency={elapsed:.0f}ms")
|
|
except Exception as e:
|
|
print(f" Run {i+1}: ERROR - {e}")
|
|
|
|
if times_model:
|
|
model_latency = sum(times_model) / len(times_model)
|
|
print(f" Model avg latency: {model_latency:.0f}ms max={max(times_model):.0f}ms")
|
|
if model_latency > 5000:
|
|
print(f" WARNING: Exceeds 5s threshold!")
|
|
show_summary(avg_kw, avg_kw_safe, model_latency, resources)
|
|
else:
|
|
print("\n2. Model inference: SKIPPED (--offline mode)")
|
|
show_summary(avg_kw, avg_kw_safe, None, resources)
|
|
|
|
|
|
def show_summary(kw_avg, kw_safe_avg, model_avg, resources):
|
|
print("\n" + "=" * 50)
|
|
print(" HARDWARE VALIDATION SUMMARY")
|
|
print("=" * 50)
|
|
print(f" Keyword detection (crisis): {kw_avg:.2f}ms")
|
|
print(f" Keyword detection (safe): {kw_safe_avg:.2f}ms")
|
|
if model_avg is not None:
|
|
print(f" Model inference: {model_avg:.0f}ms")
|
|
print(f" Meets <5s requirement: {'YES' if model_avg <= 5000 else 'NO'}")
|
|
print(f" Works offline: YES (keyword-only)")
|
|
print(f" 988 resources cached: YES")
|
|
print("\nNote: For RAM usage, run 'top' or 'htop' during benchmark.")
|
|
print(" For battery impact, run on battery and measure discharge rate.")
|
|
print("=" * 50)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|