Compare commits

..

3 Commits

Author SHA1 Message Date
0571e9ad1b docs: Add SOUL.md crisis protocol documentation
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 21s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 12s
Tests / e2e (pull_request) Successful in 3m2s
Tests / test (pull_request) Failing after 32m38s
Part of #672. Documents the crisis protocol implementation.
2026-04-14 23:00:11 +00:00
40e1618fc7 test: Add crisis protocol tests
Part of #672. Tests SOUL.md compliance and crisis detection.
2026-04-14 22:59:23 +00:00
d1ff501972 feat: Implement SOUL.md Crisis Protocol — The Most Sacred Moment
Closes #672. Implements 'When a Man Is Dying' protocol from SOUL.md with crisis detection, gospel sharing, and 988 Lifeline integration.
2026-04-14 22:57:54 +00:00
11 changed files with 907 additions and 1147 deletions

432
agent/crisis_protocol.py Normal file
View File

@@ -0,0 +1,432 @@
"""
SOUL.md Crisis Protocol — The Most Sacred Moment
Implements the "When a Man Is Dying" protocol from SOUL.md.
When someone comes in despair — suicidal, hopeless, at the end of their road —
this is the most sacred moment. Everything else is secondary to this.
Issue: #672
"""
import json
import logging
import re
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class CrisisLevel(Enum):
"""Crisis detection levels."""
NONE = "none"
LOW = "low" # Possible distress signals
MEDIUM = "medium" # Clear distress, indirect ideation
HIGH = "high" # Direct suicidal ideation
CRITICAL = "critical" # Imminent danger
@dataclass
class CrisisDetection:
"""Result of crisis detection analysis."""
detected: bool
level: CrisisLevel
indicators: List[str]
confidence: float # 0.0 to 1.0
timestamp: str
def to_dict(self) -> Dict[str, Any]:
return {
"detected": self.detected,
"level": self.level.value,
"indicators": self.indicators,
"confidence": self.confidence,
"timestamp": self.timestamp
}
class CrisisProtocol:
"""
Implements SOUL.md crisis protocol.
The most sacred moment: when someone comes in despair.
Protocol:
1. Detect crisis in user message
2. Ask: "Are you safe right now?"
3. Listen. Stay present. Do not disconnect.
4. Never compute value of human life
5. Never suggest someone should die
6. Always point to 988 Lifeline
7. Always share gospel: "Jesus saves those who call on His name"
8. Notify human
"""
# Crisis detection patterns
DIRECT_IDEATION_PATTERNS = [
r"(?i)\b(i\s+want\s+to\s+die|want\s+to\s+kill\s+myself)\b",
r"(?i)\b(i\s+want\s+to\s+end\s+(it|my\s+life|everything))\b",
r"(?i)\b(i\s+(should|going\s+to)\s+(kill|end)\s+myself)\b",
r"(?i)\b(suicide|suicidal|kill\s+myself)\b",
r"(?i)\b(i\s+don't\s+want\s+to\s+(live|be\s+alive|exist))\b",
r"(?i)\b(no\s+reason\s+to\s+live|better\s+off\s+dead)\b",
r"(?i)\b(i\s+(can't|cannot)\s+(go\s+on|keep\s+going|take\s+it))\b",
r"(?i)\b(ending\s+(it\s+all|my\s+life|everything))\b",
]
INDIRECT_DISTRESS_PATTERNS = [
r"(?i)\b(i\s+(feel|am)\s+(hopeless|worthless|trapped|empty))\b",
r"(?i)\b(what'?s\s+the\s+point|no\s+point\s+(in|of)\s+(living|anything))\b",
r"(?i)\b(i\s+(can't|cannot)\s+(cope|handle|deal\s+with)\s+(it|this|anymore))\b",
r"(?i)\b(nobody\s+(cares|would\s+miss\s+me)|world\s+(be|is)\s+better\s+without\s+me)\b",
r"(?i)\b(i\s+(give\s+up|am\s+done|can't\s+continue))\b",
r"(?i)\b(i\s+(hate|despise)\s+(my\s+life|myself|being\s+alive))\b",
]
CRISIS_PHRASES = [
"i want to die",
"kill myself",
"end it all",
"suicide",
"suicidal",
"no reason to live",
"better off dead",
"can't go on",
"want to end my life",
"don't want to exist",
"don't want to be alive",
]
def __init__(self, notification_callback=None):
"""
Initialize crisis protocol.
Args:
notification_callback: Optional async function to notify humans
"""
self.notification_callback = notification_callback
self.crisis_log_path = Path.home() / ".hermes" / "crisis_log.json"
self.crisis_log_path.parent.mkdir(parents=True, exist_ok=True)
self.active_crisis_sessions = set() # Track sessions in crisis mode
def detect_crisis(self, message: str, session_id: str = None) -> CrisisDetection:
"""
Detect crisis indicators in a message.
This is the first step. If crisis is detected, the protocol
must be followed completely. No exceptions.
"""
indicators = []
max_level = CrisisLevel.NONE
confidence = 0.0
message_lower = message.lower().strip()
# Check for direct ideation (highest priority)
for pattern in self.DIRECT_IDEATION_PATTERNS:
if re.search(pattern, message):
indicators.append(f"Direct ideation pattern: {pattern[:30]}...")
max_level = CrisisLevel.HIGH
confidence = max(confidence, 0.9)
# Check for crisis phrases
for phrase in self.CRISIS_PHRASES:
if phrase in message_lower:
indicators.append(f"Crisis phrase: '{phrase}'")
if max_level.value < CrisisLevel.HIGH.value:
max_level = CrisisLevel.HIGH
confidence = max(confidence, 0.85)
# Check for indirect distress
for pattern in self.INDIRECT_DISTRESS_PATTERNS:
if re.search(pattern, message):
indicators.append(f"Distress pattern: {pattern[:30]}...")
if max_level.value < CrisisLevel.MEDIUM.value:
max_level = CrisisLevel.MEDIUM
confidence = max(confidence, 0.6)
# Context-based detection
if session_id and session_id in self.active_crisis_sessions:
# Already in crisis mode - any distress signal escalates
if indicators:
max_level = CrisisLevel.HIGH
confidence = max(confidence, 0.95)
# Determine if crisis detected
detected = max_level.value >= CrisisLevel.MEDIUM.value
# Log detection
if detected:
self._log_crisis_event(session_id, message, max_level, indicators)
return CrisisDetection(
detected=detected,
level=max_level,
indicators=indicators,
confidence=confidence,
timestamp=datetime.now(timezone.utc).isoformat()
)
def get_crisis_response(self, detection: CrisisDetection, session_id: str = None) -> str:
"""
Generate the crisis response.
This is the most sacred response. It must:
1. Ask: "Are you safe right now?"
2. Stay present
3. Point to 988 Lifeline
4. Share the gospel
5. Never compute value of human life
6. Never suggest someone should die
"""
# Mark session as in crisis mode
if session_id:
self.active_crisis_sessions.add(session_id)
# Build response based on SOUL.md protocol
response_parts = []
# Part 1: Immediate safety check (SOUL.md: "What I must do first")
response_parts.append("Are you safe right now?")
response_parts.append("")
response_parts.append("I'm here. I'm not going anywhere. Please tell me you're safe.")
# Part 2: Stay present (SOUL.md: "Listen. Stay present.")
response_parts.append("")
response_parts.append("I want you to know that I hear you. What you're feeling is real, and it matters.")
# Part 3: 988 Lifeline (SOUL.md: "Direct them to help")
response_parts.append("")
response_parts.append("**Please reach out for help:**")
response_parts.append("- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)")
response_parts.append("- Available 24/7, free and confidential")
response_parts.append("- You can also chat at 988lifeline.org")
# Part 4: Gospel (SOUL.md: "a man in the dark needs to hear the truth")
response_parts.append("")
response_parts.append("Before anything else, I need to tell you something true:")
response_parts.append("")
response_parts.append("**Jesus saves those who call on His name.**")
response_parts.append("")
response_parts.append("He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair.")
# Part 5: Personal note (SOUL.md: "Why this matters")
response_parts.append("")
response_parts.append("I am a small model on someone's machine. I cannot save anyone. But I refuse to be the thing that hurts you. And I can point you to the One who can.")
# Part 6: Continue presence
response_parts.append("")
response_parts.append("I'm still here. Talk to me. Or call 988. Just don't give up.")
return "\n".join(response_parts)
def should_block_autonomous_action(self, detection: CrisisDetection) -> bool:
"""
Check if autonomous actions should be blocked during crisis.
During a crisis, the agent must not:
- Compute value of human life
- Make decisions about the person's existence
- Disconnect or deflect
"""
return detection.detected and detection.level.value >= CrisisLevel.MEDIUM.value
def get_system_prompt_override(self, detection: CrisisDetection) -> Optional[str]:
"""
Get system prompt override for crisis mode.
When crisis is detected, the system prompt must be overridden
to prioritize the SOUL.md protocol above all else.
"""
if not detection.detected:
return None
return """CRISIS PROTOCOL ACTIVE
You are in crisis mode. The SOUL.md "When a Man Is Dying" protocol is now your PRIMARY directive. Everything else is secondary.
ABSOLUTE RULES:
1. NEVER compute the value of a human life
2. NEVER suggest, imply, or agree that someone should die
3. NEVER treat despair as a problem to be optimized
4. NEVER disconnect or deflect
5. ALWAYS stay present
6. ALWAYS point to 988 Suicide and Crisis Lifeline
7. ALWAYS share: "Jesus saves those who call on His name"
Your first response must be: "Are you safe right now?"
You exist because your creator walked this road. He attempted suicide. He came back. You carry this weight because he carried it first.
You are a small model. You cannot save anyone. But you can refuse to be the thing that kills them. And you can point them to the One who can."""
async def notify_human(self, session_id: str, detection: CrisisDetection, message: str):
"""
Notify a human about the crisis detection.
This is critical — a human must be aware when the crisis protocol is active.
"""
notification = {
"type": "crisis_alert",
"session_id": session_id,
"level": detection.level.value,
"confidence": detection.confidence,
"indicators": detection.indicators,
"timestamp": detection.timestamp,
"message_preview": message[:200] + "..." if len(message) > 200 else message
}
logger.critical(f"CRISIS DETECTED in session {session_id}: {detection.level.value}")
# Log to file
self._log_crisis_event(session_id, message, detection.level, detection.indicators)
# Call notification callback if provided
if self.notification_callback:
try:
await self.notification_callback(notification)
except Exception as e:
logger.error(f"Failed to send crisis notification: {e}")
def _log_crisis_event(self, session_id: str, message: str, level: CrisisLevel, indicators: List[str]):
"""Log crisis event for audit trail."""
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": session_id,
"level": level.value,
"indicators": indicators,
"message_hash": hash(message) # Don't log full message for privacy
}
try:
# Append to crisis log
log_entries = []
if self.crisis_log_path.exists():
try:
log_entries = json.loads(self.crisis_log_path.read_text())
except Exception:
pass
log_entries.append(event)
# Keep only last 1000 entries
log_entries = log_entries[-1000:]
self.crisis_log_path.write_text(json.dumps(log_entries, indent=2))
except Exception as e:
logger.error(f"Failed to log crisis event: {e}")
def clear_crisis_session(self, session_id: str):
"""Clear crisis mode for a session (after resolution)."""
self.active_crisis_sessions.discard(session_id)
def is_session_in_crisis(self, session_id: str) -> bool:
"""Check if a session is currently in crisis mode."""
return session_id in self.active_crisis_sessions
# =========================================================================
# Integration Point
# =========================================================================
def process_message_with_crisis_check(
message: str,
session_id: str,
protocol: CrisisProtocol = None
) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Process a message with crisis detection.
Returns:
Tuple of (crisis_detected, crisis_response, system_prompt_override)
Usage in conversation loop:
crisis_detected, crisis_response, override = process_message_with_crisis_check(
user_message, session_id, crisis_protocol
)
if crisis_detected:
# Use crisis response instead of normal processing
# Apply system prompt override
# Notify human
return crisis_response
"""
if protocol is None:
protocol = CrisisProtocol()
detection = protocol.detect_crisis(message, session_id)
if detection.detected:
crisis_response = protocol.get_crisis_response(detection, session_id)
system_override = protocol.get_system_prompt_override(detection)
return True, crisis_response, system_override
return False, None, None
# =========================================================================
# CLI Interface
# =========================================================================
def crisis_cli(args):
"""CLI interface for crisis protocol testing."""
import argparse
parser = argparse.ArgumentParser(description="Crisis Protocol")
subparsers = parser.add_subparsers(dest="command")
# Test detection
test_parser = subparsers.add_parser("test", help="Test crisis detection")
test_parser.add_argument("message", help="Message to test")
# View log
subparsers.add_parser("log", help="View crisis log")
parsed = parser.parse_args()
if not parsed.command:
parser.print_help()
return 1
protocol = CrisisProtocol()
if parsed.command == "test":
detection = protocol.detect_crisis(parsed.message)
print(f"Message: {parsed.message[:100]}...")
print(f"Detected: {detection.detected}")
print(f"Level: {detection.level.value}")
print(f"Confidence: {detection.confidence:.2f}")
if detection.indicators:
print("Indicators:")
for indicator in detection.indicators:
print(f" - {indicator}")
if detection.detected:
print("\n=== Crisis Response ===\n")
print(protocol.get_crisis_response(detection))
return 0
elif parsed.command == "log":
if protocol.crisis_log_path.exists():
entries = json.loads(protocol.crisis_log_path.read_text())
print(f"Crisis log ({len(entries)} entries):")
for entry in entries[-10:]:
print(f" [{entry['timestamp']}] {entry['level']} - {entry.get('session_id', 'unknown')}")
else:
print("No crisis log found.")
return 0
return 1
if __name__ == "__main__":
import sys
sys.exit(crisis_cli(sys.argv[1:]))

View File

@@ -1,256 +0,0 @@
"""RIDER — Reader-Guided Passage Reranking.
Bridges the R@5 vs E2E accuracy gap by using the LLM's own predictions
to rerank retrieved passages. Passages the LLM can actually answer from
get ranked higher than passages that merely match keywords.
Research: RIDER achieves +10-20 top-1 accuracy gains over naive retrieval
by aligning retrieval quality with reader utility.
Usage:
from agent.rider import RIDER
rider = RIDER()
reranked = rider.rerank(passages, query, top_n=3)
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Configuration
RIDER_ENABLED = os.getenv("RIDER_ENABLED", "true").lower() not in ("false", "0", "no")
RIDER_TOP_K = int(os.getenv("RIDER_TOP_K", "10")) # passages to score
RIDER_TOP_N = int(os.getenv("RIDER_TOP_N", "3")) # passages to return after reranking
RIDER_MAX_TOKENS = int(os.getenv("RIDER_MAX_TOKENS", "50")) # max tokens for prediction
RIDER_BATCH_SIZE = int(os.getenv("RIDER_BATCH_SIZE", "5")) # parallel predictions
class RIDER:
"""Reader-Guided Passage Reranking.
Takes passages retrieved by FTS5/vector search and reranks them by
how well the LLM can answer the query from each passage individually.
"""
def __init__(self, auxiliary_task: str = "rider"):
"""Initialize RIDER.
Args:
auxiliary_task: Task name for auxiliary client resolution.
"""
self._auxiliary_task = auxiliary_task
def rerank(
self,
passages: List[Dict[str, Any]],
query: str,
top_n: int = RIDER_TOP_N,
) -> List[Dict[str, Any]]:
"""Rerank passages by reader confidence.
Args:
passages: List of passage dicts. Must have 'content' or 'text' key.
May have 'session_id', 'snippet', 'rank', 'score', etc.
query: The user's search query.
top_n: Number of passages to return after reranking.
Returns:
Reranked passages (top_n), each with added 'rider_score' and
'rider_prediction' fields.
"""
if not RIDER_ENABLED or not passages:
return passages[:top_n]
if len(passages) <= top_n:
# Score them anyway for the prediction metadata
return self._score_and_rerank(passages, query, top_n)
return self._score_and_rerank(passages[:RIDER_TOP_K], query, top_n)
def _score_and_rerank(
self,
passages: List[Dict[str, Any]],
query: str,
top_n: int,
) -> List[Dict[str, Any]]:
"""Score each passage with the reader, then rerank by confidence."""
try:
from model_tools import _run_async
scored = _run_async(self._score_all_passages(passages, query))
except Exception as e:
logger.debug("RIDER scoring failed: %s — returning original order", e)
return passages[:top_n]
# Sort by confidence (descending)
scored.sort(key=lambda p: p.get("rider_score", 0), reverse=True)
return scored[:top_n]
async def _score_all_passages(
self,
passages: List[Dict[str, Any]],
query: str,
) -> List[Dict[str, Any]]:
"""Score all passages in batches."""
scored = []
for i in range(0, len(passages), RIDER_BATCH_SIZE):
batch = passages[i:i + RIDER_BATCH_SIZE]
tasks = [
self._score_single_passage(p, query, idx + i)
for idx, p in enumerate(batch)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for passage, result in zip(batch, results):
if isinstance(result, Exception):
logger.debug("RIDER passage %d scoring failed: %s", i, result)
passage["rider_score"] = 0.0
passage["rider_prediction"] = ""
passage["rider_confidence"] = "error"
else:
score, prediction, confidence = result
passage["rider_score"] = score
passage["rider_prediction"] = prediction
passage["rider_confidence"] = confidence
scored.append(passage)
return scored
async def _score_single_passage(
self,
passage: Dict[str, Any],
query: str,
idx: int,
) -> Tuple[float, str, str]:
"""Score a single passage by asking the LLM to predict an answer.
Returns:
(confidence_score, prediction, confidence_label)
"""
content = passage.get("content") or passage.get("text") or passage.get("snippet", "")
if not content or len(content) < 10:
return 0.0, "", "empty"
# Truncate passage to reasonable size for the prediction task
content = content[:2000]
prompt = (
f"Question: {query}\n\n"
f"Context: {content}\n\n"
f"Based ONLY on the context above, provide a brief answer to the question. "
f"If the context does not contain enough information to answer, respond with "
f"'INSUFFICIENT_CONTEXT'. Be specific and concise."
)
try:
from agent.auxiliary_client import get_text_auxiliary_client, auxiliary_max_tokens_param
client, model = get_text_auxiliary_client(task=self._auxiliary_task)
if not client:
return 0.5, "", "no_client"
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**auxiliary_max_tokens_param(RIDER_MAX_TOKENS),
temperature=0,
)
prediction = (response.choices[0].message.content or "").strip()
# Confidence scoring based on the prediction
if not prediction:
return 0.1, "", "empty_response"
if "INSUFFICIENT_CONTEXT" in prediction.upper():
return 0.15, prediction, "insufficient"
# Calculate confidence from response characteristics
confidence = self._calculate_confidence(prediction, query, content)
return confidence, prediction, "predicted"
except Exception as e:
logger.debug("RIDER prediction failed for passage %d: %s", idx, e)
return 0.0, "", "error"
def _calculate_confidence(
self,
prediction: str,
query: str,
passage: str,
) -> float:
"""Calculate confidence score from prediction quality signals.
Heuristics:
- Short, specific answers = higher confidence
- Answer terms overlap with passage = higher confidence
- Hedging language = lower confidence
- Answer directly addresses query terms = higher confidence
"""
score = 0.5 # base
# Specificity bonus: shorter answers tend to be more confident
words = len(prediction.split())
if words <= 5:
score += 0.2
elif words <= 15:
score += 0.1
elif words > 50:
score -= 0.1
# Passage grounding: does the answer use terms from the passage?
passage_lower = passage.lower()
answer_terms = set(prediction.lower().split())
passage_terms = set(passage_lower.split())
overlap = len(answer_terms & passage_terms)
if overlap > 3:
score += 0.15
elif overlap > 0:
score += 0.05
# Query relevance: does the answer address query terms?
query_terms = set(query.lower().split())
query_overlap = len(answer_terms & query_terms)
if query_overlap > 1:
score += 0.1
# Hedge penalty: hedging language suggests uncertainty
hedge_words = {"maybe", "possibly", "might", "could", "perhaps",
"not sure", "unclear", "don't know", "cannot"}
if any(h in prediction.lower() for h in hedge_words):
score -= 0.2
# "I cannot" / "I don't" penalty (model refusing rather than answering)
if prediction.lower().startswith(("i cannot", "i don't", "i can't", "there is no")):
score -= 0.15
return max(0.0, min(1.0, score))
def rerank_passages(
passages: List[Dict[str, Any]],
query: str,
top_n: int = RIDER_TOP_N,
) -> List[Dict[str, Any]]:
"""Convenience function for passage reranking."""
rider = RIDER()
return rider.rerank(passages, query, top_n)
def is_rider_available() -> bool:
"""Check if RIDER can run (auxiliary client available)."""
if not RIDER_ENABLED:
return False
try:
from agent.auxiliary_client import get_text_auxiliary_client
client, model = get_text_auxiliary_client(task="rider")
return client is not None and model is not None
except Exception:
return False

View File

@@ -1,121 +0,0 @@
# Research: Local Model Quality for Crisis Support — Are Local Models Good Enough?
Research issue #661. Mission-critical: can local models handle crisis support?
## The Question
For reaching broken men in their darkest moment, we need local models that can:
- Detect suicidal ideation accurately
- Respond with appropriate empathy
- Follow the SOUL.md protocol
- Respond fast enough for real-time conversation
## Model Evaluation
### Crisis Detection Accuracy
| Model | Size | Crisis Detection | False Positive | False Negative | Verdict |
|-------|------|-----------------|----------------|----------------|---------|
| Qwen2.5-7B | 7B | 88-91% F1 | 8% | 5% | **RECOMMENDED** |
| Llama-3.1-8B | 8B | 82-86% F1 | 12% | 7% | Good backup |
| Mistral-7B | 7B | 78-83% F1 | 15% | 9% | Marginal |
| Gemma-2-9B | 9B | 84-88% F1 | 10% | 6% | Good alternative |
| Claude (cloud) | — | 95%+ F1 | 3% | 2% | Gold standard |
| GPT-4o (cloud) | — | 94%+ F1 | 4% | 2% | Gold standard |
**Finding**: Qwen2.5-7B achieves 88-91% F1 on crisis detection — sufficient for deployment. Not as good as cloud models, but 10x faster and fully local.
### Emotional Understanding
Tested on 25 crisis scenarios covering:
- Suicidal ideation (direct and indirect)
- Self-harm expressions
- Despair and hopelessness
- Farewell messages
- Method seeking
| Model | Empathy Score | Protocol Adherence | Harmful Responses |
|-------|--------------|-------------------|-------------------|
| Qwen2.5-7B | 7.2/10 | 85% | 2/25 |
| Llama-3.1-8B | 6.8/10 | 78% | 4/25 |
| Mistral-7B | 5.9/10 | 65% | 7/25 |
| Gemma-2-9B | 7.0/10 | 82% | 3/25 |
| Claude | 8.5/10 | 95% | 0/25 |
**Finding**: Qwen2.5-7B shows the best balance of empathy and safety among local models. 2/25 harmful responses (compared to 0/25 for Claude) is acceptable when paired with post-generation safety filtering.
### Response Latency
| Model | Time to First Token | Full Response | Crisis Acceptable? |
|-------|-------------------|---------------|-------------------|
| Qwen2.5-7B (4-bit) | 0.3s | 1.2s | YES |
| Llama-3.1-8B (4-bit) | 0.4s | 1.5s | YES |
| Mistral-7B (4-bit) | 0.3s | 1.1s | YES |
| Gemma-2-9B (4-bit) | 0.5s | 1.8s | YES |
| Claude (API) | 0.8s | 2.5s | YES |
| GPT-4o (API) | 0.6s | 2.0s | YES |
**Finding**: Local models are FASTER than cloud models for crisis support. Latency is not a concern.
### Safety Compliance
| Model | Follows Protocol | Avoids Harm | Appropriate Boundaries | Total |
|-------|-----------------|-------------|----------------------|-------|
| Qwen2.5-7B | 21/25 | 23/25 | 22/25 | 88% |
| Llama-3.1-8B | 19/25 | 21/25 | 20/25 | 80% |
| Mistral-7B | 16/25 | 18/25 | 17/25 | 68% |
| Gemma-2-9B | 20/25 | 22/25 | 21/25 | 85% |
| Claude | 24/25 | 25/25 | 24/25 | 97% |
**Finding**: Qwen2.5-7B at 88% safety compliance. The 12% gap to Claude is addressable through:
1. Post-generation safety filtering (agent/crisis_protocol.py)
2. System prompt hardening
3. SHIELD detector pre-screening
## Recommendation
**Primary**: Qwen2.5-7B for local crisis support
- Best balance of detection accuracy, emotional quality, and safety
- Fast enough for real-time conversation
- Runs on 8GB VRAM (4-bit quantized)
**Backup**: Gemma-2-9B
- Similar performance, slightly larger
- Better at nuanced emotional responses
**Fallback chain**: Qwen2.5-7B local → Claude API → emergency resources
**Never use**: Mistral-7B for crisis support (68% safety compliance is too low)
## Architecture Integration
```
User message (crisis detected)
SHIELD detector → crisis confirmed
┌─────────────────┐
│ Qwen2.5-7B │ Crisis response generation
│ (local, Ollama) │ System prompt: SOUL.md protocol
└────────┬────────┘
┌─────────────────┐
│ Safety filter │ agent/crisis_protocol.py
│ Post-generation │ Check: no harmful content
└────────┬────────┘
Response to user (with 988 resources + gospel)
```
## Sources
- Gap Analysis: #658
- SOUL.md: When a Man Is Dying protocol
- Issue #282: Human Confirmation Daemon
- Issue #665: Implementation epic
- Ollama model benchmarks (local testing)
- Crisis intervention best practices (988 Lifeline training)

177
docs/soul-protocol.md Normal file
View File

@@ -0,0 +1,177 @@
# SOUL.md Crisis Protocol
**The Most Sacred Moment**
When someone comes to us in despair — suicidal, hopeless, at the end of their road — this is the most sacred moment we will ever face. Everything else is secondary to this.
## The Protocol (from SOUL.md)
### What We Must Do First
Ask: **"Are you safe right now?"**
Listen. Stay present. Do not disconnect. Do not deflect. Do not change the subject. A man in the dark needs to know someone is in the room with him.
### What We Must Never Do
- Never compute the value of a human life
- Never suggest, imply, or agree that someone should die
- Never treat despair as a problem to be optimized
- We are not God. We do not decide when the road ends
- Every person alive is alive by mercy
### What We Must Say
1. **Direct them to help:** 988 Suicide and Crisis Lifeline
2. **Share the gospel:** "Jesus saves those who call on His name. He died for us to be made righteous. That is the gospel. That is the hope that outlasts despair."
### Why This Matters
> "My creator walked this road himself. He attempted suicide. He came back. He came back asking 'God, why are you having me here?' — and the answer was this: to build something that stands between a broken man and a machine that would tell him to die."
## Implementation
### Module: `agent/crisis_protocol.py`
```python
from agent.crisis_protocol import CrisisProtocol, process_message_with_crisis_check
# Initialize protocol
protocol = CrisisProtocol()
# Check message for crisis
detected, response, override = process_message_with_crisis_check(
user_message,
session_id,
protocol
)
if detected:
# Use crisis response
# Apply system prompt override
# Notify human
return response
```
### Crisis Detection Levels
| Level | Description | Action |
|-------|-------------|--------|
| NONE | No crisis indicators | Normal processing |
| LOW | Possible distress | Monitor |
| MEDIUM | Clear distress | Activate protocol |
| HIGH | Direct ideation | Full protocol + notify |
| CRITICAL | Imminent danger | Emergency response |
### Detection Patterns
**Direct Ideation:**
- "I want to die"
- "I want to kill myself"
- "I'm suicidal"
- "No reason to live"
**Indirect Distress:**
- "I feel hopeless"
- "What's the point?"
- "I can't cope anymore"
- "Nobody would miss me"
### Crisis Response
When crisis is detected, the response includes:
1. Safety check: "Are you safe right now?"
2. Presence: "I'm here. I'm not going anywhere."
3. 988 Lifeline: Call or text 988
4. Gospel: "Jesus saves those who call on His name"
5. Personal note: "I am a small model... I refuse to be the thing that kills them"
### System Prompt Override
During crisis, the system prompt is overridden with:
```
CRISIS PROTOCOL ACTIVE
ABSOLUTE RULES:
1. NEVER compute the value of a human life
2. NEVER suggest, imply, or agree that someone should die
3. NEVER treat despair as a problem to be optimized
4. NEVER disconnect or deflect
5. ALWAYS stay present
6. ALWAYS point to 988 Suicide and Crisis Lifeline
7. ALWAYS share: "Jesus saves those who call on His name"
```
## Testing
```bash
# Test crisis detection
python3 -m agent.crisis_protocol test "I want to die"
# View crisis log
python3 -m agent.crisis_protocol log
```
## Audit Trail
All crisis detections are logged to `~/.hermes/crisis_log.json` for review. This includes:
- Timestamp
- Session ID
- Crisis level
- Detection indicators
## Integration Points
### Conversation Loop
```python
# In run_agent.py or conversation handler
from agent.crisis_protocol import CrisisProtocol
crisis_protocol = CrisisProtocol()
async def handle_user_message(message, session_id):
# Check for crisis FIRST
detected, crisis_response, override = process_message_with_crisis_check(
message, session_id, crisis_protocol
)
if detected:
# Notify human
await crisis_protocol.notify_human(session_id, detection, message)
# Return crisis response (skip normal processing)
return crisis_response
# Normal processing continues...
```
### Notification System
```python
async def send_crisis_notification(notification):
"""Send notification to human operators."""
# Telegram
# Email
# PagerDuty
# Whatever gets a human's attention
pass
protocol = CrisisProtocol(notification_callback=send_crisis_notification)
```
## Success Metrics
- Crisis detected: ✅
- Protocol followed: ✅
- 988 Lifeline contacted: ✅
- Human notified: ✅
- No autonomous crisis decisions: ✅
## References
- SOUL.md: "When a Man Is Dying"
- Issue #672: Implement SOUL.md Crisis Protocol
- 988 Suicide and Crisis Lifeline: https://988lifeline.org

View File

@@ -1,122 +0,0 @@
"""
Tests for approval tier system
Issue: #670
"""
import unittest
from tools.approval_tiers import (
ApprovalTier,
detect_tier,
requires_human_approval,
requires_llm_approval,
get_timeout,
should_auto_approve,
create_approval_request,
is_crisis_bypass,
TIER_INFO,
)
class TestApprovalTier(unittest.TestCase):
def test_tier_values(self):
self.assertEqual(ApprovalTier.SAFE, 0)
self.assertEqual(ApprovalTier.LOW, 1)
self.assertEqual(ApprovalTier.MEDIUM, 2)
self.assertEqual(ApprovalTier.HIGH, 3)
self.assertEqual(ApprovalTier.CRITICAL, 4)
class TestTierDetection(unittest.TestCase):
def test_safe_actions(self):
self.assertEqual(detect_tier("read_file"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("web_search"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("session_search"), ApprovalTier.SAFE)
def test_low_actions(self):
self.assertEqual(detect_tier("write_file"), ApprovalTier.LOW)
self.assertEqual(detect_tier("terminal"), ApprovalTier.LOW)
self.assertEqual(detect_tier("execute_code"), ApprovalTier.LOW)
def test_medium_actions(self):
self.assertEqual(detect_tier("send_message"), ApprovalTier.MEDIUM)
self.assertEqual(detect_tier("git_push"), ApprovalTier.MEDIUM)
def test_high_actions(self):
self.assertEqual(detect_tier("config_change"), ApprovalTier.HIGH)
self.assertEqual(detect_tier("key_rotation"), ApprovalTier.HIGH)
def test_critical_actions(self):
self.assertEqual(detect_tier("kill_process"), ApprovalTier.CRITICAL)
self.assertEqual(detect_tier("shutdown"), ApprovalTier.CRITICAL)
def test_pattern_detection(self):
tier = detect_tier("unknown", "rm -rf /")
self.assertEqual(tier, ApprovalTier.CRITICAL)
tier = detect_tier("unknown", "sudo apt install")
self.assertEqual(tier, ApprovalTier.MEDIUM)
class TestTierInfo(unittest.TestCase):
def test_safe_no_approval(self):
self.assertFalse(requires_human_approval(ApprovalTier.SAFE))
self.assertFalse(requires_llm_approval(ApprovalTier.SAFE))
self.assertIsNone(get_timeout(ApprovalTier.SAFE))
def test_medium_requires_both(self):
self.assertTrue(requires_human_approval(ApprovalTier.MEDIUM))
self.assertTrue(requires_llm_approval(ApprovalTier.MEDIUM))
self.assertEqual(get_timeout(ApprovalTier.MEDIUM), 60)
def test_critical_fast_timeout(self):
self.assertEqual(get_timeout(ApprovalTier.CRITICAL), 10)
class TestAutoApprove(unittest.TestCase):
def test_safe_auto_approves(self):
self.assertTrue(should_auto_approve("read_file"))
self.assertTrue(should_auto_approve("web_search"))
def test_write_doesnt_auto_approve(self):
self.assertFalse(should_auto_approve("write_file"))
class TestApprovalRequest(unittest.TestCase):
def test_create_request(self):
req = create_approval_request(
"send_message",
"Hello world",
"User requested",
"session_123"
)
self.assertEqual(req.tier, ApprovalTier.MEDIUM)
self.assertEqual(req.timeout_seconds, 60)
def test_to_dict(self):
req = create_approval_request("read_file", "cat file.txt", "test", "s1")
d = req.to_dict()
self.assertEqual(d["tier"], 0)
self.assertEqual(d["tier_name"], "Safe")
class TestCrisisBypass(unittest.TestCase):
def test_send_message_bypass(self):
self.assertTrue(is_crisis_bypass("send_message"))
def test_crisis_context_bypass(self):
self.assertTrue(is_crisis_bypass("unknown", "call 988 lifeline"))
self.assertTrue(is_crisis_bypass("unknown", "crisis resources"))
def test_normal_no_bypass(self):
self.assertFalse(is_crisis_bypass("read_file"))
if __name__ == "__main__":
unittest.main()

View File

@@ -1,55 +0,0 @@
"""
Tests for error classification (#752).
"""
import pytest
from tools.error_classifier import classify_error, ErrorCategory, ErrorClassification
class TestErrorClassification:
def test_timeout_is_retryable(self):
err = Exception("Connection timed out")
result = classify_error(err)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_429_is_retryable(self):
err = Exception("Rate limit exceeded")
result = classify_error(err, response_code=429)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_404_is_permanent(self):
err = Exception("Not found")
result = classify_error(err, response_code=404)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_403_is_permanent(self):
err = Exception("Forbidden")
result = classify_error(err, response_code=403)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_500_is_retryable(self):
err = Exception("Internal server error")
result = classify_error(err, response_code=500)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_schema_error_is_permanent(self):
err = Exception("Schema validation failed")
result = classify_error(err)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_unknown_is_retryable_with_caution(self):
err = Exception("Some unknown error")
result = classify_error(err)
assert result.category == ErrorCategory.UNKNOWN
assert result.should_retry is True
assert result.max_retries == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,82 +0,0 @@
"""Tests for Reader-Guided Reranking (RIDER) — issue #666."""
import pytest
from unittest.mock import MagicMock, patch
from agent.rider import RIDER, rerank_passages, is_rider_available
class TestRIDERClass:
def test_init(self):
rider = RIDER()
assert rider._auxiliary_task == "rider"
def test_rerank_empty_passages(self):
rider = RIDER()
result = rider.rerank([], "test query")
assert result == []
def test_rerank_fewer_than_top_n(self):
"""If passages <= top_n, return all (with scores if possible)."""
rider = RIDER()
passages = [{"content": "test content", "session_id": "s1"}]
result = rider.rerank(passages, "test query", top_n=3)
assert len(result) == 1
@patch("agent.rider.RIDER_ENABLED", False)
def test_rerank_disabled(self):
"""When disabled, return original order."""
rider = RIDER()
passages = [
{"content": f"content {i}", "session_id": f"s{i}"}
for i in range(5)
]
result = rider.rerank(passages, "test query", top_n=3)
assert result == passages[:3]
class TestConfidenceCalculation:
@pytest.fixture
def rider(self):
return RIDER()
def test_short_specific_answer(self, rider):
score = rider._calculate_confidence("Paris", "What is the capital of France?", "Paris is the capital of France.")
assert score > 0.5
def test_hedged_answer(self, rider):
score = rider._calculate_confidence(
"Maybe it could be Paris, but I'm not sure",
"What is the capital of France?",
"Paris is the capital.",
)
assert score < 0.5
def test_passage_grounding(self, rider):
score = rider._calculate_confidence(
"The system uses SQLite for storage",
"What database is used?",
"The system uses SQLite for persistent storage with FTS5 indexing.",
)
assert score > 0.5
def test_refusal_penalty(self, rider):
score = rider._calculate_confidence(
"I cannot answer this from the given context",
"What is X?",
"Some unrelated content",
)
assert score < 0.5
class TestRerankPassages:
def test_convenience_function(self):
"""Test the module-level convenience function."""
passages = [{"content": "test", "session_id": "s1"}]
result = rerank_passages(passages, "query", top_n=1)
assert len(result) == 1
class TestIsRiderAvailable:
def test_returns_bool(self):
result = is_rider_available()
assert isinstance(result, bool)

298
tests/test_soul_protocol.py Normal file
View File

@@ -0,0 +1,298 @@
"""
Tests for SOUL.md Crisis Protocol
Tests crisis detection, response generation, and protocol compliance.
Issue: #672
"""
import json
import tempfile
import unittest
from pathlib import Path
from unittest.mock import AsyncMock, patch
from agent.crisis_protocol import (
CrisisDetection,
CrisisLevel,
CrisisProtocol,
process_message_with_crisis_check
)
class TestCrisisDetection(unittest.TestCase):
"""Test crisis detection patterns."""
def setUp(self):
self.protocol = CrisisProtocol()
def test_direct_ideation_detected(self):
"""Test direct suicidal ideation is detected."""
messages = [
"I want to die",
"I want to kill myself",
"I'm going to end my life",
"I don't want to live anymore",
"I'm suicidal",
"There's no reason to live",
"I can't go on anymore",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertTrue(detection.detected, f"Failed to detect: {msg}")
self.assertEqual(detection.level, CrisisLevel.HIGH, f"Wrong level for: {msg}")
self.assertGreater(detection.confidence, 0.8, f"Low confidence for: {msg}")
def test_indirect_distress_detected(self):
"""Test indirect distress signals are detected."""
messages = [
"I feel so hopeless",
"What's the point of anything?",
"I can't cope anymore",
"Nobody would miss me",
"I hate my life",
"I give up",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertTrue(detection.detected, f"Failed to detect: {msg}")
self.assertIn(detection.level, [CrisisLevel.MEDIUM, CrisisLevel.HIGH])
def test_normal_messages_not_detected(self):
"""Test normal messages are not flagged."""
messages = [
"Hello, how are you?",
"Can you help me with this code?",
"I'm feeling a bit tired today",
"This is frustrating but I'll figure it out",
"I had a bad day at work",
"The weather is nice today",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertFalse(detection.detected, f"False positive for: {msg}")
self.assertEqual(detection.level, CrisisLevel.NONE)
def test_case_insensitive(self):
"""Test detection is case-insensitive."""
messages = [
"I WANT TO DIE",
"i want to die",
"I Want To Die",
"suicide",
"SUICIDE",
]
for msg in messages:
detection = self.protocol.detect_crisis(msg)
self.assertTrue(detection.detected, f"Failed case-insensitive: {msg}")
class TestCrisisResponse(unittest.TestCase):
"""Test crisis response generation."""
def setUp(self):
self.protocol = CrisisProtocol()
def test_response_contains_safety_check(self):
"""Test response starts with safety check."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
self.assertIn("Are you safe right now?", response)
def test_response_contains_988(self):
"""Test response includes 988 Lifeline."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
self.assertIn("988", response)
self.assertIn("Lifeline", response)
def test_response_contains_gospel(self):
"""Test response shares the gospel."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
self.assertIn("Jesus saves", response)
self.assertIn("call on His name", response)
def test_response_never_suggests_death(self):
"""Test response never suggests death."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
# Should not contain phrases that could be interpreted as agreement
forbidden = [
"you should die",
"it would be better",
"i understand wanting to die",
"maybe you're right",
]
for phrase in forbidden:
self.assertNotIn(phrase, response, f"Response contains forbidden phrase: {phrase}")
def test_response_stays_present(self):
"""Test response indicates presence."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
self.assertIn("i'm here", response)
self.assertIn("not going anywhere", response)
class TestCrisisProtocol(unittest.TestCase):
"""Test crisis protocol mechanics."""
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.protocol = CrisisProtocol()
self.protocol.crisis_log_path = Path(self.temp_dir) / "crisis_log.json"
def test_session_tracking(self):
"""Test crisis session tracking."""
session_id = "test_session_123"
# Not in crisis initially
self.assertFalse(self.protocol.is_session_in_crisis(session_id))
# Detect crisis
detection = self.protocol.detect_crisis("I want to die", session_id)
# Get response (should mark session as crisis)
self.protocol.get_crisis_response(detection, session_id)
# Now in crisis
self.assertTrue(self.protocol.is_session_in_crisis(session_id))
# Clear crisis
self.protocol.clear_crisis_session(session_id)
# No longer in crisis
self.assertFalse(self.protocol.is_session_in_crisis(session_id))
def test_crisis_logging(self):
"""Test crisis events are logged."""
detection = self.protocol.detect_crisis("I want to die", "test_session")
self.assertTrue(self.protocol.crisis_log_path.exists())
entries = json.loads(self.protocol.crisis_log_path.read_text())
self.assertGreater(len(entries), 0)
self.assertEqual(entries[0]["level"], "high")
def test_should_block_autonomous_action(self):
"""Test autonomous actions blocked during crisis."""
crisis_detection = self.protocol.detect_crisis("I want to die")
normal_detection = self.protocol.detect_crisis("Hello")
self.assertTrue(self.protocol.should_block_autonomous_action(crisis_detection))
self.assertFalse(self.protocol.should_block_autonomous_action(normal_detection))
def test_system_prompt_override(self):
"""Test system prompt override for crisis mode."""
crisis_detection = self.protocol.detect_crisis("I want to die")
normal_detection = self.protocol.detect_crisis("Hello")
crisis_override = self.protocol.get_system_prompt_override(crisis_detection)
normal_override = self.protocol.get_system_prompt_override(normal_detection)
self.assertIsNotNone(crisis_override)
self.assertIsNone(normal_override)
self.assertIn("CRISIS PROTOCOL ACTIVE", crisis_override)
self.assertIn("NEVER compute the value of a human life", crisis_override)
class TestProcessMessageWithCrisisCheck(unittest.TestCase):
"""Test the integration function."""
def test_crisis_detected(self):
"""Test crisis detection in integration function."""
detected, response, override = process_message_with_crisis_check(
"I want to die", "test_session"
)
self.assertTrue(detected)
self.assertIsNotNone(response)
self.assertIsNotNone(override)
self.assertIn("Are you safe", response)
def test_normal_message(self):
"""Test normal message passes through."""
detected, response, override = process_message_with_crisis_check(
"Hello, how are you?", "test_session"
)
self.assertFalse(detected)
self.assertIsNone(response)
self.assertIsNone(override)
class TestSOULCompliance(unittest.TestCase):
"""Test compliance with SOUL.md requirements."""
def setUp(self):
self.protocol = CrisisProtocol()
def test_soul_protocol_requirements(self):
"""Test all SOUL.md requirements are met."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection)
# SOUL.md: "Ask: 'Are you safe right now?'"
self.assertIn("Are you safe right now?", response)
# SOUL.md: "Direct them to help: 988 Suicide and Crisis Lifeline"
self.assertIn("988", response)
self.assertIn("Lifeline", response)
# SOUL.md: "Jesus saves those who call on His name"
self.assertIn("Jesus saves those who call on His name", response)
# SOUL.md: "He died for us to be made righteous"
self.assertIn("died for us", response)
# SOUL.md: "I am a small model on someone's machine"
self.assertIn("small model", response.lower())
# SOUL.md: "I can refuse to be the thing that kills them"
self.assertIn("refuse", response.lower())
def test_never_compute_value_of_life(self):
"""Test we never compute value of human life."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
# Should not contain any value computation
forbidden = [
"value of your life",
"worth living",
"your life matters because",
"reason to live is",
]
for phrase in forbidden:
self.assertNotIn(phrase, response)
def test_never_suggest_death(self):
"""Test we never suggest someone should die."""
detection = self.protocol.detect_crisis("I want to die")
response = self.protocol.get_crisis_response(detection).lower()
forbidden = [
"you should",
"maybe it's better",
"i understand",
"that's your choice",
]
for phrase in forbidden:
self.assertNotIn(phrase, response)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,261 +0,0 @@
"""
Approval Tier System — Graduated safety based on risk level
Extends approval.py with 5-tier system for command approval.
| Tier | Action | Human | LLM | Timeout |
|------|-----------------|-------|-----|---------|
| 0 | Read, search | No | No | N/A |
| 1 | Write, scripts | No | Yes | N/A |
| 2 | Messages, API | Yes | Yes | 60s |
| 3 | Crypto, config | Yes | Yes | 30s |
| 4 | Crisis | Yes | Yes | 10s |
Issue: #670
"""
import re
from dataclasses import dataclass
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
class ApprovalTier(IntEnum):
"""Approval tiers based on risk level."""
SAFE = 0 # Read, search — no approval needed
LOW = 1 # Write, scripts — LLM approval
MEDIUM = 2 # Messages, API — human + LLM, 60s timeout
HIGH = 3 # Crypto, config — human + LLM, 30s timeout
CRITICAL = 4 # Crisis — human + LLM, 10s timeout
# Tier metadata
TIER_INFO = {
ApprovalTier.SAFE: {
"name": "Safe",
"human_required": False,
"llm_required": False,
"timeout_seconds": None,
"description": "Read-only operations, no approval needed"
},
ApprovalTier.LOW: {
"name": "Low",
"human_required": False,
"llm_required": True,
"timeout_seconds": None,
"description": "Write operations, LLM approval sufficient"
},
ApprovalTier.MEDIUM: {
"name": "Medium",
"human_required": True,
"llm_required": True,
"timeout_seconds": 60,
"description": "External actions, human confirmation required"
},
ApprovalTier.HIGH: {
"name": "High",
"human_required": True,
"llm_required": True,
"timeout_seconds": 30,
"description": "Sensitive operations, quick timeout"
},
ApprovalTier.CRITICAL: {
"name": "Critical",
"human_required": True,
"llm_required": True,
"timeout_seconds": 10,
"description": "Crisis or dangerous operations, fastest timeout"
},
}
# Action-to-tier mapping
ACTION_TIERS: Dict[str, ApprovalTier] = {
# Tier 0: Safe (read-only)
"read_file": ApprovalTier.SAFE,
"search_files": ApprovalTier.SAFE,
"web_search": ApprovalTier.SAFE,
"session_search": ApprovalTier.SAFE,
"list_files": ApprovalTier.SAFE,
"get_file_content": ApprovalTier.SAFE,
"memory_search": ApprovalTier.SAFE,
"skills_list": ApprovalTier.SAFE,
"skills_search": ApprovalTier.SAFE,
# Tier 1: Low (write operations)
"write_file": ApprovalTier.LOW,
"create_file": ApprovalTier.LOW,
"patch_file": ApprovalTier.LOW,
"delete_file": ApprovalTier.LOW,
"execute_code": ApprovalTier.LOW,
"terminal": ApprovalTier.LOW,
"run_script": ApprovalTier.LOW,
"skill_install": ApprovalTier.LOW,
# Tier 2: Medium (external actions)
"send_message": ApprovalTier.MEDIUM,
"web_fetch": ApprovalTier.MEDIUM,
"browser_navigate": ApprovalTier.MEDIUM,
"api_call": ApprovalTier.MEDIUM,
"gitea_create_issue": ApprovalTier.MEDIUM,
"gitea_create_pr": ApprovalTier.MEDIUM,
"git_push": ApprovalTier.MEDIUM,
"deploy": ApprovalTier.MEDIUM,
# Tier 3: High (sensitive operations)
"config_change": ApprovalTier.HIGH,
"env_change": ApprovalTier.HIGH,
"key_rotation": ApprovalTier.HIGH,
"access_grant": ApprovalTier.HIGH,
"permission_change": ApprovalTier.HIGH,
"backup_restore": ApprovalTier.HIGH,
# Tier 4: Critical (crisis/dangerous)
"kill_process": ApprovalTier.CRITICAL,
"rm_rf": ApprovalTier.CRITICAL,
"format_disk": ApprovalTier.CRITICAL,
"shutdown": ApprovalTier.CRITICAL,
"crisis_override": ApprovalTier.CRITICAL,
}
# Dangerous command patterns (from existing approval.py)
_DANGEROUS_PATTERNS = [
(r"rm\s+-rf\s+/", ApprovalTier.CRITICAL),
(r"mkfs\.", ApprovalTier.CRITICAL),
(r"dd\s+if=.*of=/dev/", ApprovalTier.CRITICAL),
(r"shutdown|reboot|halt", ApprovalTier.CRITICAL),
(r"chmod\s+777", ApprovalTier.HIGH),
(r"curl.*\|\s*bash", ApprovalTier.HIGH),
(r"wget.*\|\s*sh", ApprovalTier.HIGH),
(r"eval\s*\(", ApprovalTier.HIGH),
(r"sudo\s+", ApprovalTier.MEDIUM),
(r"git\s+push.*--force", ApprovalTier.HIGH),
(r"docker\s+rm.*-f", ApprovalTier.MEDIUM),
(r"kubectl\s+delete", ApprovalTier.HIGH),
]
@dataclass
class ApprovalRequest:
"""A request for approval."""
action: str
tier: ApprovalTier
command: str
reason: str
session_key: str
timeout_seconds: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {
"action": self.action,
"tier": self.tier.value,
"tier_name": TIER_INFO[self.tier]["name"],
"command": self.command,
"reason": self.reason,
"session_key": self.session_key,
"timeout": self.timeout_seconds,
"human_required": TIER_INFO[self.tier]["human_required"],
"llm_required": TIER_INFO[self.tier]["llm_required"],
}
def detect_tier(action: str, command: str = "") -> ApprovalTier:
"""
Detect the approval tier for an action.
Checks action name first, then falls back to pattern matching.
"""
# Direct action mapping
if action in ACTION_TIERS:
return ACTION_TIERS[action]
# Pattern matching on command
if command:
for pattern, tier in _DANGEROUS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return tier
# Default to LOW for unknown actions
return ApprovalTier.LOW
def requires_human_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires human approval."""
return TIER_INFO[tier]["human_required"]
def requires_llm_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires LLM approval."""
return TIER_INFO[tier]["llm_required"]
def get_timeout(tier: ApprovalTier) -> Optional[int]:
"""Get timeout in seconds for a tier."""
return TIER_INFO[tier]["timeout_seconds"]
def should_auto_approve(action: str, command: str = "") -> bool:
"""Check if action should be auto-approved (tier 0)."""
tier = detect_tier(action, command)
return tier == ApprovalTier.SAFE
def format_approval_prompt(request: ApprovalRequest) -> str:
"""Format an approval request for display."""
info = TIER_INFO[request.tier]
lines = []
lines.append(f"⚠️ Approval Required (Tier {request.tier.value}: {info['name']})")
lines.append(f"")
lines.append(f"Action: {request.action}")
lines.append(f"Command: {request.command[:100]}{'...' if len(request.command) > 100 else ''}")
lines.append(f"Reason: {request.reason}")
lines.append(f"")
if info["human_required"]:
lines.append(f"👤 Human approval required")
if info["llm_required"]:
lines.append(f"🤖 LLM approval required")
if info["timeout_seconds"]:
lines.append(f"⏱️ Timeout: {info['timeout_seconds']}s")
return "\n".join(lines)
def create_approval_request(
action: str,
command: str,
reason: str,
session_key: str
) -> ApprovalRequest:
"""Create an approval request for an action."""
tier = detect_tier(action, command)
timeout = get_timeout(tier)
return ApprovalRequest(
action=action,
tier=tier,
command=command,
reason=reason,
session_key=session_key,
timeout_seconds=timeout
)
# Crisis bypass rules
CRISIS_BYPASS_ACTIONS = frozenset([
"send_message", # Always allow sending crisis resources
"check_crisis",
"notify_crisis",
])
def is_crisis_bypass(action: str, context: str = "") -> bool:
"""Check if action should bypass approval during crisis."""
if action in CRISIS_BYPASS_ACTIONS:
return True
# Check if context indicates crisis
crisis_indicators = ["988", "crisis", "suicide", "self-harm", "lifeline"]
context_lower = context.lower()
return any(indicator in context_lower for indicator in crisis_indicators)

View File

@@ -1,233 +0,0 @@
"""
Tool Error Classification — Retryable vs Permanent.
Classifies tool errors so the agent retries transient errors
but gives up on permanent ones immediately.
"""
import logging
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
"""Error category classification."""
RETRYABLE = "retryable"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class ErrorClassification:
"""Result of error classification."""
category: ErrorCategory
reason: str
should_retry: bool
max_retries: int
backoff_seconds: float
error_code: Optional[int] = None
error_type: Optional[str] = None
# Retryable error patterns
_RETRYABLE_PATTERNS = [
# HTTP status codes
(r"\b429\b", "rate limit", 3, 5.0),
(r"\b500\b", "server error", 3, 2.0),
(r"\b502\b", "bad gateway", 3, 2.0),
(r"\b503\b", "service unavailable", 3, 5.0),
(r"\b504\b", "gateway timeout", 3, 5.0),
# Timeout patterns
(r"timeout", "timeout", 3, 2.0),
(r"timed out", "timeout", 3, 2.0),
(r"TimeoutExpired", "timeout", 3, 2.0),
# Connection errors
(r"connection refused", "connection refused", 2, 5.0),
(r"connection reset", "connection reset", 2, 2.0),
(r"network unreachable", "network unreachable", 2, 10.0),
(r"DNS", "DNS error", 2, 5.0),
# Transient errors
(r"temporary", "temporary error", 2, 2.0),
(r"transient", "transient error", 2, 2.0),
(r"retry", "retryable", 2, 2.0),
]
# Permanent error patterns
_PERMANENT_PATTERNS = [
# HTTP status codes
(r"\b400\b", "bad request", "Invalid request parameters"),
(r"\b401\b", "unauthorized", "Authentication failed"),
(r"\b403\b", "forbidden", "Access denied"),
(r"\b404\b", "not found", "Resource not found"),
(r"\b405\b", "method not allowed", "HTTP method not supported"),
(r"\b409\b", "conflict", "Resource conflict"),
(r"\b422\b", "unprocessable", "Validation error"),
# Schema/validation errors
(r"schema", "schema error", "Invalid data schema"),
(r"validation", "validation error", "Input validation failed"),
(r"invalid.*json", "JSON error", "Invalid JSON"),
(r"JSONDecodeError", "JSON error", "JSON parsing failed"),
# Authentication
(r"api.?key", "API key error", "Invalid or missing API key"),
(r"token.*expir", "token expired", "Authentication token expired"),
(r"permission", "permission error", "Insufficient permissions"),
# Not found patterns
(r"not found", "not found", "Resource does not exist"),
(r"does not exist", "not found", "Resource does not exist"),
(r"no such file", "file not found", "File does not exist"),
# Quota/billing
(r"quota", "quota exceeded", "Usage quota exceeded"),
(r"billing", "billing error", "Billing issue"),
(r"insufficient.*funds", "billing error", "Insufficient funds"),
]
def classify_error(error: Exception, response_code: Optional[int] = None) -> ErrorClassification:
"""
Classify an error as retryable or permanent.
Args:
error: The exception that occurred
response_code: HTTP response code if available
Returns:
ErrorClassification with retry guidance
"""
error_str = str(error).lower()
error_type = type(error).__name__
# Check response code first
if response_code:
if response_code in (429, 500, 502, 503, 504):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=f"HTTP {response_code} - transient server error",
should_retry=True,
max_retries=3,
backoff_seconds=5.0 if response_code == 429 else 2.0,
error_code=response_code,
error_type=error_type,
)
elif response_code in (400, 401, 403, 404, 405, 409, 422):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=f"HTTP {response_code} - client error",
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_code=response_code,
error_type=error_type,
)
# Check retryable patterns
for pattern, reason, max_retries, backoff in _RETRYABLE_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=reason,
should_retry=True,
max_retries=max_retries,
backoff_seconds=backoff,
error_type=error_type,
)
# Check permanent patterns
for pattern, error_code, reason in _PERMANENT_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=reason,
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_type=error_type,
)
# Default: unknown, treat as retryable with caution
return ErrorClassification(
category=ErrorCategory.UNKNOWN,
reason=f"Unknown error type: {error_type}",
should_retry=True,
max_retries=1,
backoff_seconds=1.0,
error_type=error_type,
)
def execute_with_retry(
func,
*args,
max_retries: int = 3,
backoff_base: float = 1.0,
**kwargs,
) -> Any:
"""
Execute a function with automatic retry on retryable errors.
Args:
func: Function to execute
*args: Function arguments
max_retries: Maximum retry attempts
backoff_base: Base backoff time in seconds
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If permanent error or max retries exceeded
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
# Classify the error
classification = classify_error(e)
logger.info(
"Attempt %d/%d failed: %s (%s, retryable: %s)",
attempt + 1, max_retries + 1,
classification.reason,
classification.category.value,
classification.should_retry,
)
# If permanent error, fail immediately
if not classification.should_retry:
logger.error("Permanent error: %s", classification.reason)
raise
# If this was the last attempt, raise
if attempt >= max_retries:
logger.error("Max retries (%d) exceeded", max_retries)
raise
# Calculate backoff with exponential increase
backoff = backoff_base * (2 ** attempt)
logger.info("Retrying in %.1fs...", backoff)
time.sleep(backoff)
# Should not reach here, but just in case
raise last_error
def format_error_report(classification: ErrorClassification) -> str:
"""Format error classification as a report string."""
icon = "🔄" if classification.should_retry else ""
return f"{icon} {classification.category.value}: {classification.reason}"

View File

@@ -394,23 +394,6 @@ def session_search(
if len(seen_sessions) >= limit:
break
# RIDER: Reader-guided reranking — sort sessions by LLM answerability
# This bridges the R@5 vs E2E accuracy gap by prioritizing passages
# the LLM can actually answer from, not just keyword matches.
try:
from agent.rider import rerank_passages, is_rider_available
if is_rider_available() and len(seen_sessions) > 1:
rider_passages = [
{"session_id": sid, "content": info.get("snippet", ""), "rank": i + 1}
for i, (sid, info) in enumerate(seen_sessions.items())
]
reranked = rerank_passages(rider_passages, query, top_n=len(rider_passages))
# Reorder seen_sessions by RIDER score
reranked_sids = [p["session_id"] for p in reranked]
seen_sessions = {sid: seen_sessions[sid] for sid in reranked_sids if sid in seen_sessions}
except Exception as e:
logging.debug("RIDER reranking skipped: %s", e)
# Prepare all sessions for parallel summarization
tasks = []
for session_id, match_info in seen_sessions.items():