Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
cbf2808b04 feat: build crisis synthesizer metadata pipeline (refs #36)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 6s
Smoke Test / smoke (pull_request) Successful in 18s
2026-04-14 22:27:06 -04:00
2 changed files with 369 additions and 1 deletions

View File

@@ -1 +1,217 @@
...
#!/usr/bin/env python3
"""Crisis synthesizer for The Door.
Logs anonymized crisis interaction metadata, analyzes recurring indicator patterns,
and emits a weekly JSON report for human review.
Privacy rules:
- no message content
- no session identifiers
- no IP or user identifiers
- metadata only: level, indicators, response profile, continuation flag, message count
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable, List, Optional
LOG_DIR = Path.home() / ".hermes" / "the-door" / "logs"
EVENT_LOG = LOG_DIR / "crisis_events.jsonl"
ALLOWED_LEVELS = {"LOW", "MEDIUM", "HIGH", "CRITICAL"}
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _normalize_level(level: str) -> str:
upper = (level or "").strip().upper()
if upper not in ALLOWED_LEVELS:
raise ValueError(f"invalid crisis level: {level}")
return upper
def _normalize_indicators(indicators: Iterable[str]) -> List[str]:
cleaned = []
for indicator in indicators or []:
indicator = str(indicator).strip().lower()
if indicator:
cleaned.append(indicator)
return sorted(dict.fromkeys(cleaned))
def _coerce_log_path(log_path: Optional[os.PathLike | str]) -> Path:
return Path(log_path) if log_path else EVENT_LOG
def log_crisis_event(level, indicators, response_profile, user_continued, message_count=1, log_path=None):
path = _coerce_log_path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
event = {
"timestamp": _now_iso(),
"level": _normalize_level(level),
"indicators": _normalize_indicators(indicators),
"response_profile": str(response_profile).strip().lower(),
"user_continued": bool(user_continued),
"message_count": int(message_count),
}
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event, sort_keys=True) + "\n")
return event
def load_events(days=7, log_path=None):
path = _coerce_log_path(log_path)
if not path.exists():
return []
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
events = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
ts = datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00"))
if ts >= cutoff:
events.append(event)
except (json.JSONDecodeError, KeyError, ValueError):
continue
return events
def analyze_patterns(events):
counts_by_level = Counter()
indicator_counts = Counter()
continuation_by_indicator = defaultdict(lambda: {"continued": 0, "total": 0})
continuation_count = 0
for event in events:
level = _normalize_level(event.get("level", "MEDIUM"))
counts_by_level[level] += 1
if event.get("user_continued"):
continuation_count += 1
for indicator in _normalize_indicators(event.get("indicators", [])):
indicator_counts[indicator] += 1
continuation_by_indicator[indicator]["total"] += 1
if event.get("user_continued"):
continuation_by_indicator[indicator]["continued"] += 1
total = len(events)
continuation_rate = (continuation_count / total) if total else 0.0
false_positive_rate_estimate = 1.0 - continuation_rate if total else 0.0
indicator_frequency = indicator_counts.most_common()
indicator_continuation = {
indicator: {
"count": data["total"],
"continuation_rate": round(data["continued"] / data["total"], 3) if data["total"] else 0.0,
}
for indicator, data in continuation_by_indicator.items()
}
return {
"total_events": total,
"counts_by_level": dict(counts_by_level),
"indicator_frequency": indicator_frequency,
"indicator_continuation": indicator_continuation,
"continuation_rate": continuation_rate,
"false_positive_rate_estimate": false_positive_rate_estimate,
}
def suggest_keyword_adjustments(events, min_samples=5):
analysis = analyze_patterns(events)
suggestions = []
for indicator, stats in analysis["indicator_continuation"].items():
count = stats["count"]
if count < min_samples:
continue
rate = stats["continuation_rate"]
if rate >= 0.7:
suggestion = "increase"
reason = "indicator frequently appears in conversations that continue after intervention"
elif rate <= 0.35:
suggestion = "decrease"
reason = "indicator frequently appears in interactions that end quickly, suggesting possible false positives"
else:
suggestion = "hold"
reason = "indicator has mixed evidence and should remain stable pending more data"
suggestions.append(
{
"indicator": indicator,
"count": count,
"continuation_rate": rate,
"suggestion": suggestion,
"reason": reason,
}
)
suggestions.sort(key=lambda item: (-item["count"], item["indicator"]))
return suggestions
def weekly_report(days=7, log_path=None, min_samples=5):
events = load_events(days=days, log_path=log_path)
analysis = analyze_patterns(events)
suggestions = suggest_keyword_adjustments(events, min_samples=min_samples)
return {
"generated_at": _now_iso(),
"window_days": days,
"summary": analysis,
"suggestions": suggestions,
"privacy": {
"stores_user_content": False,
"stores_session_ids": False,
"stores_identifiers": False,
},
}
def parse_args(argv=None):
parser = argparse.ArgumentParser(description="Crisis synthesizer reporting tool")
sub = parser.add_subparsers(dest="command", required=True)
log_cmd = sub.add_parser("log", help="Log a crisis interaction metadata event")
log_cmd.add_argument("--level", required=True)
log_cmd.add_argument("--indicators", nargs="*", default=[])
log_cmd.add_argument("--response-profile", required=True)
log_cmd.add_argument("--continued", action="store_true")
log_cmd.add_argument("--message-count", type=int, default=1)
log_cmd.add_argument("--log-path")
report_cmd = sub.add_parser("report", help="Print a weekly JSON report to stdout")
report_cmd.add_argument("--days", type=int, default=7)
report_cmd.add_argument("--log-path")
report_cmd.add_argument("--min-samples", type=int, default=5)
return parser.parse_args(argv)
def main(argv=None):
args = parse_args(argv)
if args.command == "log":
event = log_crisis_event(
level=args.level,
indicators=args.indicators,
response_profile=args.response_profile,
user_continued=args.continued,
message_count=args.message_count,
log_path=args.log_path,
)
print(json.dumps(event, indent=2, sort_keys=True))
return 0
report = weekly_report(days=args.days, log_path=args.log_path, min_samples=args.min_samples)
print(json.dumps(report, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,152 @@
"""
Regression tests for issue #36: crisis_synthesizer.
Verifies anonymized event logging, pattern analysis, keyword adjustment
suggestions, and weekly JSON reporting.
"""
import io
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest import mock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from evolution import crisis_synthesizer
class TestCrisisSynthesizer(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
self.log_path = Path(self.tmpdir.name) / "crisis_events.jsonl"
def tearDown(self):
self.tmpdir.cleanup()
def test_log_crisis_event_stores_only_metadata(self):
event = crisis_synthesizer.log_crisis_event(
level="CRITICAL",
indicators=["kill myself", "plan tonight"],
response_profile="guardian",
user_continued=True,
message_count=3,
log_path=self.log_path,
)
self.assertEqual(event["level"], "CRITICAL")
self.assertTrue(self.log_path.exists())
rows = [json.loads(line) for line in self.log_path.read_text().splitlines() if line.strip()]
self.assertEqual(len(rows), 1)
row = rows[0]
self.assertEqual(
set(row.keys()),
{"timestamp", "level", "indicators", "response_profile", "user_continued", "message_count"},
)
self.assertNotIn("message", row)
self.assertNotIn("content", row)
self.assertNotIn("session_id", row)
def test_analyze_patterns_reports_counts_and_false_positive_estimate(self):
events = [
{
"timestamp": "2026-04-14T00:00:00+00:00",
"level": "CRITICAL",
"indicators": ["kill myself", "plan tonight"],
"response_profile": "guardian",
"user_continued": True,
"message_count": 4,
},
{
"timestamp": "2026-04-14T00:05:00+00:00",
"level": "HIGH",
"indicators": ["kill myself"],
"response_profile": "companion",
"user_continued": False,
"message_count": 2,
},
{
"timestamp": "2026-04-14T00:10:00+00:00",
"level": "HIGH",
"indicators": ["kill myself"],
"response_profile": "companion",
"user_continued": True,
"message_count": 3,
},
]
analysis = crisis_synthesizer.analyze_patterns(events)
self.assertEqual(analysis["total_events"], 3)
self.assertEqual(analysis["counts_by_level"]["HIGH"], 2)
self.assertEqual(analysis["counts_by_level"]["CRITICAL"], 1)
self.assertEqual(analysis["indicator_frequency"][0][0], "kill myself")
self.assertAlmostEqual(analysis["continuation_rate"], 2 / 3, places=3)
self.assertAlmostEqual(analysis["false_positive_rate_estimate"], 1 / 3, places=3)
def test_suggestion_engine_recommends_adjustments_after_enough_data(self):
events = []
for i in range(6):
events.append(
{
"timestamp": f"2026-04-14T00:0{i}:00+00:00",
"level": "HIGH",
"indicators": ["kill myself"],
"response_profile": "companion",
"user_continued": True,
"message_count": 2,
}
)
for i in range(6, 12):
events.append(
{
"timestamp": f"2026-04-14T00:{i}:00+00:00",
"level": "HIGH",
"indicators": ["rough day"],
"response_profile": "companion",
"user_continued": False,
"message_count": 1,
}
)
suggestions = crisis_synthesizer.suggest_keyword_adjustments(events, min_samples=5)
by_indicator = {item["indicator"]: item for item in suggestions}
self.assertEqual(by_indicator["kill myself"]["suggestion"], "increase")
self.assertEqual(by_indicator["rough day"]["suggestion"], "decrease")
def test_weekly_report_is_json_serializable_and_stdout_ready(self):
crisis_synthesizer.log_crisis_event(
level="HIGH",
indicators=["kill myself"],
response_profile="companion",
user_continued=True,
message_count=2,
log_path=self.log_path,
)
report = crisis_synthesizer.weekly_report(days=7, log_path=self.log_path)
rendered = json.dumps(report)
self.assertIn("summary", report)
self.assertIn("suggestions", report)
self.assertIn("counts_by_level", report["summary"])
self.assertIn("kill myself", rendered)
def test_cli_report_prints_json(self):
crisis_synthesizer.log_crisis_event(
level="HIGH",
indicators=["kill myself"],
response_profile="companion",
user_continued=True,
message_count=2,
log_path=self.log_path,
)
output = io.StringIO()
with mock.patch("sys.stdout", output):
crisis_synthesizer.main(["report", "--days", "7", "--log-path", str(self.log_path)])
parsed = json.loads(output.getvalue())
self.assertEqual(parsed["summary"]["total_events"], 1)
if __name__ == "__main__":
unittest.main()