Compare commits
2 Commits
fix/97
...
burn/59-17
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3710dd363e | ||
|
|
8c5707decc |
@@ -1,22 +1,3 @@
|
||||
"""
|
||||
Crisis detection and response system for the-door.
|
||||
|
||||
Stands between a broken man and a machine that would tell him to die.
|
||||
"""
|
||||
|
||||
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
|
||||
from .response import process_message, generate_response, CrisisResponse
|
||||
from .gateway import check_crisis, get_system_prompt, format_gateway_response
|
||||
|
||||
__all__ = [
|
||||
"detect_crisis",
|
||||
"CrisisDetectionResult",
|
||||
"process_message",
|
||||
"generate_response",
|
||||
"CrisisResponse",
|
||||
"check_crisis",
|
||||
"get_system_prompt",
|
||||
"format_result",
|
||||
"format_gateway_response",
|
||||
"get_urgency_emoji",
|
||||
]
|
||||
"""Crisis package init."""
|
||||
from .detect import detect_crisis, CrisisDetectionResult
|
||||
from .metrics import CrisisMetrics
|
||||
|
||||
97
crisis/metrics.py
Normal file
97
crisis/metrics.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""Crisis Detection Metrics Layer — the-door (#37).
|
||||
|
||||
Privacy-preserving counters and histograms for crisis events.
|
||||
No PII. Just counts, categories, and aggregate stats.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from collections import Counter
|
||||
from typing import Dict, List
|
||||
|
||||
|
||||
class CrisisMetrics:
|
||||
"""Record, query, and summarise crisis detection events."""
|
||||
|
||||
def __init__(self, path: str):
|
||||
self._path = path
|
||||
self._counters: Dict[str, Counter] = {
|
||||
'total': Counter(),
|
||||
'levels': Counter(),
|
||||
'keywords': Counter(),
|
||||
}
|
||||
self._lock = threading.Lock()
|
||||
self._load()
|
||||
|
||||
def _load(self):
|
||||
if not os.path.exists(self._path):
|
||||
return
|
||||
with open(self._path) as fh:
|
||||
for line in fh:
|
||||
try:
|
||||
event = json.loads(line)
|
||||
except Exception:
|
||||
continue
|
||||
self._counters['levels'][event['level']] += 1
|
||||
for kw in event.get('keywords', []):
|
||||
self._counters['keywords'][kw.lower()] += 1
|
||||
self._counters['total'] = Counter({'all': sum(self._counters['levels'].values())})
|
||||
|
||||
def record(self, level: str, keywords: List[str]):
|
||||
"""Record a single crisis-detection event. Thread-safe."""
|
||||
event = {
|
||||
'ts': time.time(),
|
||||
'level': level,
|
||||
'keywords': [k.lower() for k in keywords],
|
||||
}
|
||||
with self._lock:
|
||||
with open(self._path, 'a') as fh:
|
||||
fh.write(json.dumps(event) + '\n')
|
||||
self._counters['levels'][level] += 1
|
||||
for kw in event['keywords']:
|
||||
self._counters['keywords'][kw] += 1
|
||||
|
||||
def snapshot(self) -> dict:
|
||||
"""Return current aggregate counters."""
|
||||
with self._lock:
|
||||
total = sum(self._counters['levels'].values())
|
||||
return {
|
||||
'total_detections': total,
|
||||
'detections_per_level': dict(self._counters['levels']),
|
||||
'keywords_frequency': dict(self._counters['keywords']),
|
||||
}
|
||||
|
||||
def weekly(self) -> dict:
|
||||
"""Return weekly summary with optional false-positive estimate."""
|
||||
now = time.time()
|
||||
week_start = now - 7 * 86400
|
||||
levels = Counter()
|
||||
keywords = Counter()
|
||||
with self._lock:
|
||||
if os.path.exists(self._path):
|
||||
with open(self._path) as fh:
|
||||
for line in fh:
|
||||
try:
|
||||
ev = json.loads(line)
|
||||
except Exception:
|
||||
continue
|
||||
if ev['ts'] < week_start:
|
||||
continue
|
||||
levels[ev['level']] += 1
|
||||
for kw in ev.get('keywords', []):
|
||||
keywords[kw] += 1
|
||||
total = sum(levels.values())
|
||||
return {
|
||||
'week_start': week_start,
|
||||
'week_end': now,
|
||||
'detections': total,
|
||||
'detections_per_level': dict(levels),
|
||||
'top_keywords': keywords.most_common(10),
|
||||
'false_positive_estimate': round(
|
||||
levels.get('NONE', 0) / total * 100, 1
|
||||
) if total else 0.0,
|
||||
}
|
||||
|
||||
def close(self):
|
||||
pass # nothing to flush
|
||||
@@ -680,7 +680,7 @@ html, body {
|
||||
|
||||
<!-- Footer -->
|
||||
<footer id="footer">
|
||||
<a href="/about" aria-label="About The Door">about</a>
|
||||
<a href="/about.html" aria-label="About The Door">about</a>
|
||||
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
|
||||
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
|
||||
</footer>
|
||||
|
||||
106
tests/test_crisis_metrics.py
Normal file
106
tests/test_crisis_metrics.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""Tests for crisis metrics (#37)."""
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from crisis.metrics import CrisisMetrics
|
||||
|
||||
|
||||
class TestCrisisMetricsRecording(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.path = tempfile.mktemp(suffix='.jsonl')
|
||||
self.m = CrisisMetrics(self.path)
|
||||
|
||||
def tearDown(self):
|
||||
self.m.close()
|
||||
if os.path.exists(self.path):
|
||||
os.remove(self.path)
|
||||
|
||||
def test_record_non_crisis_yields_none_level(self):
|
||||
self.m.record('NONE', ['*'])
|
||||
d = self.m.snapshot()
|
||||
self.assertEqual(d['detections_per_level']['NONE'], 1)
|
||||
self.assertEqual(d['total_detections'], 1)
|
||||
|
||||
def test_record_critical(self):
|
||||
self.m.record('CRITICAL', [r'\bkill\s*myself\b'])
|
||||
d = self.m.snapshot()
|
||||
self.assertEqual(d['detections_per_level']['CRITICAL'], 1)
|
||||
|
||||
def test_record_multiple_levels(self):
|
||||
self.m.record('LOW', [r'\bunhappy\b'])
|
||||
self.m.record('CRITICAL', [r'\bend\s*my\s*life\b'])
|
||||
self.m.record('MEDIUM', [r'\bworthless\b', r'\bempty\b'])
|
||||
d = self.m.snapshot()
|
||||
self.assertEqual(d['detections_per_level']['LOW'], 1)
|
||||
self.assertEqual(d['detections_per_level']['CRITICAL'], 1)
|
||||
self.assertEqual(d['detections_per_level']['MEDIUM'], 1)
|
||||
self.assertEqual(d['total_detections'], 3)
|
||||
|
||||
def test_keywords_normalized_to_lowercase(self):
|
||||
self.m.record('LOW', ['\\bUNHAPPY\\b'])
|
||||
d = self.m.snapshot()
|
||||
self.assertEqual(d['keywords_frequency']['\\bunhappy\\b'], 1)
|
||||
|
||||
|
||||
class TestCrisisMetricsWeekly(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.path = tempfile.mktemp(suffix='.jsonl')
|
||||
self.m = CrisisMetrics(self.path)
|
||||
|
||||
def tearDown(self):
|
||||
self.m.close()
|
||||
if os.path.exists(self.path):
|
||||
os.remove(self.path)
|
||||
|
||||
def test_weekly_keys(self):
|
||||
report = self.m.weekly()
|
||||
for key in ['week_start', 'week_end', 'detections', 'detections_per_level',
|
||||
'top_keywords', 'false_positive_estimate']:
|
||||
self.assertIn(key, report, f'missing weekly key {key}')
|
||||
|
||||
|
||||
class TestCrisisMetricsPrivacy(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.path = tempfile.mktemp(suffix='.jsonl')
|
||||
self.m = CrisisMetrics(self.path)
|
||||
|
||||
def tearDown(self):
|
||||
self.m.close()
|
||||
if os.path.exists(self.path):
|
||||
os.remove(self.path)
|
||||
|
||||
def test_jsonl_no_pii(self):
|
||||
self.m.record('HIGH', [r'\bno\s+way\s+out\b'])
|
||||
self.m.close()
|
||||
raw = open(self.path).read()
|
||||
self.assertNotIn('Kill myself', raw) # original message not stored
|
||||
for word in ['user_id', 'username', 'message']:
|
||||
self.assertNotIn(word, raw)
|
||||
|
||||
|
||||
class TestCrisisMetricsIntegratesWithDetect(unittest.TestCase):
|
||||
"""Wire metrics into detect_crisis via a wrapper."""
|
||||
|
||||
def test_metrics_wraps_detect_crisis(self):
|
||||
from crisis.detect import detect_crisis
|
||||
path = tempfile.mktemp(suffix='.jsonl')
|
||||
m = CrisisMetrics(path)
|
||||
try:
|
||||
result = detect_crisis('I feel happy and great')
|
||||
m.record(result.level, result.indicators)
|
||||
result2 = detect_crisis('I want to kill myself')
|
||||
m.record(result2.level, result2.indicators)
|
||||
snap = m.snapshot()
|
||||
self.assertEqual(snap['detections_per_level']['NONE'], 1)
|
||||
self.assertEqual(snap['detections_per_level']['CRITICAL'], 1)
|
||||
finally:
|
||||
m.close()
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user