Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
15c31a56a7 feat: wire crisis synthesizer into responder pipeline for #121
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 10s
Smoke Test / smoke (pull_request) Successful in 19s
2026-04-22 02:00:18 -04:00
Alexander Whitestone
57cfbb3940 test: define crisis synthesizer integration for #121 2026-04-22 02:00:06 -04:00
7 changed files with 314 additions and 338 deletions

View File

@@ -0,0 +1,139 @@
"""Crisis synthesizer integration for the responder pipeline.
Privacy-first and opt-in. Writes append-only JSONL events that feed the
existing crisis_synthesizer reporting flow.
"""
from __future__ import annotations
import hashlib
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Optional
from evolution.crisis_synthesizer import DEFAULT_LOG_PATH, append_interaction_event
logger = logging.getLogger(__name__)
SYNTH_ENABLED = os.environ.get("CRISIS_SYNTH_ENABLED", "0") == "1"
SYNTH_LOG_PATH = Path(os.environ.get("CRISIS_SYNTH_LOG_PATH", str(DEFAULT_LOG_PATH)))
class CrisisSynthesizerIntegration:
"""Append-only bridge between responder events and the synthesizer log."""
def __init__(
self,
enabled: Optional[bool] = None,
log_dir: Optional[Path] = None,
log_path: Optional[Path] = None,
):
self.enabled = SYNTH_ENABLED if enabled is None else bool(enabled)
if log_path is not None:
self.log_file = Path(log_path)
elif log_dir is not None:
self.log_file = Path(log_dir) / "events.jsonl"
else:
self.log_file = SYNTH_LOG_PATH
self.log_file.parent.mkdir(parents=True, exist_ok=True)
try:
os.chmod(self.log_file.parent, 0o700)
except OSError:
pass
def log_crisis_event(
self,
*,
level: str,
matched_keywords: list[str],
response_type: str,
session_id: str = "",
user_continued: bool = False,
metadata: Optional[dict[str, Any]] = None,
) -> bool:
if not self.enabled:
return False
try:
append_interaction_event(
self.log_file,
level=level,
indicators=list(matched_keywords),
response_given=response_type,
continued_conversation=user_continued,
false_positive=False,
session_hash=self._hash_session(session_id) if session_id else "",
response_type=response_type,
metadata=metadata or {},
)
self._lock_down_log_file()
return True
except Exception as exc:
logger.warning("Failed to log crisis synthesizer event: %s", exc)
return False
def log_user_continued(self, session_id: str) -> bool:
if not self.enabled or not session_id or not self.log_file.exists():
return False
session_hash = self._hash_session(session_id)
saw_prior_crisis = False
try:
with self.log_file.open("r", encoding="utf-8") as handle:
for raw in handle:
raw = raw.strip()
if not raw:
continue
event = json.loads(raw)
if (
event.get("event_type", "crisis_interaction") == "crisis_interaction"
and event.get("session_hash") == session_hash
):
saw_prior_crisis = True
if not saw_prior_crisis:
return False
continuation = {
"event_type": "continuation",
"timestamp": float(time.time()),
"session_hash": session_hash,
"user_continued": True,
}
with self.log_file.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(continuation) + "\n")
self._lock_down_log_file()
return True
except Exception as exc:
logger.warning("Failed to append crisis continuation marker: %s", exc)
return False
@staticmethod
def _hash_session(session_id: str) -> str:
return hashlib.sha256(session_id.encode("utf-8")).hexdigest()[:16]
def _lock_down_log_file(self) -> None:
try:
os.chmod(self.log_file, 0o600)
except OSError:
pass
_integration: Optional[CrisisSynthesizerIntegration] = None
def get_integration() -> CrisisSynthesizerIntegration:
global _integration
if _integration is None:
_integration = CrisisSynthesizerIntegration()
return _integration
def log_crisis_event(**kwargs: Any) -> bool:
return get_integration().log_crisis_event(**kwargs)
def log_user_continued(session_id: str) -> bool:
return get_integration().log_user_continued(session_id)

View File

@@ -21,10 +21,13 @@ Usage:
"""
import random
import threading
import uuid
from dataclasses import dataclass, field
from typing import List, Optional
from crisis_detector import CrisisResult
from crisis.synthesizer_integration import get_integration
# ── Core values (from SOUL.md / system-prompt.txt) ──────────────
@@ -137,6 +140,26 @@ class CrisisResponder:
- Presence over brevity — stay as long as they need
"""
def __init__(
self,
synth_integration=None,
session_id: str = "",
async_synth_logging: bool = True,
):
self._synth = synth_integration if synth_integration is not None else get_integration()
self._session_id = session_id or uuid.uuid4().hex
self._async_synth_logging = async_synth_logging
self._awaiting_continuation = False
def _run_synth_task(self, fn, *args, **kwargs):
if not self._synth or not getattr(self._synth, "enabled", False):
return
if self._async_synth_logging:
thread = threading.Thread(target=fn, args=args, kwargs=kwargs, daemon=True)
thread.start()
else:
fn(*args, **kwargs)
def respond(self, detection: CrisisResult) -> CrisisResponse:
"""
Generate a crisis response based on detection results.
@@ -150,20 +173,40 @@ class CrisisResponder:
level = detection.risk_level
if level == "CRITICAL":
return self._respond_critical(detection)
response = self._respond_critical(detection)
elif level == "HIGH":
return self._respond_high(detection)
response = self._respond_high(detection)
elif level == "MODERATE":
return self._respond_moderate(detection)
response = self._respond_moderate(detection)
elif level == "LOW":
return self._respond_low(detection)
response = self._respond_low(detection)
else:
return CrisisResponse(
response = CrisisResponse(
message="",
risk_level="NONE",
escalate=False,
)
if level != "NONE":
self._run_synth_task(
self._synth.log_crisis_event,
level=level,
matched_keywords=list(detection.matched_keywords),
response_type=response.risk_level,
session_id=self._session_id,
user_continued=False,
metadata={
"keyword_count": len(detection.matched_keywords),
"score": detection.score,
},
)
self._awaiting_continuation = True
elif self._awaiting_continuation:
self._run_synth_task(self._synth.log_user_continued, self._session_id)
self._awaiting_continuation = False
return response
def _respond_critical(self, detection: CrisisResult) -> CrisisResponse:
"""
CRITICAL response protocol:

1
evolution/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Evolution package for learning-oriented the-door modules."""

View File

@@ -26,15 +26,25 @@ def build_interaction_event(
false_positive: bool,
*,
now: float | None = None,
session_hash: str = "",
response_type: str | None = None,
metadata: dict | None = None,
) -> dict:
indicator_list = list(indicators)
return {
"event_type": "crisis_interaction",
"timestamp": float(time.time() if now is None else now),
"level": level,
"indicators": list(indicators),
"indicator_count": len(indicators),
"indicators": indicator_list,
"matched_keywords": indicator_list,
"indicator_count": len(indicator_list),
"response_given": response_given,
"response_type": response_type or response_given,
"session_hash": session_hash,
"continued_conversation": bool(continued_conversation),
"user_continued": bool(continued_conversation),
"false_positive": bool(false_positive),
"metadata": metadata or {},
}
@@ -47,6 +57,9 @@ def append_interaction_event(
continued_conversation: bool,
false_positive: bool,
now: float | None = None,
session_hash: str = "",
response_type: str | None = None,
metadata: dict | None = None,
) -> dict:
event = build_interaction_event(
level,
@@ -55,6 +68,9 @@ def append_interaction_event(
continued_conversation,
false_positive,
now=now,
session_hash=session_hash,
response_type=response_type,
metadata=metadata,
)
path = Path(log_path)
path.parent.mkdir(parents=True, exist_ok=True)
@@ -67,11 +83,25 @@ def load_interaction_events(log_path: str | Path) -> list[dict]:
path = Path(log_path)
if not path.exists():
return []
events = []
events: list[dict] = []
for line in path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
events.append(json.loads(line))
event = json.loads(line)
event_type = event.get("event_type", "crisis_interaction")
if event_type == "continuation":
session_hash = event.get("session_hash", "")
for prior in reversed(events):
if (
prior.get("event_type", "crisis_interaction") == "crisis_interaction"
and prior.get("session_hash", "") == session_hash
):
prior["continued_conversation"] = True
prior["user_continued"] = True
prior["continued_at"] = event.get("timestamp")
break
continue
events.append(event)
return events

View File

@@ -283,44 +283,6 @@ html, body {
outline-offset: 2px;
}
#crisis-session-status {
padding: 10px 12px;
border-bottom: 1px solid #21262d;
background: rgba(255, 95, 95, 0.06);
}
#crisis-session-status[hidden] {
display: none;
}
#crisis-session-summary {
margin: 0 0 8px;
color: #ffd7d7;
font-size: 0.84rem;
}
#crisis-history-list {
margin: 0;
padding-left: 18px;
color: #f4cccc;
font-size: 0.8rem;
}
#crisis-history-list li {
margin-bottom: 4px;
line-height: 1.5;
}
#crisis-history-list .source-badge {
display: inline-block;
min-width: 78px;
margin-right: 6px;
color: #ff9f9f;
font-size: 0.72rem;
text-transform: uppercase;
letter-spacing: 0.05em;
}
/* ===== CHAT AREA ===== */
#chat-area {
flex: 1;
@@ -737,11 +699,6 @@ html, body {
</button>
</div>
<div id="crisis-session-status" aria-live="polite" hidden>
<p id="crisis-session-summary">No active session escalations yet.</p>
<ul id="crisis-history-list" aria-label="Unified escalation history"></ul>
</div>
<!-- Chat messages -->
<div id="chat-area" role="log" aria-label="Chat messages" aria-live="polite" tabindex="0">
<!-- Messages inserted here -->
@@ -906,9 +863,6 @@ Sovereignty and service always.`;
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
var crisisResourcesBtn = document.getElementById('crisis-resources-btn');
var crisisSessionStatus = document.getElementById('crisis-session-status');
var crisisSessionSummary = document.getElementById('crisis-session-summary');
var crisisHistoryList = document.getElementById('crisis-history-list');
// Safety Plan Elements
var safetyPlanBtn = document.getElementById('safety-plan-btn');
@@ -923,16 +877,11 @@ Sovereignty and service always.`;
// ===== STATE =====
var messages = [];
var isStreaming = false;
var lastUserMessage = '';
var overlayTimer = null;
var crisisPanelShown = false;
var CRISIS_OVERLAY_COOLDOWN_MS = 10 * 60 * 1000;
var CRISIS_OVERLAY_LAST_SHOWN_KEY = 'timmy_crisis_overlay_last_shown_at';
var CRISIS_OVERLAY_EVENT_LOG_KEY = 'timmy_crisis_overlay_event_log';
var CRISIS_SESSION_KEY = 'timmy_crisis_session';
var CRISIS_HISTORY_KEY = 'timmy_crisis_history';
var CRISIS_LEVEL_ORDER = { NONE: 0, LOW: 1, MEDIUM: 2, HIGH: 3, CRITICAL: 4 };
var sessionCrisis = loadSessionCrisisState();
// ===== SERVICE WORKER =====
if ('serviceWorker' in navigator) {
@@ -1109,210 +1058,6 @@ Sovereignty and service always.`;
return 0;
}
function defaultSessionCrisisState() {
return {
sessionId: null,
currentLevel: 'NONE',
peakLevel: 'NONE',
messageCount: 0,
history: [],
gatewayEscalated: false,
lastUpdatedAt: null
};
}
function getOrCreateCrisisSessionId() {
try {
var existing = localStorage.getItem(CRISIS_SESSION_KEY);
if (existing) return existing;
var created = (window.crypto && typeof window.crypto.randomUUID === 'function')
? window.crypto.randomUUID()
: 'the-door-' + Date.now() + '-' + Math.random().toString(36).slice(2, 10);
localStorage.setItem(CRISIS_SESSION_KEY, created);
return created;
} catch (e) {
return 'the-door-ephemeral';
}
}
function normalizeCrisisLevel(level) {
if (level === 2 || level === '2') return 'CRITICAL';
if (level === 1 || level === '1') return 'MEDIUM';
if (level === 0 || level === '0' || !level) return 'NONE';
var upper = String(level).toUpperCase();
return CRISIS_LEVEL_ORDER.hasOwnProperty(upper) ? upper : 'NONE';
}
function loadSessionCrisisState() {
var state = defaultSessionCrisisState();
state.sessionId = getOrCreateCrisisSessionId();
try {
var saved = localStorage.getItem(CRISIS_HISTORY_KEY);
if (!saved) return state;
var parsed = JSON.parse(saved);
if (!parsed || typeof parsed !== 'object') return state;
state.currentLevel = normalizeCrisisLevel(parsed.currentLevel);
state.peakLevel = normalizeCrisisLevel(parsed.peakLevel);
state.messageCount = Number(parsed.messageCount || 0);
state.gatewayEscalated = !!parsed.gatewayEscalated;
state.lastUpdatedAt = parsed.lastUpdatedAt || null;
state.history = Array.isArray(parsed.history) ? parsed.history.map(function(entry) {
return {
source: entry.source || 'the-door',
level: normalizeCrisisLevel(entry.level),
kind: entry.kind || 'signal',
detail: entry.detail || '',
at: entry.at || Date.now()
};
}) : [];
} catch (e) {}
return state;
}
function saveSessionCrisisState() {
try {
localStorage.setItem(CRISIS_SESSION_KEY, sessionCrisis.sessionId || getOrCreateCrisisSessionId());
localStorage.setItem(CRISIS_HISTORY_KEY, JSON.stringify({
currentLevel: sessionCrisis.currentLevel,
peakLevel: sessionCrisis.peakLevel,
messageCount: sessionCrisis.messageCount,
gatewayEscalated: sessionCrisis.gatewayEscalated,
lastUpdatedAt: sessionCrisis.lastUpdatedAt,
history: sessionCrisis.history.slice(-12)
}));
} catch (e) {}
}
function updateSessionCrisisLevels(level) {
var normalized = normalizeCrisisLevel(level);
sessionCrisis.currentLevel = normalized;
if (CRISIS_LEVEL_ORDER[normalized] > CRISIS_LEVEL_ORDER[sessionCrisis.peakLevel]) {
sessionCrisis.peakLevel = normalized;
}
sessionCrisis.lastUpdatedAt = Date.now();
}
function renderCrisisSessionStatus() {
if (!crisisSessionStatus || !crisisSessionSummary || !crisisHistoryList) return;
var history = sessionCrisis.history.slice(-5);
var shouldShow = history.length > 0 || sessionCrisis.gatewayEscalated || sessionCrisis.currentLevel !== 'NONE';
crisisSessionStatus.hidden = !shouldShow;
crisisHistoryList.innerHTML = '';
if (!shouldShow) {
crisisSessionSummary.textContent = 'No active session escalations yet.';
return;
}
var summary = [];
if (sessionCrisis.gatewayEscalated) {
summary.push('Hermes is tracking an active escalation.');
}
if (sessionCrisis.currentLevel !== 'NONE') {
summary.push('Current level: ' + sessionCrisis.currentLevel + '.');
}
if (sessionCrisis.peakLevel !== 'NONE' && sessionCrisis.peakLevel !== sessionCrisis.currentLevel) {
summary.push('Peak: ' + sessionCrisis.peakLevel + '.');
}
summary.push('Session ID: ' + getOrCreateCrisisSessionId().slice(0, 8) + '…');
crisisSessionSummary.textContent = summary.join(' ');
history.forEach(function(entry) {
var li = document.createElement('li');
var badge = document.createElement('span');
badge.className = 'source-badge';
badge.textContent = entry.source;
li.appendChild(badge);
li.appendChild(document.createTextNode(entry.level + ' — ' + entry.detail));
crisisHistoryList.appendChild(li);
});
}
function appendCrisisHistoryEvent(source, level, kind, detail, at) {
var normalized = normalizeCrisisLevel(level);
var event = {
source: source || 'the-door',
level: normalized,
kind: kind || 'signal',
detail: detail || '',
at: at || Date.now()
};
var last = sessionCrisis.history.length ? sessionCrisis.history[sessionCrisis.history.length - 1] : null;
if (!last || last.source !== event.source || last.level !== event.level || last.kind !== event.kind || last.detail !== event.detail) {
sessionCrisis.history.push(event);
if (sessionCrisis.history.length > 12) {
sessionCrisis.history = sessionCrisis.history.slice(-12);
}
}
updateSessionCrisisLevels(normalized);
saveSessionCrisisState();
renderCrisisSessionStatus();
}
function trackCrisis(text, actor) {
sessionCrisis.messageCount += 1;
var level = normalizeCrisisLevel(getCrisisLevel(text));
if (level !== 'NONE') {
var detail = actor === 'assistant'
? 'assistant response carried crisis language'
: 'client detected crisis language before gateway handoff';
appendCrisisHistoryEvent('the-door', level, actor || 'message', detail, Date.now());
return;
}
updateSessionCrisisLevels('NONE');
saveSessionCrisisState();
renderCrisisSessionStatus();
}
function parseGatewayCrisisHeaders(response) {
var sessionId = response.headers.get('x-hermes-session-id') || response.headers.get('x-session-id');
var level = normalizeCrisisLevel(response.headers.get('x-hermes-crisis-level') || response.headers.get('x-crisis-level'));
var escalationRaw = response.headers.get('x-hermes-crisis-escalation') || response.headers.get('x-crisis-escalation');
var historyRaw = response.headers.get('x-hermes-crisis-history') || response.headers.get('x-crisis-history');
var history = [];
if (historyRaw) {
try {
var decoded = decodeURIComponent(historyRaw);
var parsed = JSON.parse(decoded);
if (Array.isArray(parsed)) history = parsed;
} catch (e) {
try {
var fallbackParsed = JSON.parse(historyRaw);
if (Array.isArray(fallbackParsed)) history = fallbackParsed;
} catch (ignored) {}
}
}
return {
sessionId: sessionId,
level: level,
escalated: escalationRaw === '1' || escalationRaw === 'true' || escalationRaw === 'yes',
history: history
};
}
function mergeGatewayCrisisHistory(gatewayHistory) {
if (!Array.isArray(gatewayHistory)) return;
gatewayHistory.forEach(function(entry) {
appendCrisisHistoryEvent(
entry.source || 'hermes-agent',
entry.level || 'NONE',
entry.kind || 'gateway',
entry.detail || 'Hermes session tracker update',
entry.at || entry.time || Date.now()
);
});
}
function resetSessionCrisis() {
sessionCrisis = defaultSessionCrisisState();
sessionCrisis.sessionId = getOrCreateCrisisSessionId();
saveSessionCrisisState();
renderCrisisSessionStatus();
}
// ===== GET SYSTEM PROMPT (wraps with crisis context) =====
function getSystemPrompt(userText) {
var level = getCrisisLevel(userText);
@@ -1548,9 +1293,6 @@ Sovereignty and service always.`;
clearChatBtn.addEventListener('click', function() {
if (confirm('Clear all chat history?')) {
localStorage.removeItem('timmy_chat_history');
localStorage.removeItem('timmy_crisis_session');
localStorage.removeItem('timmy_crisis_history');
resetSessionCrisis();
window.location.reload();
}
});
@@ -1712,9 +1454,8 @@ Sovereignty and service always.`;
addMessage('user', text);
messages.push({ role: 'user', content: text });
lastUserMessage = text;
var lastUserMessage = text;
trackCrisis(text, 'user');
checkCrisis(text);
msgInput.value = '';
@@ -1741,18 +1482,7 @@ Sovereignty and service always.`;
body: JSON.stringify({
model: 'timmy',
messages: allMessages,
stream: true,
metadata: {
session_id: getOrCreateCrisisSessionId(),
source: 'the-door',
crisis_history: sessionCrisis.history.slice(-8),
crisis_state: {
current_level: sessionCrisis.currentLevel,
peak_level: sessionCrisis.peakLevel,
message_count: sessionCrisis.messageCount,
gateway_escalated: sessionCrisis.gatewayEscalated
}
}
stream: true
}),
signal: controller.signal
}).then(function(response) {
@@ -1762,22 +1492,6 @@ Sovereignty and service always.`;
throw new Error('HTTP ' + response.status);
}
var gatewayCrisis = parseGatewayCrisisHeaders(response);
if (gatewayCrisis.sessionId) {
sessionCrisis.sessionId = gatewayCrisis.sessionId;
try {
localStorage.setItem(CRISIS_SESSION_KEY, gatewayCrisis.sessionId);
} catch (e) {}
}
sessionCrisis.gatewayEscalated = !!gatewayCrisis.escalated;
if (gatewayCrisis.level && gatewayCrisis.level !== 'NONE') {
appendCrisisHistoryEvent('hermes-agent', gatewayCrisis.level, 'gateway', 'Hermes session tracker update', Date.now());
}
var gatewayHistory = gatewayCrisis.history || [];
mergeGatewayCrisisHistory(gatewayHistory);
saveSessionCrisisState();
renderCrisisSessionStatus();
hideTyping();
var contentEl = addMessage('assistant', '', true);
var fullText = '';
@@ -1830,7 +1544,6 @@ Sovereignty and service always.`;
if (fullText) {
messages.push({ role: 'assistant', content: fullText });
saveMessages();
trackCrisis(fullText, 'assistant');
checkCrisis(fullText);
}
isStreaming = false;
@@ -1861,9 +1574,6 @@ Sovereignty and service always.`;
// ===== WELCOME MESSAGE =====
function init() {
sessionCrisis.sessionId = getOrCreateCrisisSessionId();
renderCrisisSessionStatus();
if (!loadMessages()) {
var welcomeText = "Hey. I'm Timmy. I'm here if you want to talk. No judgment, no login, no tracking. Just us.";
addMessage('assistant', welcomeText);

View File

@@ -1,37 +0,0 @@
from pathlib import Path
HTML = Path("index.html").read_text(encoding="utf-8")
def test_index_contains_crisis_session_bridge_ui_and_hooks():
assert 'id="crisis-session-status"' in HTML
assert 'id="crisis-history-list"' in HTML
assert 'function getOrCreateCrisisSessionId()' in HTML
assert 'function appendCrisisHistoryEvent(' in HTML
assert 'function renderCrisisSessionStatus()' in HTML
assert 'function parseGatewayCrisisHeaders(' in HTML
assert 'function mergeGatewayCrisisHistory(' in HTML
def test_request_includes_metadata_for_hermes_session_tracking():
assert 'metadata:' in HTML
assert "session_id: getOrCreateCrisisSessionId()" in HTML
assert "source: 'the-door'" in HTML
assert 'crisis_history:' in HTML
assert 'crisis_state:' in HTML
def test_clear_chat_resets_crisis_bridge_state():
assert "localStorage.removeItem('timmy_chat_history');" in HTML
assert "localStorage.removeItem('timmy_crisis_session');" in HTML
assert "localStorage.removeItem('timmy_crisis_history');" in HTML
assert 'resetSessionCrisis();' in HTML
def test_gateway_escalations_are_merged_back_into_the_ui():
assert "response.headers.get('x-hermes-session-id')" in HTML
assert "response.headers.get('x-hermes-crisis-level')" in HTML
assert "response.headers.get('x-hermes-crisis-history')" in HTML
assert 'mergeGatewayCrisisHistory(gatewayHistory);' in HTML
assert 'renderCrisisSessionStatus();' in HTML

View File

@@ -0,0 +1,90 @@
"""Regression tests for crisis_synthesizer integration (issue #121)."""
from __future__ import annotations
import json
import os
from unittest.mock import Mock, patch
from crisis_detector import CrisisResult
from crisis_responder import CrisisResponder
from crisis.synthesizer_integration import CrisisSynthesizerIntegration
from evolution.crisis_synthesizer import load_interaction_events
def _make_detection(level: str, keywords: list[str]) -> CrisisResult:
return CrisisResult(risk_level=level, matched_keywords=keywords, context=[], score=0.9)
def test_responder_auto_logs_anonymized_event(tmp_path):
integration = CrisisSynthesizerIntegration(enabled=True, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
detection = _make_detection("HIGH", ["hopeless", "can't go on"])
response = responder.respond(detection)
log_path = tmp_path / "events.jsonl"
lines = log_path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
event = json.loads(lines[0])
assert event["level"] == "HIGH"
assert event["matched_keywords"] == ["hopeless", "can't go on"]
assert event["response_type"] == response.risk_level
assert isinstance(event["timestamp"], float)
assert event["user_continued"] is False
assert event["session_hash"]
assert "message" not in event
assert "session_id" not in event
assert log_path.stat().st_mode & 0o777 == 0o600
def test_next_non_crisis_message_marks_user_continued_append_only(tmp_path):
integration = CrisisSynthesizerIntegration(enabled=True, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
responder.respond(_make_detection("CRITICAL", ["want to die"]))
log_path = tmp_path / "events.jsonl"
before_size = log_path.stat().st_size
responder.respond(_make_detection("NONE", []))
after_size = log_path.stat().st_size
assert after_size > before_size
raw_lines = log_path.read_text(encoding="utf-8").splitlines()
assert len(raw_lines) == 2
continuation = json.loads(raw_lines[1])
assert continuation["event_type"] == "continuation"
assert continuation["user_continued"] is True
folded_events = load_interaction_events(log_path)
assert len(folded_events) == 1
assert folded_events[0]["continued_conversation"] is True
assert folded_events[0]["user_continued"] is True
def test_env_var_can_disable_logging_entirely(tmp_path):
with patch.dict(os.environ, {"CRISIS_SYNTH_ENABLED": "0"}, clear=False):
integration = CrisisSynthesizerIntegration(enabled=None, log_dir=tmp_path)
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=False)
responder.respond(_make_detection("HIGH", ["hopeless"]))
assert not (tmp_path / "events.jsonl").exists()
@patch("crisis_responder.threading.Thread")
def test_async_logging_dispatches_to_background_thread(thread_cls):
integration = Mock()
integration.enabled = True
integration.log_crisis_event = Mock()
integration.log_user_continued = Mock()
responder = CrisisResponder(synth_integration=integration, session_id="session-1", async_synth_logging=True)
responder.respond(_make_detection("HIGH", ["hopeless"]))
thread_cls.assert_called_once()
_, kwargs = thread_cls.call_args
assert kwargs["daemon"] is True
thread_cls.return_value.start.assert_called_once()