Compare commits

...

2 Commits

Author SHA1 Message Date
af419fb797 feat(#136): Export metrics functions from crisis module
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 13s
Refs #136
2026-04-15 15:26:40 +00:00
d7d40f490a feat(#136): Add CLI command to view crisis metrics summary
CLI entry point for crisis detection metrics:
- python3 -m crisis.metrics --summary (weekly report)
- python3 -m crisis.metrics --json (raw JSON export)
- python3 -m crisis.metrics --today (today only)

Resolves #136
2026-04-15 15:23:28 +00:00
2 changed files with 164 additions and 20 deletions

View File

@@ -1,22 +1,5 @@
"""
Crisis detection and response system for the-door.
"""Crisis detection and metrics module."""
Stands between a broken man and a machine that would tell him to die.
"""
from .metrics import get_metrics_summary, get_metrics_report
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
__all__ = [
"detect_crisis",
"CrisisDetectionResult",
"process_message",
"generate_response",
"CrisisResponse",
"check_crisis",
"get_system_prompt",
"format_result",
"format_gateway_response",
"get_urgency_emoji",
]
__all__ = ["get_metrics_summary", "get_metrics_report"]

161
crisis/metrics.py Normal file
View File

@@ -0,0 +1,161 @@
#!/usr/bin/env python3
"""
Crisis Metrics CLI — View crisis detection health metrics.
Usage:
python3 -m crisis.metrics --summary # weekly report
python3 -m crisis.metrics --json # raw JSON export
python3 -m crisis.metrics --today # today only
"""
import argparse
import json
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
# Metrics file location
METRICS_FILE = Path.home() / ".the-door" / "crisis_metrics.json"
def load_metrics():
"""Load metrics from file."""
if not METRICS_FILE.exists():
return {"detections": [], "stats": {}}
try:
with open(METRICS_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {"detections": [], "stats": {}}
def get_metrics_summary(days=7):
"""Get metrics summary for the last N days."""
data = load_metrics()
detections = data.get("detections", [])
cutoff = time.time() - (days * 86400)
recent = [d for d in detections if d.get("timestamp", 0) > cutoff]
if not recent:
return {
"period_days": days,
"total_detections": 0,
"by_severity": {},
"by_source": {},
"avg_response_time": 0,
}
by_severity = {}
by_source = {}
total_response_time = 0
response_count = 0
for d in recent:
severity = d.get("severity", "unknown")
source = d.get("source", "unknown")
by_severity[severity] = by_severity.get(severity, 0) + 1
by_source[source] = by_source.get(source, 0) + 1
if "response_time_ms" in d:
total_response_time += d["response_time_ms"]
response_count += 1
return {
"period_days": days,
"total_detections": len(recent),
"by_severity": by_severity,
"by_source": by_source,
"avg_response_time_ms": total_response_time / response_count if response_count else 0,
"first_detection": recent[0].get("timestamp"),
"last_detection": recent[-1].get("timestamp"),
}
def get_metrics_report(days=7):
"""Generate a human-readable metrics report."""
summary = get_metrics_summary(days)
lines = []
lines.append("=" * 50)
lines.append("CRISIS DETECTION METRICS")
lines.append(f"Period: Last {days} days")
lines.append("=" * 50)
lines.append("")
total = summary["total_detections"]
lines.append(f"Total detections: {total}")
lines.append("")
if total > 0:
lines.append("By severity:")
for sev, count in sorted(summary["by_severity"].items()):
pct = (count / total) * 100
bar = "" * int(pct / 5)
lines.append(f" {sev:12} {count:4} ({pct:5.1f}%) {bar}")
lines.append("")
lines.append("By source:")
for src, count in sorted(summary["by_source"].items()):
lines.append(f" {src:20} {count:4}")
lines.append("")
avg_ms = summary.get("avg_response_time_ms", 0)
lines.append(f"Avg response time: {avg_ms:.0f}ms")
first = summary.get("first_detection")
last = summary.get("last_detection")
if first and last:
first_dt = datetime.fromtimestamp(first)
last_dt = datetime.fromtimestamp(last)
lines.append(f"First detection: {first_dt.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"Last detection: {last_dt.strftime('%Y-%m-%d %H:%M')}")
else:
lines.append("No crisis detections in this period.")
lines.append("")
lines.append("=" * 50)
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Crisis Detection Metrics CLI",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --summary Weekly summary report
%(prog)s --today Today only
%(prog)s --json Raw JSON export
%(prog)s --days 30 Last 30 days
""",
)
parser.add_argument("--summary", action="store_true", help="Show summary report")
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
parser.add_argument("--today", action="store_true", help="Today only (1 day)")
parser.add_argument("--days", type=int, default=7, help="Number of days (default: 7)")
parser.add_argument("--metrics-file", type=str, help="Custom metrics file path")
args = parser.parse_args()
if args.metrics_file:
global METRICS_FILE
METRICS_FILE = Path(args.metrics_file)
days = 1 if args.today else args.days
if args.json_output:
summary = get_metrics_summary(days)
print(json.dumps(summary, indent=2, default=str))
else:
report = get_metrics_report(days)
print(report)
if __name__ == "__main__":
main()