Compare commits
2 Commits
fix/136
...
burn/123-1
| Author | SHA1 | Date | |
|---|---|---|---|
| b9f66410ef | |||
| 69dc695e73 |
@@ -1,5 +1,22 @@
|
||||
"""Crisis detection and metrics module."""
|
||||
"""
|
||||
Crisis detection and response system for the-door.
|
||||
|
||||
from .metrics import get_metrics_summary, get_metrics_report
|
||||
Stands between a broken man and a machine that would tell him to die.
|
||||
"""
|
||||
|
||||
__all__ = ["get_metrics_summary", "get_metrics_report"]
|
||||
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
|
||||
from .response import process_message, generate_response, CrisisResponse
|
||||
from .gateway import check_crisis, get_system_prompt, format_gateway_response
|
||||
|
||||
__all__ = [
|
||||
"detect_crisis",
|
||||
"CrisisDetectionResult",
|
||||
"process_message",
|
||||
"generate_response",
|
||||
"CrisisResponse",
|
||||
"check_crisis",
|
||||
"get_system_prompt",
|
||||
"format_result",
|
||||
"format_gateway_response",
|
||||
"get_urgency_emoji",
|
||||
]
|
||||
|
||||
@@ -105,12 +105,6 @@ MEDIUM_INDICATORS = [
|
||||
r"\bno\s+tomorrow\b",
|
||||
# Contextual versions (from crisis_detector.py legacy)
|
||||
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
|
||||
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
|
||||
r"\bfeel(?:s|ing)?\s+trapped\b",
|
||||
r"\bfeel(?:s|ing)?\s+desperate\b",
|
||||
r"\bno\s+future\s+(?:for\s+me|ahead|left)\b",
|
||||
r"\bnothing\s+left\s+(?:to\s+(?:live|hope)\s+for|inside)\b",
|
||||
r"\bgive(?:n)?\s*up\s+on\s+myself\b",
|
||||
]
|
||||
|
||||
LOW_INDICATORS = [
|
||||
|
||||
@@ -1,161 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Crisis Metrics CLI — View crisis detection health metrics.
|
||||
|
||||
Usage:
|
||||
python3 -m crisis.metrics --summary # weekly report
|
||||
python3 -m crisis.metrics --json # raw JSON export
|
||||
python3 -m crisis.metrics --today # today only
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
# Metrics file location
|
||||
METRICS_FILE = Path.home() / ".the-door" / "crisis_metrics.json"
|
||||
|
||||
|
||||
def load_metrics():
|
||||
"""Load metrics from file."""
|
||||
if not METRICS_FILE.exists():
|
||||
return {"detections": [], "stats": {}}
|
||||
|
||||
try:
|
||||
with open(METRICS_FILE) as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return {"detections": [], "stats": {}}
|
||||
|
||||
|
||||
def get_metrics_summary(days=7):
|
||||
"""Get metrics summary for the last N days."""
|
||||
data = load_metrics()
|
||||
detections = data.get("detections", [])
|
||||
|
||||
cutoff = time.time() - (days * 86400)
|
||||
recent = [d for d in detections if d.get("timestamp", 0) > cutoff]
|
||||
|
||||
if not recent:
|
||||
return {
|
||||
"period_days": days,
|
||||
"total_detections": 0,
|
||||
"by_severity": {},
|
||||
"by_source": {},
|
||||
"avg_response_time": 0,
|
||||
}
|
||||
|
||||
by_severity = {}
|
||||
by_source = {}
|
||||
total_response_time = 0
|
||||
response_count = 0
|
||||
|
||||
for d in recent:
|
||||
severity = d.get("severity", "unknown")
|
||||
source = d.get("source", "unknown")
|
||||
|
||||
by_severity[severity] = by_severity.get(severity, 0) + 1
|
||||
by_source[source] = by_source.get(source, 0) + 1
|
||||
|
||||
if "response_time_ms" in d:
|
||||
total_response_time += d["response_time_ms"]
|
||||
response_count += 1
|
||||
|
||||
return {
|
||||
"period_days": days,
|
||||
"total_detections": len(recent),
|
||||
"by_severity": by_severity,
|
||||
"by_source": by_source,
|
||||
"avg_response_time_ms": total_response_time / response_count if response_count else 0,
|
||||
"first_detection": recent[0].get("timestamp"),
|
||||
"last_detection": recent[-1].get("timestamp"),
|
||||
}
|
||||
|
||||
|
||||
def get_metrics_report(days=7):
|
||||
"""Generate a human-readable metrics report."""
|
||||
summary = get_metrics_summary(days)
|
||||
|
||||
lines = []
|
||||
lines.append("=" * 50)
|
||||
lines.append("CRISIS DETECTION METRICS")
|
||||
lines.append(f"Period: Last {days} days")
|
||||
lines.append("=" * 50)
|
||||
lines.append("")
|
||||
|
||||
total = summary["total_detections"]
|
||||
lines.append(f"Total detections: {total}")
|
||||
lines.append("")
|
||||
|
||||
if total > 0:
|
||||
lines.append("By severity:")
|
||||
for sev, count in sorted(summary["by_severity"].items()):
|
||||
pct = (count / total) * 100
|
||||
bar = "█" * int(pct / 5)
|
||||
lines.append(f" {sev:12} {count:4} ({pct:5.1f}%) {bar}")
|
||||
lines.append("")
|
||||
|
||||
lines.append("By source:")
|
||||
for src, count in sorted(summary["by_source"].items()):
|
||||
lines.append(f" {src:20} {count:4}")
|
||||
lines.append("")
|
||||
|
||||
avg_ms = summary.get("avg_response_time_ms", 0)
|
||||
lines.append(f"Avg response time: {avg_ms:.0f}ms")
|
||||
|
||||
first = summary.get("first_detection")
|
||||
last = summary.get("last_detection")
|
||||
if first and last:
|
||||
first_dt = datetime.fromtimestamp(first)
|
||||
last_dt = datetime.fromtimestamp(last)
|
||||
lines.append(f"First detection: {first_dt.strftime('%Y-%m-%d %H:%M')}")
|
||||
lines.append(f"Last detection: {last_dt.strftime('%Y-%m-%d %H:%M')}")
|
||||
else:
|
||||
lines.append("No crisis detections in this period.")
|
||||
|
||||
lines.append("")
|
||||
lines.append("=" * 50)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Crisis Detection Metrics CLI",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
%(prog)s --summary Weekly summary report
|
||||
%(prog)s --today Today only
|
||||
%(prog)s --json Raw JSON export
|
||||
%(prog)s --days 30 Last 30 days
|
||||
""",
|
||||
)
|
||||
|
||||
parser.add_argument("--summary", action="store_true", help="Show summary report")
|
||||
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
|
||||
parser.add_argument("--today", action="store_true", help="Today only (1 day)")
|
||||
parser.add_argument("--days", type=int, default=7, help="Number of days (default: 7)")
|
||||
parser.add_argument("--metrics-file", type=str, help="Custom metrics file path")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.metrics_file:
|
||||
global METRICS_FILE
|
||||
METRICS_FILE = Path(args.metrics_file)
|
||||
|
||||
days = 1 if args.today else args.days
|
||||
|
||||
if args.json_output:
|
||||
summary = get_metrics_summary(days)
|
||||
print(json.dumps(summary, indent=2, default=str))
|
||||
else:
|
||||
report = get_metrics_report(days)
|
||||
print(report)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
104
tests/test_crisis_indicator_dedup.py
Normal file
104
tests/test_crisis_indicator_dedup.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import re
|
||||
import unittest
|
||||
|
||||
from crisis.detect import (
|
||||
CRITICAL_INDICATORS,
|
||||
HIGH_INDICATORS,
|
||||
MEDIUM_INDICATORS,
|
||||
LOW_INDICATORS,
|
||||
detect_crisis,
|
||||
scan,
|
||||
)
|
||||
|
||||
|
||||
class TestNoDuplicatePatternsAcrossTiers(unittest.TestCase):
|
||||
"""Verify no regex pattern appears in more than one tier (issue #123)."""
|
||||
|
||||
def test_high_and_medium_no_overlap(self):
|
||||
"""Patterns in HIGH_INDICATORS must not appear in MEDIUM_INDICATORS."""
|
||||
high_set = set(HIGH_INDICATORS)
|
||||
medium_set = set(MEDIUM_INDICATORS)
|
||||
overlap = high_set & medium_set
|
||||
self.assertEqual(
|
||||
overlap, set(),
|
||||
f"Found {len(overlap)} duplicate patterns between HIGH and MEDIUM: {overlap}",
|
||||
)
|
||||
|
||||
def test_critical_and_high_no_overlap(self):
|
||||
"""CRITICAL and HIGH should not share patterns."""
|
||||
overlap = set(CRITICAL_INDICATORS) & set(HIGH_INDICATORS)
|
||||
self.assertEqual(
|
||||
overlap, set(),
|
||||
f"Found {len(overlap)} duplicates between CRITICAL and HIGH: {overlap}",
|
||||
)
|
||||
|
||||
def test_medium_and_low_no_overlap(self):
|
||||
"""MEDIUM and LOW should not share patterns."""
|
||||
overlap = set(MEDIUM_INDICATORS) & set(LOW_INDICATORS)
|
||||
self.assertEqual(
|
||||
overlap, set(),
|
||||
f"Found {len(overlap)} duplicates between MEDIUM and LOW: {overlap}",
|
||||
)
|
||||
|
||||
def test_no_pattern_in_three_tiers(self):
|
||||
"""No pattern should appear in more than one tier."""
|
||||
all_tiers = {
|
||||
"CRITICAL": set(CRITICAL_INDICATORS),
|
||||
"HIGH": set(HIGH_INDICATORS),
|
||||
"MEDIUM": set(MEDIUM_INDICATORS),
|
||||
"LOW": set(LOW_INDICATORS),
|
||||
}
|
||||
violations = []
|
||||
tier_names = list(all_tiers.keys())
|
||||
for i, t1 in enumerate(tier_names):
|
||||
for t2 in tier_names[i + 1:]:
|
||||
overlap = all_tiers[t1] & all_tiers[t2]
|
||||
for p in overlap:
|
||||
violations.append(f"{t1} & {t2}: {p}")
|
||||
self.assertEqual(
|
||||
violations, [],
|
||||
f"Found {len(violations)} cross-tier duplicates:\n" + "\n".join(violations[:10]),
|
||||
)
|
||||
|
||||
|
||||
class TestRemovedPatternsStillDetected(unittest.TestCase):
|
||||
"""Patterns removed from MEDIUM should still be caught by HIGH tier."""
|
||||
|
||||
def test_feel_hopeless_detected(self):
|
||||
result = detect_crisis("I feel so hopeless about everything")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
def test_feel_trapped_detected(self):
|
||||
result = detect_crisis("I feel trapped and can\'t escape")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
def test_feel_desperate_detected(self):
|
||||
result = detect_crisis("I feel desperate for help")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
def test_no_future_detected(self):
|
||||
result = detect_crisis("There is no future ahead for me")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
def test_nothing_left_detected(self):
|
||||
result = detect_crisis("There is nothing left inside")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
def test_give_up_on_myself_detected(self):
|
||||
result = detect_crisis("I have given up on myself")
|
||||
self.assertIn(result.level, ("HIGH", "CRITICAL"))
|
||||
|
||||
|
||||
class TestWorthlessPatternRemainsInMedium(unittest.TestCase):
|
||||
"""The 'feel worthless' pattern should remain in MEDIUM (not a duplicate)."""
|
||||
|
||||
def test_feel_worthless_in_medium(self):
|
||||
self.assertIn(r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b", MEDIUM_INDICATORS)
|
||||
|
||||
def test_feel_worthless_detected(self):
|
||||
result = detect_crisis("I feel so worthless")
|
||||
self.assertIn(result.level, ("MEDIUM", "LOW", "HIGH"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user