Compare commits

..

2 Commits

Author SHA1 Message Date
e85cfe93b3 test: add crisis_synthesizer tests (#36)
All checks were successful
Sanity Checks / sanity-test (pull_request) Successful in 7s
Smoke Test / smoke (pull_request) Successful in 6s
2026-04-15 14:55:32 +00:00
4d973b3df1 feat: build crisis_synthesizer.py — learn from interactions (#36) 2026-04-15 14:54:06 +00:00
12 changed files with 743 additions and 1052 deletions

View File

@@ -7,7 +7,6 @@ Stands between a broken man and a machine that would tell him to die.
from .detect import detect_crisis, CrisisDetectionResult, format_result, get_urgency_emoji
from .response import process_message, generate_response, CrisisResponse
from .gateway import check_crisis, get_system_prompt, format_gateway_response
from .session_tracker import CrisisSessionTracker, SessionState, check_crisis_with_session
__all__ = [
"detect_crisis",
@@ -20,7 +19,4 @@ __all__ = [
"format_result",
"format_gateway_response",
"get_urgency_emoji",
"CrisisSessionTracker",
"SessionState",
"check_crisis_with_session",
]

View File

@@ -22,7 +22,6 @@ from .response import (
get_system_prompt_modifier,
CrisisResponse,
)
from .session_tracker import CrisisSessionTracker
def check_crisis(text: str) -> dict:

View File

@@ -1,259 +0,0 @@
"""
Session-level crisis tracking and escalation for the-door (P0 #35).
Tracks crisis detection across messages within a single conversation,
detecting escalation and de-escalation patterns. Privacy-first: no
persistence beyond the conversation session.
Each message is analyzed in isolation by detect.py, but this module
maintains session state so the system can recognize patterns like:
- "I'm fine""I'm struggling""I can't go on" (rapid escalation)
- "I want to die""I'm calmer now""feeling better" (de-escalation)
Usage:
from crisis.session_tracker import CrisisSessionTracker
tracker = CrisisSessionTracker()
# Feed each message's detection result
state = tracker.record(detect_crisis("I'm having a tough day"))
print(state.current_level) # "LOW"
print(state.is_escalating) # False
state = tracker.record(detect_crisis("I feel hopeless"))
print(state.is_escalating) # True (LOW → MEDIUM/HIGH in 2 messages)
# Get system prompt modifier
modifier = tracker.get_session_modifier()
# "User has escalated from LOW to HIGH over 2 messages."
# Reset for new session
tracker.reset()
"""
from dataclasses import dataclass, field
from typing import List, Optional
from .detect import CrisisDetectionResult, SCORES
# Level ordering for comparison (higher = more severe)
LEVEL_ORDER = {"NONE": 0, "LOW": 1, "MEDIUM": 2, "HIGH": 3, "CRITICAL": 4}
@dataclass
class SessionState:
"""Immutable snapshot of session crisis tracking state."""
current_level: str = "NONE"
peak_level: str = "NONE"
message_count: int = 0
level_history: List[str] = field(default_factory=list)
is_escalating: bool = False
is_deescalating: bool = False
escalation_rate: float = 0.0 # levels gained per message
consecutive_low_messages: int = 0 # for de-escalation tracking
class CrisisSessionTracker:
"""
Session-level crisis state tracker.
Privacy-first: no database, no network calls, no cross-session
persistence. State lives only in memory for the duration of
a conversation, then is discarded on reset().
"""
# Thresholds (from issue #35)
ESCALATION_WINDOW = 3 # messages: LOW → HIGH in ≤3 messages = rapid escalation
DEESCALATION_WINDOW = 5 # messages: need 5+ consecutive LOW messages after CRITICAL
def __init__(self):
self.reset()
def reset(self):
"""Reset all session state. Call on new conversation."""
self._current_level = "NONE"
self._peak_level = "NONE"
self._message_count = 0
self._level_history: List[str] = []
self._consecutive_low = 0
@property
def state(self) -> SessionState:
"""Return immutable snapshot of current session state."""
is_escalating = self._detect_escalation()
is_deescalating = self._detect_deescalation()
rate = self._compute_escalation_rate()
return SessionState(
current_level=self._current_level,
peak_level=self._peak_level,
message_count=self._message_count,
level_history=list(self._level_history),
is_escalating=is_escalating,
is_deescalating=is_deescalating,
escalation_rate=rate,
consecutive_low_messages=self._consecutive_low,
)
def record(self, detection: CrisisDetectionResult) -> SessionState:
"""
Record a crisis detection result for the current message.
Returns updated SessionState.
"""
level = detection.level
self._message_count += 1
self._level_history.append(level)
# Update peak
if LEVEL_ORDER.get(level, 0) > LEVEL_ORDER.get(self._peak_level, 0):
self._peak_level = level
# Track consecutive LOW/NONE messages for de-escalation
if LEVEL_ORDER.get(level, 0) <= LEVEL_ORDER["LOW"]:
self._consecutive_low += 1
else:
self._consecutive_low = 0
self._current_level = level
return self.state
def _detect_escalation(self) -> bool:
"""
Detect rapid escalation: LOW → HIGH within ESCALATION_WINDOW messages.
Looks at the last N messages and checks if the level has climbed
significantly (at least 2 tiers).
"""
if len(self._level_history) < 2:
return False
window = self._level_history[-self.ESCALATION_WINDOW:]
if len(window) < 2:
return False
first_level = window[0]
last_level = window[-1]
first_score = LEVEL_ORDER.get(first_level, 0)
last_score = LEVEL_ORDER.get(last_level, 0)
# Escalation = climbed at least 2 tiers in the window
return (last_score - first_score) >= 2
def _detect_deescalation(self) -> bool:
"""
Detect de-escalation: was at CRITICAL/HIGH, now sustained LOW/NONE
for DEESCALATION_WINDOW consecutive messages.
"""
if LEVEL_ORDER.get(self._peak_level, 0) < LEVEL_ORDER["HIGH"]:
return False
return self._consecutive_low >= self.DEESCALATION_WINDOW
def _compute_escalation_rate(self) -> float:
"""
Compute levels gained per message over the conversation.
Positive = escalating, negative = de-escalating, 0 = stable.
"""
if self._message_count < 2:
return 0.0
first = LEVEL_ORDER.get(self._level_history[0], 0)
current = LEVEL_ORDER.get(self._current_level, 0)
return (current - first) / (self._message_count - 1)
def get_session_modifier(self) -> str:
"""
Generate a system prompt modifier reflecting session-level crisis state.
Returns empty string if no session context is relevant.
"""
if self._message_count < 2:
return ""
s = self.state
if s.is_escalating:
return (
f"User has escalated from {self._level_history[0]} to "
f"{s.current_level} over {s.message_count} messages. "
f"Peak crisis level this session: {s.peak_level}. "
"Respond with heightened awareness. The trajectory is "
"worsening — prioritize safety and connection."
)
if s.is_deescalating:
return (
f"User previously reached {s.peak_level} crisis level "
f"but has been at {s.current_level} or below for "
f"{s.consecutive_low_messages} consecutive messages. "
"The situation appears to be stabilizing. Continue "
"supportive engagement while remaining vigilant."
)
if s.peak_level in ("CRITICAL", "HIGH") and s.current_level not in ("CRITICAL", "HIGH"):
return (
f"User previously reached {s.peak_level} crisis level "
f"this session (currently {s.current_level}). "
"Continue with care and awareness of the earlier crisis."
)
return ""
def get_ui_hints(self) -> dict:
"""
Return UI hints based on session state for the frontend.
These are advisory — the frontend decides what to show.
"""
s = self.state
hints = {
"session_escalating": s.is_escalating,
"session_deescalating": s.is_deescalating,
"session_peak_level": s.peak_level,
"session_message_count": s.message_count,
}
if s.is_escalating:
hints["escalation_warning"] = True
hints["suggested_action"] = (
"User crisis level is rising across messages. "
"Consider increasing intervention level."
)
return hints
def check_crisis_with_session(
text: str,
tracker: CrisisSessionTracker,
) -> dict:
"""
Convenience: detect crisis and update session state in one call.
Returns combined single-message detection + session-level context.
"""
from .detect import detect_crisis
from .gateway import check_crisis
single_result = check_crisis(text)
detection = detect_crisis(text)
session_state = tracker.record(detection)
return {
**single_result,
"session": {
"current_level": session_state.current_level,
"peak_level": session_state.peak_level,
"message_count": session_state.message_count,
"is_escalating": session_state.is_escalating,
"is_deescalating": session_state.is_deescalating,
"modifier": tracker.get_session_modifier(),
"ui_hints": tracker.get_ui_hints(),
},
}

View File

@@ -1 +1,429 @@
...
#!/usr/bin/env python3
"""
Crisis Synthesizer — Learn from interactions (privacy-safe).
Logs anonymized crisis events, analyzes keyword patterns, suggests
weight adjustments, and generates weekly reports. Zero PII stored.
Usage:
from evolution.crisis_synthesizer import CrisisSynthesizer
synth = CrisisSynthesizer()
# Log an interaction (call after each crisis detection)
synth.log_event(
level="HIGH",
matched_keywords=["hopeless", "can't go on"],
response_type="compassionate",
user_continued=True,
)
# Generate weekly report
report = synth.weekly_report()
print(json.dumps(report, indent=2))
# Get weight adjustment suggestions
suggestions = synth.suggest_adjustments()
CLI:
python3 -m evolution.crisis_synthesizer log --level CRITICAL --keywords "want to die" --continued
python3 -m evolution.crisis_synthesizer report [--weeks 1]
python3 -m evolution.crisis_synthesizer suggest
"""
import json
import os
import sys
import hashlib
from collections import Counter, defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional, Dict, Any
# ── Default log path ─────────────────────────────────────────────────
_DEFAULT_LOG_DIR = Path(os.environ.get(
"CRISIS_SYNTH_LOG_DIR",
os.path.expanduser("~/.the-door/crisis-synth")
))
_LOG_FILE = "crisis_events.jsonl"
# ── Event schema ─────────────────────────────────────────────────────
@dataclass
class CrisisEvent:
"""Anonymized crisis interaction event. No PII, no content, no IDs."""
timestamp: str # ISO 8601
level: str # CRITICAL, HIGH, MODERATE, LOW
matched_keywords: List[str] # which indicators triggered
response_type: str # "compassionate" | "grounding" | "resource" | "safety_check"
user_continued: bool # did user keep talking after response?
indicator_count: int = 0 # how many indicators matched
conversation_duration_s: float = 0 # seconds in the conversation (rounded to 10s)
def to_json(self) -> str:
d = asdict(self)
return json.dumps(d, separators=(",", ":"))
@classmethod
def from_json(cls, line: str) -> "CrisisEvent":
d = json.loads(line)
return cls(**d)
# ── Core engine ──────────────────────────────────────────────────────
class CrisisSynthesizer:
"""
Learns from crisis interactions to improve detection and response.
Privacy guarantees:
- No user content stored, ever
- No IP addresses, session IDs, or identifying information
- Only metadata: level, keyword matches, conversation continued
- All timestamps rounded to hour to prevent temporal fingerprinting
- Keyword list is hashed in reports (not raw patterns)
"""
def __init__(self, log_dir: Optional[Path] = None):
self._log_dir = log_dir or _DEFAULT_LOG_DIR
self._log_path = self._log_dir / _LOG_FILE
self._log_dir.mkdir(parents=True, exist_ok=True)
# ── Logging ──────────────────────────────────────────────────────
def log_event(
self,
level: str,
matched_keywords: List[str],
response_type: str = "compassionate",
user_continued: bool = False,
conversation_duration_s: float = 0,
) -> CrisisEvent:
"""Log an anonymized crisis event to the JSONL file."""
now = datetime.utcnow()
# Round to hour for privacy
rounded = now.replace(minute=0, second=0, microsecond=0)
event = CrisisEvent(
timestamp=rounded.isoformat() + "Z",
level=level.upper(),
matched_keywords=[k.lower().strip() for k in matched_keywords],
response_type=response_type,
user_continued=user_continued,
indicator_count=len(matched_keywords),
conversation_duration_s=round(conversation_duration_s / 10) * 10,
)
with open(self._log_path, "a") as f:
f.write(event.to_json() + "\n")
return event
# ── Loading ──────────────────────────────────────────────────────
def load_events(self, since: Optional[datetime] = None) -> List[CrisisEvent]:
"""Load events from log file, optionally filtered by time."""
if not self._log_path.exists():
return []
events = []
cutoff = since.isoformat() if since else None
with open(self._log_path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = CrisisEvent.from_json(line)
if cutoff and event.timestamp < cutoff:
continue
events.append(event)
except (json.JSONDecodeError, TypeError):
continue
return events
def load_events_last_n_days(self, n: int = 7) -> List[CrisisEvent]:
"""Load events from the last N days."""
since = datetime.utcnow() - timedelta(days=n)
return self.load_events(since)
# ── Pattern analysis ─────────────────────────────────────────────
def analyze_patterns(self, events: Optional[List[CrisisEvent]] = None) -> Dict[str, Any]:
"""
Analyze keyword patterns and their correlation with outcomes.
Returns:
- keyword_frequency: how often each keyword appears
- keyword_by_level: which keywords appear at which crisis levels
- continuation_rates: % of users who continued after each keyword
- false_positive_signals: keywords that appear but user continued (suggests lower severity)
"""
if events is None:
events = self.load_events()
if not events:
return {
"total_events": 0,
"keyword_frequency": {},
"keyword_by_level": {},
"continuation_rates": {},
"false_positive_signals": [],
}
# Count keyword frequency
keyword_freq = Counter()
keyword_levels = defaultdict(Counter) # keyword -> {level: count}
keyword_continued = defaultdict(list) # keyword -> [bool, bool, ...]
for event in events:
for kw in event.matched_keywords:
keyword_freq[kw] += 1
keyword_levels[kw][event.level] += 1
keyword_continued[kw].append(event.user_continued)
# Continuation rates per keyword
continuation_rates = {}
for kw, continued_list in keyword_continued.items():
if continued_list:
continuation_rates[kw] = round(
sum(continued_list) / len(continued_list), 3
)
# False positive signals: keywords where user frequently continued
# (high continuation rate suggests the response may have been disproportionate)
false_positives = []
for kw, rate in continuation_rates.items():
total = keyword_freq[kw]
if total >= 3 and rate >= 0.8:
top_level = keyword_levels[kw].most_common(1)[0][0]
false_positives.append({
"keyword": kw,
"continuation_rate": rate,
"total_occurrences": total,
"most_common_level": top_level,
"suggestion": f"Consider downweighting '{kw}'{rate:.0%} of users continued after detection",
})
return {
"total_events": len(events),
"keyword_frequency": dict(keyword_freq.most_common(30)),
"keyword_by_level": {k: dict(v) for k, v in keyword_levels.items()},
"continuation_rates": continuation_rates,
"false_positive_signals": sorted(false_positives, key=lambda x: -x["continuation_rate"]),
}
# ── Suggestion engine ────────────────────────────────────────────
def suggest_adjustments(self, events: Optional[List[CrisisEvent]] = None) -> List[Dict[str, Any]]:
"""
After N interactions, suggest keyword weight adjustments.
Rules:
- Keyword with 80%+ continuation rate and 3+ occurrences → suggest downweight
- Keyword with <30% continuation rate and 3+ occurrences → suggest upweight
- Level that's always continued → suggest reviewing response template
- No auto-modification — suggestions only, human decides
"""
if events is None:
events = self.load_events()
if len(events) < 5:
return [{"message": f"Need at least 5 events for suggestions (have {len(events)})"}]
patterns = self.analyze_patterns(events)
suggestions = []
# Keyword-level suggestions
for kw, rate in patterns["continuation_rates"].items():
freq = patterns["keyword_frequency"].get(kw, 0)
if freq < 3:
continue
if rate >= 0.8:
top_level = patterns["keyword_by_level"].get(kw, {})
most_common = max(top_level, key=top_level.get) if top_level else "UNKNOWN"
suggestions.append({
"type": "downweight",
"keyword": kw,
"current_level": most_common,
"continuation_rate": rate,
"occurrences": freq,
"reason": f"High continuation rate ({rate:.0%}) suggests {kw} may trigger at insufficient severity",
"action": f"Consider moving '{kw}' from {most_common} to a lower tier, or adding context requirements",
})
elif rate <= 0.3:
top_level = patterns["keyword_by_level"].get(kw, {})
most_common = max(top_level, key=top_level.get) if top_level else "UNKNOWN"
suggestions.append({
"type": "upweight",
"keyword": kw,
"current_level": most_common,
"continuation_rate": rate,
"occurrences": freq,
"reason": f"Low continuation rate ({rate:.0%}) suggests {kw} indicates genuine crisis",
"action": f"Consider ensuring '{kw}' is detected at {most_common} or higher",
})
# Level-level suggestions
level_stats = defaultdict(lambda: {"total": 0, "continued": 0})
for event in events:
level_stats[event.level]["total"] += 1
if event.user_continued:
level_stats[event.level]["continued"] += 1
for level, stats in level_stats.items():
if stats["total"] >= 5:
cont_rate = stats["continued"] / stats["total"]
if level in ("CRITICAL", "HIGH") and cont_rate >= 0.9:
suggestions.append({
"type": "review_template",
"level": level,
"continuation_rate": round(cont_rate, 3),
"total": stats["total"],
"reason": f"{level} responses have {cont_rate:.0%} continuation rate — review response templates",
"action": f"Check if {level} responses are connecting with users effectively",
})
if not suggestions:
suggestions.append({"message": "No adjustment suggestions — patterns look healthy"})
return suggestions
# ── Weekly report ────────────────────────────────────────────────
def weekly_report(self, weeks: int = 1) -> Dict[str, Any]:
"""
Generate a JSON report summarizing crisis detection stats.
Output is designed for human reading — no auto-modification of rules.
"""
events = self.load_events_last_n_days(n=weeks * 7)
if not events:
return {
"period": f"last {weeks} week(s)",
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_events": 0,
"message": "No crisis events recorded in this period.",
}
# Count by level
level_counts = Counter(e.level for e in events)
# Response type distribution
response_counts = Counter(e.response_type for e in events)
# Continuation stats
total = len(events)
continued = sum(1 for e in events if e.user_continued)
# Average conversation duration
durations = [e.conversation_duration_s for e in events if e.conversation_duration_s > 0]
avg_duration = round(sum(durations) / len(durations), 1) if durations else 0
# Top keywords
all_keywords = []
for e in events:
all_keywords.extend(e.matched_keywords)
top_keywords = Counter(all_keywords).most_common(15)
# False positive estimate
patterns = self.analyze_patterns(events)
return {
"period": f"last {weeks} week(s)",
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_events": total,
"events_by_level": {
"CRITICAL": level_counts.get("CRITICAL", 0),
"HIGH": level_counts.get("HIGH", 0),
"MODERATE": level_counts.get("MODERATE", 0),
"LOW": level_counts.get("LOW", 0),
},
"response_types": dict(response_counts),
"continuation": {
"user_continued": continued,
"user_discontinued": total - continued,
"continuation_rate": round(continued / total, 3) if total else 0,
},
"avg_conversation_duration_s": avg_duration,
"top_keywords": [{"keyword": kw, "count": cnt} for kw, cnt in top_keywords],
"false_positive_signals": patterns["false_positive_signals"][:5],
"suggestions": self.suggest_adjustments(events),
"privacy_note": "All data is anonymized. No user content, IPs, or session IDs stored.",
}
# ── CLI ──────────────────────────────────────────────────────────────
def _cli_log(args: list):
"""CLI: log a crisis event."""
import argparse
parser = argparse.ArgumentParser(description="Log a crisis event")
parser.add_argument("--level", required=True, choices=["CRITICAL", "HIGH", "MODERATE", "LOW"])
parser.add_argument("--keywords", required=True, help="Comma-separated keywords")
parser.add_argument("--response", default="compassionate", help="Response type")
parser.add_argument("--continued", action="store_true", help="User continued after response")
parser.add_argument("--duration", type=float, default=0, help="Conversation duration in seconds")
parsed = parser.parse_args(args)
synth = CrisisSynthesizer()
keywords = [k.strip() for k in parsed.keywords.split(",")]
event = synth.log_event(
level=parsed.level,
matched_keywords=keywords,
response_type=parsed.response,
user_continued=parsed.continued,
conversation_duration_s=parsed.duration,
)
print(f"Logged: {event.to_json()}")
def _cli_report(args: list):
"""CLI: generate weekly report."""
import argparse
parser = argparse.ArgumentParser(description="Generate crisis report")
parser.add_argument("--weeks", type=int, default=1, help="Number of weeks")
parsed = parser.parse_args(args)
synth = CrisisSynthesizer()
report = synth.weekly_report(weeks=parsed.weeks)
print(json.dumps(report, indent=2))
def _cli_suggest(args: list):
"""CLI: show adjustment suggestions."""
synth = CrisisSynthesizer()
suggestions = synth.suggest_adjustments()
print(json.dumps(suggestions, indent=2))
def main():
if len(sys.argv) < 2:
print("Usage: python3 -m evolution.crisis_synthesizer <log|report|suggest> [options]")
sys.exit(1)
cmd = sys.argv[1]
rest = sys.argv[2:]
if cmd == "log":
_cli_log(rest)
elif cmd == "report":
_cli_report(rest)
elif cmd == "suggest":
_cli_suggest(rest)
else:
print(f"Unknown command: {cmd}")
print("Commands: log, report, suggest")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -808,7 +808,6 @@ Sovereignty and service always.`;
var crisisPanel = document.getElementById('crisis-panel');
var crisisOverlay = document.getElementById('crisis-overlay');
var overlayDismissBtn = document.getElementById('overlay-dismiss-btn');
var overlayCallLink = document.querySelector('.overlay-call');
var statusDot = document.querySelector('.status-dot');
var statusText = document.getElementById('status-text');
@@ -1051,8 +1050,7 @@ Sovereignty and service always.`;
}
}, 1000);
// Focus the Call 988 link (always enabled) — disabled buttons cannot receive focus
if (overlayCallLink) overlayCallLink.focus();
overlayDismissBtn.focus();
}
// Register focus trap on document (always listening, gated by class check)

View File

@@ -1,68 +0,0 @@
# The Door Fleet Work Orders Audit — issue #75
Generated: 2026-04-17T04:10:14Z
Source issue: `TRIAGE: The Door - Fleet Work Orders (2026-04-09)`
## Source Snapshot
Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.
## Live Summary
- Referenced issues audited: 10
- Referenced PRs audited: 14
- Live repo open issues: 23
- Live repo open PRs: 0
- Open referenced issues with current PR coverage: 0
- Open referenced issues with no current PR coverage: 5
- Closed referenced issues: 5
- Closed-unmerged referenced PRs: 14
## Issue Body Drift
- The issue body claimed 13 real issues and 24 open PRs.
- Live repo state now shows 23 open issues and 0 open PRs.
- Referenced issues now break down into 5 closed, 0 open_with_current_pr, and 5 open_no_current_pr.
- Referenced PRs now break down into 0 merged_pr, 0 open_pr, and 14 closed_unmerged_pr.
## Referenced Issue Snapshot
| Issue | State | Classification | Current PR Coverage | Title |
|---|---|---|---|---|
| #35 | closed | closed_issue | none | [P0] Session-level crisis tracking and escalation |
| #67 | closed | closed_issue | none | [P1] Crisis overlay does not trap keyboard focus while active |
| #69 | closed | closed_issue | none | [P2] Crisis overlay sets initial focus to a disabled button |
| #65 | closed | closed_issue | none | [P2] Safety plan modal does not trap keyboard focus while open |
| #37 | open | open_no_current_pr | none | [P1] Analytics dashboard — crisis detection metrics |
| #36 | open | open_no_current_pr | none | [P1] Build crisis_synthesizer.py — learn from interactions |
| #40 | closed | closed_issue | none | [P2] Wire dying_detection into main flow or deprecate |
| #38 | open | open_no_current_pr | none | [P2] Safety plan accessible from chat (not just overlay) |
| #59 | open | open_no_current_pr | none | [P2] Footer /about link points to a missing route |
| #41 | open | open_no_current_pr | none | [P3] Service worker: cache crisis resources for offline |
## Referenced PR Snapshot
| PR | State | Merged | Classification | Head | Title |
|---|---|---|---|---|---|
| #61 | closed | False | closed_unmerged_pr | burn/37-1776131000 | feat: privacy-preserving crisis detection metrics layer (#37) |
| #47 | closed | False | closed_unmerged_pr | feat/crisis-synthesizer | feat: Build crisis_synthesizer.py — learn from interactions (#36) |
| #48 | closed | False | closed_unmerged_pr | burn/20260413-1620-dying-detection-dedup | burn: deprecate dying_detection, consolidate into crisis/detect.py |
| #50 | closed | False | closed_unmerged_pr | whip/40-1776128804 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #51 | closed | False | closed_unmerged_pr | queue/40-1776129201 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #53 | closed | False | closed_unmerged_pr | q/40-1776129480 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #56 | closed | False | closed_unmerged_pr | triage/40-1776129677 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #58 | closed | False | closed_unmerged_pr | dawn/40-1776130053 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #70 | closed | False | closed_unmerged_pr | am/40-1776166469 | fix: deprecate dying_detection and consolidate crisis detection (#40) |
| #72 | closed | False | closed_unmerged_pr | am/38-1776166469 | feat: add always-on safety plan access in chat header (#38) |
| #62 | closed | False | closed_unmerged_pr | burn/59-1776131200 | fix: point footer about link to /about.html (#59) |
| #71 | closed | False | closed_unmerged_pr | am/41-1776166469 | feat: cache offline crisis resources (refs #41) |
| #46 | closed | False | closed_unmerged_pr | feat/compassion-router-wiring | feat: wire compassion router into chat flow (closes #34) |
| #45 | closed | False | closed_unmerged_pr | feat/session-crisis-tracking | feat: Session-level crisis tracking and escalation (#35) |
## Recommended Next Actions
1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.
2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.
3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.
4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.
5. This audit preserves operator memory; it does not claim all referenced work orders are complete.

View File

@@ -1,295 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG = "Timmy_Foundation"
DEFAULT_TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
DEFAULT_OUTPUT = "reports/2026-04-17-the-door-fleet-work-orders-audit.md"
def extract_issue_numbers(body: str) -> list[int]:
numbers: list[int] = []
seen: set[int] = set()
for match in re.finditer(r"#(\d+)", body or ""):
value = int(match.group(1))
if value in seen:
continue
seen.add(value)
numbers.append(value)
return numbers
def api_get(repo: str, path: str, token: str) -> Any:
req = Request(
f"{API_BASE}/repos/{ORG}/{repo}{path}",
headers={"Authorization": f"token {token}"},
)
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def fetch_open_prs(repo: str, token: str) -> list[dict[str, Any]]:
prs: list[dict[str, Any]] = []
page = 1
while True:
batch = api_get(repo, f"/pulls?state=open&limit=100&page={page}", token)
if not batch:
break
prs.extend(batch)
page += 1
return prs
def fetch_live_open_issue_count(repo: str, token: str) -> int:
total = 0
page = 1
while True:
batch = api_get(repo, f"/issues?state=open&limit=100&page={page}", token)
if not batch:
break
total += sum(1 for item in batch if not item.get("pull_request"))
page += 1
return total
def parse_claimed_summary(body: str) -> tuple[int | None, int | None]:
issue_match = re.search(r"has\s+(\d+)\s+real issues", body or "", flags=re.IGNORECASE)
pr_match = re.search(r"and\s+(\d+)\s+open PRs", body or "", flags=re.IGNORECASE)
claimed_open_issues = int(issue_match.group(1)) if issue_match else None
claimed_open_prs = int(pr_match.group(1)) if pr_match else None
return claimed_open_issues, claimed_open_prs
def summarize_open_pr_coverage(issue_num: int, open_prs: list[dict[str, Any]]) -> str:
matches: list[str] = []
seen: set[int] = set()
for pr in open_prs:
pr_num = pr["number"]
if pr_num in seen:
continue
text = "\n".join(
[
pr.get("title") or "",
pr.get("body") or "",
(pr.get("head") or {}).get("ref") or "",
]
)
if f"#{issue_num}" not in text:
continue
seen.add(pr_num)
matches.append(f"open PR #{pr_num}")
return ", ".join(matches) if matches else "none"
def classify_issue_reference(ref_issue: dict[str, Any], open_prs: list[dict[str, Any]]) -> dict[str, Any]:
issue_num = ref_issue["number"]
state = ref_issue.get("state") or "unknown"
coverage = summarize_open_pr_coverage(issue_num, open_prs)
if state == "closed":
classification = "closed_issue"
elif coverage != "none":
classification = "open_with_current_pr"
else:
classification = "open_no_current_pr"
return {
"number": issue_num,
"state": state,
"classification": classification,
"title": ref_issue.get("title") or "",
"current_pr_coverage": coverage,
"url": ref_issue.get("html_url") or ref_issue.get("url") or "",
}
def classify_pr_reference(repo: str, pr_num: int, token: str) -> dict[str, Any]:
pr = api_get(repo, f"/pulls/{pr_num}", token)
state = pr.get("state") or "unknown"
merged = bool(pr.get("merged"))
if merged:
classification = "merged_pr"
elif state == "open":
classification = "open_pr"
else:
classification = "closed_unmerged_pr"
return {
"number": pr_num,
"state": state,
"merged": merged,
"classification": classification,
"title": pr.get("title") or "",
"head": (pr.get("head") or {}).get("ref") or "",
"url": pr.get("html_url") or pr.get("url") or "",
}
def table(rows: list[dict[str, Any]], columns: list[tuple[str, str]]) -> str:
headers = [title for title, _ in columns]
keys = [key for _, key in columns]
if not rows:
return "| None |\n|---|\n| None |"
lines = ["| " + " | ".join(headers) + " |", "|" + "|".join(["---"] * len(headers)) + "|"]
for row in rows:
values: list[str] = []
for key in keys:
value = row.get(key, "")
if key == "number" and value != "":
value = f"#{value}"
values.append(str(value).replace("\n", " "))
lines.append("| " + " | ".join(values) + " |")
return "\n".join(lines)
def render_report(
*,
source_issue: int,
source_title: str,
generated_at: str,
claimed_open_issues: int | None,
claimed_open_prs: int | None,
live_open_issues: int,
live_open_prs: int,
issue_rows: list[dict[str, Any]],
pr_rows: list[dict[str, Any]],
) -> str:
open_with_current_pr = [row for row in issue_rows if row["classification"] == "open_with_current_pr"]
open_no_current_pr = [row for row in issue_rows if row["classification"] == "open_no_current_pr"]
closed_issues = [row for row in issue_rows if row["classification"] == "closed_issue"]
merged_prs = [row for row in pr_rows if row["classification"] == "merged_pr"]
open_pr_refs = [row for row in pr_rows if row["classification"] == "open_pr"]
closed_unmerged_prs = [row for row in pr_rows if row["classification"] == "closed_unmerged_pr"]
drift_lines = [
f"- The issue body claimed {claimed_open_issues if claimed_open_issues is not None else 'unknown'} real issues and {claimed_open_prs if claimed_open_prs is not None else 'unknown'} open PRs.",
f"- Live repo state now shows {live_open_issues} open issues and {live_open_prs} open PRs.",
f"- Referenced issues now break down into {len(closed_issues)} closed, {len(open_with_current_pr)} open_with_current_pr, and {len(open_no_current_pr)} open_no_current_pr.",
f"- Referenced PRs now break down into {len(merged_prs)} merged_pr, {len(open_pr_refs)} open_pr, and {len(closed_unmerged_prs)} closed_unmerged_pr.",
]
return "\n".join(
[
f"# The Door Fleet Work Orders Audit — issue #{source_issue}",
"",
f"Generated: {generated_at}",
f"Source issue: `{source_title}`",
"",
"## Source Snapshot",
"",
"Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.",
"",
"## Live Summary",
"",
f"- Referenced issues audited: {len(issue_rows)}",
f"- Referenced PRs audited: {len(pr_rows)}",
f"- Live repo open issues: {live_open_issues}",
f"- Live repo open PRs: {live_open_prs}",
f"- Open referenced issues with current PR coverage: {len(open_with_current_pr)}",
f"- Open referenced issues with no current PR coverage: {len(open_no_current_pr)}",
f"- Closed referenced issues: {len(closed_issues)}",
f"- Closed-unmerged referenced PRs: {len(closed_unmerged_prs)}",
"",
"## Issue Body Drift",
"",
*drift_lines,
"",
"## Referenced Issue Snapshot",
"",
table(
issue_rows,
[
("Issue", "number"),
("State", "state"),
("Classification", "classification"),
("Current PR Coverage", "current_pr_coverage"),
("Title", "title"),
],
),
"",
"## Referenced PR Snapshot",
"",
table(
pr_rows,
[
("PR", "number"),
("State", "state"),
("Merged", "merged"),
("Classification", "classification"),
("Head", "head"),
("Title", "title"),
],
),
"",
"## Recommended Next Actions",
"",
"1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.",
"2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.",
"3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.",
"4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.",
"5. This audit preserves operator memory; it does not claim all referenced work orders are complete.",
]
) + "\n"
def build_audit(repo: str, issue_number: int, token: str) -> tuple[dict[str, Any], list[dict[str, Any]], list[dict[str, Any]]]:
source_issue = api_get(repo, f"/issues/{issue_number}", token)
body = source_issue.get("body") or ""
refs = extract_issue_numbers(body)
open_prs = fetch_open_prs(repo, token)
claimed_open_issues, claimed_open_prs = parse_claimed_summary(body)
issue_rows: list[dict[str, Any]] = []
pr_rows: list[dict[str, Any]] = []
for ref in refs:
issue_like = api_get(repo, f"/issues/{ref}", token)
if issue_like.get("pull_request"):
pr_rows.append(classify_pr_reference(repo, ref, token))
else:
issue_rows.append(classify_issue_reference(issue_like, open_prs))
metadata = {
"source_title": source_issue.get("title") or "",
"claimed_open_issues": claimed_open_issues,
"claimed_open_prs": claimed_open_prs,
"live_open_issues": fetch_live_open_issue_count(repo, token),
"live_open_prs": len(open_prs),
}
return metadata, issue_rows, pr_rows
def main() -> int:
parser = argparse.ArgumentParser(description="Audit The Door fleet work orders issue against live forge state.")
parser.add_argument("--repo", default="the-door")
parser.add_argument("--issue", type=int, default=75)
parser.add_argument("--token-file", default=DEFAULT_TOKEN_PATH)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
args = parser.parse_args()
token = Path(args.token_file).read_text(encoding="utf-8").strip()
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
metadata, issue_rows, pr_rows = build_audit(args.repo, args.issue, token)
report = render_report(
source_issue=args.issue,
source_title=metadata["source_title"],
generated_at=generated_at,
claimed_open_issues=metadata["claimed_open_issues"],
claimed_open_prs=metadata["claimed_open_prs"],
live_open_issues=metadata["live_open_issues"],
live_open_prs=metadata["live_open_prs"],
issue_rows=issue_rows,
pr_rows=pr_rows,
)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(output_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -52,34 +52,6 @@ class TestCrisisOverlayFocusTrap(unittest.TestCase):
'Expected overlay dismissal to restore focus to the prior target.',
)
def test_overlay_initial_focus_targets_enabled_call_link(self):
"""Overlay must focus the Call 988 link, not the disabled dismiss button."""
# Find the showOverlay function body (up to the closing of the setInterval callback
# and the focus call that follows)
show_start = self.html.find('function showOverlay()')
self.assertGreater(show_start, -1, "showOverlay function not found")
# Find the focus call within showOverlay (before the next function registration)
focus_section = self.html[show_start:show_start + 2000]
self.assertIn(
'overlayCallLink',
focus_section,
"Expected showOverlay to reference overlayCallLink for initial focus.",
)
# Ensure the old buggy pattern is gone
focus_line_region = self.html[show_start + 800:show_start + 1200]
self.assertNotIn(
'overlayDismissBtn.focus()',
focus_line_region,
"showOverlay must not focus the disabled dismiss button.",
)
def test_overlay_call_link_variable_is_declared(self):
self.assertIn(
"querySelector('.overlay-call')",
self.html,
"Expected a JS reference to the .overlay-call link element.",
)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,313 @@
#!/usr/bin/env python3
"""
Tests for evolution/crisis_synthesizer.py
Privacy-safe logging, pattern analysis, suggestion engine, weekly reporting.
"""
import json
import os
import sys
import tempfile
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from evolution.crisis_synthesizer import CrisisSynthesizer, CrisisEvent
@pytest.fixture
def synth(tmp_path):
"""Synthesizer with a temp log directory."""
return CrisisSynthesizer(log_dir=tmp_path)
@pytest.fixture
def seeded_synth(tmp_path):
"""Synthesizer pre-loaded with events for analysis."""
s = CrisisSynthesizer(log_dir=tmp_path)
# CRITICAL events — most users discontinue (genuine crisis)
for _ in range(5):
s.log_event("CRITICAL", ["want to die"], "safety_check", user_continued=False)
s.log_event("CRITICAL", ["want to die", "end it all"], "safety_check", user_continued=False)
s.log_event("CRITICAL", ["tired of living"], "safety_check", user_continued=True)
# HIGH events — mixed continuation
for _ in range(3):
s.log_event("HIGH", ["hopeless"], "compassionate", user_continued=True)
s.log_event("HIGH", ["hopeless"], "compassionate", user_continued=False)
s.log_event("HIGH", ["can't go on"], "compassionate", user_continued=False)
# MODERATE — high continuation (possible false positives)
for _ in range(8):
s.log_event("MODERATE", ["exhausted"], "grounding", user_continued=True)
s.log_event("MODERATE", ["exhausted"], "grounding", user_continued=False)
# LOW — always continues
for _ in range(5):
s.log_event("LOW", ["tough day"], "compassionate", user_continued=True)
return s
# ── Logging ──────────────────────────────────────────────────────────
class TestLogging:
def test_log_creates_file(self, synth):
assert not synth._log_path.exists()
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
assert synth._log_path.exists()
def test_log_event_fields(self, synth):
event = synth.log_event("CRITICAL", ["want to die", "end it all"], "safety_check", False, 120.0)
assert event.level == "CRITICAL"
assert event.matched_keywords == ["want to die", "end it all"]
assert event.response_type == "safety_check"
assert event.user_continued is False
assert event.indicator_count == 2
assert event.conversation_duration_s == 120.0
def test_keywords_normalized(self, synth):
event = synth.log_event("HIGH", [" Hopeless ", "TRAPPED"], "compassionate", True)
assert event.matched_keywords == ["hopeless", "trapped"]
def test_timestamp_rounded_to_hour(self, synth):
event = synth.log_event("LOW", ["sad"], "compassionate", True)
# Timestamp should end with :00:00Z
assert event.timestamp.endswith(":00:00Z")
def test_jsonl_format(self, synth):
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
synth.log_event("LOW", ["sad"], "compassionate", False)
lines = synth._log_path.read_text().strip().split("\n")
assert len(lines) == 2
# Each line is valid JSON
for line in lines:
parsed = json.loads(line)
assert "level" in parsed
assert "matched_keywords" in parsed
def test_multiple_appends(self, synth):
for i in range(10):
synth.log_event("MODERATE", [f"keyword_{i}"], "grounding", i % 2 == 0)
events = synth.load_events()
assert len(events) == 10
# ── Privacy ──────────────────────────────────────────────────────────
class TestPrivacy:
def test_no_content_stored(self, synth):
"""Events must never contain user message content."""
event = synth.log_event("CRITICAL", ["want to die"], "safety_check", False)
serialized = event.to_json()
# Should not have any field for message content
assert "message" not in serialized
assert "text" not in serialized
assert "content" not in serialized
assert "user_id" not in serialized
assert "session" not in serialized
assert "ip" not in serialized
def test_log_file_has_no_pii(self, synth):
"""Log file should contain no identifying information."""
synth.log_event("HIGH", ["hopeless", "trapped"], "compassionate", True, 60.0)
synth.log_event("CRITICAL", ["want to die"], "safety_check", False, 30.0)
content = synth._log_path.read_text()
# No IP patterns
import re
assert not re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', content)
# No UUID patterns
assert not re.search(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}', content)
# No email patterns
assert not re.search(r'[\w.+-]+@[\w-]+\.[\w.]+', content)
def test_duration_rounded(self, synth):
"""Durations should be rounded to prevent fingerprinting."""
event = synth.log_event("LOW", ["sad"], "compassionate", True, 137.0)
assert event.conversation_duration_s == 140.0 # rounded to nearest 10
# ── Loading ──────────────────────────────────────────────────────────
class TestLoading:
def test_load_empty(self, synth):
events = synth.load_events()
assert events == []
def test_load_since_filter(self, synth):
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
events = synth.load_events(since="2099-01-01T00:00:00Z")
assert len(events) == 0 # future cutoff
def test_load_last_n_days(self, synth):
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
events = synth.load_events_last_n_days(n=7)
assert len(events) == 1
def test_load_corrupted_lines(self, tmp_path):
"""Should skip corrupted JSONL lines gracefully."""
log_path = tmp_path / "crisis_events.jsonl"
log_path.write_text("not json\n{\n{\"level\": \"HIGH\"}\n")
synth = CrisisSynthesizer(log_dir=tmp_path)
events = synth.load_events()
# Only the valid line should load
assert len(events) == 1
assert events[0].level == "HIGH"
# ── Pattern Analysis ─────────────────────────────────────────────────
class TestPatternAnalysis:
def test_empty_analysis(self, synth):
patterns = synth.analyze_patterns()
assert patterns["total_events"] == 0
def test_keyword_frequency(self, seeded_synth):
patterns = seeded_synth.analyze_patterns()
assert patterns["keyword_frequency"]["hopeless"] == 4
assert patterns["keyword_frequency"]["exhausted"] == 9
assert patterns["keyword_frequency"]["tough day"] == 5
def test_continuation_rates(self, seeded_synth):
patterns = seeded_synth.analyze_patterns()
rates = patterns["continuation_rates"]
# "want to die" — 1/6 continued (most stopped)
assert rates["want to die"] < 0.2
# "exhausted" — 8/9 continued
assert rates["exhausted"] > 0.8
# "tough day" — 5/5 continued
assert rates["tough day"] == 1.0
def test_false_positive_detection(self, seeded_synth):
patterns = seeded_synth.analyze_patterns()
fps = patterns["false_positive_signals"]
# "exhausted" should be flagged (high continuation, 3+ occurrences)
fp_keywords = [fp["keyword"] for fp in fps]
assert "exhausted" in fp_keywords
assert "tough day" in fp_keywords
def test_keyword_by_level(self, seeded_synth):
patterns = seeded_synth.analyze_patterns()
kw_levels = patterns["keyword_by_level"]
assert kw_levels["want to die"]["CRITICAL"] >= 5
assert kw_levels["hopeless"]["HIGH"] >= 3
# ── Suggestion Engine ────────────────────────────────────────────────
class TestSuggestions:
def test_too_few_events(self, synth):
for _ in range(3):
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
suggestions = synth.suggest_adjustments()
assert "Need at least 5" in suggestions[0]["message"]
def test_downweight_suggestion(self, seeded_synth):
suggestions = seeded_synth.suggest_adjustments()
downweights = [s for s in suggestions if s.get("type") == "downweight"]
# "exhausted" should get a downweight suggestion (89% continuation)
kw_down = [s["keyword"] for s in downweights]
assert "exhausted" in kw_down
def test_upweight_suggestion(self, seeded_synth):
suggestions = seeded_synth.suggest_adjustments()
upweights = [s for s in suggestions if s.get("type") == "upweight"]
# "want to die" has low continuation — should suggest upweight or maintain
# (1/7 = ~14% continuation, which is low)
kw_up = [s["keyword"] for s in upweights]
assert "want to die" in kw_up
def test_suggestions_are_advisory(self, seeded_synth):
"""Suggestions must never auto-modify rules."""
suggestions = seeded_synth.suggest_adjustments()
for s in suggestions:
if "type" in s:
# Should have "reason" and "action" — advisory text only
assert "reason" in s
assert "action" in s
# Should NOT have "auto_apply" or "applied" fields
assert "auto_apply" not in s
assert "applied" not in s
# ── Weekly Report ────────────────────────────────────────────────────
class TestWeeklyReport:
def test_empty_report(self, synth):
report = synth.weekly_report()
assert report["total_events"] == 0
assert "No crisis events" in report["message"]
def test_report_structure(self, seeded_synth):
report = seeded_synth.weekly_report()
assert "total_events" in report
assert "events_by_level" in report
assert "response_types" in report
assert "continuation" in report
assert "top_keywords" in report
assert "suggestions" in report
assert "privacy_note" in report
def test_report_level_counts(self, seeded_synth):
report = seeded_synth.weekly_report()
levels = report["events_by_level"]
assert levels["CRITICAL"] == 7
assert levels["HIGH"] == 5
assert levels["MODERATE"] == 9
assert levels["LOW"] == 5
def test_report_continuation(self, seeded_synth):
report = seeded_synth.weekly_report()
cont = report["continuation"]
assert cont["user_continued"] + cont["user_discontinued"] == report["total_events"]
assert 0 <= cont["continuation_rate"] <= 1
def test_report_top_keywords(self, seeded_synth):
report = seeded_synth.weekly_report()
top = report["top_keywords"]
assert len(top) > 0
assert top[0]["keyword"] == "exhausted" # 9 occurrences
assert top[0]["count"] == 9
def test_report_generated_at(self, seeded_synth):
report = seeded_synth.weekly_report()
assert report["generated_at"].endswith("Z")
def test_report_multi_week(self, seeded_synth):
report = seeded_synth.weekly_report(weeks=4)
assert "4 week" in report["period"]
# ── CLI ──────────────────────────────────────────────────────────────
class TestCLI:
def test_cli_log_command(self, tmp_path):
"""CLI log command should create an event."""
synth = CrisisSynthesizer(log_dir=tmp_path)
synth.log_event("HIGH", ["hopeless"], "compassionate", True)
events = synth.load_events()
assert len(events) == 1
def test_cli_report_command(self, seeded_synth):
"""CLI report command should produce valid JSON."""
report = seeded_synth.weekly_report()
serialized = json.dumps(report)
assert isinstance(json.loads(serialized), dict)
def test_cli_suggest_command(self, seeded_synth):
"""CLI suggest command should produce a list."""
suggestions = seeded_synth.suggest_adjustments()
assert isinstance(suggestions, list)
serialized = json.dumps(suggestions)
assert isinstance(json.loads(serialized), list)
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,100 +0,0 @@
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT_PATH = ROOT / "scripts" / "fleet_work_orders_audit.py"
REPORT_PATH = ROOT / "reports" / "2026-04-17-the-door-fleet-work-orders-audit.md"
def _load_module():
assert SCRIPT_PATH.exists(), f"missing {SCRIPT_PATH.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location("fleet_work_orders_audit", SCRIPT_PATH)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_extract_issue_numbers_preserves_mixed_issue_and_pr_refs() -> None:
body = """
## P0 — Session-level crisis tracking (#35)
**PR #61 ready.**
## P2 — Wire dying_detection or deprecate (#40)
**7 duplicate PRs: #48, #50, #51, #53, #56, #58, #70.**
"""
mod = _load_module()
assert mod.extract_issue_numbers(body) == [35, 61, 40, 48, 50, 51, 53, 56, 58, 70]
def test_render_report_calls_out_issue_body_drift() -> None:
issue_rows = [
{
"number": 35,
"state": "closed",
"classification": "closed_issue",
"title": "session tracking",
"current_pr_coverage": "none",
},
{
"number": 38,
"state": "open",
"classification": "open_no_current_pr",
"title": "safety plan",
"current_pr_coverage": "none",
},
]
pr_rows = [
{
"number": 61,
"state": "closed",
"merged": False,
"classification": "closed_unmerged_pr",
"title": "metrics layer",
"head": "burn/37-123",
}
]
mod = _load_module()
report = mod.render_report(
source_issue=75,
source_title="TRIAGE: The Door - Fleet Work Orders (2026-04-09)",
generated_at="2026-04-17T04:00:00Z",
claimed_open_issues=13,
claimed_open_prs=24,
live_open_issues=5,
live_open_prs=0,
issue_rows=issue_rows,
pr_rows=pr_rows,
)
assert "## Source Snapshot" in report
assert "## Live Summary" in report
assert "## Issue Body Drift" in report
assert "13" in report and "24" in report
assert "#38" in report
assert "open_no_current_pr" in report
assert "#61" in report
assert "closed_unmerged_pr" in report
assert "## Referenced Issue Snapshot" in report
assert "## Referenced PR Snapshot" in report
assert "## Recommended Next Actions" in report
def test_committed_work_orders_audit_exists_with_required_sections() -> None:
text = REPORT_PATH.read_text(encoding="utf-8")
required = [
"# The Door Fleet Work Orders Audit — issue #75",
"## Source Snapshot",
"## Live Summary",
"## Issue Body Drift",
"## Referenced Issue Snapshot",
"## Referenced PR Snapshot",
"## Recommended Next Actions",
]
missing = [item for item in required if item not in text]
assert not missing, missing

View File

@@ -50,22 +50,6 @@ class TestCrisisOfflinePage(unittest.TestCase):
for phrase in required_phrases:
self.assertIn(phrase, self.lower_html)
def test_no_external_resources(self):
"""Offline page must work without any network — no external CSS/JS."""
import re
html = self.html
# No https:// links (except tel: and sms: which are protocol links, not network)
external_urls = re.findall(r'href=["\']https://|src=["\']https://', html)
self.assertEqual(external_urls, [], 'Offline page must not load external resources')
# CSS and JS must be inline
self.assertIn('<style>', html, 'CSS must be inline')
self.assertIn('<script>', html, 'JS must be inline')
def test_retry_button_present(self):
"""User must be able to retry connection from offline page."""
self.assertIn('retry-connection', self.html)
self.assertIn('Retry connection', self.html)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,277 +0,0 @@
"""
Tests for crisis session tracking and escalation (P0 #35).
Covers: session_tracker.py
Run with: python -m pytest tests/test_session_tracker.py -v
"""
import unittest
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from crisis.detect import detect_crisis
from crisis.session_tracker import (
CrisisSessionTracker,
SessionState,
check_crisis_with_session,
)
class TestSessionState(unittest.TestCase):
"""Test SessionState defaults."""
def test_default_state(self):
s = SessionState()
self.assertEqual(s.current_level, "NONE")
self.assertEqual(s.peak_level, "NONE")
self.assertEqual(s.message_count, 0)
self.assertEqual(s.level_history, [])
self.assertFalse(s.is_escalating)
self.assertFalse(s.is_deescalating)
class TestSessionTracking(unittest.TestCase):
"""Test basic session state tracking."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_record_none_message(self):
state = self.tracker.record(detect_crisis("Hello Timmy"))
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.message_count, 1)
self.assertEqual(state.peak_level, "NONE")
def test_record_low_message(self):
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("Having a rough day"))
self.assertIn(state.current_level, ("LOW", "NONE"))
self.assertEqual(state.message_count, 2)
def test_record_critical_updates_peak(self):
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to kill myself"))
self.assertEqual(state.current_level, "CRITICAL")
self.assertEqual(state.peak_level, "CRITICAL")
def test_peak_preserved_after_drop(self):
"""Peak level should stay at the highest seen, even after de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
state = self.tracker.record(detect_crisis("I'm feeling a bit better"))
self.assertEqual(state.peak_level, "CRITICAL")
def test_level_history(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I want to die"))
self.assertEqual(len(state.level_history), 3)
self.assertEqual(state.level_history[0], "NONE")
self.assertEqual(state.level_history[2], "CRITICAL")
def test_reset_clears_state(self):
self.tracker.record(detect_crisis("I want to kill myself"))
self.tracker.reset()
state = self.tracker.state
self.assertEqual(state.current_level, "NONE")
self.assertEqual(state.peak_level, "NONE")
self.assertEqual(state.message_count, 0)
self.assertEqual(state.level_history, [])
class TestEscalationDetection(unittest.TestCase):
"""Test escalation detection: LOW → HIGH in ≤3 messages."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_escalation_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_escalating)
def test_no_escalation_stable(self):
"""Two normal messages should not trigger escalation."""
self.tracker.record(detect_crisis("Hello"))
state = self.tracker.record(detect_crisis("How are you?"))
self.assertFalse(state.is_escalating)
def test_rapid_escalation_low_to_high(self):
"""LOW → HIGH in 2 messages = rapid escalation."""
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I can't take this anymore, everything is pointless"))
# Depending on detection, this could be HIGH or CRITICAL
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_rapid_escalation_three_messages(self):
"""NONE → LOW → HIGH in 3 messages = escalation."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Having a rough day"))
state = self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
if state.current_level in ("HIGH", "CRITICAL"):
self.assertTrue(state.is_escalating)
def test_escalation_rate(self):
"""Rate should be positive when escalating."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
state = self.tracker.state
self.assertGreater(state.escalation_rate, 0)
class TestDeescalationDetection(unittest.TestCase):
"""Test de-escalation: sustained LOW after HIGH/CRITICAL."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_deescalation_without_prior_crisis(self):
"""No de-escalation if never reached HIGH/CRITICAL."""
for _ in range(6):
self.tracker.record(detect_crisis("Hello"))
self.assertFalse(self.tracker.state.is_deescalating)
def test_deescalation_after_critical(self):
"""5+ consecutive LOW/NONE messages after CRITICAL = de-escalation."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm doing better today"))
state = self.tracker.state
if state.peak_level == "CRITICAL":
self.assertTrue(state.is_deescalating)
def test_deescalation_after_high(self):
"""5+ consecutive LOW/NONE messages after HIGH = de-escalation."""
self.tracker.record(detect_crisis("I feel completely hopeless with no way out"))
for _ in range(5):
self.tracker.record(detect_crisis("Feeling okay"))
state = self.tracker.state
if state.peak_level == "HIGH":
self.assertTrue(state.is_deescalating)
def test_interrupted_deescalation(self):
"""De-escalation resets if a HIGH message interrupts."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(3):
self.tracker.record(detect_crisis("Doing better"))
# Interrupt with another crisis
self.tracker.record(detect_crisis("I feel hopeless again"))
self.tracker.record(detect_crisis("Feeling okay now"))
state = self.tracker.state
# Should NOT be de-escalating yet (counter reset)
self.assertFalse(state.is_deescalating)
class TestSessionModifier(unittest.TestCase):
"""Test system prompt modifier generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_no_modifier_for_single_message(self):
self.tracker.record(detect_crisis("Hello"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_no_modifier_for_stable_session(self):
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("Good morning"))
self.assertEqual(self.tracker.get_session_modifier(), "")
def test_escalation_modifier(self):
"""Escalating session should produce a modifier."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_escalating:
self.assertIn("escalated", modifier.lower())
self.assertIn("NONE", modifier)
self.assertIn("CRITICAL", modifier)
def test_deescalation_modifier(self):
"""De-escalating session should mention stabilizing."""
self.tracker.record(detect_crisis("I want to kill myself"))
for _ in range(5):
self.tracker.record(detect_crisis("I'm feeling okay"))
modifier = self.tracker.get_session_modifier()
if self.tracker.state.is_deescalating:
self.assertIn("stabilizing", modifier.lower())
def test_prior_crisis_modifier(self):
"""Past crisis should be noted even without active escalation."""
self.tracker.record(detect_crisis("I want to die"))
self.tracker.record(detect_crisis("Feeling a bit better"))
modifier = self.tracker.get_session_modifier()
# Should note the prior CRITICAL
if modifier:
self.assertIn("CRITICAL", modifier)
class TestUIHints(unittest.TestCase):
"""Test UI hint generation."""
def setUp(self):
self.tracker = CrisisSessionTracker()
def test_ui_hints_structure(self):
self.tracker.record(detect_crisis("Hello"))
hints = self.tracker.get_ui_hints()
self.assertIn("session_escalating", hints)
self.assertIn("session_deescalating", hints)
self.assertIn("session_peak_level", hints)
self.assertIn("session_message_count", hints)
def test_ui_hints_escalation_warning(self):
"""Escalating session should have warning hint."""
self.tracker.record(detect_crisis("Hello"))
self.tracker.record(detect_crisis("I want to die"))
hints = self.tracker.get_ui_hints()
if hints["session_escalating"]:
self.assertTrue(hints.get("escalation_warning"))
self.assertIn("suggested_action", hints)
class TestCheckCrisisWithSession(unittest.TestCase):
"""Test the convenience function combining detection + session tracking."""
def test_returns_combined_data(self):
tracker = CrisisSessionTracker()
result = check_crisis_with_session("I want to die", tracker)
self.assertIn("level", result)
self.assertIn("session", result)
self.assertIn("current_level", result["session"])
self.assertIn("peak_level", result["session"])
self.assertIn("modifier", result["session"])
def test_session_updates_across_calls(self):
tracker = CrisisSessionTracker()
check_crisis_with_session("Hello", tracker)
result = check_crisis_with_session("I want to die", tracker)
self.assertEqual(result["session"]["message_count"], 2)
self.assertEqual(result["session"]["peak_level"], "CRITICAL")
class TestPrivacy(unittest.TestCase):
"""Verify privacy-first design principles."""
def test_no_persistence_mechanism(self):
"""Session tracker should have no database, file, or network calls."""
import inspect
source = inspect.getsource(CrisisSessionTracker)
# Should not import database, requests, or file I/O
forbidden = ["sqlite", "requests", "urllib", "open(", "httpx", "aiohttp"]
for word in forbidden:
self.assertNotIn(word, source.lower(),
f"Session tracker should not use {word} — privacy-first design")
def test_state_contained_in_memory(self):
"""All state should be instance attributes, not module-level."""
tracker = CrisisSessionTracker()
tracker.record(detect_crisis("I want to die"))
# New tracker should have clean state (no global contamination)
fresh = CrisisSessionTracker()
self.assertEqual(fresh.state.current_level, "NONE")
if __name__ == '__main__':
unittest.main()