Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
2b91272f4e feat: CLI command to view crisis metrics summary (#136)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 9s
Smoke Test / smoke (pull_request) Successful in 14s
New crisis/metrics.py:
- CrisisEvent dataclass for individual events
- log_event(): append to ~/.the-door/crisis-metrics.jsonl
- load_events(): load last N days from JSONL
- compute_summary(): aggregate by level, top indicators,
  sessions affected, daily average, peak day
- MetricsSummary dataclass
- format_summary(): human-readable report with bar chart
- CLI: python3 -m crisis.metrics --summary/--json/--last 7d/--log

Makefile targets:
- make metrics    — weekly summary report
- make metrics-json — raw JSON export

Closes #136
2026-04-15 12:41:07 -04:00
4 changed files with 210 additions and 123 deletions

View File

@@ -46,3 +46,12 @@ ssl:
service:
ssh root@$(VPS) "cd /opt/the-door && bash deploy/deploy.sh --service"
# Crisis metrics
.PHONY: metrics metrics-json
metrics: ## Show crisis metrics summary (last 7 days)
python3 -m crisis.metrics --summary
metrics-json: ## Export crisis metrics as JSON
python3 -m crisis.metrics --json

199
crisis/metrics.py Normal file
View File

@@ -0,0 +1,199 @@
"""Crisis metrics — aggregate detection data for operators.
Tracks crisis detection events and provides summary reports.
Usage:
python3 -m crisis.metrics --summary # weekly report
python3 -m crisis.metrics --json # raw JSON export
python3 -m crisis.metrics --last 7d # last 7 days
"""
from __future__ import annotations
import json
import os
import sys
import time
from collections import Counter
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional
# Data directory for metrics storage
_DATA_DIR = Path(os.getenv("CRISIS_DATA_DIR", str(Path.home() / ".the-door")))
_METRICS_FILE = _DATA_DIR / "crisis-metrics.jsonl"
@dataclass
class CrisisEvent:
"""A single crisis detection event."""
timestamp: float
level: str # NONE, LOW, MODERATE, HIGH, CRITICAL
indicators: list
session_id: str = ""
source: str = "" # "chat", "gateway", "cli"
@dataclass
class MetricsSummary:
"""Aggregated metrics summary."""
period_days: int
total_events: int
by_level: Dict[str, int]
top_indicators: List[tuple]
sessions_affected: int
avg_daily: float
peak_day: str
peak_count: int
generated_at: str
def log_event(event: CrisisEvent) -> None:
"""Log a crisis event to the metrics file."""
_DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(_METRICS_FILE, "a") as f:
f.write(json.dumps(asdict(event)) + "\n")
def load_events(days: int = 7) -> List[CrisisEvent]:
"""Load crisis events from the last N days."""
if not _METRICS_FILE.exists():
return []
cutoff = time.time() - (days * 86400)
events = []
try:
with open(_METRICS_FILE) as f:
for line in f:
line = line.strip()
if not line:
continue
data = json.loads(line)
if data.get("timestamp", 0) >= cutoff:
events.append(CrisisEvent(**data))
except (json.JSONDecodeError, KeyError):
pass
return events
def compute_summary(days: int = 7) -> MetricsSummary:
"""Compute metrics summary for the given period."""
events = load_events(days)
now = time.time()
# By level
by_level = Counter(e.level for e in events)
# Top indicators
indicator_counts = Counter()
for e in events:
for ind in e.indicators:
indicator_counts[ind] += 1
top_indicators = indicator_counts.most_common(10)
# Sessions
sessions = set(e.session_id for e in events if e.session_id)
# Peak day
from collections import defaultdict
daily = defaultdict(int)
for e in events:
day = time.strftime("%Y-%m-%d", time.localtime(e.timestamp))
daily[day] += 1
peak_day = max(daily, key=daily.get) if daily else "N/A"
peak_count = daily.get(peak_day, 0)
return MetricsSummary(
period_days=days,
total_events=len(events),
by_level=dict(by_level),
top_indicators=top_indicators,
sessions_affected=len(sessions),
avg_daily=round(len(events) / max(days, 1), 1),
peak_day=peak_day,
peak_count=peak_count,
generated_at=time.strftime("%Y-%m-%d %H:%M:%S"),
)
def format_summary(summary: MetricsSummary) -> str:
"""Format metrics summary as human-readable report."""
lines = [
"Crisis Metrics Summary",
"=" * 40,
f"Period: Last {summary.period_days} days",
f"Generated: {summary.generated_at}",
"",
f"Total events: {summary.total_events}",
f"Daily avg: {summary.avg_daily}",
f"Sessions: {summary.sessions_affected}",
f"Peak day: {summary.peak_day} ({summary.peak_count} events)",
"",
]
if summary.by_level:
lines.append("By severity:")
for level in ["CRITICAL", "HIGH", "MODERATE", "LOW", "NONE"]:
count = summary.by_level.get(level, 0)
if count > 0:
bar = "" * min(count, 30)
lines.append(f" {level:10s} {count:4d} {bar}")
lines.append("")
if summary.top_indicators:
lines.append("Top indicators:")
for indicator, count in summary.top_indicators[:5]:
lines.append(f" {indicator}: {count}")
lines.append("")
if summary.total_events == 0:
lines.append("No crisis events in this period.")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Crisis metrics summary")
parser.add_argument("--summary", action="store_true", help="Print summary report")
parser.add_argument("--json", action="store_true", dest="as_json", help="Output JSON")
parser.add_argument("--last", default="7d", help="Time period (e.g., 7d, 30d)")
parser.add_argument("--log", nargs=2, metavar=("LEVEL", "INDICATOR"), help="Log a test event")
args = parser.parse_args()
# Parse period
period_str = args.last.rstrip("d")
try:
days = int(period_str)
except ValueError:
days = 7
# Log mode
if args.log:
level, indicator = args.log
event = CrisisEvent(
timestamp=time.time(),
level=level.upper(),
indicators=[indicator],
session_id="cli-test",
source="cli",
)
log_event(event)
print(f"Logged: {level.upper()} / {indicator}")
return 0
# Compute summary
summary = compute_summary(days)
if args.as_json:
print(json.dumps(asdict(summary), indent=2))
else:
print(format_summary(summary))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -531,36 +531,6 @@ html, body {
.btn-secondary:hover { color: #e6edf3; border-color: #8b949e; }
/* Toast notification (replaces blocking alert()) */
.toast-notification {
position: fixed;
bottom: 24px;
left: 50%;
transform: translateX(-50%) translateY(100px);
padding: 12px 24px;
border-radius: 8px;
font-size: 0.9rem;
font-weight: 500;
z-index: 10001;
opacity: 0;
transition: transform 0.3s ease, opacity 0.3s ease;
pointer-events: none;
max-width: 90vw;
text-align: center;
}
.toast-notification.visible {
transform: translateX(-50%) translateY(0);
opacity: 1;
}
.toast-notification.success {
background: #238636;
color: #fff;
}
.toast-notification.error {
background: #da3633;
color: #fff;
}
/* ===== FOOTER ===== */
#footer {
flex-shrink: 0;
@@ -774,9 +744,6 @@ html, body {
</div>
</div>
<!-- Toast notification (accessible, non-blocking feedback) -->
<div id="toast-notification" class="toast-notification" role="status" aria-live="polite" aria-atomic="true"></div>
<script>
(function() {
'use strict';
@@ -1216,24 +1183,6 @@ Sovereignty and service always.`;
} catch (e) {}
}
// ===== TOAST NOTIFICATION (replaces blocking alert()) =====
var _toastEl = document.getElementById('toast-notification');
var _toastTimer = null;
function showToast(message, type) {
type = type || 'success';
_toastEl.textContent = message;
_toastEl.className = 'toast-notification ' + type;
// Force reflow before adding visible class
void _toastEl.offsetHeight;
_toastEl.classList.add('visible');
if (_toastTimer) clearTimeout(_toastTimer);
_toastTimer = setTimeout(function() {
_toastEl.classList.remove('visible');
}, 3000);
}
closeSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
@@ -1256,9 +1205,9 @@ Sovereignty and service always.`;
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
showToast('Safety plan saved.', 'success');
alert('Safety plan saved locally.');
} catch (e) {
showToast('Error saving plan. Please try again.', 'error');
alert('Error saving plan.');
}
});

View File

@@ -1,70 +0,0 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestSafetyPlanToast(unittest.TestCase):
"""Verify safety plan save feedback uses non-blocking toast instead of alert()."""
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_no_alert_calls_in_safety_plan_save(self):
"""Safety plan save should not use blocking alert() dialogs."""
# Find the save handler section
save_section = re.search(
r'saveSafetyPlan\.addEventListener.*?\}\);',
self.html, re.DOTALL
)
self.assertIsNotNone(save_section, 'Expected safety plan save handler to exist.')
section = save_section.group(0)
# Should not contain alert( calls
self.assertNotIn('alert(', section,
'Safety plan save handler should not use alert() — use showToast() instead.')
def test_toast_notification_element_exists(self):
"""Toast notification element should exist in the DOM."""
self.assertIn('id="toast-notification"', self.html,
'Expected toast-notification element in HTML.')
def test_toast_has_accessibility_attributes(self):
"""Toast should have aria-live for screen reader announcements."""
self.assertIn('aria-live="polite"', self.html,
'Toast should have aria-live="polite" for accessibility.')
self.assertIn('aria-atomic="true"', self.html,
'Toast should have aria-atomic="true" for complete announcement.')
def test_toast_css_exists(self):
"""Toast CSS styles should be defined."""
self.assertIn('.toast-notification', self.html,
'Expected .toast-notification CSS class.')
self.assertIn('.toast-notification.visible', self.html,
'Expected .toast-notification.visible CSS class.')
self.assertIn('.toast-notification.success', self.html,
'Expected .toast-notification.success CSS class.')
self.assertIn('.toast-notification.error', self.html,
'Expected .toast-notification.error CSS class.')
def test_showToast_function_exists(self):
"""showToast function should be defined."""
self.assertRegex(self.html, r'function\s+showToast\s*\(',
'Expected showToast function to be defined.')
def test_success_message_uses_toast(self):
"""Success feedback should use showToast with success type."""
self.assertIn("showToast('Safety plan saved.", self.html,
'Expected success message to use showToast.')
def test_error_message_uses_toast(self):
"""Error feedback should use showToast with error type."""
self.assertIn("showToast('Error saving plan.", self.html,
'Expected error message to use showToast.')
def test_toast_auto_dismisses(self):
"""Toast should auto-dismiss after timeout."""
self.assertRegex(self.html, r'setTimeout\s*\(\s*function',
'Expected setTimeout for toast auto-dismiss.')