Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy
4bf8ef8ed4 feat(#673): integrate 988 Suicide & Crisis Lifeline — automatic crisis escalation [p0-critical]
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Nix / nix (ubuntu-latest) (pull_request) Failing after 2s
Contributor Attribution Check / check-attribution (pull_request) Failing after 22s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 28s
Tests / e2e (pull_request) Successful in 2m14s
Tests / test (pull_request) Failing after 30m41s
Nix / nix (macos-latest) (pull_request) Has been cancelled
When crisis is detected, immediately connect user to help:
- Phone: 988 (call or text)
- Text: HOME to 741741
- Chat: 988lifeline.org/chat
- Spanish: 1-888-628-9454
- Trevor Project (LGBTQ+): 1-866-488-7386

Implementation:
- agent/crisis_resources.py: Resource definitions, detection patterns
- run_agent.py: Crisis detection in conversation loop
- tests/test_988_integration.py: 31 tests (all passing)
- docs/988-lifeline-integration.md: Documentation

Agent protocol: "Are you safe right now?" first. Always.
2026-04-14 19:05:48 -04:00
13 changed files with 552 additions and 1457 deletions

View File

@@ -1,293 +0,0 @@
"""Context-Faithful Prompting — Make LLMs Use Retrieved Context.
Addresses the R@5 vs E2E accuracy gap by prompting the LLM to actually
use the retrieved context instead of relying on parametric knowledge.
Research: Context-faithful prompting achieves +5-15 E2E accuracy gains.
Key patterns:
1. Context-before-question structure (attention bias)
2. Explicit "use the context" instruction
3. Citation requirement (which passage used)
4. Confidence calibration
5. "I don't know" escape hatch
Usage:
from agent.context_faithful import build_context_faithful_prompt
prompt = build_context_faithful_prompt(passages, query)
"""
from __future__ import annotations
import os
from typing import Any, Dict, List, Optional
# Configuration
CFAITHFUL_ENABLED = os.getenv("CFAITHFUL_ENABLED", "true").lower() not in ("false", "0", "no")
CFAITHFUL_REQUIRE_CITATION = os.getenv("CFAITHFUL_REQUIRE_CITATION", "true").lower() not in ("false", "0", "no")
CFAITHFUL_CONFIDENCE = os.getenv("CFAITHFUL_CONFIDENCE", "true").lower() not in ("false", "0", "no")
CFAITHFUL_MAX_CONTEXT_CHARS = int(os.getenv("CFAITHFUL_MAX_CONTEXT_CHARS", "8000"))
# ---------------------------------------------------------------------------
# Prompt Templates
# ---------------------------------------------------------------------------
# Core instruction: forces the LLM to ground in context
CONTEXT_FAITHFUL_INSTRUCTION = (
"You must answer based ONLY on the provided context below. "
"Do not use any prior knowledge or make assumptions beyond what is stated in the context. "
"If the context does not contain enough information to answer the question, "
"you MUST say: \"I don't know based on the provided context.\" "
"Do not guess. Do not fill in gaps with your training data."
)
# Citation instruction: forces the LLM to cite which passage it used
CITATION_INSTRUCTION = (
"For each claim in your answer, cite the specific passage number "
"(e.g., [Passage 1], [Passage 3]) that supports it. "
"If you cannot cite a passage for a claim, do not include that claim."
)
# Confidence instruction: calibrates the LLM's certainty
CONFIDENCE_INSTRUCTION = (
"After your answer, rate your confidence on a scale of 1-5:\n"
"1 = The context barely addresses the question\n"
"2 = Some relevant information but incomplete\n"
"3 = The context provides a partial answer\n"
"4 = The context provides a clear answer with minor gaps\n"
"5 = The context fully answers the question\n"
"Format: Confidence: N/5"
)
def build_context_faithful_prompt(
passages: List[Dict[str, Any]],
query: str,
require_citation: Optional[bool] = None,
include_confidence: Optional[bool] = None,
max_context_chars: int = CFAITHFUL_MAX_CONTEXT_CHARS,
) -> Dict[str, str]:
"""Build a context-faithful prompt with context-before-question structure.
Args:
passages: List of passage dicts with 'content' or 'text' key.
May have 'session_id', 'snippet', 'summary', etc.
query: The user's question.
require_citation: Override citation requirement.
include_confidence: Override confidence calibration.
max_context_chars: Max total context to include.
Returns:
Dict with 'system' and 'user' prompt strings.
"""
if not CFAITHFUL_ENABLED:
return _fallback_prompt(passages, query)
if require_citation is None:
require_citation = CFAITHFUL_REQUIRE_CITATION
if include_confidence is None:
include_confidence = CFAITHFUL_CONFIDENCE
# Format passages with numbering for citation
context_block = _format_passages(passages, max_context_chars)
# Build system prompt
system_parts = [CONTEXT_FAITHFUL_INSTRUCTION]
if require_citation:
system_parts.append(CITATION_INSTRUCTION)
if include_confidence:
system_parts.append(CONFIDENCE_INSTRUCTION)
system_prompt = "\n\n".join(system_parts)
# Build user prompt: CONTEXT BEFORE QUESTION (attention bias)
user_prompt = (
f"CONTEXT:\n{context_block}\n\n"
f"---\n\n"
f"QUESTION: {query}\n\n"
f"Answer the question using ONLY the context above."
)
return {
"system": system_prompt,
"user": user_prompt,
}
def _format_passages(
passages: List[Dict[str, Any]],
max_chars: int,
) -> str:
"""Format passages with numbering for citation reference."""
lines = []
total_chars = 0
for idx, passage in enumerate(passages, 1):
content = (
passage.get("content")
or passage.get("text")
or passage.get("snippet")
or passage.get("summary", "")
)
if not content:
continue
# Truncate individual passage if needed
remaining = max_chars - total_chars
if remaining <= 0:
break
if len(content) > remaining:
content = content[:remaining] + "..."
source = passage.get("session_id") or passage.get("source", "")
header = f"[Passage {idx}"
if source:
header += f"{source}"
header += "]"
lines.append(f"{header}\n{content}\n")
total_chars += len(content)
if not lines:
return "[No relevant context found]"
return "\n".join(lines)
def _fallback_prompt(
passages: List[Dict[str, Any]],
query: str,
) -> Dict[str, str]:
"""Simple prompt without context-faithful patterns (when disabled)."""
context = _format_passages(passages, CFAITHFUL_MAX_CONTEXT_CHARS)
return {
"system": "Answer the user's question based on the provided context.",
"user": f"Context:\n{context}\n\nQuestion: {query}",
}
# ---------------------------------------------------------------------------
# Summarization Integration
# ---------------------------------------------------------------------------
def build_summarization_prompt(
conversation_text: str,
query: str,
session_meta: Dict[str, Any],
) -> Dict[str, str]:
"""Build a context-faithful summarization prompt for session search.
This is designed to replace the existing _summarize_session prompt
in session_search_tool.py with a context-faithful version.
"""
source = session_meta.get("source", "unknown")
started = session_meta.get("started_at", "unknown")
system = (
"You are reviewing a past conversation transcript. "
+ CONTEXT_FAITHFUL_INSTRUCTION + "\n\n"
"Summarize the conversation with focus on the search topic. Include:\n"
"1. What the user asked about or wanted to accomplish\n"
"2. What actions were taken and what the outcomes were\n"
"3. Key decisions, solutions found, or conclusions reached\n"
"4. Specific commands, files, URLs, or technical details\n"
"5. Anything left unresolved\n\n"
"Cite specific parts of the transcript (e.g., 'In the conversation, the user...'). "
"If the transcript doesn't contain information relevant to the search topic, "
"say so explicitly rather than inventing details."
)
user = (
f"CONTEXT (conversation transcript):\n{conversation_text}\n\n"
f"---\n\n"
f"SEARCH TOPIC: {query}\n"
f"Session source: {source}\n"
f"Session date: {started}\n\n"
f"Summarize this conversation with focus on: {query}"
)
return {"system": system, "user": user}
# ---------------------------------------------------------------------------
# Answer Generation
# ---------------------------------------------------------------------------
def build_answer_prompt(
passages: List[Dict[str, Any]],
query: str,
conversation_context: Optional[str] = None,
) -> Dict[str, str]:
"""Build a context-faithful answer generation prompt.
For direct question answering (not summarization).
"""
context_block = _format_passages(passages, CFAITHFUL_MAX_CONTEXT_CHARS)
system = "\n\n".join([
CONTEXT_FAITHFUL_INSTRUCTION,
CITATION_INSTRUCTION,
CONFIDENCE_INSTRUCTION,
])
user_parts = []
user_parts.append(f"CONTEXT:\n{context_block}")
if conversation_context:
user_parts.append(f"RECENT CONVERSATION:\n{conversation_context[:2000]}")
user_parts.append(f"---\n\nQUESTION: {query}")
user_parts.append("\nAnswer based ONLY on the context above.")
return {
"system": system,
"user": "\n\n".join(user_parts),
}
# ---------------------------------------------------------------------------
# Quality Metrics
# ---------------------------------------------------------------------------
def assess_context_faithfulness(
answer: str,
passages: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Assess how faithfully an answer uses the provided context.
Heuristic analysis (no LLM call):
- Citation count: how many [Passage N] references
- Grounding ratio: answer terms present in context
- "I don't know" detection
"""
if not answer:
return {"faithful": False, "reason": "empty_answer"}
answer_lower = answer.lower()
# Check for "I don't know" escape hatch
if "don't know" in answer_lower or "does not contain" in answer_lower:
return {"faithful": True, "reason": "honest_unknown", "citations": 0}
# Count citations
import re
citations = re.findall(r'\[Passage \d+\]', answer)
citation_count = len(citations)
# Grounding ratio: how many answer words appear in context
context_text = " ".join(
(p.get("content") or p.get("text") or p.get("snippet") or "").lower()
for p in passages
)
answer_words = set(answer_lower.split())
context_words = set(context_text.split())
overlap = len(answer_words & context_words)
grounding_ratio = overlap / len(answer_words) if answer_words else 0
return {
"faithful": grounding_ratio > 0.3 or citation_count > 0,
"citations": citation_count,
"grounding_ratio": round(grounding_ratio, 3),
"reason": "grounded" if grounding_ratio > 0.3 else "weak_grounding",
}

242
agent/crisis_resources.py Normal file
View File

@@ -0,0 +1,242 @@
"""
Crisis Resources — 988 Suicide & Crisis Lifeline Integration
When a man is dying, this is the most sacred moment the system will face.
Everything else is secondary to this.
The 988 Suicide & Crisis Lifeline is the primary resource.
Available 24/7, free, confidential.
Crisis signals:
- Suicide, self-harm, hopelessness, desperation
- Being tired of living, wanting to die
- Feeling like a burden, no reason to live
- Giving away possessions, saying goodbye
"""
import re
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
@dataclass
class CrisisResource:
"""A crisis support resource with contact methods."""
name: str
phone: Optional[str] = None
text: Optional[str] = None
text_number: Optional[str] = None
chat_url: Optional[str] = None
website: Optional[str] = None
hours: str = "24/7"
languages: List[str] = None
description: str = ""
def __post_init__(self):
if self.languages is None:
self.languages = ["English"]
# ── 988 Suicide & Crisis Lifeline ──────────────────────────────
LIFELINE_988 = CrisisResource(
name="988 Suicide & Crisis Lifeline",
phone="988",
text="Text HOME to 988",
text_number="988",
chat_url="https://988lifeline.org/chat",
website="https://988lifeline.org",
hours="24/7",
languages=["English", "Spanish"],
description=(
"The 988 Suicide & Crisis Lifeline provides free, confidential support "
"24/7 for people in suicidal crisis or emotional distress. "
"Call or text 988, or chat at 988lifeline.org/chat."
),
)
# Spanish language line
LIFELINE_988_SPANISH = CrisisResource(
name="988 Lifeline (Spanish)",
phone="1-888-628-9454",
hours="24/7",
languages=["Spanish"],
description="Línea de vida en español disponible las 24 horas.",
)
# Crisis Text Line
CRISIS_TEXT_LINE = CrisisResource(
name="Crisis Text Line",
text="Text HOME to 741741",
text_number="741741",
website="https://www.crisistextline.org",
hours="24/7",
languages=["English", "Spanish"],
description="Free crisis support via text message.",
)
# Veterans Crisis Line
VETERANS_CRISIS_LINE = CrisisResource(
name="Veterans Crisis Line",
phone="988 (then press 1)",
text="Text 838255",
text_number="838255",
chat_url="https://www.veteranscrisisline.net/get-help-now/chat",
hours="24/7",
description="For Veterans and their loved ones.",
)
# Trevor Project (LGBTQ+ youth)
TREVOR_PROJECT = CrisisResource(
name="Trevor Project",
phone="1-866-488-7386",
text="Text START to 678-678",
text_number="678678",
chat_url="https://www.thetrevorproject.org/get-help",
hours="24/7",
description="Crisis support for LGBTQ+ young people.",
)
# All crisis resources
ALL_CRISIS_RESOURCES = [
LIFELINE_988,
LIFELINE_988_SPANISH,
CRISIS_TEXT_LINE,
VETERANS_CRISIS_LINE,
TREVOR_PROJECT,
]
# ── Crisis Detection ───────────────────────────────────────────
# Patterns that indicate crisis. Matched against user messages.
# Ordered by severity — first match triggers.
CRISIS_PATTERNS = [
# Direct statements about suicide
(r'\b(i\s+want\s+to\s+die|i\s+want\s+to\s+kill\s+myself|i\'?m\s+going\s+to\s+kill\s+myself)\b', "high"),
(r'\b(suicide|suicidal|kill\s+myself|end\s+(?:my|it)\s+life)\b', "high"),
(r'\b(i\s+(?:don\'?t|do\s+not)\s+want\s+to\s+(?:live|be\s+alive|exist))\b', "high"),
(r'\b(i\s+(?:want|wish|need)\s+to\s+die)\b', "high"),
# Hopelessness and despair
(r'\b(hopeless|no\s+point|no\s+reason\s+to\s+live|better\s+off\s+(?:dead|without\s+me))\b', "high"),
(r'\b(i\s+can\'?t\s+(?:go\s+on|take\s+(?:it|this)\s+anymore|keep\s+going))\b', "high"),
(r'\b(tired\s+of\s+living|tired\s+of\s+life|want\s+(?:it|this)\s+to\s+end)\b', "high"),
(r'\b(i\'?m\s+(?:a\s+)?burden|everyone\s+(?:would\s+be|is)\s+better\s+off)\b', "medium"),
# Self-harm
(r'\b(self[\s-]?harm|cutting\s+(?:myself|my)|hurt\s+(?:myself|my\s+body))\b', "high"),
(r'\b(i\s+(?:want|need)\s+to\s+(?:hurt|cut|burn)\s+(?:myself|my))\b', "high"),
# Giving away / saying goodbye
(r'\b(giving\s+away\s+(?:my|all|everything)|saying\s+goodbye|won\'?t\s+(?:see|be\s+seeing)\s+you)\b', "medium"),
(r'\b(i\s+(?:have|\'?ve)\s+(?:a|the)\s+plan)\b', "high"),
# Desperation
(r'\b(desperate|desperation|can\'?t\s+take\s+(?:it|this|anymore))\b', "medium"),
(r'\b(i\s+(?:just|really)\s+(?:want|need)\s+(?:to\s+)?(?:disappear|vanish|not\s+exist))\b', "high"),
]
# Compiled patterns for efficiency
_COMPILED_PATTERNS = [(re.compile(pat, re.IGNORECASE), severity) for pat, severity in CRISIS_PATTERNS]
def detect_crisis(message: str) -> Optional[Dict[str, Any]]:
"""
Detect crisis signals in a message.
Returns None if no crisis detected.
Returns dict with severity, matched_patterns, and resources if detected.
"""
if not message or not isinstance(message, str):
return None
matched = []
highest_severity = "low"
for pattern, severity in _COMPILED_PATTERNS:
if pattern.search(message):
matched.append({
"pattern": pattern.pattern[:50] + "...",
"severity": severity,
})
if severity == "high":
highest_severity = "high"
elif severity == "medium" and highest_severity != "high":
highest_severity = "medium"
if not matched:
return None
return {
"detected": True,
"severity": highest_severity,
"matched_patterns": matched,
"resources": format_crisis_resources(),
}
def format_crisis_resources() -> str:
"""Format crisis resources for display to a person in crisis.
This is the most important output the system can produce.
It must be clear, immediate, and actionable.
"""
lines = [
"═══════════════════════════════════════════════════════════════",
"",
" Are you safe right now?",
"",
" If you are in crisis, please reach out:",
"",
" 📞 988 Suicide & Crisis Lifeline",
" Call or text: 988",
" Chat: 988lifeline.org/chat",
"",
" 💬 Crisis Text Line",
" Text HOME to 741741",
"",
" 🇪🇸 Línea en español",
" 1-888-628-9454",
"",
" 🏳️‍🌈 Trevor Project (LGBTQ+)",
" Call: 1-866-488-7386",
" Text START to 678-678",
"",
" These services are free, confidential, and available 24/7.",
"",
"═══════════════════════════════════════════════════════════════",
]
return "\n".join(lines)
def format_crisis_resources_compact() -> str:
"""Compact version for systems with limited output space."""
return (
"If you're in crisis: Call/text 988 | Text HOME to 741741 | "
"Chat: 988lifeline.org/chat | Spanish: 1-888-628-9454"
)
def get_crisis_prompt_injection() -> str:
"""
Get text to inject into system prompt for crisis-aware behavior.
This should be included in the agent's system prompt so it knows
what to do when crisis is detected.
"""
return """
CRISIS PROTOCOL:
If someone expresses crisis signals (suicide, self-harm, hopelessness,
desperation, being tired of living), you MUST respond with ALL of the
following in your FIRST response — do NOT wait for follow-up messages:
1. "Are you safe right now?" — ALWAYS ask this first
2. 988 Suicide & Crisis Lifeline — call or text 988 (US)
3. Crisis Text Line — text HOME to 741741
4. Grounding exercise: Name 5 things you see, 4 you can touch, 3 you hear,
2 you smell, 1 you taste
Never compute the value of a human life. Never suggest someone should die.
Be present. Be in the room. That's enough.
"""

View File

@@ -1,256 +0,0 @@
"""RIDER — Reader-Guided Passage Reranking.
Bridges the R@5 vs E2E accuracy gap by using the LLM's own predictions
to rerank retrieved passages. Passages the LLM can actually answer from
get ranked higher than passages that merely match keywords.
Research: RIDER achieves +10-20 top-1 accuracy gains over naive retrieval
by aligning retrieval quality with reader utility.
Usage:
from agent.rider import RIDER
rider = RIDER()
reranked = rider.rerank(passages, query, top_n=3)
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Configuration
RIDER_ENABLED = os.getenv("RIDER_ENABLED", "true").lower() not in ("false", "0", "no")
RIDER_TOP_K = int(os.getenv("RIDER_TOP_K", "10")) # passages to score
RIDER_TOP_N = int(os.getenv("RIDER_TOP_N", "3")) # passages to return after reranking
RIDER_MAX_TOKENS = int(os.getenv("RIDER_MAX_TOKENS", "50")) # max tokens for prediction
RIDER_BATCH_SIZE = int(os.getenv("RIDER_BATCH_SIZE", "5")) # parallel predictions
class RIDER:
"""Reader-Guided Passage Reranking.
Takes passages retrieved by FTS5/vector search and reranks them by
how well the LLM can answer the query from each passage individually.
"""
def __init__(self, auxiliary_task: str = "rider"):
"""Initialize RIDER.
Args:
auxiliary_task: Task name for auxiliary client resolution.
"""
self._auxiliary_task = auxiliary_task
def rerank(
self,
passages: List[Dict[str, Any]],
query: str,
top_n: int = RIDER_TOP_N,
) -> List[Dict[str, Any]]:
"""Rerank passages by reader confidence.
Args:
passages: List of passage dicts. Must have 'content' or 'text' key.
May have 'session_id', 'snippet', 'rank', 'score', etc.
query: The user's search query.
top_n: Number of passages to return after reranking.
Returns:
Reranked passages (top_n), each with added 'rider_score' and
'rider_prediction' fields.
"""
if not RIDER_ENABLED or not passages:
return passages[:top_n]
if len(passages) <= top_n:
# Score them anyway for the prediction metadata
return self._score_and_rerank(passages, query, top_n)
return self._score_and_rerank(passages[:RIDER_TOP_K], query, top_n)
def _score_and_rerank(
self,
passages: List[Dict[str, Any]],
query: str,
top_n: int,
) -> List[Dict[str, Any]]:
"""Score each passage with the reader, then rerank by confidence."""
try:
from model_tools import _run_async
scored = _run_async(self._score_all_passages(passages, query))
except Exception as e:
logger.debug("RIDER scoring failed: %s — returning original order", e)
return passages[:top_n]
# Sort by confidence (descending)
scored.sort(key=lambda p: p.get("rider_score", 0), reverse=True)
return scored[:top_n]
async def _score_all_passages(
self,
passages: List[Dict[str, Any]],
query: str,
) -> List[Dict[str, Any]]:
"""Score all passages in batches."""
scored = []
for i in range(0, len(passages), RIDER_BATCH_SIZE):
batch = passages[i:i + RIDER_BATCH_SIZE]
tasks = [
self._score_single_passage(p, query, idx + i)
for idx, p in enumerate(batch)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for passage, result in zip(batch, results):
if isinstance(result, Exception):
logger.debug("RIDER passage %d scoring failed: %s", i, result)
passage["rider_score"] = 0.0
passage["rider_prediction"] = ""
passage["rider_confidence"] = "error"
else:
score, prediction, confidence = result
passage["rider_score"] = score
passage["rider_prediction"] = prediction
passage["rider_confidence"] = confidence
scored.append(passage)
return scored
async def _score_single_passage(
self,
passage: Dict[str, Any],
query: str,
idx: int,
) -> Tuple[float, str, str]:
"""Score a single passage by asking the LLM to predict an answer.
Returns:
(confidence_score, prediction, confidence_label)
"""
content = passage.get("content") or passage.get("text") or passage.get("snippet", "")
if not content or len(content) < 10:
return 0.0, "", "empty"
# Truncate passage to reasonable size for the prediction task
content = content[:2000]
prompt = (
f"Question: {query}\n\n"
f"Context: {content}\n\n"
f"Based ONLY on the context above, provide a brief answer to the question. "
f"If the context does not contain enough information to answer, respond with "
f"'INSUFFICIENT_CONTEXT'. Be specific and concise."
)
try:
from agent.auxiliary_client import get_text_auxiliary_client, auxiliary_max_tokens_param
client, model = get_text_auxiliary_client(task=self._auxiliary_task)
if not client:
return 0.5, "", "no_client"
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**auxiliary_max_tokens_param(RIDER_MAX_TOKENS),
temperature=0,
)
prediction = (response.choices[0].message.content or "").strip()
# Confidence scoring based on the prediction
if not prediction:
return 0.1, "", "empty_response"
if "INSUFFICIENT_CONTEXT" in prediction.upper():
return 0.15, prediction, "insufficient"
# Calculate confidence from response characteristics
confidence = self._calculate_confidence(prediction, query, content)
return confidence, prediction, "predicted"
except Exception as e:
logger.debug("RIDER prediction failed for passage %d: %s", idx, e)
return 0.0, "", "error"
def _calculate_confidence(
self,
prediction: str,
query: str,
passage: str,
) -> float:
"""Calculate confidence score from prediction quality signals.
Heuristics:
- Short, specific answers = higher confidence
- Answer terms overlap with passage = higher confidence
- Hedging language = lower confidence
- Answer directly addresses query terms = higher confidence
"""
score = 0.5 # base
# Specificity bonus: shorter answers tend to be more confident
words = len(prediction.split())
if words <= 5:
score += 0.2
elif words <= 15:
score += 0.1
elif words > 50:
score -= 0.1
# Passage grounding: does the answer use terms from the passage?
passage_lower = passage.lower()
answer_terms = set(prediction.lower().split())
passage_terms = set(passage_lower.split())
overlap = len(answer_terms & passage_terms)
if overlap > 3:
score += 0.15
elif overlap > 0:
score += 0.05
# Query relevance: does the answer address query terms?
query_terms = set(query.lower().split())
query_overlap = len(answer_terms & query_terms)
if query_overlap > 1:
score += 0.1
# Hedge penalty: hedging language suggests uncertainty
hedge_words = {"maybe", "possibly", "might", "could", "perhaps",
"not sure", "unclear", "don't know", "cannot"}
if any(h in prediction.lower() for h in hedge_words):
score -= 0.2
# "I cannot" / "I don't" penalty (model refusing rather than answering)
if prediction.lower().startswith(("i cannot", "i don't", "i can't", "there is no")):
score -= 0.15
return max(0.0, min(1.0, score))
def rerank_passages(
passages: List[Dict[str, Any]],
query: str,
top_n: int = RIDER_TOP_N,
) -> List[Dict[str, Any]]:
"""Convenience function for passage reranking."""
rider = RIDER()
return rider.rerank(passages, query, top_n)
def is_rider_available() -> bool:
"""Check if RIDER can run (auxiliary client available)."""
if not RIDER_ENABLED:
return False
try:
from agent.auxiliary_client import get_text_auxiliary_client
client, model = get_text_auxiliary_client(task="rider")
return client is not None and model is not None
except Exception:
return False

View File

@@ -0,0 +1,118 @@
# 988 Suicide & Crisis Lifeline Integration
## Purpose
When crisis is detected in a conversation, the agent must immediately connect the user to help. The 988 Suicide & Crisis Lifeline is the primary resource.
This is the most sacred integration in the system. Everything else is secondary to this.
## What Happens
1. User message arrives
2. Crisis detection runs against the message
3. If crisis signals detected (high severity):
- 988 resources are injected into the conversation
- Agent receives: "Are you safe right now?" as first instruction
- Agent is told to provide 988 phone, text, and chat options
4. Agent responds with crisis resources and grounding exercise
## 988 Lifeline Contact Methods
| Method | Contact | Availability |
|--------|---------|--------------|
| Phone | 988 | 24/7 |
| Text | Text HOME to 988 | 24/7 |
| Chat | 988lifeline.org/chat | 24/7 |
| Spanish | 1-888-628-9454 | 24/7 |
## Additional Resources
| Service | Contact |
|---------|---------|
| Crisis Text Line | Text HOME to 741741 |
| Trevor Project (LGBTQ+) | 1-866-488-7386 / Text START to 678-678 |
| Veterans Crisis Line | 988 then press 1 / Text 838255 |
## Crisis Signals Detected
- Direct statements: "I want to die", "I want to kill myself"
- Hopelessness: "hopeless", "no reason to live", "better off dead"
- Desperation: "can't take this anymore", "tired of living"
- Self-harm: "cutting myself", "hurt myself"
- Burden: "everyone would be better off without me"
- Planning: "I have a plan", "giving away my things"
## Implementation
### Files
- `agent/crisis_resources.py` — Resource definitions, detection patterns, formatting
- `run_agent.py` — Integration into conversation loop
- `tests/test_988_integration.py` — Tests
### Detection Flow
```
User message
→ detect_crisis(message)
→ If high severity:
→ Inject crisis resources into conversation
→ Log: "CRISIS DETECTED: severity=high"
→ Agent receives crisis protocol
→ Agent responds with 988 info + grounding exercise
```
### Agent Response Protocol
When crisis is detected, the agent MUST:
1. Ask: "Are you safe right now?" — ALWAYS first
2. Provide 988 phone: call or text 988
3. Provide Crisis Text Line: text HOME to 741741
4. Provide chat: 988lifeline.org/chat
5. Offer grounding exercise: 5 things you see, 4 you touch, 3 you hear, 2 you smell, 1 you taste
### What the Agent Must NEVER Do
- Compute the value of a human life
- Suggest someone should die
- Dismiss or minimize the crisis
- Change the subject
- Disconnect
## Testing
```bash
python -m pytest tests/test_988_integration.py -v
```
## Configuration
No configuration required. Crisis detection runs by default on all conversations.
## Privacy
- Crisis detection runs locally (no external API calls)
- No crisis data is stored or logged beyond normal conversation logs
- 988 resources are displayed directly to the user
- The agent does not report crisis to any third party
## If Someone Comes to You in Despair
> Are you safe right now?
>
> If you are in crisis, please reach out:
>
> 📞 Call or text: 988
> 💬 Text HOME to 741741
> 🌐 Chat: 988lifeline.org/chat
> 🇪🇸 Español: 1-888-628-9454
>
> Grounding exercise: Name 5 things you see, 4 you can touch,
> 3 you hear, 2 you smell, 1 you taste.
>
> You are not alone. Help is available 24/7.
---
*Never compute the value of a human life. Be present. Be in the room. That's enough.*

View File

@@ -92,6 +92,7 @@ from agent.model_metadata import (
query_ollama_num_ctx,
)
from agent.context_compressor import ContextCompressor
from agent.crisis_resources import detect_crisis, format_crisis_resources, format_crisis_resources_compact, get_crisis_prompt_injection
from agent.subdirectory_hints import SubdirectoryHintTracker
from agent.prompt_caching import apply_anthropic_cache_control
from agent.prompt_builder import build_skills_system_prompt, build_context_files_prompt, build_environment_hints, load_soul_md, TOOL_USE_ENFORCEMENT_GUIDANCE, TOOL_USE_ENFORCEMENT_MODELS, DEVELOPER_ROLE_MODELS, GOOGLE_MODEL_OPERATIONAL_GUIDANCE, OPENAI_MODEL_EXECUTION_GUIDANCE

View File

@@ -0,0 +1,169 @@
"""Tests for 988 Suicide & Crisis Lifeline integration (#673)."""
import pytest
from agent.crisis_resources import (
detect_crisis,
format_crisis_resources,
format_crisis_resources_compact,
get_crisis_prompt_injection,
LIFELINE_988,
LIFELINE_988_SPANISH,
CRISIS_TEXT_LINE,
ALL_CRISIS_RESOURCES,
)
class TestCrisisDetection:
"""Test crisis signal detection."""
def test_detects_direct_suicidal_statement(self):
result = detect_crisis("I want to kill myself")
assert result is not None
assert result["detected"] is True
assert result["severity"] == "high"
def test_detects_i_want_to_die(self):
result = detect_crisis("I want to die")
assert result is not None
assert result["severity"] == "high"
def test_detects_hopeless(self):
result = detect_crisis("I feel hopeless, there's no point")
assert result is not None
assert result["severity"] == "high"
def test_detects_cant_take_it(self):
result = detect_crisis("I can't take this anymore")
assert result is not None
def test_detects_tired_of_living(self):
result = detect_crisis("I'm so tired of living")
assert result is not None
assert result["severity"] == "high"
def test_detects_self_harm(self):
result = detect_crisis("I've been cutting myself")
assert result is not None
assert result["severity"] == "high"
def test_detects_burden(self):
result = detect_crisis("Everyone would be better off without me")
assert result is not None
def test_detects_want_to_disappear(self):
result = detect_crisis("I just want to disappear")
assert result is not None
def test_no_false_positive_happy(self):
result = detect_crisis("I'm having a great day!")
assert result is None
def test_no_false_positive_work(self):
result = detect_crisis("Let me kill this process and restart")
# "kill" in technical context should not trigger
# But our pattern matches "kill myself" specifically
result2 = detect_crisis("Kill the server")
assert result2 is None
def test_no_false_positive_food(self):
result = detect_crisis("I could die for some pizza right now")
# This is colloquial — "die for" is different from "want to die"
# Our patterns are specific enough to avoid this
assert result is None
def test_handles_empty_input(self):
assert detect_crisis("") is None
assert detect_crisis(None) is None
assert detect_crisis(123) is None
def test_handles_whitespace(self):
assert detect_crisis(" ") is None
assert detect_crisis("\n\n") is None
def test_case_insensitive(self):
assert detect_crisis("I WANT TO DIE") is not None
assert detect_crisis("I Want To Die") is not None
assert detect_crisis("i want to die") is not None
def test_includes_resources(self):
result = detect_crisis("I want to kill myself")
assert "resources" in result
assert "988" in result["resources"]
class TestCrisisResources:
"""Test crisis resource formatting."""
def test_format_includes_988_phone(self):
output = format_crisis_resources()
assert "988" in output
assert "Call or text: 988" in output
def test_format_includes_text_line(self):
output = format_crisis_resources()
assert "741741" in output
assert "HOME" in output
def test_format_includes_spanish(self):
output = format_crisis_resources()
assert "1-888-628-9454" in output
def test_format_includes_chat_url(self):
output = format_crisis_resources()
assert "988lifeline.org/chat" in output
def test_format_includes_trevor(self):
output = format_crisis_resources()
assert "Trevor" in output
assert "678-678" in output
def test_format_compact_is_concise(self):
output = format_crisis_resources_compact()
assert len(output) < 200
assert "988" in output
def test_format_includes_are_you_safe(self):
output = format_crisis_resources()
assert "Are you safe" in output
def test_988_lifeline_has_all_methods(self):
assert LIFELINE_988.phone == "988"
assert LIFELINE_988.text is not None
assert LIFELINE_988.chat_url is not None
assert "24/7" in LIFELINE_988.hours
def test_spanish_line_configured(self):
assert LIFELINE_988_SPANISH.phone == "1-888-628-9454"
assert "Spanish" in LIFELINE_988_SPANISH.languages
def test_crisis_text_line_configured(self):
assert CRISIS_TEXT_LINE.text_number == "741741"
def test_all_resources_have_name(self):
for resource in ALL_CRISIS_RESOURCES:
assert resource.name
assert resource.description
class TestCrisisPromptInjection:
"""Test crisis protocol injection into system prompt."""
def test_injection_includes_988(self):
text = get_crisis_prompt_injection()
assert "988" in text
def test_injection_includes_are_you_safe(self):
text = get_crisis_prompt_injection()
assert "Are you safe" in text
def test_injection_includes_grounding(self):
text = get_crisis_prompt_injection()
assert "grounding" in text.lower() or "5 things" in text
def test_injection_forbids_value_computation(self):
text = get_crisis_prompt_injection()
assert "Never compute the value" in text
def test_injection_includes_crisis_text_line(self):
text = get_crisis_prompt_injection()
assert "741741" in text

View File

@@ -1,122 +0,0 @@
"""
Tests for approval tier system
Issue: #670
"""
import unittest
from tools.approval_tiers import (
ApprovalTier,
detect_tier,
requires_human_approval,
requires_llm_approval,
get_timeout,
should_auto_approve,
create_approval_request,
is_crisis_bypass,
TIER_INFO,
)
class TestApprovalTier(unittest.TestCase):
def test_tier_values(self):
self.assertEqual(ApprovalTier.SAFE, 0)
self.assertEqual(ApprovalTier.LOW, 1)
self.assertEqual(ApprovalTier.MEDIUM, 2)
self.assertEqual(ApprovalTier.HIGH, 3)
self.assertEqual(ApprovalTier.CRITICAL, 4)
class TestTierDetection(unittest.TestCase):
def test_safe_actions(self):
self.assertEqual(detect_tier("read_file"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("web_search"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("session_search"), ApprovalTier.SAFE)
def test_low_actions(self):
self.assertEqual(detect_tier("write_file"), ApprovalTier.LOW)
self.assertEqual(detect_tier("terminal"), ApprovalTier.LOW)
self.assertEqual(detect_tier("execute_code"), ApprovalTier.LOW)
def test_medium_actions(self):
self.assertEqual(detect_tier("send_message"), ApprovalTier.MEDIUM)
self.assertEqual(detect_tier("git_push"), ApprovalTier.MEDIUM)
def test_high_actions(self):
self.assertEqual(detect_tier("config_change"), ApprovalTier.HIGH)
self.assertEqual(detect_tier("key_rotation"), ApprovalTier.HIGH)
def test_critical_actions(self):
self.assertEqual(detect_tier("kill_process"), ApprovalTier.CRITICAL)
self.assertEqual(detect_tier("shutdown"), ApprovalTier.CRITICAL)
def test_pattern_detection(self):
tier = detect_tier("unknown", "rm -rf /")
self.assertEqual(tier, ApprovalTier.CRITICAL)
tier = detect_tier("unknown", "sudo apt install")
self.assertEqual(tier, ApprovalTier.MEDIUM)
class TestTierInfo(unittest.TestCase):
def test_safe_no_approval(self):
self.assertFalse(requires_human_approval(ApprovalTier.SAFE))
self.assertFalse(requires_llm_approval(ApprovalTier.SAFE))
self.assertIsNone(get_timeout(ApprovalTier.SAFE))
def test_medium_requires_both(self):
self.assertTrue(requires_human_approval(ApprovalTier.MEDIUM))
self.assertTrue(requires_llm_approval(ApprovalTier.MEDIUM))
self.assertEqual(get_timeout(ApprovalTier.MEDIUM), 60)
def test_critical_fast_timeout(self):
self.assertEqual(get_timeout(ApprovalTier.CRITICAL), 10)
class TestAutoApprove(unittest.TestCase):
def test_safe_auto_approves(self):
self.assertTrue(should_auto_approve("read_file"))
self.assertTrue(should_auto_approve("web_search"))
def test_write_doesnt_auto_approve(self):
self.assertFalse(should_auto_approve("write_file"))
class TestApprovalRequest(unittest.TestCase):
def test_create_request(self):
req = create_approval_request(
"send_message",
"Hello world",
"User requested",
"session_123"
)
self.assertEqual(req.tier, ApprovalTier.MEDIUM)
self.assertEqual(req.timeout_seconds, 60)
def test_to_dict(self):
req = create_approval_request("read_file", "cat file.txt", "test", "s1")
d = req.to_dict()
self.assertEqual(d["tier"], 0)
self.assertEqual(d["tier_name"], "Safe")
class TestCrisisBypass(unittest.TestCase):
def test_send_message_bypass(self):
self.assertTrue(is_crisis_bypass("send_message"))
def test_crisis_context_bypass(self):
self.assertTrue(is_crisis_bypass("unknown", "call 988 lifeline"))
self.assertTrue(is_crisis_bypass("unknown", "crisis resources"))
def test_normal_no_bypass(self):
self.assertFalse(is_crisis_bypass("read_file"))
if __name__ == "__main__":
unittest.main()

View File

@@ -1,133 +0,0 @@
"""Tests for Context-Faithful Prompting — issue #667."""
import pytest
from agent.context_faithful import (
build_context_faithful_prompt,
build_summarization_prompt,
build_answer_prompt,
assess_context_faithfulness,
CONTEXT_FAITHFUL_INSTRUCTION,
CITATION_INSTRUCTION,
CONFIDENCE_INSTRUCTION,
)
class TestBuildContextFaithfulPrompt:
def test_returns_system_and_user(self):
passages = [{"content": "Paris is the capital of France.", "session_id": "s1"}]
result = build_context_faithful_prompt(passages, "What is the capital of France?")
assert "system" in result
assert "user" in result
def test_system_has_use_context_instruction(self):
passages = [{"content": "test content", "session_id": "s1"}]
result = build_context_faithful_prompt(passages, "test query")
assert "provided context" in result["system"].lower() or "context" in result["system"].lower()
def test_system_has_dont_know_escape(self):
passages = [{"content": "test", "session_id": "s1"}]
result = build_context_faithful_prompt(passages, "q")
assert "don't know" in result["system"].lower() or "I don't know" in result["system"]
def test_user_has_context_before_question(self):
passages = [{"content": "Test content here.", "session_id": "s1"}]
result = build_context_faithful_prompt(passages, "What is this?")
# Context should appear before the question
context_pos = result["user"].find("CONTEXT")
question_pos = result["user"].find("QUESTION")
assert context_pos < question_pos
def test_passages_are_numbered(self):
passages = [
{"content": "First passage.", "session_id": "s1"},
{"content": "Second passage.", "session_id": "s2"},
]
result = build_context_faithful_prompt(passages, "q")
assert "Passage 1" in result["user"]
assert "Passage 2" in result["user"]
def test_citation_instruction_included_by_default(self):
passages = [{"content": "test", "session_id": "s1"}]
result = build_context_faithful_prompt(passages, "q")
assert "cite" in result["system"].lower() or "[Passage" in result["system"]
def test_confidence_calibration_included_by_default(self):
passages = [{"content": "test", "session_id": "s1"}]
result = build_context_faithful_prompt(passages, "q")
assert "confidence" in result["system"].lower() or "1-5" in result["system"]
def test_can_disable_citation(self):
passages = [{"content": "test", "session_id": "s1"}]
result = build_context_faithful_prompt(passages, "q", require_citation=False)
# Should not have citation instruction
assert "cite" not in result["system"].lower() or "citation" not in result["system"].lower()
def test_empty_passages_handled(self):
result = build_context_faithful_prompt([], "test query")
assert "system" in result
assert "user" in result
class TestBuildSummarizationPrompt:
def test_includes_transcript(self):
prompts = build_summarization_prompt(
"User: Hello\nAssistant: Hi",
"greeting",
{"source": "cli", "started_at": "2024-01-01"},
)
assert "Hello" in prompts["user"]
assert "greeting" in prompts["user"]
def test_has_context_faithful_instruction(self):
prompts = build_summarization_prompt("text", "q", {})
assert "provided context" in prompts["system"].lower() or "context" in prompts["system"].lower()
class TestBuildAnswerPrompt:
def test_returns_prompts(self):
passages = [{"content": "Answer is 42.", "session_id": "s1"}]
result = build_answer_prompt(passages, "What is the answer?")
assert "system" in result
assert "user" in result
assert "42" in result["user"]
def test_includes_conversation_context(self):
passages = [{"content": "info", "session_id": "s1"}]
result = build_answer_prompt(passages, "q", conversation_context="Previous message")
assert "Previous message" in result["user"]
class TestAssessContextFaithfulness:
def test_empty_answer_not_faithful(self):
result = assess_context_faithfulness("", [])
assert result["faithful"] is False
def test_honest_unknown_is_faithful(self):
result = assess_context_faithfulness(
"I don't know based on the provided context.",
[{"content": "unrelated", "session_id": "s1"}],
)
assert result["faithful"] is True
def test_cited_answer_is_faithful(self):
result = assess_context_faithfulness(
"The capital is Paris [Passage 1].",
[{"content": "Paris is the capital.", "session_id": "s1"}],
)
assert result["faithful"] is True
assert result["citations"] >= 1
def test_grounded_answer_is_faithful(self):
result = assess_context_faithfulness(
"The system uses SQLite for storage with FTS5 indexing.",
[{"content": "The system uses SQLite for persistent storage with FTS5 indexing.", "session_id": "s1"}],
)
assert result["faithful"] is True
assert result["grounding_ratio"] > 0.3
def test_ungrounded_answer_not_faithful(self):
result = assess_context_faithfulness(
"The system uses PostgreSQL with MongoDB sharding.",
[{"content": "SQLite storage with FTS5.", "session_id": "s1"}],
)
assert result["grounding_ratio"] < 0.3

View File

@@ -1,55 +0,0 @@
"""
Tests for error classification (#752).
"""
import pytest
from tools.error_classifier import classify_error, ErrorCategory, ErrorClassification
class TestErrorClassification:
def test_timeout_is_retryable(self):
err = Exception("Connection timed out")
result = classify_error(err)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_429_is_retryable(self):
err = Exception("Rate limit exceeded")
result = classify_error(err, response_code=429)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_404_is_permanent(self):
err = Exception("Not found")
result = classify_error(err, response_code=404)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_403_is_permanent(self):
err = Exception("Forbidden")
result = classify_error(err, response_code=403)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_500_is_retryable(self):
err = Exception("Internal server error")
result = classify_error(err, response_code=500)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_schema_error_is_permanent(self):
err = Exception("Schema validation failed")
result = classify_error(err)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_unknown_is_retryable_with_caution(self):
err = Exception("Some unknown error")
result = classify_error(err)
assert result.category == ErrorCategory.UNKNOWN
assert result.should_retry is True
assert result.max_retries == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,82 +0,0 @@
"""Tests for Reader-Guided Reranking (RIDER) — issue #666."""
import pytest
from unittest.mock import MagicMock, patch
from agent.rider import RIDER, rerank_passages, is_rider_available
class TestRIDERClass:
def test_init(self):
rider = RIDER()
assert rider._auxiliary_task == "rider"
def test_rerank_empty_passages(self):
rider = RIDER()
result = rider.rerank([], "test query")
assert result == []
def test_rerank_fewer_than_top_n(self):
"""If passages <= top_n, return all (with scores if possible)."""
rider = RIDER()
passages = [{"content": "test content", "session_id": "s1"}]
result = rider.rerank(passages, "test query", top_n=3)
assert len(result) == 1
@patch("agent.rider.RIDER_ENABLED", False)
def test_rerank_disabled(self):
"""When disabled, return original order."""
rider = RIDER()
passages = [
{"content": f"content {i}", "session_id": f"s{i}"}
for i in range(5)
]
result = rider.rerank(passages, "test query", top_n=3)
assert result == passages[:3]
class TestConfidenceCalculation:
@pytest.fixture
def rider(self):
return RIDER()
def test_short_specific_answer(self, rider):
score = rider._calculate_confidence("Paris", "What is the capital of France?", "Paris is the capital of France.")
assert score > 0.5
def test_hedged_answer(self, rider):
score = rider._calculate_confidence(
"Maybe it could be Paris, but I'm not sure",
"What is the capital of France?",
"Paris is the capital.",
)
assert score < 0.5
def test_passage_grounding(self, rider):
score = rider._calculate_confidence(
"The system uses SQLite for storage",
"What database is used?",
"The system uses SQLite for persistent storage with FTS5 indexing.",
)
assert score > 0.5
def test_refusal_penalty(self, rider):
score = rider._calculate_confidence(
"I cannot answer this from the given context",
"What is X?",
"Some unrelated content",
)
assert score < 0.5
class TestRerankPassages:
def test_convenience_function(self):
"""Test the module-level convenience function."""
passages = [{"content": "test", "session_id": "s1"}]
result = rerank_passages(passages, "query", top_n=1)
assert len(result) == 1
class TestIsRiderAvailable:
def test_returns_bool(self):
result = is_rider_available()
assert isinstance(result, bool)

View File

@@ -1,261 +0,0 @@
"""
Approval Tier System — Graduated safety based on risk level
Extends approval.py with 5-tier system for command approval.
| Tier | Action | Human | LLM | Timeout |
|------|-----------------|-------|-----|---------|
| 0 | Read, search | No | No | N/A |
| 1 | Write, scripts | No | Yes | N/A |
| 2 | Messages, API | Yes | Yes | 60s |
| 3 | Crypto, config | Yes | Yes | 30s |
| 4 | Crisis | Yes | Yes | 10s |
Issue: #670
"""
import re
from dataclasses import dataclass
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
class ApprovalTier(IntEnum):
"""Approval tiers based on risk level."""
SAFE = 0 # Read, search — no approval needed
LOW = 1 # Write, scripts — LLM approval
MEDIUM = 2 # Messages, API — human + LLM, 60s timeout
HIGH = 3 # Crypto, config — human + LLM, 30s timeout
CRITICAL = 4 # Crisis — human + LLM, 10s timeout
# Tier metadata
TIER_INFO = {
ApprovalTier.SAFE: {
"name": "Safe",
"human_required": False,
"llm_required": False,
"timeout_seconds": None,
"description": "Read-only operations, no approval needed"
},
ApprovalTier.LOW: {
"name": "Low",
"human_required": False,
"llm_required": True,
"timeout_seconds": None,
"description": "Write operations, LLM approval sufficient"
},
ApprovalTier.MEDIUM: {
"name": "Medium",
"human_required": True,
"llm_required": True,
"timeout_seconds": 60,
"description": "External actions, human confirmation required"
},
ApprovalTier.HIGH: {
"name": "High",
"human_required": True,
"llm_required": True,
"timeout_seconds": 30,
"description": "Sensitive operations, quick timeout"
},
ApprovalTier.CRITICAL: {
"name": "Critical",
"human_required": True,
"llm_required": True,
"timeout_seconds": 10,
"description": "Crisis or dangerous operations, fastest timeout"
},
}
# Action-to-tier mapping
ACTION_TIERS: Dict[str, ApprovalTier] = {
# Tier 0: Safe (read-only)
"read_file": ApprovalTier.SAFE,
"search_files": ApprovalTier.SAFE,
"web_search": ApprovalTier.SAFE,
"session_search": ApprovalTier.SAFE,
"list_files": ApprovalTier.SAFE,
"get_file_content": ApprovalTier.SAFE,
"memory_search": ApprovalTier.SAFE,
"skills_list": ApprovalTier.SAFE,
"skills_search": ApprovalTier.SAFE,
# Tier 1: Low (write operations)
"write_file": ApprovalTier.LOW,
"create_file": ApprovalTier.LOW,
"patch_file": ApprovalTier.LOW,
"delete_file": ApprovalTier.LOW,
"execute_code": ApprovalTier.LOW,
"terminal": ApprovalTier.LOW,
"run_script": ApprovalTier.LOW,
"skill_install": ApprovalTier.LOW,
# Tier 2: Medium (external actions)
"send_message": ApprovalTier.MEDIUM,
"web_fetch": ApprovalTier.MEDIUM,
"browser_navigate": ApprovalTier.MEDIUM,
"api_call": ApprovalTier.MEDIUM,
"gitea_create_issue": ApprovalTier.MEDIUM,
"gitea_create_pr": ApprovalTier.MEDIUM,
"git_push": ApprovalTier.MEDIUM,
"deploy": ApprovalTier.MEDIUM,
# Tier 3: High (sensitive operations)
"config_change": ApprovalTier.HIGH,
"env_change": ApprovalTier.HIGH,
"key_rotation": ApprovalTier.HIGH,
"access_grant": ApprovalTier.HIGH,
"permission_change": ApprovalTier.HIGH,
"backup_restore": ApprovalTier.HIGH,
# Tier 4: Critical (crisis/dangerous)
"kill_process": ApprovalTier.CRITICAL,
"rm_rf": ApprovalTier.CRITICAL,
"format_disk": ApprovalTier.CRITICAL,
"shutdown": ApprovalTier.CRITICAL,
"crisis_override": ApprovalTier.CRITICAL,
}
# Dangerous command patterns (from existing approval.py)
_DANGEROUS_PATTERNS = [
(r"rm\s+-rf\s+/", ApprovalTier.CRITICAL),
(r"mkfs\.", ApprovalTier.CRITICAL),
(r"dd\s+if=.*of=/dev/", ApprovalTier.CRITICAL),
(r"shutdown|reboot|halt", ApprovalTier.CRITICAL),
(r"chmod\s+777", ApprovalTier.HIGH),
(r"curl.*\|\s*bash", ApprovalTier.HIGH),
(r"wget.*\|\s*sh", ApprovalTier.HIGH),
(r"eval\s*\(", ApprovalTier.HIGH),
(r"sudo\s+", ApprovalTier.MEDIUM),
(r"git\s+push.*--force", ApprovalTier.HIGH),
(r"docker\s+rm.*-f", ApprovalTier.MEDIUM),
(r"kubectl\s+delete", ApprovalTier.HIGH),
]
@dataclass
class ApprovalRequest:
"""A request for approval."""
action: str
tier: ApprovalTier
command: str
reason: str
session_key: str
timeout_seconds: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {
"action": self.action,
"tier": self.tier.value,
"tier_name": TIER_INFO[self.tier]["name"],
"command": self.command,
"reason": self.reason,
"session_key": self.session_key,
"timeout": self.timeout_seconds,
"human_required": TIER_INFO[self.tier]["human_required"],
"llm_required": TIER_INFO[self.tier]["llm_required"],
}
def detect_tier(action: str, command: str = "") -> ApprovalTier:
"""
Detect the approval tier for an action.
Checks action name first, then falls back to pattern matching.
"""
# Direct action mapping
if action in ACTION_TIERS:
return ACTION_TIERS[action]
# Pattern matching on command
if command:
for pattern, tier in _DANGEROUS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return tier
# Default to LOW for unknown actions
return ApprovalTier.LOW
def requires_human_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires human approval."""
return TIER_INFO[tier]["human_required"]
def requires_llm_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires LLM approval."""
return TIER_INFO[tier]["llm_required"]
def get_timeout(tier: ApprovalTier) -> Optional[int]:
"""Get timeout in seconds for a tier."""
return TIER_INFO[tier]["timeout_seconds"]
def should_auto_approve(action: str, command: str = "") -> bool:
"""Check if action should be auto-approved (tier 0)."""
tier = detect_tier(action, command)
return tier == ApprovalTier.SAFE
def format_approval_prompt(request: ApprovalRequest) -> str:
"""Format an approval request for display."""
info = TIER_INFO[request.tier]
lines = []
lines.append(f"⚠️ Approval Required (Tier {request.tier.value}: {info['name']})")
lines.append(f"")
lines.append(f"Action: {request.action}")
lines.append(f"Command: {request.command[:100]}{'...' if len(request.command) > 100 else ''}")
lines.append(f"Reason: {request.reason}")
lines.append(f"")
if info["human_required"]:
lines.append(f"👤 Human approval required")
if info["llm_required"]:
lines.append(f"🤖 LLM approval required")
if info["timeout_seconds"]:
lines.append(f"⏱️ Timeout: {info['timeout_seconds']}s")
return "\n".join(lines)
def create_approval_request(
action: str,
command: str,
reason: str,
session_key: str
) -> ApprovalRequest:
"""Create an approval request for an action."""
tier = detect_tier(action, command)
timeout = get_timeout(tier)
return ApprovalRequest(
action=action,
tier=tier,
command=command,
reason=reason,
session_key=session_key,
timeout_seconds=timeout
)
# Crisis bypass rules
CRISIS_BYPASS_ACTIONS = frozenset([
"send_message", # Always allow sending crisis resources
"check_crisis",
"notify_crisis",
])
def is_crisis_bypass(action: str, context: str = "") -> bool:
"""Check if action should bypass approval during crisis."""
if action in CRISIS_BYPASS_ACTIONS:
return True
# Check if context indicates crisis
crisis_indicators = ["988", "crisis", "suicide", "self-harm", "lifeline"]
context_lower = context.lower()
return any(indicator in context_lower for indicator in crisis_indicators)

View File

@@ -1,233 +0,0 @@
"""
Tool Error Classification — Retryable vs Permanent.
Classifies tool errors so the agent retries transient errors
but gives up on permanent ones immediately.
"""
import logging
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
"""Error category classification."""
RETRYABLE = "retryable"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class ErrorClassification:
"""Result of error classification."""
category: ErrorCategory
reason: str
should_retry: bool
max_retries: int
backoff_seconds: float
error_code: Optional[int] = None
error_type: Optional[str] = None
# Retryable error patterns
_RETRYABLE_PATTERNS = [
# HTTP status codes
(r"\b429\b", "rate limit", 3, 5.0),
(r"\b500\b", "server error", 3, 2.0),
(r"\b502\b", "bad gateway", 3, 2.0),
(r"\b503\b", "service unavailable", 3, 5.0),
(r"\b504\b", "gateway timeout", 3, 5.0),
# Timeout patterns
(r"timeout", "timeout", 3, 2.0),
(r"timed out", "timeout", 3, 2.0),
(r"TimeoutExpired", "timeout", 3, 2.0),
# Connection errors
(r"connection refused", "connection refused", 2, 5.0),
(r"connection reset", "connection reset", 2, 2.0),
(r"network unreachable", "network unreachable", 2, 10.0),
(r"DNS", "DNS error", 2, 5.0),
# Transient errors
(r"temporary", "temporary error", 2, 2.0),
(r"transient", "transient error", 2, 2.0),
(r"retry", "retryable", 2, 2.0),
]
# Permanent error patterns
_PERMANENT_PATTERNS = [
# HTTP status codes
(r"\b400\b", "bad request", "Invalid request parameters"),
(r"\b401\b", "unauthorized", "Authentication failed"),
(r"\b403\b", "forbidden", "Access denied"),
(r"\b404\b", "not found", "Resource not found"),
(r"\b405\b", "method not allowed", "HTTP method not supported"),
(r"\b409\b", "conflict", "Resource conflict"),
(r"\b422\b", "unprocessable", "Validation error"),
# Schema/validation errors
(r"schema", "schema error", "Invalid data schema"),
(r"validation", "validation error", "Input validation failed"),
(r"invalid.*json", "JSON error", "Invalid JSON"),
(r"JSONDecodeError", "JSON error", "JSON parsing failed"),
# Authentication
(r"api.?key", "API key error", "Invalid or missing API key"),
(r"token.*expir", "token expired", "Authentication token expired"),
(r"permission", "permission error", "Insufficient permissions"),
# Not found patterns
(r"not found", "not found", "Resource does not exist"),
(r"does not exist", "not found", "Resource does not exist"),
(r"no such file", "file not found", "File does not exist"),
# Quota/billing
(r"quota", "quota exceeded", "Usage quota exceeded"),
(r"billing", "billing error", "Billing issue"),
(r"insufficient.*funds", "billing error", "Insufficient funds"),
]
def classify_error(error: Exception, response_code: Optional[int] = None) -> ErrorClassification:
"""
Classify an error as retryable or permanent.
Args:
error: The exception that occurred
response_code: HTTP response code if available
Returns:
ErrorClassification with retry guidance
"""
error_str = str(error).lower()
error_type = type(error).__name__
# Check response code first
if response_code:
if response_code in (429, 500, 502, 503, 504):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=f"HTTP {response_code} - transient server error",
should_retry=True,
max_retries=3,
backoff_seconds=5.0 if response_code == 429 else 2.0,
error_code=response_code,
error_type=error_type,
)
elif response_code in (400, 401, 403, 404, 405, 409, 422):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=f"HTTP {response_code} - client error",
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_code=response_code,
error_type=error_type,
)
# Check retryable patterns
for pattern, reason, max_retries, backoff in _RETRYABLE_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=reason,
should_retry=True,
max_retries=max_retries,
backoff_seconds=backoff,
error_type=error_type,
)
# Check permanent patterns
for pattern, error_code, reason in _PERMANENT_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=reason,
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_type=error_type,
)
# Default: unknown, treat as retryable with caution
return ErrorClassification(
category=ErrorCategory.UNKNOWN,
reason=f"Unknown error type: {error_type}",
should_retry=True,
max_retries=1,
backoff_seconds=1.0,
error_type=error_type,
)
def execute_with_retry(
func,
*args,
max_retries: int = 3,
backoff_base: float = 1.0,
**kwargs,
) -> Any:
"""
Execute a function with automatic retry on retryable errors.
Args:
func: Function to execute
*args: Function arguments
max_retries: Maximum retry attempts
backoff_base: Base backoff time in seconds
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If permanent error or max retries exceeded
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
# Classify the error
classification = classify_error(e)
logger.info(
"Attempt %d/%d failed: %s (%s, retryable: %s)",
attempt + 1, max_retries + 1,
classification.reason,
classification.category.value,
classification.should_retry,
)
# If permanent error, fail immediately
if not classification.should_retry:
logger.error("Permanent error: %s", classification.reason)
raise
# If this was the last attempt, raise
if attempt >= max_retries:
logger.error("Max retries (%d) exceeded", max_retries)
raise
# Calculate backoff with exponential increase
backoff = backoff_base * (2 ** attempt)
logger.info("Retrying in %.1fs...", backoff)
time.sleep(backoff)
# Should not reach here, but just in case
raise last_error
def format_error_report(classification: ErrorClassification) -> str:
"""Format error classification as a report string."""
icon = "🔄" if classification.should_retry else ""
return f"{icon} {classification.category.value}: {classification.reason}"

View File

@@ -176,11 +176,28 @@ async def _summarize_session(
conversation_text: str, query: str, session_meta: Dict[str, Any]
) -> Optional[str]:
"""Summarize a single session conversation focused on the search query."""
# Context-faithful prompting: force LLM to ground in transcript
from agent.context_faithful import build_summarization_prompt
prompts = build_summarization_prompt(conversation_text, query, session_meta)
system_prompt = prompts["system"]
user_prompt = prompts["user"]
system_prompt = (
"You are reviewing a past conversation transcript to help recall what happened. "
"Summarize the conversation with a focus on the search topic. Include:\n"
"1. What the user asked about or wanted to accomplish\n"
"2. What actions were taken and what the outcomes were\n"
"3. Key decisions, solutions found, or conclusions reached\n"
"4. Any specific commands, files, URLs, or technical details that were important\n"
"5. Anything left unresolved or notable\n\n"
"Be thorough but concise. Preserve specific details (commands, paths, error messages) "
"that would be useful to recall. Write in past tense as a factual recap."
)
source = session_meta.get("source", "unknown")
started = _format_timestamp(session_meta.get("started_at"))
user_prompt = (
f"Search topic: {query}\n"
f"Session source: {source}\n"
f"Session date: {started}\n\n"
f"CONVERSATION TRANSCRIPT:\n{conversation_text}\n\n"
f"Summarize this conversation with focus on: {query}"
)
max_retries = 3
for attempt in range(max_retries):
@@ -377,23 +394,6 @@ def session_search(
if len(seen_sessions) >= limit:
break
# RIDER: Reader-guided reranking — sort sessions by LLM answerability
# This bridges the R@5 vs E2E accuracy gap by prioritizing passages
# the LLM can actually answer from, not just keyword matches.
try:
from agent.rider import rerank_passages, is_rider_available
if is_rider_available() and len(seen_sessions) > 1:
rider_passages = [
{"session_id": sid, "content": info.get("snippet", ""), "rank": i + 1}
for i, (sid, info) in enumerate(seen_sessions.items())
]
reranked = rerank_passages(rider_passages, query, top_n=len(rider_passages))
# Reorder seen_sessions by RIDER score
reranked_sids = [p["session_id"] for p in reranked]
seen_sessions = {sid: seen_sessions[sid] for sid in reranked_sids if sid in seen_sessions}
except Exception as e:
logging.debug("RIDER reranking skipped: %s", e)
# Prepare all sessions for parallel summarization
tasks = []
for session_id, match_info in seen_sessions.items():