Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
2b91272f4e feat: CLI command to view crisis metrics summary (#136)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 9s
Smoke Test / smoke (pull_request) Successful in 14s
New crisis/metrics.py:
- CrisisEvent dataclass for individual events
- log_event(): append to ~/.the-door/crisis-metrics.jsonl
- load_events(): load last N days from JSONL
- compute_summary(): aggregate by level, top indicators,
  sessions affected, daily average, peak day
- MetricsSummary dataclass
- format_summary(): human-readable report with bar chart
- CLI: python3 -m crisis.metrics --summary/--json/--last 7d/--log

Makefile targets:
- make metrics    — weekly summary report
- make metrics-json — raw JSON export

Closes #136
2026-04-15 12:41:07 -04:00
3 changed files with 198 additions and 134 deletions

View File

@@ -46,3 +46,12 @@ ssl:
service:
ssh root@$(VPS) "cd /opt/the-door && bash deploy/deploy.sh --service"
# Crisis metrics
.PHONY: metrics metrics-json
metrics: ## Show crisis metrics summary (last 7 days)
python3 -m crisis.metrics --summary
metrics-json: ## Export crisis metrics as JSON
python3 -m crisis.metrics --json

View File

@@ -1,5 +1,22 @@
"""Crisis detection and metrics module."""
"""
Crisis detection and response system for the-door.
from .metrics import get_metrics_summary, get_metrics_report
Stands between a broken man and a machine that would tell him to die.
"""
__all__ = ["get_metrics_summary", "get_metrics_report"]
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
__all__ = [
"detect_crisis",
"CrisisDetectionResult",
"process_message",
"generate_response",
"CrisisResponse",
"check_crisis",
"get_system_prompt",
"format_result",
"format_gateway_response",
"get_urgency_emoji",
]

View File

@@ -1,161 +1,199 @@
#!/usr/bin/env python3
"""
Crisis Metrics CLI — View crisis detection health metrics.
"""Crisis metrics — aggregate detection data for operators.
Tracks crisis detection events and provides summary reports.
Usage:
python3 -m crisis.metrics --summary # weekly report
python3 -m crisis.metrics --json # raw JSON export
python3 -m crisis.metrics --today # today only
python3 -m crisis.metrics --last 7d # last 7 days
"""
import argparse
from __future__ import annotations
import json
import os
import sys
import time
from datetime import datetime, timedelta
from collections import Counter
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional
# Metrics file location
METRICS_FILE = Path.home() / ".the-door" / "crisis_metrics.json"
# Data directory for metrics storage
_DATA_DIR = Path(os.getenv("CRISIS_DATA_DIR", str(Path.home() / ".the-door")))
_METRICS_FILE = _DATA_DIR / "crisis-metrics.jsonl"
def load_metrics():
"""Load metrics from file."""
if not METRICS_FILE.exists():
return {"detections": [], "stats": {}}
try:
with open(METRICS_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {"detections": [], "stats": {}}
@dataclass
class CrisisEvent:
"""A single crisis detection event."""
timestamp: float
level: str # NONE, LOW, MODERATE, HIGH, CRITICAL
indicators: list
session_id: str = ""
source: str = "" # "chat", "gateway", "cli"
def get_metrics_summary(days=7):
"""Get metrics summary for the last N days."""
data = load_metrics()
detections = data.get("detections", [])
@dataclass
class MetricsSummary:
"""Aggregated metrics summary."""
period_days: int
total_events: int
by_level: Dict[str, int]
top_indicators: List[tuple]
sessions_affected: int
avg_daily: float
peak_day: str
peak_count: int
generated_at: str
def log_event(event: CrisisEvent) -> None:
"""Log a crisis event to the metrics file."""
_DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(_METRICS_FILE, "a") as f:
f.write(json.dumps(asdict(event)) + "\n")
def load_events(days: int = 7) -> List[CrisisEvent]:
"""Load crisis events from the last N days."""
if not _METRICS_FILE.exists():
return []
cutoff = time.time() - (days * 86400)
recent = [d for d in detections if d.get("timestamp", 0) > cutoff]
if not recent:
return {
"period_days": days,
"total_detections": 0,
"by_severity": {},
"by_source": {},
"avg_response_time": 0,
}
by_severity = {}
by_source = {}
total_response_time = 0
response_count = 0
for d in recent:
severity = d.get("severity", "unknown")
source = d.get("source", "unknown")
by_severity[severity] = by_severity.get(severity, 0) + 1
by_source[source] = by_source.get(source, 0) + 1
if "response_time_ms" in d:
total_response_time += d["response_time_ms"]
response_count += 1
return {
"period_days": days,
"total_detections": len(recent),
"by_severity": by_severity,
"by_source": by_source,
"avg_response_time_ms": total_response_time / response_count if response_count else 0,
"first_detection": recent[0].get("timestamp"),
"last_detection": recent[-1].get("timestamp"),
}
events = []
try:
with open(_METRICS_FILE) as f:
for line in f:
line = line.strip()
if not line:
continue
data = json.loads(line)
if data.get("timestamp", 0) >= cutoff:
events.append(CrisisEvent(**data))
except (json.JSONDecodeError, KeyError):
pass
return events
def get_metrics_report(days=7):
"""Generate a human-readable metrics report."""
summary = get_metrics_summary(days)
lines = []
lines.append("=" * 50)
lines.append("CRISIS DETECTION METRICS")
lines.append(f"Period: Last {days} days")
lines.append("=" * 50)
lines.append("")
total = summary["total_detections"]
lines.append(f"Total detections: {total}")
lines.append("")
if total > 0:
def compute_summary(days: int = 7) -> MetricsSummary:
"""Compute metrics summary for the given period."""
events = load_events(days)
now = time.time()
# By level
by_level = Counter(e.level for e in events)
# Top indicators
indicator_counts = Counter()
for e in events:
for ind in e.indicators:
indicator_counts[ind] += 1
top_indicators = indicator_counts.most_common(10)
# Sessions
sessions = set(e.session_id for e in events if e.session_id)
# Peak day
from collections import defaultdict
daily = defaultdict(int)
for e in events:
day = time.strftime("%Y-%m-%d", time.localtime(e.timestamp))
daily[day] += 1
peak_day = max(daily, key=daily.get) if daily else "N/A"
peak_count = daily.get(peak_day, 0)
return MetricsSummary(
period_days=days,
total_events=len(events),
by_level=dict(by_level),
top_indicators=top_indicators,
sessions_affected=len(sessions),
avg_daily=round(len(events) / max(days, 1), 1),
peak_day=peak_day,
peak_count=peak_count,
generated_at=time.strftime("%Y-%m-%d %H:%M:%S"),
)
def format_summary(summary: MetricsSummary) -> str:
"""Format metrics summary as human-readable report."""
lines = [
"Crisis Metrics Summary",
"=" * 40,
f"Period: Last {summary.period_days} days",
f"Generated: {summary.generated_at}",
"",
f"Total events: {summary.total_events}",
f"Daily avg: {summary.avg_daily}",
f"Sessions: {summary.sessions_affected}",
f"Peak day: {summary.peak_day} ({summary.peak_count} events)",
"",
]
if summary.by_level:
lines.append("By severity:")
for sev, count in sorted(summary["by_severity"].items()):
pct = (count / total) * 100
bar = "" * int(pct / 5)
lines.append(f" {sev:12} {count:4} ({pct:5.1f}%) {bar}")
for level in ["CRITICAL", "HIGH", "MODERATE", "LOW", "NONE"]:
count = summary.by_level.get(level, 0)
if count > 0:
bar = "" * min(count, 30)
lines.append(f" {level:10s} {count:4d} {bar}")
lines.append("")
lines.append("By source:")
for src, count in sorted(summary["by_source"].items()):
lines.append(f" {src:20} {count:4}")
if summary.top_indicators:
lines.append("Top indicators:")
for indicator, count in summary.top_indicators[:5]:
lines.append(f" {indicator}: {count}")
lines.append("")
avg_ms = summary.get("avg_response_time_ms", 0)
lines.append(f"Avg response time: {avg_ms:.0f}ms")
first = summary.get("first_detection")
last = summary.get("last_detection")
if first and last:
first_dt = datetime.fromtimestamp(first)
last_dt = datetime.fromtimestamp(last)
lines.append(f"First detection: {first_dt.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"Last detection: {last_dt.strftime('%Y-%m-%d %H:%M')}")
else:
lines.append("No crisis detections in this period.")
lines.append("")
lines.append("=" * 50)
if summary.total_events == 0:
lines.append("No crisis events in this period.")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Crisis Detection Metrics CLI",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --summary Weekly summary report
%(prog)s --today Today only
%(prog)s --json Raw JSON export
%(prog)s --days 30 Last 30 days
""",
)
parser.add_argument("--summary", action="store_true", help="Show summary report")
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
parser.add_argument("--today", action="store_true", help="Today only (1 day)")
parser.add_argument("--days", type=int, default=7, help="Number of days (default: 7)")
parser.add_argument("--metrics-file", type=str, help="Custom metrics file path")
import argparse
parser = argparse.ArgumentParser(description="Crisis metrics summary")
parser.add_argument("--summary", action="store_true", help="Print summary report")
parser.add_argument("--json", action="store_true", dest="as_json", help="Output JSON")
parser.add_argument("--last", default="7d", help="Time period (e.g., 7d, 30d)")
parser.add_argument("--log", nargs=2, metavar=("LEVEL", "INDICATOR"), help="Log a test event")
args = parser.parse_args()
if args.metrics_file:
global METRICS_FILE
METRICS_FILE = Path(args.metrics_file)
days = 1 if args.today else args.days
if args.json_output:
summary = get_metrics_summary(days)
print(json.dumps(summary, indent=2, default=str))
# Parse period
period_str = args.last.rstrip("d")
try:
days = int(period_str)
except ValueError:
days = 7
# Log mode
if args.log:
level, indicator = args.log
event = CrisisEvent(
timestamp=time.time(),
level=level.upper(),
indicators=[indicator],
session_id="cli-test",
source="cli",
)
log_event(event)
print(f"Logged: {level.upper()} / {indicator}")
return 0
# Compute summary
summary = compute_summary(days)
if args.as_json:
print(json.dumps(asdict(summary), indent=2))
else:
report = get_metrics_report(days)
print(report)
print(format_summary(summary))
return 0
if __name__ == "__main__":
main()
sys.exit(main())