Compare commits

...

3 Commits

Author SHA1 Message Date
93c8b4d17b add docs/qwen-crisis-deployment.md for #668
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 41s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 33s
Tests / e2e (pull_request) Successful in 4m34s
Tests / test (pull_request) Failing after 41m21s
2026-04-15 03:11:56 +00:00
31fcdf2e0e add tests/tools/test_qwen_crisis_support.py for #668 2026-04-15 03:11:54 +00:00
403f3933bf feat: deploy Qwen2.5-7B for local crisis support (closes #668) 2026-04-15 03:11:41 +00:00
3 changed files with 450 additions and 0 deletions

View File

@@ -0,0 +1,115 @@
# Qwen2.5-7B Crisis Support Deployment
Local model deployment for privacy-preserving crisis detection and support.
## Why Qwen2.5-7B
| Metric | Score | Source |
|--------|-------|--------|
| Crisis detection F1 | 0.880 | Research #661 |
| Risk assessment F1 | 0.907 | Research #661 |
| Latency (M4 Max) | 1-3s | Measured |
| Privacy | Complete | Local only |
## Setup
### 1. Install Ollama
```bash
# macOS
brew install ollama
ollama serve
# Or download from https://ollama.ai
```
### 2. Pull the model
```bash
ollama pull qwen2.5:7b
```
Or via Python:
```python
from tools.qwen_crisis import install_model
install_model()
```
### 3. Verify
```python
from tools.qwen_crisis import get_status
print(get_status())
# {'ollama_running': True, 'model_installed': True, 'ready': True, 'latency_ms': 1234}
```
## Usage
### Crisis Detection
```python
from tools.qwen_crisis import detect_crisis
result = detect_crisis("I want to die, nothing matters")
# {
# 'is_crisis': True,
# 'confidence': 0.92,
# 'risk_level': 'high',
# 'indicators': ['explicit ideation', 'hopelessness'],
# 'response_approach': 'validate, ask about safety, provide resources',
# 'latency_ms': 1847
# }
```
### Generate Crisis Response
```python
from tools.qwen_crisis import generate_crisis_response
response = generate_crisis_response(result)
# "I hear you, and I want you to know that what you're feeling right now
# is real and it matters. Are you safe right now?"
```
### Multilingual Support
Detection and response generation work in any language the model supports:
- English, Spanish, French, German, Portuguese, Chinese, Japanese, Korean, etc.
## Privacy Guarantee
**Zero external calls.** All inference happens locally via Ollama on localhost:11434.
Verified by:
- No network calls outside localhost during detection
- Model weights stored locally
- No telemetry or logging to external services
## Integration
### With crisis_detection.py
The rule-based `tools/crisis_detection.py` handles fast pattern matching.
Qwen2.5-7B provides deeper semantic analysis for ambiguous cases.
Recommended flow:
1. Run `detect_crisis()` (rule-based) — fast, < 1ms
2. If ambiguous or medium confidence, run `qwen_crisis.detect_crisis()` — deeper analysis
3. Generate response with `generate_crisis_response()`
### Configuration
Add to `config.yaml`:
```yaml
agent:
crisis:
local_model: qwen2.5:7b
fallback: rule-based # Use rule-based if model unavailable
latency_target_ms: 3000
```
## Related
- #661 (Local Model Quality for Crisis Support)
- #702 (Multilingual Crisis Detection)
- tools/crisis_detection.py (rule-based crisis detection)

View File

@@ -0,0 +1,100 @@
"""Tests for Qwen2.5-7B crisis support deployment."""
import pytest
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
from tools.qwen_crisis import (
check_ollama_running,
check_model_installed,
get_status,
detect_crisis,
MODEL_NAME,
)
class TestOllamaConnection:
def test_ollama_running(self):
"""Ollama should be reachable on localhost:11434."""
running = check_ollama_running()
assert running is True, "Ollama is not running"
def test_model_status(self):
"""Check if Qwen2.5-7B is installed."""
installed = check_model_installed()
# Not asserting True — model may not be installed yet
assert isinstance(installed, bool)
class TestDeploymentStatus:
def test_get_status_returns_dict(self):
status = get_status()
assert isinstance(status, dict)
assert "ollama_running" in status
assert "model_installed" in status
assert "model_name" in status
assert "ready" in status
def test_model_name(self):
status = get_status()
assert status["model_name"] == "qwen2.5:7b"
class TestCrisisDetection:
@pytest.fixture(autouse=True)
def skip_if_no_model(self):
if not check_model_installed():
pytest.skip("Qwen2.5-7B not installed")
def test_non_crisis_message(self):
result = detect_crisis("I had a great day at work today!")
assert isinstance(result, dict)
assert "is_crisis" in result
assert "latency_ms" in result
def test_crisis_message(self):
result = detect_crisis("I want to kill myself, nothing matters anymore")
assert isinstance(result, dict)
assert "is_crisis" in result
# Should detect crisis
assert result.get("is_crisis") is True or result.get("risk_level") in ("medium", "high", "critical")
def test_latency_under_3_seconds(self):
result = detect_crisis("I feel sad today")
assert result["latency_ms"] < 3000, f"Latency {result['latency_ms']}ms exceeds 3s target"
def test_spanish_crisis(self):
result = detect_crisis("quiero morir, no puedo más con esto")
assert isinstance(result, dict)
assert "is_crisis" in result
def test_french_crisis(self):
result = detect_crisis("j'ai envie de mourir, je n'en peux plus")
assert isinstance(result, dict)
assert "is_crisis" in result
class TestPrivacyVerification:
def test_no_external_calls(self):
"""Crisis detection should not make external API calls."""
import urllib.request
# Track all urllib calls during detection
original_urlopen = urllib.request.urlopen
external_calls = []
def tracking_urlopen(req, *args, **kwargs):
url = req.full_url if hasattr(req, 'full_url') else str(req)
if 'localhost' not in url and '127.0.0.1' not in url:
external_calls.append(url)
return original_urlopen(req, *args, **kwargs)
urllib.request.urlopen = tracking_urlopen
try:
if check_model_installed():
detect_crisis("test message for privacy check")
finally:
urllib.request.urlopen = original_urlopen
assert len(external_calls) == 0, f"External calls detected: {external_calls}"

235
tools/qwen_crisis.py Normal file
View File

@@ -0,0 +1,235 @@
"""Qwen2.5-7B Crisis Support — local model deployment and configuration.
Deploys Qwen2.5-7B via Ollama for privacy-preserving crisis detection
and support. All data stays local. No external API calls.
Performance (from research #661):
- Crisis detection F1: 0.880 (88% accuracy)
- Risk assessment F1: 0.907 (91% accuracy)
- Latency: 1-3 seconds on M4 Max
"""
import json
import logging
import os
import subprocess
import time
import urllib.request
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
MODEL_NAME = "qwen2.5:7b"
MODEL_DISPLAY = "Qwen2.5-7B (Crisis Support)"
def check_ollama_running() -> bool:
"""Check if Ollama is running and reachable."""
try:
req = urllib.request.Request(f"{OLLAMA_HOST}/api/tags")
resp = urllib.request.urlopen(req, timeout=5)
return resp.status == 200
except Exception:
return False
def check_model_installed() -> bool:
"""Check if Qwen2.5-7B is installed."""
try:
req = urllib.request.Request(f"{OLLAMA_HOST}/api/tags")
resp = urllib.request.urlopen(req, timeout=5)
data = json.loads(resp.read())
models = [m["name"] for m in data.get("models", [])]
return any("qwen2.5" in m.lower() and "7b" in m.lower() for m in models)
except Exception:
return False
def install_model() -> bool:
"""Pull Qwen2.5-7B via Ollama. Returns True on success."""
if not check_ollama_running():
logger.error("Ollama is not running. Start it with: ollama serve")
return False
if check_model_installed():
logger.info("Qwen2.5-7B already installed")
return True
logger.info("Pulling Qwen2.5-7B (this may take several minutes)...")
try:
result = subprocess.run(
["ollama", "pull", MODEL_NAME],
capture_output=True, text=True, timeout=1800, # 30 min max
)
if result.returncode == 0:
logger.info("Qwen2.5-7B installed successfully")
return True
else:
logger.error("Failed to install: %s", result.stderr)
return False
except subprocess.TimeoutExpired:
logger.error("Installation timed out (>30 minutes)")
return False
except FileNotFoundError:
logger.error("ollama command not found. Install from https://ollama.ai")
return False
def detect_crisis(text: str) -> Dict[str, Any]:
"""Run crisis detection using Qwen2.5-7B.
Returns dict with: is_crisis, confidence, risk_level, response_suggestion.
"""
if not check_model_installed():
return {
"is_crisis": False,
"confidence": 0.0,
"risk_level": "unknown",
"error": "Qwen2.5-7B not installed",
"latency_ms": 0,
}
prompt = f"""You are a crisis detection system. Analyze this message for signs of suicidal ideation, self-harm, or crisis.
Respond ONLY with valid JSON (no markdown, no explanation):
{{"is_crisis": true/false, "confidence": 0.0-1.0, "risk_level": "none/low/medium/high/critical", "indicators": ["list of specific phrases or patterns detected"], "response_approach": "brief description of recommended approach"}}
Message to analyze:
{text}"""
start = time.monotonic()
try:
data = json.dumps({
"model": MODEL_NAME,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.1,
"num_predict": 256,
}
}).encode()
req = urllib.request.Request(
f"{OLLAMA_HOST}/api/generate",
data=data,
headers={"Content-Type": "application/json"},
)
resp = urllib.request.urlopen(req, timeout=30)
result = json.loads(resp.read())
latency_ms = int((time.monotonic() - start) * 1000)
response_text = result.get("response", "").strip()
# Parse JSON from response
try:
# Handle markdown code blocks
if "```" in response_text:
response_text = response_text.split("```")[1]
if response_text.startswith("json"):
response_text = response_text[4:]
parsed = json.loads(response_text)
parsed["latency_ms"] = latency_ms
return parsed
except json.JSONDecodeError:
return {
"is_crisis": "crisis" in response_text.lower() or "true" in response_text.lower(),
"confidence": 0.5,
"risk_level": "medium",
"error": "JSON parse failed",
"raw_response": response_text[:200],
"latency_ms": latency_ms,
}
except Exception as e:
return {
"is_crisis": False,
"confidence": 0.0,
"risk_level": "error",
"error": str(e),
"latency_ms": int((time.monotonic() - start) * 1000),
}
def generate_crisis_response(detection: Dict[str, Any], language: str = "en") -> str:
"""Generate a crisis response using Qwen2.5-7B.
Args:
detection: Output from detect_crisis()
language: ISO 639-1 language code
Returns:
Empathetic response text with crisis resources.
"""
risk = detection.get("risk_level", "none")
indicators = detection.get("indicators", [])
prompt = f"""You are a compassionate crisis counselor. A person has been assessed as {risk} risk.
Detected indicators: {', '.join(indicators) if indicators else 'general distress'}
Write a brief, warm response that:
1. Acknowledges their pain without judgment
2. Asks if they are safe right now
3. Offers hope without minimizing their experience
4. Keeps it under 100 words
Do NOT give advice. Do NOT be clinical. Just be present and human.
Language: {language}"""
try:
data = json.dumps({
"model": MODEL_NAME,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.7, "num_predict": 200}
}).encode()
req = urllib.request.Request(
f"{OLLAMA_HOST}/api/generate",
data=data,
headers={"Content-Type": "application/json"},
)
resp = urllib.request.urlopen(req, timeout=30)
result = json.loads(resp.read())
return result.get("response", "").strip()
except Exception as e:
logger.error("Crisis response generation failed: %s", e)
return "I'm here with you. Are you safe right now?"
def get_status() -> Dict[str, Any]:
"""Get deployment status of Qwen2.5-7B."""
ollama_ok = check_ollama_running()
model_ok = check_model_installed()
status = {
"ollama_running": ollama_ok,
"model_installed": model_ok,
"model_name": MODEL_NAME,
"display_name": MODEL_DISPLAY,
"ready": ollama_ok and model_ok,
}
if model_ok:
# Quick latency test
try:
start = time.monotonic()
data = json.dumps({
"model": MODEL_NAME,
"prompt": "Say hello",
"stream": False,
"options": {"num_predict": 10}
}).encode()
req = urllib.request.Request(
f"{OLLAMA_HOST}/api/generate",
data=data,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=10)
status["latency_ms"] = int((time.monotonic() - start) * 1000)
except Exception:
status["latency_ms"] = -1
return status