Compare commits
3 Commits
feat/136-c
...
fix/123
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b4a70d5fe | ||
|
|
bc02ca54ec | ||
| d412939b4f |
@@ -104,13 +104,9 @@ MEDIUM_INDICATORS = [
|
||||
r"\blost\s+all\s+hope\b",
|
||||
r"\bno\s+tomorrow\b",
|
||||
# Contextual versions (from crisis_detector.py legacy)
|
||||
# Keep only medium-only patterns here; stronger overlaps live in HIGH_INDICATORS.
|
||||
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
|
||||
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
|
||||
r"\bfeel(?:s|ing)?\s+trapped\b",
|
||||
r"\bfeel(?:s|ing)?\s+desperate\b",
|
||||
r"\bno\s+future\s+(?:for\s+me|ahead|left)\b",
|
||||
r"\bnothing\s+left\s+(?:to\s+(?:live|hope)\s+for|inside)\b",
|
||||
r"\bgive(?:n)?\s*up\s+on\s+myself\b",
|
||||
]
|
||||
|
||||
LOW_INDICATORS = [
|
||||
|
||||
@@ -1,133 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Crisis Metrics CLI — View crisis detection health from the command line.
|
||||
|
||||
Usage:
|
||||
python3 -m crisis.metrics --summary # weekly report
|
||||
python3 -m crisis.metrics --json # raw JSON export
|
||||
python3 -m crisis.metrics --last 24h # last 24 hours
|
||||
|
||||
Ref: #136
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
METRICS_DIR = os.environ.get("CRISIS_METRICS_DIR", str(Path.home() / ".the-door" / "metrics"))
|
||||
|
||||
|
||||
def load_metrics(hours: int = 168) -> List[dict]:
|
||||
"""Load metrics entries from the last N hours."""
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
|
||||
entries = []
|
||||
metrics_path = Path(METRICS_DIR)
|
||||
|
||||
if not metrics_path.exists():
|
||||
return entries
|
||||
|
||||
for f in sorted(metrics_path.glob("*.json")):
|
||||
try:
|
||||
with open(f) as fh:
|
||||
data = json.load(fh)
|
||||
if isinstance(data, list):
|
||||
entries.extend(data)
|
||||
elif isinstance(data, dict):
|
||||
entries.append(data)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Filter by timestamp
|
||||
filtered = []
|
||||
for e in entries:
|
||||
ts = e.get("timestamp", "")
|
||||
if ts:
|
||||
try:
|
||||
t = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
if t >= cutoff:
|
||||
filtered.append(e)
|
||||
except Exception:
|
||||
filtered.append(e)
|
||||
|
||||
return filtered
|
||||
|
||||
|
||||
def summarize(entries: List[dict]) -> dict:
|
||||
"""Summarize metrics entries."""
|
||||
total = len(entries)
|
||||
by_level = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "NONE": 0}
|
||||
escalated = 0
|
||||
deescalated = 0
|
||||
resources_shown = 0
|
||||
|
||||
for e in entries:
|
||||
level = e.get("level", "NONE")
|
||||
by_level[level] = by_level.get(level, 0) + 1
|
||||
if e.get("escalated"):
|
||||
escalated += 1
|
||||
if e.get("deescalation_confirmed"):
|
||||
deescalated += 1
|
||||
if e.get("resources_shown"):
|
||||
resources_shown += 1
|
||||
|
||||
return {
|
||||
"period_hours": 168,
|
||||
"total_interactions": total,
|
||||
"by_level": by_level,
|
||||
"escalated_sessions": escalated,
|
||||
"deescalated_sessions": deescalated,
|
||||
"resources_shown": resources_shown,
|
||||
"crisis_rate": round((by_level["CRITICAL"] + by_level["HIGH"]) / max(total, 1) * 100, 1),
|
||||
}
|
||||
|
||||
|
||||
def print_summary(summary: dict):
|
||||
print(f"\n{'='*50}")
|
||||
print(f" CRISIS METRICS SUMMARY")
|
||||
print(f" {datetime.now().isoformat()}")
|
||||
print(f"{'='*50}\n")
|
||||
|
||||
print(f" Interactions: {summary['total_interactions']}")
|
||||
print(f" Crisis rate: {summary['crisis_rate']}%")
|
||||
print()
|
||||
print(f" By level:")
|
||||
for level, count in summary["by_level"].items():
|
||||
bar = "█" * min(count, 40)
|
||||
print(f" {level:10} {count:5} {bar}")
|
||||
print()
|
||||
print(f" Escalated: {summary['escalated_sessions']}")
|
||||
print(f" De-escalated: {summary['deescalated_sessions']}")
|
||||
print(f" 988 shown: {summary['resources_shown']}")
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Crisis Metrics CLI")
|
||||
parser.add_argument("--summary", action="store_true", help="Weekly summary")
|
||||
parser.add_argument("--json", action="store_true", help="JSON export")
|
||||
parser.add_argument("--last", default="168h", help="Time window (e.g., 24h, 7d)")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Parse time window
|
||||
last = args.last
|
||||
if last.endswith("h"):
|
||||
hours = int(last[:-1])
|
||||
elif last.endswith("d"):
|
||||
hours = int(last[:-1]) * 24
|
||||
else:
|
||||
hours = 168
|
||||
|
||||
entries = load_metrics(hours)
|
||||
summary = summarize(entries)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(summary, indent=2))
|
||||
else:
|
||||
print_summary(summary)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -680,7 +680,7 @@ html, body {
|
||||
|
||||
<!-- Footer -->
|
||||
<footer id="footer">
|
||||
<a href="/about" aria-label="About The Door">about</a>
|
||||
<a href="/about.html" aria-label="About The Door">about</a>
|
||||
<button id="safety-plan-btn" aria-label="Open My Safety Plan">my safety plan</button>
|
||||
<button id="clear-chat-btn" aria-label="Clear chat history">clear chat</button>
|
||||
</footer>
|
||||
|
||||
50
tests/test_indicator_dedup.py
Normal file
50
tests/test_indicator_dedup.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Regression tests for duplicate crisis indicators cleanup (#123)."""
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from crisis.detect import detect_crisis
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
DETECT_FILE = ROOT / "crisis" / "detect.py"
|
||||
|
||||
|
||||
class TestCrisisIndicatorDedup(unittest.TestCase):
|
||||
def _extract_indicator_list(self, name: str) -> set[str]:
|
||||
text = DETECT_FILE.read_text(encoding="utf-8")
|
||||
match = re.search(rf"{name}\s*=\s*\[(.*?)\n\]", text, re.S)
|
||||
self.assertIsNotNone(match, f"{name} list missing")
|
||||
return {
|
||||
line.strip().rstrip(",")
|
||||
for line in match.group(1).splitlines()
|
||||
if line.strip().startswith('r"')
|
||||
}
|
||||
|
||||
def test_high_and_medium_indicator_lists_do_not_overlap(self):
|
||||
high = self._extract_indicator_list("HIGH_INDICATORS")
|
||||
medium = self._extract_indicator_list("MEDIUM_INDICATORS")
|
||||
overlap = high & medium
|
||||
self.assertEqual(set(), overlap, f"duplicate cross-tier patterns found: {sorted(overlap)}")
|
||||
|
||||
def test_removed_duplicates_still_classify_at_high_tier(self):
|
||||
cases = {
|
||||
"I feel hopeless": "HIGH",
|
||||
"I feel trapped": "HIGH",
|
||||
"I feel desperate": "HIGH",
|
||||
"I have no future ahead": "HIGH",
|
||||
"I have given up on myself": "HIGH",
|
||||
}
|
||||
for text, expected in cases.items():
|
||||
with self.subTest(text=text):
|
||||
result = detect_crisis(text)
|
||||
self.assertEqual(expected, result.level)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user