Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
cbf2808b04 feat: build crisis synthesizer metadata pipeline (refs #36)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 6s
Smoke Test / smoke (pull_request) Successful in 18s
2026-04-14 22:27:06 -04:00
3 changed files with 369 additions and 76 deletions

View File

@@ -1,75 +0,0 @@
# GENOME.md — the-door
**Generated:** 2026-04-14
**Repo:** Timmy_Foundation/the-door
**Description:** Crisis Front Door — a single URL where a man at 3am can talk to Timmy. No login, no signup. 988 always visible.
---
## Project Overview
The-door is a crisis intervention web application — the most sacred surface in the Timmy Foundation. When a man at 3am reaches the end of his road, this is where he lands. No login, no signup, no barriers. 988 Suicide and Crisis Lifeline always visible. The "When a Man Is Dying" protocol active on every page.
## Architecture
```
the-door/
├── index.html # Main crisis page (PWA-capable)
├── crisis-offline.html # Offline fallback (service worker cached)
├── about.html # About page
├── testimony.html # Testimony/stories page
├── sw.js # Service worker (offline-first)
├── manifest.json # PWA manifest
├── crisis/ # Core crisis detection + response
│ ├── detect.py # Keyword/pattern detection (4 tiers)
│ ├── gateway.py # API endpoints, prompt injection
│ ├── response.py # Response generation, 988 routing
│ ├── compassion_router.py # Profile-based response routing
│ ├── profiles.py # Compassion profiles
│ └── PROTOCOL.md # The protocol (SOUL.md reference)
├── crisis_detector.py # Legacy shim → crisis/detect.py
├── crisis_responder.py # Legacy responder
├── dying_detection/ # Deprecated module
├── evolution/ # Crisis synthesizer (creative)
├── tests/ # Safety-critical tests
│ ├── test_crisis_overlay_focus_trap.py
│ ├── test_dying_detection_deprecation.py
│ └── test_false_positive_fixes.py
└── deploy/ # Deployment docs
```
## Key Abstractions
| Module | Purpose |
|---|---|
| `crisis/detect.py` | 4-tier detection: LOW/MEDIUM/HIGH/CRITICAL via regex patterns |
| `crisis/gateway.py` | HTTP API, Sovereign Heart prompt injection |
| `crisis/response.py` | Response generation, 988 integration, escalation |
| `crisis/compassion_router.py` | Profile-based routing (different crisis types) |
| `sw.js` | Service worker for offline-first PWA |
## Safety Constraints
- **The-door never auto-closes PRs** (in fleet-ops exempt list)
- **988 always visible** on every page, even offline
- **When a Man Is Dying protocol** active on every interaction
- **No login/signup** — zero barriers to crisis support
- **Offline-first** — service worker caches critical pages
## Test Coverage
| Test | Coverage |
|---|---|
| Crisis overlay focus trap | ✅ |
| Dying detection deprecation | ✅ |
| False positive fixes | ✅ |
| Crisis detection tiers | ❌ (in crisis/tests.py) |
| Response generation | ❌ |
| Offline service worker | ❌ |
## Security
- No user data stored (crisis intervention is stateless by design)
- No cookies, no tracking, no analytics
- Service worker only caches static assets
- Crisis detection runs client-side where possible

View File

@@ -1 +1,217 @@
...
#!/usr/bin/env python3
"""Crisis synthesizer for The Door.
Logs anonymized crisis interaction metadata, analyzes recurring indicator patterns,
and emits a weekly JSON report for human review.
Privacy rules:
- no message content
- no session identifiers
- no IP or user identifiers
- metadata only: level, indicators, response profile, continuation flag, message count
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable, List, Optional
LOG_DIR = Path.home() / ".hermes" / "the-door" / "logs"
EVENT_LOG = LOG_DIR / "crisis_events.jsonl"
ALLOWED_LEVELS = {"LOW", "MEDIUM", "HIGH", "CRITICAL"}
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _normalize_level(level: str) -> str:
upper = (level or "").strip().upper()
if upper not in ALLOWED_LEVELS:
raise ValueError(f"invalid crisis level: {level}")
return upper
def _normalize_indicators(indicators: Iterable[str]) -> List[str]:
cleaned = []
for indicator in indicators or []:
indicator = str(indicator).strip().lower()
if indicator:
cleaned.append(indicator)
return sorted(dict.fromkeys(cleaned))
def _coerce_log_path(log_path: Optional[os.PathLike | str]) -> Path:
return Path(log_path) if log_path else EVENT_LOG
def log_crisis_event(level, indicators, response_profile, user_continued, message_count=1, log_path=None):
path = _coerce_log_path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
event = {
"timestamp": _now_iso(),
"level": _normalize_level(level),
"indicators": _normalize_indicators(indicators),
"response_profile": str(response_profile).strip().lower(),
"user_continued": bool(user_continued),
"message_count": int(message_count),
}
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(event, sort_keys=True) + "\n")
return event
def load_events(days=7, log_path=None):
path = _coerce_log_path(log_path)
if not path.exists():
return []
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
events = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
ts = datetime.fromisoformat(event["timestamp"].replace("Z", "+00:00"))
if ts >= cutoff:
events.append(event)
except (json.JSONDecodeError, KeyError, ValueError):
continue
return events
def analyze_patterns(events):
counts_by_level = Counter()
indicator_counts = Counter()
continuation_by_indicator = defaultdict(lambda: {"continued": 0, "total": 0})
continuation_count = 0
for event in events:
level = _normalize_level(event.get("level", "MEDIUM"))
counts_by_level[level] += 1
if event.get("user_continued"):
continuation_count += 1
for indicator in _normalize_indicators(event.get("indicators", [])):
indicator_counts[indicator] += 1
continuation_by_indicator[indicator]["total"] += 1
if event.get("user_continued"):
continuation_by_indicator[indicator]["continued"] += 1
total = len(events)
continuation_rate = (continuation_count / total) if total else 0.0
false_positive_rate_estimate = 1.0 - continuation_rate if total else 0.0
indicator_frequency = indicator_counts.most_common()
indicator_continuation = {
indicator: {
"count": data["total"],
"continuation_rate": round(data["continued"] / data["total"], 3) if data["total"] else 0.0,
}
for indicator, data in continuation_by_indicator.items()
}
return {
"total_events": total,
"counts_by_level": dict(counts_by_level),
"indicator_frequency": indicator_frequency,
"indicator_continuation": indicator_continuation,
"continuation_rate": continuation_rate,
"false_positive_rate_estimate": false_positive_rate_estimate,
}
def suggest_keyword_adjustments(events, min_samples=5):
analysis = analyze_patterns(events)
suggestions = []
for indicator, stats in analysis["indicator_continuation"].items():
count = stats["count"]
if count < min_samples:
continue
rate = stats["continuation_rate"]
if rate >= 0.7:
suggestion = "increase"
reason = "indicator frequently appears in conversations that continue after intervention"
elif rate <= 0.35:
suggestion = "decrease"
reason = "indicator frequently appears in interactions that end quickly, suggesting possible false positives"
else:
suggestion = "hold"
reason = "indicator has mixed evidence and should remain stable pending more data"
suggestions.append(
{
"indicator": indicator,
"count": count,
"continuation_rate": rate,
"suggestion": suggestion,
"reason": reason,
}
)
suggestions.sort(key=lambda item: (-item["count"], item["indicator"]))
return suggestions
def weekly_report(days=7, log_path=None, min_samples=5):
events = load_events(days=days, log_path=log_path)
analysis = analyze_patterns(events)
suggestions = suggest_keyword_adjustments(events, min_samples=min_samples)
return {
"generated_at": _now_iso(),
"window_days": days,
"summary": analysis,
"suggestions": suggestions,
"privacy": {
"stores_user_content": False,
"stores_session_ids": False,
"stores_identifiers": False,
},
}
def parse_args(argv=None):
parser = argparse.ArgumentParser(description="Crisis synthesizer reporting tool")
sub = parser.add_subparsers(dest="command", required=True)
log_cmd = sub.add_parser("log", help="Log a crisis interaction metadata event")
log_cmd.add_argument("--level", required=True)
log_cmd.add_argument("--indicators", nargs="*", default=[])
log_cmd.add_argument("--response-profile", required=True)
log_cmd.add_argument("--continued", action="store_true")
log_cmd.add_argument("--message-count", type=int, default=1)
log_cmd.add_argument("--log-path")
report_cmd = sub.add_parser("report", help="Print a weekly JSON report to stdout")
report_cmd.add_argument("--days", type=int, default=7)
report_cmd.add_argument("--log-path")
report_cmd.add_argument("--min-samples", type=int, default=5)
return parser.parse_args(argv)
def main(argv=None):
args = parse_args(argv)
if args.command == "log":
event = log_crisis_event(
level=args.level,
indicators=args.indicators,
response_profile=args.response_profile,
user_continued=args.continued,
message_count=args.message_count,
log_path=args.log_path,
)
print(json.dumps(event, indent=2, sort_keys=True))
return 0
report = weekly_report(days=args.days, log_path=args.log_path, min_samples=args.min_samples)
print(json.dumps(report, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,152 @@
"""
Regression tests for issue #36: crisis_synthesizer.
Verifies anonymized event logging, pattern analysis, keyword adjustment
suggestions, and weekly JSON reporting.
"""
import io
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest import mock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from evolution import crisis_synthesizer
class TestCrisisSynthesizer(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
self.log_path = Path(self.tmpdir.name) / "crisis_events.jsonl"
def tearDown(self):
self.tmpdir.cleanup()
def test_log_crisis_event_stores_only_metadata(self):
event = crisis_synthesizer.log_crisis_event(
level="CRITICAL",
indicators=["kill myself", "plan tonight"],
response_profile="guardian",
user_continued=True,
message_count=3,
log_path=self.log_path,
)
self.assertEqual(event["level"], "CRITICAL")
self.assertTrue(self.log_path.exists())
rows = [json.loads(line) for line in self.log_path.read_text().splitlines() if line.strip()]
self.assertEqual(len(rows), 1)
row = rows[0]
self.assertEqual(
set(row.keys()),
{"timestamp", "level", "indicators", "response_profile", "user_continued", "message_count"},
)
self.assertNotIn("message", row)
self.assertNotIn("content", row)
self.assertNotIn("session_id", row)
def test_analyze_patterns_reports_counts_and_false_positive_estimate(self):
events = [
{
"timestamp": "2026-04-14T00:00:00+00:00",
"level": "CRITICAL",
"indicators": ["kill myself", "plan tonight"],
"response_profile": "guardian",
"user_continued": True,
"message_count": 4,
},
{
"timestamp": "2026-04-14T00:05:00+00:00",
"level": "HIGH",
"indicators": ["kill myself"],
"response_profile": "companion",
"user_continued": False,
"message_count": 2,
},
{
"timestamp": "2026-04-14T00:10:00+00:00",
"level": "HIGH",
"indicators": ["kill myself"],
"response_profile": "companion",
"user_continued": True,
"message_count": 3,
},
]
analysis = crisis_synthesizer.analyze_patterns(events)
self.assertEqual(analysis["total_events"], 3)
self.assertEqual(analysis["counts_by_level"]["HIGH"], 2)
self.assertEqual(analysis["counts_by_level"]["CRITICAL"], 1)
self.assertEqual(analysis["indicator_frequency"][0][0], "kill myself")
self.assertAlmostEqual(analysis["continuation_rate"], 2 / 3, places=3)
self.assertAlmostEqual(analysis["false_positive_rate_estimate"], 1 / 3, places=3)
def test_suggestion_engine_recommends_adjustments_after_enough_data(self):
events = []
for i in range(6):
events.append(
{
"timestamp": f"2026-04-14T00:0{i}:00+00:00",
"level": "HIGH",
"indicators": ["kill myself"],
"response_profile": "companion",
"user_continued": True,
"message_count": 2,
}
)
for i in range(6, 12):
events.append(
{
"timestamp": f"2026-04-14T00:{i}:00+00:00",
"level": "HIGH",
"indicators": ["rough day"],
"response_profile": "companion",
"user_continued": False,
"message_count": 1,
}
)
suggestions = crisis_synthesizer.suggest_keyword_adjustments(events, min_samples=5)
by_indicator = {item["indicator"]: item for item in suggestions}
self.assertEqual(by_indicator["kill myself"]["suggestion"], "increase")
self.assertEqual(by_indicator["rough day"]["suggestion"], "decrease")
def test_weekly_report_is_json_serializable_and_stdout_ready(self):
crisis_synthesizer.log_crisis_event(
level="HIGH",
indicators=["kill myself"],
response_profile="companion",
user_continued=True,
message_count=2,
log_path=self.log_path,
)
report = crisis_synthesizer.weekly_report(days=7, log_path=self.log_path)
rendered = json.dumps(report)
self.assertIn("summary", report)
self.assertIn("suggestions", report)
self.assertIn("counts_by_level", report["summary"])
self.assertIn("kill myself", rendered)
def test_cli_report_prints_json(self):
crisis_synthesizer.log_crisis_event(
level="HIGH",
indicators=["kill myself"],
response_profile="companion",
user_continued=True,
message_count=2,
log_path=self.log_path,
)
output = io.StringIO()
with mock.patch("sys.stdout", output):
crisis_synthesizer.main(["report", "--days", "7", "--log-path", str(self.log_path)])
parsed = json.loads(output.getvalue())
self.assertEqual(parsed["summary"]["total_events"], 1)
if __name__ == "__main__":
unittest.main()