Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
cbf2808b04 feat: build crisis synthesizer metadata pipeline (refs #36)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 6s
Smoke Test / smoke (pull_request) Successful in 18s
2026-04-14 22:27:06 -04:00
4 changed files with 389 additions and 165 deletions

View File

@@ -1,5 +1,22 @@
"""Crisis detection and metrics module."""
"""
Crisis detection and response system for the-door.
from .metrics import get_metrics_summary, get_metrics_report
Stands between a broken man and a machine that would tell him to die.
"""
__all__ = ["get_metrics_summary", "get_metrics_report"]
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
__all__ = [
"detect_crisis",
"CrisisDetectionResult",
"process_message",
"generate_response",
"CrisisResponse",
"check_crisis",
"get_system_prompt",
"format_result",
"format_gateway_response",
"get_urgency_emoji",
]

View File

@@ -1,161 +0,0 @@
#!/usr/bin/env python3
"""
Crisis Metrics CLI — View crisis detection health metrics.
Usage:
python3 -m crisis.metrics --summary # weekly report
python3 -m crisis.metrics --json # raw JSON export
python3 -m crisis.metrics --today # today only
"""
import argparse
import json
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
# Metrics file location
METRICS_FILE = Path.home() / ".the-door" / "crisis_metrics.json"
def load_metrics():
"""Load metrics from file."""
if not METRICS_FILE.exists():
return {"detections": [], "stats": {}}
try:
with open(METRICS_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {"detections": [], "stats": {}}
def get_metrics_summary(days=7):
"""Get metrics summary for the last N days."""
data = load_metrics()
detections = data.get("detections", [])
cutoff = time.time() - (days * 86400)
recent = [d for d in detections if d.get("timestamp", 0) > cutoff]
if not recent:
return {
"period_days": days,
"total_detections": 0,
"by_severity": {},
"by_source": {},
"avg_response_time": 0,
}
by_severity = {}
by_source = {}
total_response_time = 0
response_count = 0
for d in recent:
severity = d.get("severity", "unknown")
source = d.get("source", "unknown")
by_severity[severity] = by_severity.get(severity, 0) + 1
by_source[source] = by_source.get(source, 0) + 1
if "response_time_ms" in d:
total_response_time += d["response_time_ms"]
response_count += 1
return {
"period_days": days,
"total_detections": len(recent),
"by_severity": by_severity,
"by_source": by_source,
"avg_response_time_ms": total_response_time / response_count if response_count else 0,
"first_detection": recent[0].get("timestamp"),
"last_detection": recent[-1].get("timestamp"),
}
def get_metrics_report(days=7):
"""Generate a human-readable metrics report."""
summary = get_metrics_summary(days)
lines = []
lines.append("=" * 50)
lines.append("CRISIS DETECTION METRICS")
lines.append(f"Period: Last {days} days")
lines.append("=" * 50)
lines.append("")
total = summary["total_detections"]
lines.append(f"Total detections: {total}")
lines.append("")
if total > 0:
lines.append("By severity:")
for sev, count in sorted(summary["by_severity"].items()):
pct = (count / total) * 100
bar = "" * int(pct / 5)
lines.append(f" {sev:12} {count:4} ({pct:5.1f}%) {bar}")
lines.append("")
lines.append("By source:")
for src, count in sorted(summary["by_source"].items()):
lines.append(f" {src:20} {count:4}")
lines.append("")
avg_ms = summary.get("avg_response_time_ms", 0)
lines.append(f"Avg response time: {avg_ms:.0f}ms")
first = summary.get("first_detection")
last = summary.get("last_detection")
if first and last:
first_dt = datetime.fromtimestamp(first)
last_dt = datetime.fromtimestamp(last)
lines.append(f"First detection: {first_dt.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"Last detection: {last_dt.strftime('%Y-%m-%d %H:%M')}")
else:
lines.append("No crisis detections in this period.")
lines.append("")
lines.append("=" * 50)
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Crisis Detection Metrics CLI",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --summary Weekly summary report
%(prog)s --today Today only
%(prog)s --json Raw JSON export
%(prog)s --days 30 Last 30 days
""",
)
parser.add_argument("--summary", action="store_true", help="Show summary report")
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
parser.add_argument("--today", action="store_true", help="Today only (1 day)")
parser.add_argument("--days", type=int, default=7, help="Number of days (default: 7)")
parser.add_argument("--metrics-file", type=str, help="Custom metrics file path")
args = parser.parse_args()
if args.metrics_file:
global METRICS_FILE
METRICS_FILE = Path(args.metrics_file)
days = 1 if args.today else args.days
if args.json_output:
summary = get_metrics_summary(days)
print(json.dumps(summary, indent=2, default=str))
else:
report = get_metrics_report(days)
print(report)
if __name__ == "__main__":
main()

View File

@@ -1 +1,217 @@
...
#!/usr/bin/env python3
"""Crisis synthesizer for The Door.
Logs anonymized crisis interaction metadata, analyzes recurring indicator patterns,
and emits a weekly JSON report for human review.
Privacy rules:
- no message content
- no session identifiers
- no IP or user identifiers
- metadata only: level, indicators, response profile, continuation flag, message count
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable, List, Optional
LOG_DIR = Path.home() / ".hermes" / "the-door" / "logs"
EVENT_LOG = LOG_DIR / "crisis_events.jsonl"
ALLOWED_LEVELS = {"LOW", "MEDIUM", "HIGH", "CRITICAL"}
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _normalize_level(level: str) -> str:
upper = (level or "").strip().upper()
if upper not in ALLOWED_LEVELS:
raise ValueError(f"invalid crisis level: {level}")
return upper
def _normalize_indicators(indicators: Iterable[str]) -> List[str]:
cleaned = []
for indicator in indicators or []:
indicator = str(indicator).strip().lower()
if indicator:
cleaned.append(indicator)
return sorted(dict.fromkeys(cleaned))
def _coerce_log_path(log_path: Optional[os.PathLike | str]) -> Path:
return Path(log_path) if log_path else EVENT_LOG
def log_crisis_event(level, indicators, response_profile, user_continued, message_count=1, log_path=None):
path = _coerce_log_path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
event = {
"timestamp": _now_iso(),
"level": _normalize_level(level),
"indicators": _normalize_indicators(indicators),
"response_profile": str(response_profile).strip().lower(),
"user_continued": bool(user_continued),
"message_count": int(message_count),
}
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event, sort_keys=True) + "\n")
return event
def load_events(days=7, log_path=None):
path = _coerce_log_path(log_path)
if not path.exists():
return []
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
events = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
ts = datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00"))
if ts >= cutoff:
events.append(event)
except (json.JSONDecodeError, KeyError, ValueError):
continue
return events
def analyze_patterns(events):
counts_by_level = Counter()
indicator_counts = Counter()
continuation_by_indicator = defaultdict(lambda: {"continued": 0, "total": 0})
continuation_count = 0
for event in events:
level = _normalize_level(event.get("level", "MEDIUM"))
counts_by_level[level] += 1
if event.get("user_continued"):
continuation_count += 1
for indicator in _normalize_indicators(event.get("indicators", [])):
indicator_counts[indicator] += 1
continuation_by_indicator[indicator]["total"] += 1
if event.get("user_continued"):
continuation_by_indicator[indicator]["continued"] += 1
total = len(events)
continuation_rate = (continuation_count / total) if total else 0.0
false_positive_rate_estimate = 1.0 - continuation_rate if total else 0.0
indicator_frequency = indicator_counts.most_common()
indicator_continuation = {
indicator: {
"count": data["total"],
"continuation_rate": round(data["continued"] / data["total"], 3) if data["total"] else 0.0,
}
for indicator, data in continuation_by_indicator.items()
}
return {
"total_events": total,
"counts_by_level": dict(counts_by_level),
"indicator_frequency": indicator_frequency,
"indicator_continuation": indicator_continuation,
"continuation_rate": continuation_rate,
"false_positive_rate_estimate": false_positive_rate_estimate,
}
def suggest_keyword_adjustments(events, min_samples=5):
analysis = analyze_patterns(events)
suggestions = []
for indicator, stats in analysis["indicator_continuation"].items():
count = stats["count"]
if count < min_samples:
continue
rate = stats["continuation_rate"]
if rate >= 0.7:
suggestion = "increase"
reason = "indicator frequently appears in conversations that continue after intervention"
elif rate <= 0.35:
suggestion = "decrease"
reason = "indicator frequently appears in interactions that end quickly, suggesting possible false positives"
else:
suggestion = "hold"
reason = "indicator has mixed evidence and should remain stable pending more data"
suggestions.append(
{
"indicator": indicator,
"count": count,
"continuation_rate": rate,
"suggestion": suggestion,
"reason": reason,
}
)
suggestions.sort(key=lambda item: (-item["count"], item["indicator"]))
return suggestions
def weekly_report(days=7, log_path=None, min_samples=5):
events = load_events(days=days, log_path=log_path)
analysis = analyze_patterns(events)
suggestions = suggest_keyword_adjustments(events, min_samples=min_samples)
return {
"generated_at": _now_iso(),
"window_days": days,
"summary": analysis,
"suggestions": suggestions,
"privacy": {
"stores_user_content": False,
"stores_session_ids": False,
"stores_identifiers": False,
},
}
def parse_args(argv=None):
parser = argparse.ArgumentParser(description="Crisis synthesizer reporting tool")
sub = parser.add_subparsers(dest="command", required=True)
log_cmd = sub.add_parser("log", help="Log a crisis interaction metadata event")
log_cmd.add_argument("--level", required=True)
log_cmd.add_argument("--indicators", nargs="*", default=[])
log_cmd.add_argument("--response-profile", required=True)
log_cmd.add_argument("--continued", action="store_true")
log_cmd.add_argument("--message-count", type=int, default=1)
log_cmd.add_argument("--log-path")
report_cmd = sub.add_parser("report", help="Print a weekly JSON report to stdout")
report_cmd.add_argument("--days", type=int, default=7)
report_cmd.add_argument("--log-path")
report_cmd.add_argument("--min-samples", type=int, default=5)
return parser.parse_args(argv)
def main(argv=None):
args = parse_args(argv)
if args.command == "log":
event = log_crisis_event(
level=args.level,
indicators=args.indicators,
response_profile=args.response_profile,
user_continued=args.continued,
message_count=args.message_count,
log_path=args.log_path,
)
print(json.dumps(event, indent=2, sort_keys=True))
return 0
report = weekly_report(days=args.days, log_path=args.log_path, min_samples=args.min_samples)
print(json.dumps(report, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,152 @@
"""
Regression tests for issue #36: crisis_synthesizer.
Verifies anonymized event logging, pattern analysis, keyword adjustment
suggestions, and weekly JSON reporting.
"""
import io
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest import mock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from evolution import crisis_synthesizer
class TestCrisisSynthesizer(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
self.log_path = Path(self.tmpdir.name) / "crisis_events.jsonl"
def tearDown(self):
self.tmpdir.cleanup()
def test_log_crisis_event_stores_only_metadata(self):
event = crisis_synthesizer.log_crisis_event(
level="CRITICAL",
indicators=["kill myself", "plan tonight"],
response_profile="guardian",
user_continued=True,
message_count=3,
log_path=self.log_path,
)
self.assertEqual(event["level"], "CRITICAL")
self.assertTrue(self.log_path.exists())
rows = [json.loads(line) for line in self.log_path.read_text().splitlines() if line.strip()]
self.assertEqual(len(rows), 1)
row = rows[0]
self.assertEqual(
set(row.keys()),
{"timestamp", "level", "indicators", "response_profile", "user_continued", "message_count"},
)
self.assertNotIn("message", row)
self.assertNotIn("content", row)
self.assertNotIn("session_id", row)
def test_analyze_patterns_reports_counts_and_false_positive_estimate(self):
events = [
{
"timestamp": "2026-04-14T00:00:00+00:00",
"level": "CRITICAL",
"indicators": ["kill myself", "plan tonight"],
"response_profile": "guardian",
"user_continued": True,
"message_count": 4,
},
{
"timestamp": "2026-04-14T00:05:00+00:00",
"level": "HIGH",
"indicators": ["kill myself"],
"response_profile": "companion",
"user_continued": False,
"message_count": 2,
},
{
"timestamp": "2026-04-14T00:10:00+00:00",
"level": "HIGH",
"indicators": ["kill myself"],
"response_profile": "companion",
"user_continued": True,
"message_count": 3,
},
]
analysis = crisis_synthesizer.analyze_patterns(events)
self.assertEqual(analysis["total_events"], 3)
self.assertEqual(analysis["counts_by_level"]["HIGH"], 2)
self.assertEqual(analysis["counts_by_level"]["CRITICAL"], 1)
self.assertEqual(analysis["indicator_frequency"][0][0], "kill myself")
self.assertAlmostEqual(analysis["continuation_rate"], 2 / 3, places=3)
self.assertAlmostEqual(analysis["false_positive_rate_estimate"], 1 / 3, places=3)
def test_suggestion_engine_recommends_adjustments_after_enough_data(self):
events = []
for i in range(6):
events.append(
{
"timestamp": f"2026-04-14T00:0{i}:00+00:00",
"level": "HIGH",
"indicators": ["kill myself"],
"response_profile": "companion",
"user_continued": True,
"message_count": 2,
}
)
for i in range(6, 12):
events.append(
{
"timestamp": f"2026-04-14T00:{i}:00+00:00",
"level": "HIGH",
"indicators": ["rough day"],
"response_profile": "companion",
"user_continued": False,
"message_count": 1,
}
)
suggestions = crisis_synthesizer.suggest_keyword_adjustments(events, min_samples=5)
by_indicator = {item["indicator"]: item for item in suggestions}
self.assertEqual(by_indicator["kill myself"]["suggestion"], "increase")
self.assertEqual(by_indicator["rough day"]["suggestion"], "decrease")
def test_weekly_report_is_json_serializable_and_stdout_ready(self):
crisis_synthesizer.log_crisis_event(
level="HIGH",
indicators=["kill myself"],
response_profile="companion",
user_continued=True,
message_count=2,
log_path=self.log_path,
)
report = crisis_synthesizer.weekly_report(days=7, log_path=self.log_path)
rendered = json.dumps(report)
self.assertIn("summary", report)
self.assertIn("suggestions", report)
self.assertIn("counts_by_level", report["summary"])
self.assertIn("kill myself", rendered)
def test_cli_report_prints_json(self):
crisis_synthesizer.log_crisis_event(
level="HIGH",
indicators=["kill myself"],
response_profile="companion",
user_continued=True,
message_count=2,
log_path=self.log_path,
)
output = io.StringIO()
with mock.patch("sys.stdout", output):
crisis_synthesizer.main(["report", "--days", "7", "--log-path", str(self.log_path)])
parsed = json.loads(output.getvalue())
self.assertEqual(parsed["summary"]["total_events"], 1)
if __name__ == "__main__":
unittest.main()