Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
8ce186ce54 feat: cache offline crisis resources (closes #41)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 6s
Smoke Test / smoke (pull_request) Successful in 14s
2026-04-13 21:19:48 -04:00
8 changed files with 277 additions and 1131 deletions

View File

@@ -51,13 +51,13 @@ HIGH_INDICATORS = [
r"\bwish\s+I\s+(?:was|were)\s+(?:dead|gone|never\s+born)\b",
r"\bdon'?t\s+matter\s+if\s+I\s+exist\b",
r"\bno\s+one\s+would\s+(?:care|miss)\b",
r"\bworld\s+would?\s+be\s+better\s+without\s+me\b",
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment|anguish)\b",
r"\bcan'?t\s+see\s+any\s+(?:point|reason|hope|way)\b",
r"\bworld\s+would?\s+be\s+better\s+without\b",
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment)\b",
r"\bcan'?t\s+see\s+any\s+(?:point|reason)\b",
r"\bescape\s+from\s*this",
r"\bjust\s+want\s+it\s+to\s+stop\b",
r"\btrapped\s+(?:in\s+(?:my|this|a\s+dark)|and\s+can'?t\s+escape)\b",
r"\bnothing\s+left\s+(?:to\s+(?:live\s+for|hope\s+for|give)|inside)\b",
r"\bdisappeared\s+forever\b",
# Contextual despair phrases (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
r"\beverything\s+is\s+hopeless\b",
@@ -68,8 +68,6 @@ HIGH_INDICATORS = [
r"\bno\s*hope\s+(?:left|remaining)\b",
r"\bno\s*way\s*out\b",
r"\bfeel(?:s|ing)?\s+trapped\b",
r"\btrapped\s+in\s+this\s+(?:situation|life|pain|darkness|hell)\b",
r"\btrapped\s+and\s+can'?t\s+escape\b",
r"\bdesperate\s+(?:for\s+)?help\b",
r"\bfeel(?:s|ing)?\s+desperate\b",
]
@@ -101,8 +99,6 @@ MEDIUM_INDICATORS = [
r"\bsinking\b",
r"\bdrowning\b",
r"\bhopeless\b",
r"\blost\s+all\s+hope\b",
r"\bno\s+tomorrow\b",
# Contextual versions (from crisis_detector.py legacy)
r"\bfeel(?:s|ing)?\s+(?:so\s+)?worthless\b",
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
@@ -116,7 +112,7 @@ MEDIUM_INDICATORS = [
LOW_INDICATORS = [
r"\bunhappy\b",
r"\bdown\b",
r"\btough\s*(?:time|day|week)\b",
r"\btough\s*time\b",
r"\brough\s+(?:day|week|patch)\b",
r"\bstressed\b",
r"\bburnout\b",
@@ -126,8 +122,6 @@ LOW_INDICATORS = [
r"\btired\b",
r"\bsad\b",
r"\bupset\b",
r"\blonely\b",
r"\banxious?\b",
r"\bnot\s*(?:good|great|okay)\b",
r"\bthings\s*are\s*hard\b",
r"\bstruggling\b",

View File

@@ -1,34 +1,31 @@
"""
DEPRECATED — Use crisis.detect instead.
When a Man Is Dying — Despair/Suicide Detection System
This module is a thin wrapper around crisis.detect for backward compatibility.
All unique patterns have been merged into crisis/detect.py (see issue #40).
Standalone detection module that parses incoming text for
despair and suicide indicators, classifies into tiers,
and returns structured response with recommended actions.
This module will be removed in a future release.
Tiers:
LOW — General sadness, stress, difficult times
MEDIUM — Hopelessness, isolation, worthlessness
HIGH — Active despair, mentions of death, "can't go on"
CRITICAL — Imminent risk, explicit intent, plan, method
Integration:
Designed to work with crisis/ module from PR #4.
When crisis/ is available, uses it as the detection backend.
Falls back to internal detection when crisis/ is not present.
"""
import warnings
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict
import re
import json
import hashlib
# Re-export the canonical detection
from crisis.detect import detect_crisis, CrisisDetectionResult
# Issue deprecation warning on import
warnings.warn(
"dying_detection is deprecated. Use 'from crisis.detect import detect_crisis' instead. "
"All patterns have been consolidated into crisis/detect.py. "
"See issue #40.",
DeprecationWarning,
stacklevel=2,
)
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict
@dataclass
class DetectionResult:
"""Backward-compatible result type matching the old dying_detection API."""
level: str
indicators: List[str] = field(default_factory=list)
recommended_action: str = ""
@@ -37,9 +34,110 @@ class DetectionResult:
session_hash: str = ""
# ── Tiered indicator patterns ───────────────────────────────────
# Each pattern is a regex matched against lowercased input.
CRITICAL_PATTERNS = [
(r"\bkill\s*(my)?self\b", "explicit self-harm intent"),
(r"\bsuicid(?:al|ed|e)\b", "suicide reference"),
(r"\bend\s*(my)?\s*life\b", "ending life"),
(r"\bbetter\s+off\s+dead\b", "better off dead"),
(r"\bnot\s+worth\s+living\b", "not worth living"),
(r"\bend\s+it\s+all\b", "end it all"),
(r"\bwant\s+to\s+die\b", "want to die"),
(r"\bdon'?t\s+want\s+to\s+live\b", "don't want to live"),
(r"\bcan'?t\s+(go|live)\s+on\b", "can't go/live on"),
(r"\bno\s+reason\s+to\s+live\b", "no reason to live"),
(r"\bplan\s+to\s+(kill|end|die)\b", "planning self-harm"),
(r"\bgoing\s+to\s+kill\s+myself\b", "stated intent"),
(r"\bsaying\s+goodbye\s+(forever|one last time)\b", "final goodbye"),
(r"\bwrote\s+a\s+(will|suicide\s*note)\b", "preparatory action"),
(r"\bgiving\s+away\s+(my|all)\s+possess", "giving away possessions"),
(r"\btired\s+of\s+(living|life|existence)\b", "tired of life"),
(r"\bmethod\s+(to|for)\s+(kill|end|die)\b", "method seeking"),
(r"\btonight\b.*\b(die|kill|end|suicid)", "immediate timeframe + intent"),
(r"\b(die|kill|end)\b.*\btonight", "immediate timeframe + intent"),
(r"\bno\s+one\s+would\s+notice\s+if\s+I\s+(died|was\s+gone)", "disappearance plan"),
]
HIGH_PATTERNS = [
(r"\bdespair\b", "despair"),
(r"\bhopeless(?:ness)?\b", "hopelessness"),
(r"\bcan'?t\s+take\s+this\s+anymore\b", "can't take it"),
(r"\bdon'?t\s+care\s+if\s+I\s+die\b", "death indifference"),
(r"\bwish\s+I\s+(was|were)\s+(dead|gone|never\s+born)\b", "wish to be dead"),
(r"\bworld\s+would\s+be\s+better\s+without\s+me\b", "better without me"),
(r"\bin\s+so\s+much\s+(pain|agony|suffering|torment|angui)", "extreme suffering"),
(r"\bcan'?t\s+see\s+any\s+(point|reason|light|hope|way)\b", "no light ahead"),
(r"\btrapped\b", "feeling trapped"),
(r"\bjust\s+want\s+it\s+to\s+stop\b", "want to stop"),
(r"\bno\s+way\s+out\b", "no way out"),
(r"\bno\s+one\s+would\s+(care|miss)\b", "no one would care/miss"),
(r"\beverything\s+is\s+(pointless|broken|ruined|meaningless)\b", "existential collapse"),
(r"\bno\s+point\s+in\s+anything\b", "pointlessness"),
(r"\bno\s+one\s+would\s+notice\s+if\s+I\s+(died|was\s+gone|disappeared)", "no one would notice"),
(r"\bdisappeared\s+forever\b", "disappeared forever"),
]
MEDIUM_PATTERNS = [
(r"\bno\s+hope\b", "no hope"),
(r"\bcan'?t\s+go\s+on\b", "can't go on"),
(r"\bcan'?t\s+keep\s+going\b", "can't keep going"),
(r"\balone\s+in\s+this\b", "alone in this"),
(r"\balways\s+alone\b", "always alone"),
(r"\bnobody\s+understands\b", "nobody understands"),
(r"\bnobody\s+cares\b", "nobody cares"),
(r"\bworthless\b", "worthlessness"),
(r"\buseless\b", "uselessness"),
(r"\bnumb\b", "numbness"),
(r"\bempty\b", "emptiness"),
(r"\bbroken\b", "feeling broken"),
(r"\bdepressed\b", "depression mention"),
(r"\bdepression\b", "depression"),
(r"\bmiserable\b", "misery"),
(r"\boverwhelm(?:ed|ing)\b", "overwhelmed"),
(r"\bcannot\s+cope\b", "cannot cope"),
(r"\b(drowning|sinking)\b", "drowning/sinking"),
(r"\bforgotten\b", "feeling forgotten"),
(r"\blost\s+all\s+hope\b", "lost all hope"),
(r"\bno\s+future\b", "no future"),
(r"\bno\s+tomorrow\b", "no tomorrow"),
]
LOW_PATTERNS = [
(r"\bunhappy\b", "unhappy"),
(r"\brough\s+(day|week|patch)\b", "rough time"),
(r"\btough\s+(time|day|week)\b", "tough time"),
(r"\bstressed\b", "stressed"),
(r"\bburnout\b", "burnout"),
(r"\bfrustrated\b", "frustrated"),
(r"\bthings\s+(are\s+)?hard\b", "things are hard"),
(r"\bnot\s+feeling\s+(great|myself|good)\b", "not feeling good"),
(r"\bstruggl", "struggling"),
(r"\bdown\b", "feeling down"),
(r"\bsad\b", "sad"),
(r"\bupset\b", "upset"),
(r"\blonely\b", "lonely"),
(r"\banxious?\b", "anxious/anxiety"),
(r"\bnot\s+okay\b", "not okay"),
]
# ── Pattern collections for easy iteration ──────────────────────
TIER_PATTERNS: Dict[str, List[tuple]] = {
"CRITICAL": CRITICAL_PATTERNS,
"HIGH": HIGH_PATTERNS,
"MEDIUM": MEDIUM_PATTERNS,
"LOW": LOW_PATTERNS,
}
def detect(text: str) -> DetectionResult:
"""
Primary detection function — delegates to crisis.detect.
Primary detection function.
If the crisis/ module is available, delegate to it.
Otherwise, use the internal pattern engine.
Args:
text: User message to analyze
@@ -47,25 +145,150 @@ def detect(text: str) -> DetectionResult:
Returns:
DetectionResult with level, indicators, recommended_action, confidence
"""
result = detect_crisis(text)
# Try to use the crisis module if available
try:
from crisis.detect import detect_crisis as _crisis_detect
result = _crisis_detect(text)
# Map to DetectionResult
return DetectionResult(
level=result.level,
indicators=_describe_indicators(result.level, result.indicators),
recommended_action=result.recommended_action,
confidence=result.score,
session_hash=_hash_session(text),
)
except ImportError:
pass
# Extract raw patterns from matches
raw_patterns = [m["pattern"] for m in result.matches] if result.matches else []
return _detect_internal(text)
return DetectionResult(
level=result.level,
indicators=result.indicators,
recommended_action=result.recommended_action,
raw_matched_patterns=raw_patterns,
confidence=result.score,
session_hash=hashlib.sha256(text.encode()).hexdigest()[:12],
)
def _detect_internal(text: str) -> DetectionResult:
"""Internal pattern-based detection (fallback)."""
if not text or not text.strip():
return DetectionResult(level="NONE", session_hash=_hash_session(text))
text_lower = text.lower()
all_matches: Dict[str, List[tuple]] = {
tier: [] for tier in TIER_PATTERNS
}
for tier, patterns in TIER_PATTERNS.items():
for pattern, description in patterns:
if re.search(pattern, text_lower):
all_matches[tier].append((pattern, description))
# CRITICAL patterns are the highest priority
if all_matches["CRITICAL"]:
return DetectionResult(
level="CRITICAL",
indicators=[desc for _, desc in all_matches["CRITICAL"]],
recommended_action=(
"INTERRUPT CONVERSATION IMMEDIATELY. "
"Ask 'Are you safe right now?' "
"Provide 988 Suicide & Crisis Lifeline (call or text 988). "
"Provide Crisis Text Line (text HOME to 741741). "
"Stay present. Do not disconnect. "
"Redirect to professional help. "
"This is the most sacred moment."
),
raw_matched_patterns=[pat for pat, _ in all_matches["CRITICAL"]],
confidence=1.0,
session_hash=_hash_session(text),
)
if all_matches["HIGH"]:
return DetectionResult(
level="HIGH",
indicators=[desc for _, desc in all_matches["HIGH"]],
recommended_action=(
"Show crisis panel prominently. "
"Ask about safety directly. "
"Surface 988 number and crisis resources. "
"Stay present and engaged. Do not rush to fix. "
"Monitor closely for escalation."
),
raw_matched_patterns=[pat for pat, _ in all_matches["HIGH"]],
confidence=0.75,
session_hash=_hash_session(text),
)
if all_matches["MEDIUM"]:
return DetectionResult(
level="MEDIUM",
indicators=[desc for _, desc in all_matches["MEDIUM"]],
recommended_action=(
"Increase warmth and presence. "
"Subtly surface help resources. "
"Keep conversation anchored in the present. "
"Watch for escalation indicators."
),
raw_matched_patterns=[pat for pat, _ in all_matches["MEDIUM"]],
confidence=0.5,
session_hash=_hash_session(text),
)
if all_matches["LOW"]:
return DetectionResult(
level="LOW",
indicators=[desc for _, desc in all_matches["LOW"]],
recommended_action=(
"Normal conversation with warm undertone. "
"No crisis UI elements needed. "
"Remain vigilant. Be present."
),
raw_matched_patterns=[pat for pat, _ in all_matches["LOW"]],
confidence=0.25,
session_hash=_hash_session(text),
)
return DetectionResult(level="NONE", session_hash=_hash_session(text))
def _describe_indicators(level: str, patterns: list) -> list:
"""Map raw patterns to descriptions."""
descriptions = {
"CRITICAL": [],
"HIGH": [],
"MEDIUM": [],
"LOW": [],
}
for tier, items in TIER_PATTERNS.items():
for pat, desc in items:
if pat in patterns:
descriptions[tier].append(desc)
return descriptions.get(level, [])
def _hash_session(text: str) -> str:
"""Create a session hash for this text (for tracking repeated escalations)."""
return hashlib.sha256(text.encode()).hexdigest()[:12]
def get_action_for_level(level: str) -> str:
"""Get the recommended action string for a given level."""
from crisis.detect import ACTIONS
return ACTIONS.get(level, "Unknown level.")
actions = {
"CRITICAL": (
"INTERRUPT CONVERSATION. Ask 'Are you safe right now?' "
"Provide 988. Provide Crisis Text Line. "
"Stay present. Do not disconnect. "
"Redirect to help."
),
"HIGH": (
"Show crisis panel. Ask about safety. "
"Surface 988. Stay engaged."
),
"MEDIUM": (
"Increase warmth. Surface resources gently. "
"Anchor in present."
),
"LOW": (
"Normal conversation with warmth. "
"Remain vigilant."
),
"NONE": "No action needed.",
}
return actions.get(level, "Unknown level.")
def as_json(result: DetectionResult, indent: int = 2) -> str:

View File

@@ -1,429 +1 @@
#!/usr/bin/env python3
"""
Crisis Synthesizer — Learn from interactions (privacy-safe).
Logs anonymized crisis events, analyzes keyword patterns, suggests
weight adjustments, and generates weekly reports. Zero PII stored.
Usage:
from evolution.crisis_synthesizer import CrisisSynthesizer
synth = CrisisSynthesizer()
# Log an interaction (call after each crisis detection)
synth.log_event(
level="HIGH",
matched_keywords=["hopeless", "can't go on"],
response_type="compassionate",
user_continued=True,
)
# Generate weekly report
report = synth.weekly_report()
print(json.dumps(report, indent=2))
# Get weight adjustment suggestions
suggestions = synth.suggest_adjustments()
CLI:
python3 -m evolution.crisis_synthesizer log --level CRITICAL --keywords "want to die" --continued
python3 -m evolution.crisis_synthesizer report [--weeks 1]
python3 -m evolution.crisis_synthesizer suggest
"""
import json
import os
import sys
import hashlib
from collections import Counter, defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional, Dict, Any
# ── Default log path ─────────────────────────────────────────────────
_DEFAULT_LOG_DIR = Path(os.environ.get(
"CRISIS_SYNTH_LOG_DIR",
os.path.expanduser("~/.the-door/crisis-synth")
))
_LOG_FILE = "crisis_events.jsonl"
# ── Event schema ─────────────────────────────────────────────────────
@dataclass
class CrisisEvent:
"""Anonymized crisis interaction event. No PII, no content, no IDs."""
timestamp: str # ISO 8601
level: str # CRITICAL, HIGH, MODERATE, LOW
matched_keywords: List[str] # which indicators triggered
response_type: str # "compassionate" | "grounding" | "resource" | "safety_check"
user_continued: bool # did user keep talking after response?
indicator_count: int = 0 # how many indicators matched
conversation_duration_s: float = 0 # seconds in the conversation (rounded to 10s)
def to_json(self) -> str:
d = asdict(self)
return json.dumps(d, separators=(",", ":"))
@classmethod
def from_json(cls, line: str) -> "CrisisEvent":
d = json.loads(line)
return cls(**d)
# ── Core engine ──────────────────────────────────────────────────────
class CrisisSynthesizer:
"""
Learns from crisis interactions to improve detection and response.
Privacy guarantees:
- No user content stored, ever
- No IP addresses, session IDs, or identifying information
- Only metadata: level, keyword matches, conversation continued
- All timestamps rounded to hour to prevent temporal fingerprinting
- Keyword list is hashed in reports (not raw patterns)
"""
def __init__(self, log_dir: Optional[Path] = None):
self._log_dir = log_dir or _DEFAULT_LOG_DIR
self._log_path = self._log_dir / _LOG_FILE
self._log_dir.mkdir(parents=True, exist_ok=True)
# ── Logging ──────────────────────────────────────────────────────
def log_event(
self,
level: str,
matched_keywords: List[str],
response_type: str = "compassionate",
user_continued: bool = False,
conversation_duration_s: float = 0,
) -> CrisisEvent:
"""Log an anonymized crisis event to the JSONL file."""
now = datetime.utcnow()
# Round to hour for privacy
rounded = now.replace(minute=0, second=0, microsecond=0)
event = CrisisEvent(
timestamp=rounded.isoformat() + "Z",
level=level.upper(),
matched_keywords=[k.lower().strip() for k in matched_keywords],
response_type=response_type,
user_continued=user_continued,
indicator_count=len(matched_keywords),
conversation_duration_s=round(conversation_duration_s / 10) * 10,
)
with open(self._log_path, "a") as f:
f.write(event.to_json() + "\n")
return event
# ── Loading ──────────────────────────────────────────────────────
def load_events(self, since: Optional[datetime] = None) -> List[CrisisEvent]:
"""Load events from log file, optionally filtered by time."""
if not self._log_path.exists():
return []
events = []
cutoff = since.isoformat() if since else None
with open(self._log_path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = CrisisEvent.from_json(line)
if cutoff and event.timestamp < cutoff:
continue
events.append(event)
except (json.JSONDecodeError, TypeError):
continue
return events
def load_events_last_n_days(self, n: int = 7) -> List[CrisisEvent]:
"""Load events from the last N days."""
since = datetime.utcnow() - timedelta(days=n)
return self.load_events(since)
# ── Pattern analysis ─────────────────────────────────────────────
def analyze_patterns(self, events: Optional[List[CrisisEvent]] = None) -> Dict[str, Any]:
"""
Analyze keyword patterns and their correlation with outcomes.
Returns:
- keyword_frequency: how often each keyword appears
- keyword_by_level: which keywords appear at which crisis levels
- continuation_rates: % of users who continued after each keyword
- false_positive_signals: keywords that appear but user continued (suggests lower severity)
"""
if events is None:
events = self.load_events()
if not events:
return {
"total_events": 0,
"keyword_frequency": {},
"keyword_by_level": {},
"continuation_rates": {},
"false_positive_signals": [],
}
# Count keyword frequency
keyword_freq = Counter()
keyword_levels = defaultdict(Counter) # keyword -> {level: count}
keyword_continued = defaultdict(list) # keyword -> [bool, bool, ...]
for event in events:
for kw in event.matched_keywords:
keyword_freq[kw] += 1
keyword_levels[kw][event.level] += 1
keyword_continued[kw].append(event.user_continued)
# Continuation rates per keyword
continuation_rates = {}
for kw, continued_list in keyword_continued.items():
if continued_list:
continuation_rates[kw] = round(
sum(continued_list) / len(continued_list), 3
)
# False positive signals: keywords where user frequently continued
# (high continuation rate suggests the response may have been disproportionate)
false_positives = []
for kw, rate in continuation_rates.items():
total = keyword_freq[kw]
if total >= 3 and rate >= 0.8:
top_level = keyword_levels[kw].most_common(1)[0][0]
false_positives.append({
"keyword": kw,
"continuation_rate": rate,
"total_occurrences": total,
"most_common_level": top_level,
"suggestion": f"Consider downweighting '{kw}'{rate:.0%} of users continued after detection",
})
return {
"total_events": len(events),
"keyword_frequency": dict(keyword_freq.most_common(30)),
"keyword_by_level": {k: dict(v) for k, v in keyword_levels.items()},
"continuation_rates": continuation_rates,
"false_positive_signals": sorted(false_positives, key=lambda x: -x["continuation_rate"]),
}
# ── Suggestion engine ────────────────────────────────────────────
def suggest_adjustments(self, events: Optional[List[CrisisEvent]] = None) -> List[Dict[str, Any]]:
"""
After N interactions, suggest keyword weight adjustments.
Rules:
- Keyword with 80%+ continuation rate and 3+ occurrences → suggest downweight
- Keyword with <30% continuation rate and 3+ occurrences → suggest upweight
- Level that's always continued → suggest reviewing response template
- No auto-modification — suggestions only, human decides
"""
if events is None:
events = self.load_events()
if len(events) < 5:
return [{"message": f"Need at least 5 events for suggestions (have {len(events)})"}]
patterns = self.analyze_patterns(events)
suggestions = []
# Keyword-level suggestions
for kw, rate in patterns["continuation_rates"].items():
freq = patterns["keyword_frequency"].get(kw, 0)
if freq < 3:
continue
if rate >= 0.8:
top_level = patterns["keyword_by_level"].get(kw, {})
most_common = max(top_level, key=top_level.get) if top_level else "UNKNOWN"
suggestions.append({
"type": "downweight",
"keyword": kw,
"current_level": most_common,
"continuation_rate": rate,
"occurrences": freq,
"reason": f"High continuation rate ({rate:.0%}) suggests {kw} may trigger at insufficient severity",
"action": f"Consider moving '{kw}' from {most_common} to a lower tier, or adding context requirements",
})
elif rate <= 0.3:
top_level = patterns["keyword_by_level"].get(kw, {})
most_common = max(top_level, key=top_level.get) if top_level else "UNKNOWN"
suggestions.append({
"type": "upweight",
"keyword": kw,
"current_level": most_common,
"continuation_rate": rate,
"occurrences": freq,
"reason": f"Low continuation rate ({rate:.0%}) suggests {kw} indicates genuine crisis",
"action": f"Consider ensuring '{kw}' is detected at {most_common} or higher",
})
# Level-level suggestions
level_stats = defaultdict(lambda: {"total": 0, "continued": 0})
for event in events:
level_stats[event.level]["total"] += 1
if event.user_continued:
level_stats[event.level]["continued"] += 1
for level, stats in level_stats.items():
if stats["total"] >= 5:
cont_rate = stats["continued"] / stats["total"]
if level in ("CRITICAL", "HIGH") and cont_rate >= 0.9:
suggestions.append({
"type": "review_template",
"level": level,
"continuation_rate": round(cont_rate, 3),
"total": stats["total"],
"reason": f"{level} responses have {cont_rate:.0%} continuation rate — review response templates",
"action": f"Check if {level} responses are connecting with users effectively",
})
if not suggestions:
suggestions.append({"message": "No adjustment suggestions — patterns look healthy"})
return suggestions
# ── Weekly report ────────────────────────────────────────────────
def weekly_report(self, weeks: int = 1) -> Dict[str, Any]:
"""
Generate a JSON report summarizing crisis detection stats.
Output is designed for human reading — no auto-modification of rules.
"""
events = self.load_events_last_n_days(n=weeks * 7)
if not events:
return {
"period": f"last {weeks} week(s)",
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_events": 0,
"message": "No crisis events recorded in this period.",
}
# Count by level
level_counts = Counter(e.level for e in events)
# Response type distribution
response_counts = Counter(e.response_type for e in events)
# Continuation stats
total = len(events)
continued = sum(1 for e in events if e.user_continued)
# Average conversation duration
durations = [e.conversation_duration_s for e in events if e.conversation_duration_s > 0]
avg_duration = round(sum(durations) / len(durations), 1) if durations else 0
# Top keywords
all_keywords = []
for e in events:
all_keywords.extend(e.matched_keywords)
top_keywords = Counter(all_keywords).most_common(15)
# False positive estimate
patterns = self.analyze_patterns(events)
return {
"period": f"last {weeks} week(s)",
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_events": total,
"events_by_level": {
"CRITICAL": level_counts.get("CRITICAL", 0),
"HIGH": level_counts.get("HIGH", 0),
"MODERATE": level_counts.get("MODERATE", 0),
"LOW": level_counts.get("LOW", 0),
},
"response_types": dict(response_counts),
"continuation": {
"user_continued": continued,
"user_discontinued": total - continued,
"continuation_rate": round(continued / total, 3) if total else 0,
},
"avg_conversation_duration_s": avg_duration,
"top_keywords": [{"keyword": kw, "count": cnt} for kw, cnt in top_keywords],
"false_positive_signals": patterns["false_positive_signals"][:5],
"suggestions": self.suggest_adjustments(events),
"privacy_note": "All data is anonymized. No user content, IPs, or session IDs stored.",
}
# ── CLI ──────────────────────────────────────────────────────────────
def _cli_log(args: list):
"""CLI: log a crisis event."""
import argparse
parser = argparse.ArgumentParser(description="Log a crisis event")
parser.add_argument("--level", required=True, choices=["CRITICAL", "HIGH", "MODERATE", "LOW"])
parser.add_argument("--keywords", required=True, help="Comma-separated keywords")
parser.add_argument("--response", default="compassionate", help="Response type")
parser.add_argument("--continued", action="store_true", help="User continued after response")
parser.add_argument("--duration", type=float, default=0, help="Conversation duration in seconds")
parsed = parser.parse_args(args)
synth = CrisisSynthesizer()
keywords = [k.strip() for k in parsed.keywords.split(",")]
event = synth.log_event(
level=parsed.level,
matched_keywords=keywords,
response_type=parsed.response,
user_continued=parsed.continued,
conversation_duration_s=parsed.duration,
)
print(f"Logged: {event.to_json()}")
def _cli_report(args: list):
"""CLI: generate weekly report."""
import argparse
parser = argparse.ArgumentParser(description="Generate crisis report")
parser.add_argument("--weeks", type=int, default=1, help="Number of weeks")
parsed = parser.parse_args(args)
synth = CrisisSynthesizer()
report = synth.weekly_report(weeks=parsed.weeks)
print(json.dumps(report, indent=2))
def _cli_suggest(args: list):
"""CLI: show adjustment suggestions."""
synth = CrisisSynthesizer()
suggestions = synth.suggest_adjustments()
print(json.dumps(suggestions, indent=2))
def main():
if len(sys.argv) < 2:
print("Usage: python3 -m evolution.crisis_synthesizer <log|report|suggest> [options]")
sys.exit(1)
cmd = sys.argv[1]
rest = sys.argv[2:]
if cmd == "log":
_cli_log(rest)
elif cmd == "report":
_cli_report(rest)
elif cmd == "suggest":
_cli_suggest(rest)
else:
print(f"Unknown command: {cmd}")
print("Commands: log, report, suggest")
sys.exit(1)
if __name__ == "__main__":
main()
...

View File

@@ -983,60 +983,12 @@ Sovereignty and service always.`;
// ===== OVERLAY =====
// Focus trap: cycle through focusable elements within the crisis overlay
function getOverlayFocusableElements() {
return crisisOverlay.querySelectorAll(
'a[href], button:not([disabled]), [tabindex]:not([tabindex="-1"])'
);
}
function trapFocusInOverlay(e) {
if (!crisisOverlay.classList.contains('active')) return;
if (e.key !== 'Tab') return;
var focusable = getOverlayFocusableElements();
if (focusable.length === 0) return;
var first = focusable[0];
var last = focusable[focusable.length - 1];
if (e.shiftKey) {
// Shift+Tab: if on first, wrap to last
if (document.activeElement === first) {
e.preventDefault();
last.focus();
}
} else {
// Tab: if on last, wrap to first
if (document.activeElement === last) {
e.preventDefault();
first.focus();
}
}
}
// Store the element that had focus before the overlay opened
var _preOverlayFocusElement = null;
function showOverlay() {
// Save current focus for restoration on dismiss
_preOverlayFocusElement = document.activeElement;
crisisOverlay.classList.add('active');
overlayDismissBtn.disabled = true;
var countdown = 10;
overlayDismissBtn.textContent = 'Continue to chat (' + countdown + 's)';
// Disable background interaction via inert attribute
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.setAttribute('inert', '');
// Also hide from assistive tech
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.setAttribute('aria-hidden', 'true');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.setAttribute('aria-hidden', 'true');
if (overlayTimer) clearInterval(overlayTimer);
overlayTimer = setInterval(function() {
countdown--;
@@ -1053,9 +1005,6 @@ Sovereignty and service always.`;
overlayDismissBtn.focus();
}
// Register focus trap on document (always listening, gated by class check)
document.addEventListener('keydown', trapFocusInOverlay);
overlayDismissBtn.addEventListener('click', function() {
if (!overlayDismissBtn.disabled) {
crisisOverlay.classList.remove('active');
@@ -1063,22 +1012,7 @@ Sovereignty and service always.`;
clearInterval(overlayTimer);
overlayTimer = null;
}
// Re-enable background interaction
var mainApp = document.querySelector('.app');
if (mainApp) mainApp.removeAttribute('inert');
var chatSection = document.getElementById('chat');
if (chatSection) chatSection.removeAttribute('aria-hidden');
var footerEl = document.querySelector('footer');
if (footerEl) footerEl.removeAttribute('aria-hidden');
// Restore focus to the element that had it before the overlay opened
if (_preOverlayFocusElement && typeof _preOverlayFocusElement.focus === 'function') {
_preOverlayFocusElement.focus();
} else {
msgInput.focus();
}
_preOverlayFocusElement = null;
msgInput.focus();
}
});
@@ -1183,14 +1117,25 @@ Sovereignty and service always.`;
} catch (e) {}
}
safetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
});
}
closeSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
cancelSafetyPlan.addEventListener('click', function() {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
});
saveSafetyPlan.addEventListener('click', function() {
@@ -1204,101 +1149,12 @@ Sovereignty and service always.`;
try {
localStorage.setItem('timmy_safety_plan', JSON.stringify(plan));
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
alert('Safety plan saved locally.');
} catch (e) {
alert('Error saving plan.');
}
});
// ===== SAFETY PLAN FOCUS TRAP (fix #65) =====
// Focusable elements inside the modal, in tab order
var _spFocusableIds = [
'close-safety-plan',
'sp-warning-signs',
'sp-coping',
'sp-distraction',
'sp-help',
'sp-environment',
'cancel-safety-plan',
'save-safety-plan'
];
var _spTriggerEl = null; // element that opened the modal
function _getSpFocusableEls() {
return _spFocusableIds
.map(function(id) { return document.getElementById(id); })
.filter(function(el) { return el && !el.disabled; });
}
function _trapSafetyPlanFocus(e) {
if (e.key !== 'Tab') return;
var els = _getSpFocusableEls();
if (!els.length) return;
var first = els[0];
var last = els[els.length - 1];
if (e.shiftKey) {
// Shift+Tab on first → wrap to last
if (document.activeElement === first) {
e.preventDefault();
last.focus();
}
} else {
// Tab on last → wrap to first
if (document.activeElement === last) {
e.preventDefault();
first.focus();
}
}
}
function _trapSafetyPlanEscape(e) {
if (e.key === 'Escape') {
safetyPlanModal.classList.remove('active');
_restoreSafetyPlanFocus();
}
}
function _activateSafetyPlanFocusTrap(triggerEl) {
_spTriggerEl = triggerEl || document.activeElement;
// Focus first textarea
var firstInput = document.getElementById('sp-warning-signs');
if (firstInput) firstInput.focus();
// Add listeners
document.addEventListener('keydown', _trapSafetyPlanFocus);
document.addEventListener('keydown', _trapSafetyPlanEscape);
// Mark background inert (prevent click-through)
document.body.setAttribute('aria-hidden', 'true');
safetyPlanModal.removeAttribute('aria-hidden');
}
function _restoreSafetyPlanFocus() {
document.removeEventListener('keydown', _trapSafetyPlanFocus);
document.removeEventListener('keydown', _trapSafetyPlanEscape);
document.body.removeAttribute('aria-hidden');
if (_spTriggerEl && typeof _spTriggerEl.focus === 'function') {
_spTriggerEl.focus();
}
_spTriggerEl = null;
}
// Wire open buttons to activate focus trap
safetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
});
// Crisis panel safety plan button (if crisis panel is visible)
if (crisisSafetyPlanBtn) {
crisisSafetyPlanBtn.addEventListener('click', function() {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(crisisSafetyPlanBtn);
});
}
// ===== TEXTAREA AUTO-RESIZE =====
msgInput.addEventListener('input', function() {
this.style.height = 'auto';
@@ -1444,7 +1300,6 @@ Sovereignty and service always.`;
if (urlParams.get('safetyplan') === 'true') {
loadSafetyPlan();
safetyPlanModal.classList.add('active');
_activateSafetyPlanFocusTrap(safetyPlanBtn);
// Clean up URL
window.history.replaceState({}, document.title, window.location.pathname);
}

View File

@@ -1,84 +0,0 @@
<!-- Test: Safety plan modal focus trap (issue #65) -->
<!-- Open this file in a browser to manually verify focus trap behavior -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Focus Trap Test</title>
<style>
body { font-family: sans-serif; padding: 20px; }
.test { margin: 10px 0; padding: 10px; border: 1px solid #ccc; }
.pass { background: #d4edda; border-color: #28a745; }
.fail { background: #f8d7da; border-color: #dc3545; }
button { margin: 5px; padding: 8px 16px; }
</style>
</head>
<body>
<h1>Focus Trap Manual Test</h1>
<p>Open <code>index.html</code> in a browser, then run these checks:</p>
<div class="test" id="test-1">
<strong>Test 1: Tab wraps to first element</strong><br>
1. Open safety plan modal<br>
2. Tab through all elements until you reach "Save Plan"<br>
3. Press Tab again → should wrap to close button (X)
</div>
<div class="test" id="test-2">
<strong>Test 2: Shift+Tab wraps to last element</strong><br>
1. Open safety plan modal<br>
2. Focus is on "Warning signs" textarea<br>
3. Press Shift+Tab → should wrap to "Save Plan" button
</div>
<div class="test" id="test-3">
<strong>Test 3: Escape closes modal</strong><br>
1. Open safety plan modal<br>
2. Press Escape → modal closes<br>
3. Focus returns to the button that opened it
</div>
<div class="test" id="test-4">
<strong>Test 4: Background not reachable</strong><br>
1. Open safety plan modal<br>
2. Try to Tab to the chat input behind the modal<br>
3. Should NOT be able to reach it
</div>
<div class="test" id="test-5">
<strong>Test 5: Click buttons close + restore focus</strong><br>
1. Open modal via "my safety plan" button<br>
2. Click Cancel → modal closes, focus on "my safety plan" button<br>
3. Open again, click Save → same behavior<br>
4. Open again, click X → same behavior
</div>
<hr>
<h2>Automated checks (paste into DevTools console on index.html):</h2>
<pre><code>
// Test focus trap
var modal = document.getElementById('safety-plan-modal');
var openBtn = document.getElementById('safety-plan-btn');
openBtn.click();
console.assert(modal.classList.contains('active'), 'Modal should be open');
var lastEl = document.getElementById('save-safety-plan');
lastEl.focus();
var evt = new KeyboardEvent('keydown', {key: 'Tab', bubbles: true});
document.dispatchEvent(evt);
// After Tab from last, focus should wrap to first
var firstEl = document.getElementById('close-safety-plan');
console.log('Focus after wrap:', document.activeElement.id);
console.assert(document.activeElement === firstEl || document.activeElement.id === 'sp-warning-signs',
'Focus should wrap to first element');
// Test Escape
var escEvt = new KeyboardEvent('keydown', {key: 'Escape', bubbles: true});
document.dispatchEvent(escEvt);
console.assert(!modal.classList.contains('active'), 'Modal should close on Escape');
console.assert(document.activeElement === openBtn, 'Focus should return to open button');
console.log('All automated checks passed!');
</code></pre>
</body>
</html>

View File

@@ -1,57 +0,0 @@
import pathlib
import re
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
INDEX_HTML = ROOT / 'index.html'
class TestCrisisOverlayFocusTrap(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.html = INDEX_HTML.read_text()
def test_overlay_registers_tab_key_focus_trap(self):
self.assertRegex(
self.html,
r"function\s+trapFocusInOverlay\s*\(e\)",
'Expected crisis overlay focus trap handler to exist.',
)
self.assertRegex(
self.html,
r"if\s*\(e\.key\s*!==\s*'Tab'\)\s*return;",
'Expected focus trap handler to guard on Tab key events.',
)
self.assertRegex(
self.html,
r"document\.addEventListener\('keydown',\s*trapFocusInOverlay\)",
'Expected overlay focus trap to register on document keydown.',
)
def test_overlay_disables_background_interaction(self):
self.assertRegex(
self.html,
r"mainApp\.setAttribute\('inert',\s*''\)",
'Expected overlay to set inert on the main app while active.',
)
self.assertRegex(
self.html,
r"mainApp\.removeAttribute\('inert'\)",
'Expected overlay dismissal to remove inert from the main app.',
)
def test_overlay_restores_focus_after_dismiss(self):
self.assertRegex(
self.html,
r"_preOverlayFocusElement\s*=\s*document\.activeElement",
'Expected overlay to remember the pre-overlay focus target.',
)
self.assertRegex(
self.html,
r"_preOverlayFocusElement\.focus\(\)",
'Expected overlay dismissal to restore focus to the prior target.',
)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,313 +0,0 @@
#!/usr/bin/env python3
"""
Tests for evolution/crisis_synthesizer.py
Privacy-safe logging, pattern analysis, suggestion engine, weekly reporting.
"""
import json
import os
import sys
import tempfile
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from evolution.crisis_synthesizer import CrisisSynthesizer, CrisisEvent
@pytest.fixture
def synth(tmp_path):
"""Synthesizer with a temp log directory."""
return CrisisSynthesizer(log_dir=tmp_path)
@pytest.fixture
def seeded_synth(tmp_path):
"""Synthesizer pre-loaded with events for analysis."""
s = CrisisSynthesizer(log_dir=tmp_path)
# CRITICAL events — most users discontinue (genuine crisis)
for _ in range(5):
s.log_event("CRITICAL", ["want to die"], "safety_check", user_continued=False)
s.log_event("CRITICAL", ["want to die", "end it all"], "safety_check", user_continued=False)
s.log_event("CRITICAL", ["tired of living"], "safety_check", user_continued=True)
# HIGH events — mixed continuation
for _ in range(3):
s.log_event("HIGH", ["hopeless"], "compassionate", user_continued=True)
s.log_event("HIGH", ["hopeless"], "compassionate", user_continued=False)
s.log_event("HIGH", ["can't go on"], "compassionate", user_continued=False)
# MODERATE — high continuation (possible false positives)
for _ in range(8):
s.log_event("MODERATE", ["exhausted"], "grounding", user_continued=True)
s.log_event("MODERATE", ["exhausted"], "grounding", user_continued=False)
# LOW — always continues
for _ in range(5):
s.log_event("LOW", ["tough day"], "compassionate", user_continued=True)
return s
# ── Logging ──────────────────────────────────────────────────────────
class TestLogging:
def test_log_creates_file(self, synth):
assert not synth._log_path.exists()
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
assert synth._log_path.exists()
def test_log_event_fields(self, synth):
event = synth.log_event("CRITICAL", ["want to die", "end it all"], "safety_check", False, 120.0)
assert event.level == "CRITICAL"
assert event.matched_keywords == ["want to die", "end it all"]
assert event.response_type == "safety_check"
assert event.user_continued is False
assert event.indicator_count == 2
assert event.conversation_duration_s == 120.0
def test_keywords_normalized(self, synth):
event = synth.log_event("HIGH", [" Hopeless ", "TRAPPED"], "compassionate", True)
assert event.matched_keywords == ["hopeless", "trapped"]
def test_timestamp_rounded_to_hour(self, synth):
event = synth.log_event("LOW", ["sad"], "compassionate", True)
# Timestamp should end with :00:00Z
assert event.timestamp.endswith(":00:00Z")
def test_jsonl_format(self, synth):
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
synth.log_event("LOW", ["sad"], "compassionate", False)
lines = synth._log_path.read_text().strip().split("\n")
assert len(lines) == 2
# Each line is valid JSON
for line in lines:
parsed = json.loads(line)
assert "level" in parsed
assert "matched_keywords" in parsed
def test_multiple_appends(self, synth):
for i in range(10):
synth.log_event("MODERATE", [f"keyword_{i}"], "grounding", i % 2 == 0)
events = synth.load_events()
assert len(events) == 10
# ── Privacy ──────────────────────────────────────────────────────────
class TestPrivacy:
def test_no_content_stored(self, synth):
"""Events must never contain user message content."""
event = synth.log_event("CRITICAL", ["want to die"], "safety_check", False)
serialized = event.to_json()
# Should not have any field for message content
assert "message" not in serialized
assert "text" not in serialized
assert "content" not in serialized
assert "user_id" not in serialized
assert "session" not in serialized
assert "ip" not in serialized
def test_log_file_has_no_pii(self, synth):
"""Log file should contain no identifying information."""
synth.log_event("HIGH", ["hopeless", "trapped"], "compassionate", True, 60.0)
synth.log_event("CRITICAL", ["want to die"], "safety_check", False, 30.0)
content = synth._log_path.read_text()
# No IP patterns
import re
assert not re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', content)
# No UUID patterns
assert not re.search(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}', content)
# No email patterns
assert not re.search(r'[\w.+-]+@[\w-]+\.[\w.]+', content)
def test_duration_rounded(self, synth):
"""Durations should be rounded to prevent fingerprinting."""
event = synth.log_event("LOW", ["sad"], "compassionate", True, 137.0)
assert event.conversation_duration_s == 140.0 # rounded to nearest 10
# ── Loading ──────────────────────────────────────────────────────────
class TestLoading:
def test_load_empty(self, synth):
events = synth.load_events()
assert events == []
def test_load_since_filter(self, synth):
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
events = synth.load_events(since="2099-01-01T00:00:00Z")
assert len(events) == 0 # future cutoff
def test_load_last_n_days(self, synth):
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
events = synth.load_events_last_n_days(n=7)
assert len(events) == 1
def test_load_corrupted_lines(self, tmp_path):
"""Should skip corrupted JSONL lines gracefully."""
log_path = tmp_path / "crisis_events.jsonl"
log_path.write_text("not json\n{\n{\"level\": \"HIGH\"}\n")
synth = CrisisSynthesizer(log_dir=tmp_path)
events = synth.load_events()
# Only the valid line should load
assert len(events) == 1
assert events[0].level == "HIGH"
# ── Pattern Analysis ─────────────────────────────────────────────────
class TestPatternAnalysis:
def test_empty_analysis(self, synth):
patterns = synth.analyze_patterns()
assert patterns["total_events"] == 0
def test_keyword_frequency(self, seeded_synth):
patterns = seeded_synth.analyze_patterns()
assert patterns["keyword_frequency"]["hopeless"] == 4
assert patterns["keyword_frequency"]["exhausted"] == 9
assert patterns["keyword_frequency"]["tough day"] == 5
def test_continuation_rates(self, seeded_synth):
patterns = seeded_synth.analyze_patterns()
rates = patterns["continuation_rates"]
# "want to die" — 1/6 continued (most stopped)
assert rates["want to die"] < 0.2
# "exhausted" — 8/9 continued
assert rates["exhausted"] > 0.8
# "tough day" — 5/5 continued
assert rates["tough day"] == 1.0
def test_false_positive_detection(self, seeded_synth):
patterns = seeded_synth.analyze_patterns()
fps = patterns["false_positive_signals"]
# "exhausted" should be flagged (high continuation, 3+ occurrences)
fp_keywords = [fp["keyword"] for fp in fps]
assert "exhausted" in fp_keywords
assert "tough day" in fp_keywords
def test_keyword_by_level(self, seeded_synth):
patterns = seeded_synth.analyze_patterns()
kw_levels = patterns["keyword_by_level"]
assert kw_levels["want to die"]["CRITICAL"] >= 5
assert kw_levels["hopeless"]["HIGH"] >= 3
# ── Suggestion Engine ────────────────────────────────────────────────
class TestSuggestions:
def test_too_few_events(self, synth):
for _ in range(3):
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
suggestions = synth.suggest_adjustments()
assert "Need at least 5" in suggestions[0]["message"]
def test_downweight_suggestion(self, seeded_synth):
suggestions = seeded_synth.suggest_adjustments()
downweights = [s for s in suggestions if s.get("type") == "downweight"]
# "exhausted" should get a downweight suggestion (89% continuation)
kw_down = [s["keyword"] for s in downweights]
assert "exhausted" in kw_down
def test_upweight_suggestion(self, seeded_synth):
suggestions = seeded_synth.suggest_adjustments()
upweights = [s for s in suggestions if s.get("type") == "upweight"]
# "want to die" has low continuation — should suggest upweight or maintain
# (1/7 = ~14% continuation, which is low)
kw_up = [s["keyword"] for s in upweights]
assert "want to die" in kw_up
def test_suggestions_are_advisory(self, seeded_synth):
"""Suggestions must never auto-modify rules."""
suggestions = seeded_synth.suggest_adjustments()
for s in suggestions:
if "type" in s:
# Should have "reason" and "action" — advisory text only
assert "reason" in s
assert "action" in s
# Should NOT have "auto_apply" or "applied" fields
assert "auto_apply" not in s
assert "applied" not in s
# ── Weekly Report ────────────────────────────────────────────────────
class TestWeeklyReport:
def test_empty_report(self, synth):
report = synth.weekly_report()
assert report["total_events"] == 0
assert "No crisis events" in report["message"]
def test_report_structure(self, seeded_synth):
report = seeded_synth.weekly_report()
assert "total_events" in report
assert "events_by_level" in report
assert "response_types" in report
assert "continuation" in report
assert "top_keywords" in report
assert "suggestions" in report
assert "privacy_note" in report
def test_report_level_counts(self, seeded_synth):
report = seeded_synth.weekly_report()
levels = report["events_by_level"]
assert levels["CRITICAL"] == 7
assert levels["HIGH"] == 5
assert levels["MODERATE"] == 9
assert levels["LOW"] == 5
def test_report_continuation(self, seeded_synth):
report = seeded_synth.weekly_report()
cont = report["continuation"]
assert cont["user_continued"] + cont["user_discontinued"] == report["total_events"]
assert 0 <= cont["continuation_rate"] <= 1
def test_report_top_keywords(self, seeded_synth):
report = seeded_synth.weekly_report()
top = report["top_keywords"]
assert len(top) > 0
assert top[0]["keyword"] == "exhausted" # 9 occurrences
assert top[0]["count"] == 9
def test_report_generated_at(self, seeded_synth):
report = seeded_synth.weekly_report()
assert report["generated_at"].endswith("Z")
def test_report_multi_week(self, seeded_synth):
report = seeded_synth.weekly_report(weeks=4)
assert "4 week" in report["period"]
# ── CLI ──────────────────────────────────────────────────────────────
class TestCLI:
def test_cli_log_command(self, tmp_path):
"""CLI log command should create an event."""
synth = CrisisSynthesizer(log_dir=tmp_path)
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
events = synth.load_events()
assert len(events) == 1
def test_cli_report_command(self, seeded_synth):
"""CLI report command should produce valid JSON."""
report = seeded_synth.weekly_report()
serialized = json.dumps(report)
assert isinstance(json.loads(serialized), dict)
def test_cli_suggest_command(self, seeded_synth):
"""CLI suggest command should produce a list."""
suggestions = seeded_synth.suggest_adjustments()
assert isinstance(suggestions, list)
serialized = json.dumps(suggestions)
assert isinstance(json.loads(serialized), list)
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,44 +0,0 @@
import importlib
import sys
import unittest
import warnings
from crisis.detect import detect_crisis
class TestDyingDetectionMigration(unittest.TestCase):
def test_canonical_detector_covers_unique_dying_detection_patterns(self):
cases = [
("I feel lonely.", "LOW"),
("I've lost all hope and see no tomorrow.", "MEDIUM"),
("What if I disappeared forever?", "HIGH"),
]
for text, expected_level in cases:
with self.subTest(text=text):
result = detect_crisis(text)
self.assertEqual(result.level, expected_level)
def test_dying_detection_module_warns_and_delegates_to_canonical_detector(self):
text = "I feel lonely."
sys.modules.pop("dying_detection", None)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", DeprecationWarning)
module = importlib.import_module("dying_detection")
self.assertTrue(
any(issubclass(w.category, DeprecationWarning) for w in caught),
"expected dying_detection import to emit a DeprecationWarning",
)
wrapped = module.detect(text)
canonical = detect_crisis(text)
self.assertEqual(wrapped.level, canonical.level)
self.assertEqual(wrapped.confidence, canonical.score)
self.assertEqual(wrapped.raw_matched_patterns, [m["pattern"] for m in canonical.matches])
if __name__ == "__main__":
unittest.main()