|
|
|
|
@@ -1,161 +1,199 @@
|
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""
|
|
|
|
|
Crisis Metrics CLI — View crisis detection health metrics.
|
|
|
|
|
"""Crisis metrics — aggregate detection data for operators.
|
|
|
|
|
|
|
|
|
|
Tracks crisis detection events and provides summary reports.
|
|
|
|
|
|
|
|
|
|
Usage:
|
|
|
|
|
python3 -m crisis.metrics --summary # weekly report
|
|
|
|
|
python3 -m crisis.metrics --json # raw JSON export
|
|
|
|
|
python3 -m crisis.metrics --today # today only
|
|
|
|
|
python3 -m crisis.metrics --last 7d # last 7 days
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
|
import time
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
from collections import Counter
|
|
|
|
|
from dataclasses import dataclass, asdict
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
|
|
|
|
# Metrics file location
|
|
|
|
|
METRICS_FILE = Path.home() / ".the-door" / "crisis_metrics.json"
|
|
|
|
|
# Data directory for metrics storage
|
|
|
|
|
_DATA_DIR = Path(os.getenv("CRISIS_DATA_DIR", str(Path.home() / ".the-door")))
|
|
|
|
|
_METRICS_FILE = _DATA_DIR / "crisis-metrics.jsonl"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_metrics():
|
|
|
|
|
"""Load metrics from file."""
|
|
|
|
|
if not METRICS_FILE.exists():
|
|
|
|
|
return {"detections": [], "stats": {}}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with open(METRICS_FILE) as f:
|
|
|
|
|
return json.load(f)
|
|
|
|
|
except (json.JSONDecodeError, IOError):
|
|
|
|
|
return {"detections": [], "stats": {}}
|
|
|
|
|
@dataclass
|
|
|
|
|
class CrisisEvent:
|
|
|
|
|
"""A single crisis detection event."""
|
|
|
|
|
timestamp: float
|
|
|
|
|
level: str # NONE, LOW, MODERATE, HIGH, CRITICAL
|
|
|
|
|
indicators: list
|
|
|
|
|
session_id: str = ""
|
|
|
|
|
source: str = "" # "chat", "gateway", "cli"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_metrics_summary(days=7):
|
|
|
|
|
"""Get metrics summary for the last N days."""
|
|
|
|
|
data = load_metrics()
|
|
|
|
|
detections = data.get("detections", [])
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class MetricsSummary:
|
|
|
|
|
"""Aggregated metrics summary."""
|
|
|
|
|
period_days: int
|
|
|
|
|
total_events: int
|
|
|
|
|
by_level: Dict[str, int]
|
|
|
|
|
top_indicators: List[tuple]
|
|
|
|
|
sessions_affected: int
|
|
|
|
|
avg_daily: float
|
|
|
|
|
peak_day: str
|
|
|
|
|
peak_count: int
|
|
|
|
|
generated_at: str
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def log_event(event: CrisisEvent) -> None:
|
|
|
|
|
"""Log a crisis event to the metrics file."""
|
|
|
|
|
_DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
with open(_METRICS_FILE, "a") as f:
|
|
|
|
|
f.write(json.dumps(asdict(event)) + "\n")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_events(days: int = 7) -> List[CrisisEvent]:
|
|
|
|
|
"""Load crisis events from the last N days."""
|
|
|
|
|
if not _METRICS_FILE.exists():
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
cutoff = time.time() - (days * 86400)
|
|
|
|
|
recent = [d for d in detections if d.get("timestamp", 0) > cutoff]
|
|
|
|
|
|
|
|
|
|
if not recent:
|
|
|
|
|
return {
|
|
|
|
|
"period_days": days,
|
|
|
|
|
"total_detections": 0,
|
|
|
|
|
"by_severity": {},
|
|
|
|
|
"by_source": {},
|
|
|
|
|
"avg_response_time": 0,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
by_severity = {}
|
|
|
|
|
by_source = {}
|
|
|
|
|
total_response_time = 0
|
|
|
|
|
response_count = 0
|
|
|
|
|
|
|
|
|
|
for d in recent:
|
|
|
|
|
severity = d.get("severity", "unknown")
|
|
|
|
|
source = d.get("source", "unknown")
|
|
|
|
|
|
|
|
|
|
by_severity[severity] = by_severity.get(severity, 0) + 1
|
|
|
|
|
by_source[source] = by_source.get(source, 0) + 1
|
|
|
|
|
|
|
|
|
|
if "response_time_ms" in d:
|
|
|
|
|
total_response_time += d["response_time_ms"]
|
|
|
|
|
response_count += 1
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"period_days": days,
|
|
|
|
|
"total_detections": len(recent),
|
|
|
|
|
"by_severity": by_severity,
|
|
|
|
|
"by_source": by_source,
|
|
|
|
|
"avg_response_time_ms": total_response_time / response_count if response_count else 0,
|
|
|
|
|
"first_detection": recent[0].get("timestamp"),
|
|
|
|
|
"last_detection": recent[-1].get("timestamp"),
|
|
|
|
|
}
|
|
|
|
|
events = []
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with open(_METRICS_FILE) as f:
|
|
|
|
|
for line in f:
|
|
|
|
|
line = line.strip()
|
|
|
|
|
if not line:
|
|
|
|
|
continue
|
|
|
|
|
data = json.loads(line)
|
|
|
|
|
if data.get("timestamp", 0) >= cutoff:
|
|
|
|
|
events.append(CrisisEvent(**data))
|
|
|
|
|
except (json.JSONDecodeError, KeyError):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
return events
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_metrics_report(days=7):
|
|
|
|
|
"""Generate a human-readable metrics report."""
|
|
|
|
|
summary = get_metrics_summary(days)
|
|
|
|
|
|
|
|
|
|
lines = []
|
|
|
|
|
lines.append("=" * 50)
|
|
|
|
|
lines.append("CRISIS DETECTION METRICS")
|
|
|
|
|
lines.append(f"Period: Last {days} days")
|
|
|
|
|
lines.append("=" * 50)
|
|
|
|
|
lines.append("")
|
|
|
|
|
|
|
|
|
|
total = summary["total_detections"]
|
|
|
|
|
lines.append(f"Total detections: {total}")
|
|
|
|
|
lines.append("")
|
|
|
|
|
|
|
|
|
|
if total > 0:
|
|
|
|
|
def compute_summary(days: int = 7) -> MetricsSummary:
|
|
|
|
|
"""Compute metrics summary for the given period."""
|
|
|
|
|
events = load_events(days)
|
|
|
|
|
now = time.time()
|
|
|
|
|
|
|
|
|
|
# By level
|
|
|
|
|
by_level = Counter(e.level for e in events)
|
|
|
|
|
|
|
|
|
|
# Top indicators
|
|
|
|
|
indicator_counts = Counter()
|
|
|
|
|
for e in events:
|
|
|
|
|
for ind in e.indicators:
|
|
|
|
|
indicator_counts[ind] += 1
|
|
|
|
|
top_indicators = indicator_counts.most_common(10)
|
|
|
|
|
|
|
|
|
|
# Sessions
|
|
|
|
|
sessions = set(e.session_id for e in events if e.session_id)
|
|
|
|
|
|
|
|
|
|
# Peak day
|
|
|
|
|
from collections import defaultdict
|
|
|
|
|
daily = defaultdict(int)
|
|
|
|
|
for e in events:
|
|
|
|
|
day = time.strftime("%Y-%m-%d", time.localtime(e.timestamp))
|
|
|
|
|
daily[day] += 1
|
|
|
|
|
peak_day = max(daily, key=daily.get) if daily else "N/A"
|
|
|
|
|
peak_count = daily.get(peak_day, 0)
|
|
|
|
|
|
|
|
|
|
return MetricsSummary(
|
|
|
|
|
period_days=days,
|
|
|
|
|
total_events=len(events),
|
|
|
|
|
by_level=dict(by_level),
|
|
|
|
|
top_indicators=top_indicators,
|
|
|
|
|
sessions_affected=len(sessions),
|
|
|
|
|
avg_daily=round(len(events) / max(days, 1), 1),
|
|
|
|
|
peak_day=peak_day,
|
|
|
|
|
peak_count=peak_count,
|
|
|
|
|
generated_at=time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_summary(summary: MetricsSummary) -> str:
|
|
|
|
|
"""Format metrics summary as human-readable report."""
|
|
|
|
|
lines = [
|
|
|
|
|
"Crisis Metrics Summary",
|
|
|
|
|
"=" * 40,
|
|
|
|
|
f"Period: Last {summary.period_days} days",
|
|
|
|
|
f"Generated: {summary.generated_at}",
|
|
|
|
|
"",
|
|
|
|
|
f"Total events: {summary.total_events}",
|
|
|
|
|
f"Daily avg: {summary.avg_daily}",
|
|
|
|
|
f"Sessions: {summary.sessions_affected}",
|
|
|
|
|
f"Peak day: {summary.peak_day} ({summary.peak_count} events)",
|
|
|
|
|
"",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
if summary.by_level:
|
|
|
|
|
lines.append("By severity:")
|
|
|
|
|
for sev, count in sorted(summary["by_severity"].items()):
|
|
|
|
|
pct = (count / total) * 100
|
|
|
|
|
bar = "█" * int(pct / 5)
|
|
|
|
|
lines.append(f" {sev:12} {count:4} ({pct:5.1f}%) {bar}")
|
|
|
|
|
for level in ["CRITICAL", "HIGH", "MODERATE", "LOW", "NONE"]:
|
|
|
|
|
count = summary.by_level.get(level, 0)
|
|
|
|
|
if count > 0:
|
|
|
|
|
bar = "█" * min(count, 30)
|
|
|
|
|
lines.append(f" {level:10s} {count:4d} {bar}")
|
|
|
|
|
lines.append("")
|
|
|
|
|
|
|
|
|
|
lines.append("By source:")
|
|
|
|
|
for src, count in sorted(summary["by_source"].items()):
|
|
|
|
|
lines.append(f" {src:20} {count:4}")
|
|
|
|
|
|
|
|
|
|
if summary.top_indicators:
|
|
|
|
|
lines.append("Top indicators:")
|
|
|
|
|
for indicator, count in summary.top_indicators[:5]:
|
|
|
|
|
lines.append(f" {indicator}: {count}")
|
|
|
|
|
lines.append("")
|
|
|
|
|
|
|
|
|
|
avg_ms = summary.get("avg_response_time_ms", 0)
|
|
|
|
|
lines.append(f"Avg response time: {avg_ms:.0f}ms")
|
|
|
|
|
|
|
|
|
|
first = summary.get("first_detection")
|
|
|
|
|
last = summary.get("last_detection")
|
|
|
|
|
if first and last:
|
|
|
|
|
first_dt = datetime.fromtimestamp(first)
|
|
|
|
|
last_dt = datetime.fromtimestamp(last)
|
|
|
|
|
lines.append(f"First detection: {first_dt.strftime('%Y-%m-%d %H:%M')}")
|
|
|
|
|
lines.append(f"Last detection: {last_dt.strftime('%Y-%m-%d %H:%M')}")
|
|
|
|
|
else:
|
|
|
|
|
lines.append("No crisis detections in this period.")
|
|
|
|
|
|
|
|
|
|
lines.append("")
|
|
|
|
|
lines.append("=" * 50)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if summary.total_events == 0:
|
|
|
|
|
lines.append("No crisis events in this period.")
|
|
|
|
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
|
|
|
description="Crisis Detection Metrics CLI",
|
|
|
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
|
|
|
epilog="""
|
|
|
|
|
Examples:
|
|
|
|
|
%(prog)s --summary Weekly summary report
|
|
|
|
|
%(prog)s --today Today only
|
|
|
|
|
%(prog)s --json Raw JSON export
|
|
|
|
|
%(prog)s --days 30 Last 30 days
|
|
|
|
|
""",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
parser.add_argument("--summary", action="store_true", help="Show summary report")
|
|
|
|
|
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
|
|
|
|
|
parser.add_argument("--today", action="store_true", help="Today only (1 day)")
|
|
|
|
|
parser.add_argument("--days", type=int, default=7, help="Number of days (default: 7)")
|
|
|
|
|
parser.add_argument("--metrics-file", type=str, help="Custom metrics file path")
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
parser = argparse.ArgumentParser(description="Crisis metrics summary")
|
|
|
|
|
parser.add_argument("--summary", action="store_true", help="Print summary report")
|
|
|
|
|
parser.add_argument("--json", action="store_true", dest="as_json", help="Output JSON")
|
|
|
|
|
parser.add_argument("--last", default="7d", help="Time period (e.g., 7d, 30d)")
|
|
|
|
|
parser.add_argument("--log", nargs=2, metavar=("LEVEL", "INDICATOR"), help="Log a test event")
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
|
if args.metrics_file:
|
|
|
|
|
global METRICS_FILE
|
|
|
|
|
METRICS_FILE = Path(args.metrics_file)
|
|
|
|
|
|
|
|
|
|
days = 1 if args.today else args.days
|
|
|
|
|
|
|
|
|
|
if args.json_output:
|
|
|
|
|
summary = get_metrics_summary(days)
|
|
|
|
|
print(json.dumps(summary, indent=2, default=str))
|
|
|
|
|
|
|
|
|
|
# Parse period
|
|
|
|
|
period_str = args.last.rstrip("d")
|
|
|
|
|
try:
|
|
|
|
|
days = int(period_str)
|
|
|
|
|
except ValueError:
|
|
|
|
|
days = 7
|
|
|
|
|
|
|
|
|
|
# Log mode
|
|
|
|
|
if args.log:
|
|
|
|
|
level, indicator = args.log
|
|
|
|
|
event = CrisisEvent(
|
|
|
|
|
timestamp=time.time(),
|
|
|
|
|
level=level.upper(),
|
|
|
|
|
indicators=[indicator],
|
|
|
|
|
session_id="cli-test",
|
|
|
|
|
source="cli",
|
|
|
|
|
)
|
|
|
|
|
log_event(event)
|
|
|
|
|
print(f"Logged: {level.upper()} / {indicator}")
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
# Compute summary
|
|
|
|
|
summary = compute_summary(days)
|
|
|
|
|
|
|
|
|
|
if args.as_json:
|
|
|
|
|
print(json.dumps(asdict(summary), indent=2))
|
|
|
|
|
else:
|
|
|
|
|
report = get_metrics_report(days)
|
|
|
|
|
print(report)
|
|
|
|
|
print(format_summary(summary))
|
|
|
|
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|
|
|
|
|
sys.exit(main())
|
|
|
|
|
|