Compare commits
1 Commits
fix/136
...
burn/36-17
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cbf2808b04 |
@@ -1,5 +1,22 @@
|
||||
"""Crisis detection and metrics module."""
|
||||
"""
|
||||
Crisis detection and response system for the-door.
|
||||
|
||||
from .metrics import get_metrics_summary, get_metrics_report
|
||||
Stands between a broken man and a machine that would tell him to die.
|
||||
"""
|
||||
|
||||
__all__ = ["get_metrics_summary", "get_metrics_report"]
|
||||
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
|
||||
from .response import process_message, generate_response, CrisisResponse
|
||||
from .gateway import check_crisis, get_system_prompt, format_gateway_response
|
||||
|
||||
__all__ = [
|
||||
"detect_crisis",
|
||||
"CrisisDetectionResult",
|
||||
"process_message",
|
||||
"generate_response",
|
||||
"CrisisResponse",
|
||||
"check_crisis",
|
||||
"get_system_prompt",
|
||||
"format_result",
|
||||
"format_gateway_response",
|
||||
"get_urgency_emoji",
|
||||
]
|
||||
|
||||
@@ -1,161 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Crisis Metrics CLI — View crisis detection health metrics.
|
||||
|
||||
Usage:
|
||||
python3 -m crisis.metrics --summary # weekly report
|
||||
python3 -m crisis.metrics --json # raw JSON export
|
||||
python3 -m crisis.metrics --today # today only
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
# Metrics file location
|
||||
METRICS_FILE = Path.home() / ".the-door" / "crisis_metrics.json"
|
||||
|
||||
|
||||
def load_metrics():
|
||||
"""Load metrics from file."""
|
||||
if not METRICS_FILE.exists():
|
||||
return {"detections": [], "stats": {}}
|
||||
|
||||
try:
|
||||
with open(METRICS_FILE) as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return {"detections": [], "stats": {}}
|
||||
|
||||
|
||||
def get_metrics_summary(days=7):
|
||||
"""Get metrics summary for the last N days."""
|
||||
data = load_metrics()
|
||||
detections = data.get("detections", [])
|
||||
|
||||
cutoff = time.time() - (days * 86400)
|
||||
recent = [d for d in detections if d.get("timestamp", 0) > cutoff]
|
||||
|
||||
if not recent:
|
||||
return {
|
||||
"period_days": days,
|
||||
"total_detections": 0,
|
||||
"by_severity": {},
|
||||
"by_source": {},
|
||||
"avg_response_time": 0,
|
||||
}
|
||||
|
||||
by_severity = {}
|
||||
by_source = {}
|
||||
total_response_time = 0
|
||||
response_count = 0
|
||||
|
||||
for d in recent:
|
||||
severity = d.get("severity", "unknown")
|
||||
source = d.get("source", "unknown")
|
||||
|
||||
by_severity[severity] = by_severity.get(severity, 0) + 1
|
||||
by_source[source] = by_source.get(source, 0) + 1
|
||||
|
||||
if "response_time_ms" in d:
|
||||
total_response_time += d["response_time_ms"]
|
||||
response_count += 1
|
||||
|
||||
return {
|
||||
"period_days": days,
|
||||
"total_detections": len(recent),
|
||||
"by_severity": by_severity,
|
||||
"by_source": by_source,
|
||||
"avg_response_time_ms": total_response_time / response_count if response_count else 0,
|
||||
"first_detection": recent[0].get("timestamp"),
|
||||
"last_detection": recent[-1].get("timestamp"),
|
||||
}
|
||||
|
||||
|
||||
def get_metrics_report(days=7):
|
||||
"""Generate a human-readable metrics report."""
|
||||
summary = get_metrics_summary(days)
|
||||
|
||||
lines = []
|
||||
lines.append("=" * 50)
|
||||
lines.append("CRISIS DETECTION METRICS")
|
||||
lines.append(f"Period: Last {days} days")
|
||||
lines.append("=" * 50)
|
||||
lines.append("")
|
||||
|
||||
total = summary["total_detections"]
|
||||
lines.append(f"Total detections: {total}")
|
||||
lines.append("")
|
||||
|
||||
if total > 0:
|
||||
lines.append("By severity:")
|
||||
for sev, count in sorted(summary["by_severity"].items()):
|
||||
pct = (count / total) * 100
|
||||
bar = "█" * int(pct / 5)
|
||||
lines.append(f" {sev:12} {count:4} ({pct:5.1f}%) {bar}")
|
||||
lines.append("")
|
||||
|
||||
lines.append("By source:")
|
||||
for src, count in sorted(summary["by_source"].items()):
|
||||
lines.append(f" {src:20} {count:4}")
|
||||
lines.append("")
|
||||
|
||||
avg_ms = summary.get("avg_response_time_ms", 0)
|
||||
lines.append(f"Avg response time: {avg_ms:.0f}ms")
|
||||
|
||||
first = summary.get("first_detection")
|
||||
last = summary.get("last_detection")
|
||||
if first and last:
|
||||
first_dt = datetime.fromtimestamp(first)
|
||||
last_dt = datetime.fromtimestamp(last)
|
||||
lines.append(f"First detection: {first_dt.strftime('%Y-%m-%d %H:%M')}")
|
||||
lines.append(f"Last detection: {last_dt.strftime('%Y-%m-%d %H:%M')}")
|
||||
else:
|
||||
lines.append("No crisis detections in this period.")
|
||||
|
||||
lines.append("")
|
||||
lines.append("=" * 50)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Crisis Detection Metrics CLI",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
%(prog)s --summary Weekly summary report
|
||||
%(prog)s --today Today only
|
||||
%(prog)s --json Raw JSON export
|
||||
%(prog)s --days 30 Last 30 days
|
||||
""",
|
||||
)
|
||||
|
||||
parser.add_argument("--summary", action="store_true", help="Show summary report")
|
||||
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
|
||||
parser.add_argument("--today", action="store_true", help="Today only (1 day)")
|
||||
parser.add_argument("--days", type=int, default=7, help="Number of days (default: 7)")
|
||||
parser.add_argument("--metrics-file", type=str, help="Custom metrics file path")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.metrics_file:
|
||||
global METRICS_FILE
|
||||
METRICS_FILE = Path(args.metrics_file)
|
||||
|
||||
days = 1 if args.today else args.days
|
||||
|
||||
if args.json_output:
|
||||
summary = get_metrics_summary(days)
|
||||
print(json.dumps(summary, indent=2, default=str))
|
||||
else:
|
||||
report = get_metrics_report(days)
|
||||
print(report)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1 +1,217 @@
|
||||
...
|
||||
#!/usr/bin/env python3
|
||||
"""Crisis synthesizer for The Door.
|
||||
|
||||
Logs anonymized crisis interaction metadata, analyzes recurring indicator patterns,
|
||||
and emits a weekly JSON report for human review.
|
||||
|
||||
Privacy rules:
|
||||
- no message content
|
||||
- no session identifiers
|
||||
- no IP or user identifiers
|
||||
- metadata only: level, indicators, response profile, continuation flag, message count
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import Counter, defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Iterable, List, Optional
|
||||
|
||||
LOG_DIR = Path.home() / ".hermes" / "the-door" / "logs"
|
||||
EVENT_LOG = LOG_DIR / "crisis_events.jsonl"
|
||||
ALLOWED_LEVELS = {"LOW", "MEDIUM", "HIGH", "CRITICAL"}
|
||||
|
||||
|
||||
def _now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _normalize_level(level: str) -> str:
|
||||
upper = (level or "").strip().upper()
|
||||
if upper not in ALLOWED_LEVELS:
|
||||
raise ValueError(f"invalid crisis level: {level}")
|
||||
return upper
|
||||
|
||||
|
||||
def _normalize_indicators(indicators: Iterable[str]) -> List[str]:
|
||||
cleaned = []
|
||||
for indicator in indicators or []:
|
||||
indicator = str(indicator).strip().lower()
|
||||
if indicator:
|
||||
cleaned.append(indicator)
|
||||
return sorted(dict.fromkeys(cleaned))
|
||||
|
||||
|
||||
def _coerce_log_path(log_path: Optional[os.PathLike | str]) -> Path:
|
||||
return Path(log_path) if log_path else EVENT_LOG
|
||||
|
||||
|
||||
def log_crisis_event(level, indicators, response_profile, user_continued, message_count=1, log_path=None):
|
||||
path = _coerce_log_path(log_path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
event = {
|
||||
"timestamp": _now_iso(),
|
||||
"level": _normalize_level(level),
|
||||
"indicators": _normalize_indicators(indicators),
|
||||
"response_profile": str(response_profile).strip().lower(),
|
||||
"user_continued": bool(user_continued),
|
||||
"message_count": int(message_count),
|
||||
}
|
||||
with path.open("a", encoding="utf-8") as handle:
|
||||
handle.write(json.dumps(event, sort_keys=True) + "\n")
|
||||
return event
|
||||
|
||||
|
||||
def load_events(days=7, log_path=None):
|
||||
path = _coerce_log_path(log_path)
|
||||
if not path.exists():
|
||||
return []
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
||||
events = []
|
||||
for line in path.read_text(encoding="utf-8").splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
event = json.loads(line)
|
||||
ts = datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00"))
|
||||
if ts >= cutoff:
|
||||
events.append(event)
|
||||
except (json.JSONDecodeError, KeyError, ValueError):
|
||||
continue
|
||||
return events
|
||||
|
||||
|
||||
def analyze_patterns(events):
|
||||
counts_by_level = Counter()
|
||||
indicator_counts = Counter()
|
||||
continuation_by_indicator = defaultdict(lambda: {"continued": 0, "total": 0})
|
||||
continuation_count = 0
|
||||
|
||||
for event in events:
|
||||
level = _normalize_level(event.get("level", "MEDIUM"))
|
||||
counts_by_level[level] += 1
|
||||
if event.get("user_continued"):
|
||||
continuation_count += 1
|
||||
for indicator in _normalize_indicators(event.get("indicators", [])):
|
||||
indicator_counts[indicator] += 1
|
||||
continuation_by_indicator[indicator]["total"] += 1
|
||||
if event.get("user_continued"):
|
||||
continuation_by_indicator[indicator]["continued"] += 1
|
||||
|
||||
total = len(events)
|
||||
continuation_rate = (continuation_count / total) if total else 0.0
|
||||
false_positive_rate_estimate = 1.0 - continuation_rate if total else 0.0
|
||||
|
||||
indicator_frequency = indicator_counts.most_common()
|
||||
indicator_continuation = {
|
||||
indicator: {
|
||||
"count": data["total"],
|
||||
"continuation_rate": round(data["continued"] / data["total"], 3) if data["total"] else 0.0,
|
||||
}
|
||||
for indicator, data in continuation_by_indicator.items()
|
||||
}
|
||||
|
||||
return {
|
||||
"total_events": total,
|
||||
"counts_by_level": dict(counts_by_level),
|
||||
"indicator_frequency": indicator_frequency,
|
||||
"indicator_continuation": indicator_continuation,
|
||||
"continuation_rate": continuation_rate,
|
||||
"false_positive_rate_estimate": false_positive_rate_estimate,
|
||||
}
|
||||
|
||||
|
||||
def suggest_keyword_adjustments(events, min_samples=5):
|
||||
analysis = analyze_patterns(events)
|
||||
suggestions = []
|
||||
for indicator, stats in analysis["indicator_continuation"].items():
|
||||
count = stats["count"]
|
||||
if count < min_samples:
|
||||
continue
|
||||
rate = stats["continuation_rate"]
|
||||
if rate >= 0.7:
|
||||
suggestion = "increase"
|
||||
reason = "indicator frequently appears in conversations that continue after intervention"
|
||||
elif rate <= 0.35:
|
||||
suggestion = "decrease"
|
||||
reason = "indicator frequently appears in interactions that end quickly, suggesting possible false positives"
|
||||
else:
|
||||
suggestion = "hold"
|
||||
reason = "indicator has mixed evidence and should remain stable pending more data"
|
||||
suggestions.append(
|
||||
{
|
||||
"indicator": indicator,
|
||||
"count": count,
|
||||
"continuation_rate": rate,
|
||||
"suggestion": suggestion,
|
||||
"reason": reason,
|
||||
}
|
||||
)
|
||||
suggestions.sort(key=lambda item: (-item["count"], item["indicator"]))
|
||||
return suggestions
|
||||
|
||||
|
||||
def weekly_report(days=7, log_path=None, min_samples=5):
|
||||
events = load_events(days=days, log_path=log_path)
|
||||
analysis = analyze_patterns(events)
|
||||
suggestions = suggest_keyword_adjustments(events, min_samples=min_samples)
|
||||
return {
|
||||
"generated_at": _now_iso(),
|
||||
"window_days": days,
|
||||
"summary": analysis,
|
||||
"suggestions": suggestions,
|
||||
"privacy": {
|
||||
"stores_user_content": False,
|
||||
"stores_session_ids": False,
|
||||
"stores_identifiers": False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def parse_args(argv=None):
|
||||
parser = argparse.ArgumentParser(description="Crisis synthesizer reporting tool")
|
||||
sub = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
log_cmd = sub.add_parser("log", help="Log a crisis interaction metadata event")
|
||||
log_cmd.add_argument("--level", required=True)
|
||||
log_cmd.add_argument("--indicators", nargs="*", default=[])
|
||||
log_cmd.add_argument("--response-profile", required=True)
|
||||
log_cmd.add_argument("--continued", action="store_true")
|
||||
log_cmd.add_argument("--message-count", type=int, default=1)
|
||||
log_cmd.add_argument("--log-path")
|
||||
|
||||
report_cmd = sub.add_parser("report", help="Print a weekly JSON report to stdout")
|
||||
report_cmd.add_argument("--days", type=int, default=7)
|
||||
report_cmd.add_argument("--log-path")
|
||||
report_cmd.add_argument("--min-samples", type=int, default=5)
|
||||
|
||||
return parser.parse_args(argv)
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
args = parse_args(argv)
|
||||
if args.command == "log":
|
||||
event = log_crisis_event(
|
||||
level=args.level,
|
||||
indicators=args.indicators,
|
||||
response_profile=args.response_profile,
|
||||
user_continued=args.continued,
|
||||
message_count=args.message_count,
|
||||
log_path=args.log_path,
|
||||
)
|
||||
print(json.dumps(event, indent=2, sort_keys=True))
|
||||
return 0
|
||||
|
||||
report = weekly_report(days=args.days, log_path=args.log_path, min_samples=args.min_samples)
|
||||
print(json.dumps(report, indent=2, sort_keys=True))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
152
tests/test_crisis_synthesizer.py
Normal file
152
tests/test_crisis_synthesizer.py
Normal file
@@ -0,0 +1,152 @@
|
||||
"""
|
||||
Regression tests for issue #36: crisis_synthesizer.
|
||||
|
||||
Verifies anonymized event logging, pattern analysis, keyword adjustment
|
||||
suggestions, and weekly JSON reporting.
|
||||
"""
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
from evolution import crisis_synthesizer
|
||||
|
||||
|
||||
class TestCrisisSynthesizer(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmpdir = tempfile.TemporaryDirectory()
|
||||
self.log_path = Path(self.tmpdir.name) / "crisis_events.jsonl"
|
||||
|
||||
def tearDown(self):
|
||||
self.tmpdir.cleanup()
|
||||
|
||||
def test_log_crisis_event_stores_only_metadata(self):
|
||||
event = crisis_synthesizer.log_crisis_event(
|
||||
level="CRITICAL",
|
||||
indicators=["kill myself", "plan tonight"],
|
||||
response_profile="guardian",
|
||||
user_continued=True,
|
||||
message_count=3,
|
||||
log_path=self.log_path,
|
||||
)
|
||||
self.assertEqual(event["level"], "CRITICAL")
|
||||
self.assertTrue(self.log_path.exists())
|
||||
|
||||
rows = [json.loads(line) for line in self.log_path.read_text().splitlines() if line.strip()]
|
||||
self.assertEqual(len(rows), 1)
|
||||
row = rows[0]
|
||||
self.assertEqual(
|
||||
set(row.keys()),
|
||||
{"timestamp", "level", "indicators", "response_profile", "user_continued", "message_count"},
|
||||
)
|
||||
self.assertNotIn("message", row)
|
||||
self.assertNotIn("content", row)
|
||||
self.assertNotIn("session_id", row)
|
||||
|
||||
def test_analyze_patterns_reports_counts_and_false_positive_estimate(self):
|
||||
events = [
|
||||
{
|
||||
"timestamp": "2026-04-14T00:00:00+00:00",
|
||||
"level": "CRITICAL",
|
||||
"indicators": ["kill myself", "plan tonight"],
|
||||
"response_profile": "guardian",
|
||||
"user_continued": True,
|
||||
"message_count": 4,
|
||||
},
|
||||
{
|
||||
"timestamp": "2026-04-14T00:05:00+00:00",
|
||||
"level": "HIGH",
|
||||
"indicators": ["kill myself"],
|
||||
"response_profile": "companion",
|
||||
"user_continued": False,
|
||||
"message_count": 2,
|
||||
},
|
||||
{
|
||||
"timestamp": "2026-04-14T00:10:00+00:00",
|
||||
"level": "HIGH",
|
||||
"indicators": ["kill myself"],
|
||||
"response_profile": "companion",
|
||||
"user_continued": True,
|
||||
"message_count": 3,
|
||||
},
|
||||
]
|
||||
analysis = crisis_synthesizer.analyze_patterns(events)
|
||||
self.assertEqual(analysis["total_events"], 3)
|
||||
self.assertEqual(analysis["counts_by_level"]["HIGH"], 2)
|
||||
self.assertEqual(analysis["counts_by_level"]["CRITICAL"], 1)
|
||||
self.assertEqual(analysis["indicator_frequency"][0][0], "kill myself")
|
||||
self.assertAlmostEqual(analysis["continuation_rate"], 2 / 3, places=3)
|
||||
self.assertAlmostEqual(analysis["false_positive_rate_estimate"], 1 / 3, places=3)
|
||||
|
||||
def test_suggestion_engine_recommends_adjustments_after_enough_data(self):
|
||||
events = []
|
||||
for i in range(6):
|
||||
events.append(
|
||||
{
|
||||
"timestamp": f"2026-04-14T00:0{i}:00+00:00",
|
||||
"level": "HIGH",
|
||||
"indicators": ["kill myself"],
|
||||
"response_profile": "companion",
|
||||
"user_continued": True,
|
||||
"message_count": 2,
|
||||
}
|
||||
)
|
||||
for i in range(6, 12):
|
||||
events.append(
|
||||
{
|
||||
"timestamp": f"2026-04-14T00:{i}:00+00:00",
|
||||
"level": "HIGH",
|
||||
"indicators": ["rough day"],
|
||||
"response_profile": "companion",
|
||||
"user_continued": False,
|
||||
"message_count": 1,
|
||||
}
|
||||
)
|
||||
|
||||
suggestions = crisis_synthesizer.suggest_keyword_adjustments(events, min_samples=5)
|
||||
by_indicator = {item["indicator"]: item for item in suggestions}
|
||||
self.assertEqual(by_indicator["kill myself"]["suggestion"], "increase")
|
||||
self.assertEqual(by_indicator["rough day"]["suggestion"], "decrease")
|
||||
|
||||
def test_weekly_report_is_json_serializable_and_stdout_ready(self):
|
||||
crisis_synthesizer.log_crisis_event(
|
||||
level="HIGH",
|
||||
indicators=["kill myself"],
|
||||
response_profile="companion",
|
||||
user_continued=True,
|
||||
message_count=2,
|
||||
log_path=self.log_path,
|
||||
)
|
||||
report = crisis_synthesizer.weekly_report(days=7, log_path=self.log_path)
|
||||
rendered = json.dumps(report)
|
||||
self.assertIn("summary", report)
|
||||
self.assertIn("suggestions", report)
|
||||
self.assertIn("counts_by_level", report["summary"])
|
||||
self.assertIn("kill myself", rendered)
|
||||
|
||||
def test_cli_report_prints_json(self):
|
||||
crisis_synthesizer.log_crisis_event(
|
||||
level="HIGH",
|
||||
indicators=["kill myself"],
|
||||
response_profile="companion",
|
||||
user_continued=True,
|
||||
message_count=2,
|
||||
log_path=self.log_path,
|
||||
)
|
||||
output = io.StringIO()
|
||||
with mock.patch("sys.stdout", output):
|
||||
crisis_synthesizer.main(["report", "--days", "7", "--log-path", str(self.log_path)])
|
||||
parsed = json.loads(output.getvalue())
|
||||
self.assertEqual(parsed["summary"]["total_events"], 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user