diff --git a/crisis/metrics.py b/crisis/metrics.py new file mode 100644 index 0000000..0289c5e --- /dev/null +++ b/crisis/metrics.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +""" +Crisis Metrics CLI — View crisis detection health metrics. + +Usage: + python3 -m crisis.metrics --summary # weekly report + python3 -m crisis.metrics --json # raw JSON export + python3 -m crisis.metrics --today # today only +""" + +import argparse +import json +import sys +import time +from datetime import datetime, timedelta +from pathlib import Path + +# Metrics file location +METRICS_FILE = Path.home() / ".the-door" / "crisis_metrics.json" + + +def load_metrics(): + """Load metrics from file.""" + if not METRICS_FILE.exists(): + return {"detections": [], "stats": {}} + + try: + with open(METRICS_FILE) as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {"detections": [], "stats": {}} + + +def get_metrics_summary(days=7): + """Get metrics summary for the last N days.""" + data = load_metrics() + detections = data.get("detections", []) + + cutoff = time.time() - (days * 86400) + recent = [d for d in detections if d.get("timestamp", 0) > cutoff] + + if not recent: + return { + "period_days": days, + "total_detections": 0, + "by_severity": {}, + "by_source": {}, + "avg_response_time": 0, + } + + by_severity = {} + by_source = {} + total_response_time = 0 + response_count = 0 + + for d in recent: + severity = d.get("severity", "unknown") + source = d.get("source", "unknown") + + by_severity[severity] = by_severity.get(severity, 0) + 1 + by_source[source] = by_source.get(source, 0) + 1 + + if "response_time_ms" in d: + total_response_time += d["response_time_ms"] + response_count += 1 + + return { + "period_days": days, + "total_detections": len(recent), + "by_severity": by_severity, + "by_source": by_source, + "avg_response_time_ms": total_response_time / response_count if response_count else 0, + "first_detection": recent[0].get("timestamp"), + "last_detection": recent[-1].get("timestamp"), + } + + +def get_metrics_report(days=7): + """Generate a human-readable metrics report.""" + summary = get_metrics_summary(days) + + lines = [] + lines.append("=" * 50) + lines.append("CRISIS DETECTION METRICS") + lines.append(f"Period: Last {days} days") + lines.append("=" * 50) + lines.append("") + + total = summary["total_detections"] + lines.append(f"Total detections: {total}") + lines.append("") + + if total > 0: + lines.append("By severity:") + for sev, count in sorted(summary["by_severity"].items()): + pct = (count / total) * 100 + bar = "█" * int(pct / 5) + lines.append(f" {sev:12} {count:4} ({pct:5.1f}%) {bar}") + lines.append("") + + lines.append("By source:") + for src, count in sorted(summary["by_source"].items()): + lines.append(f" {src:20} {count:4}") + lines.append("") + + avg_ms = summary.get("avg_response_time_ms", 0) + lines.append(f"Avg response time: {avg_ms:.0f}ms") + + first = summary.get("first_detection") + last = summary.get("last_detection") + if first and last: + first_dt = datetime.fromtimestamp(first) + last_dt = datetime.fromtimestamp(last) + lines.append(f"First detection: {first_dt.strftime('%Y-%m-%d %H:%M')}") + lines.append(f"Last detection: {last_dt.strftime('%Y-%m-%d %H:%M')}") + else: + lines.append("No crisis detections in this period.") + + lines.append("") + lines.append("=" * 50) + + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser( + description="Crisis Detection Metrics CLI", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s --summary Weekly summary report + %(prog)s --today Today only + %(prog)s --json Raw JSON export + %(prog)s --days 30 Last 30 days + """, + ) + + parser.add_argument("--summary", action="store_true", help="Show summary report") + parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON") + parser.add_argument("--today", action="store_true", help="Today only (1 day)") + parser.add_argument("--days", type=int, default=7, help="Number of days (default: 7)") + parser.add_argument("--metrics-file", type=str, help="Custom metrics file path") + + args = parser.parse_args() + + if args.metrics_file: + global METRICS_FILE + METRICS_FILE = Path(args.metrics_file) + + days = 1 if args.today else args.days + + if args.json_output: + summary = get_metrics_summary(days) + print(json.dumps(summary, indent=2, default=str)) + else: + report = get_metrics_report(days) + print(report) + + +if __name__ == "__main__": + main()