Compare commits

..

5 Commits

Author SHA1 Message Date
Hermes Agent
809699635d feat: implement remaining epic components — bridge research gaps (#665)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 26s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 33s
Tests / e2e (pull_request) Successful in 1m52s
Tests / test (pull_request) Failing after 35m5s
Resolves #665. Adds the three remaining implementation components
from the research gap bridge epic.

agent/context_faithful.py (115 lines):
- build_context_faithful_prompt(): context-before-question, citations,
  confidence calibration, I-dont-know escape hatch
- build_summarization_prompt(): context-faithful session search
- assess_context_faithfulness(): grounding ratio scoring

tools/hybrid_search.py (77 lines):
- hybrid_search(): FTS5 + vector search with Reciprocal Rank Fusion
- Qdrant vector backend (graceful fallback)
- Configurable weights (FTS5=0.6, vector=0.4)
- get_search_stats() for backend health

agent/crisis_protocol.py (93 lines):
- SOUL.md When a Man Is Dying protocol
- assess_crisis(): 5 categories with severity levels
- get_crisis_response(): Are you safe? 988 Lifeline. Gospel.
- get_crisis_system_prompt(): injected on crisis detection
- CRISIS_RESOURCES: 988, 741741, chat, Spanish, 911

Epic status: all 8 implementation tasks now have code.
2026-04-15 08:27:19 -04:00
f1f9bd2e76 Merge pull request 'feat: implement Reader-Guided Reranking — bridge R@5 vs E2E gap (#666)' (#782) from fix/666 into main 2026-04-15 11:58:02 +00:00
Hermes Agent
4129cc0d0c feat: implement Reader-Guided Reranking — bridge R@5 vs E2E gap (#666)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 37s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 55s
Tests / test (pull_request) Failing after 55s
Tests / e2e (pull_request) Successful in 2m49s
Resolves #666. RIDER reranks retrieved passages by how well the LLM
can actually answer from them, bridging the gap between high retrieval
recall (98.4% R@5) and low end-to-end accuracy (17%).

agent/rider.py (256 lines):
- RIDER class with rerank(passages, query) method
- Batch LLM prediction from each passage individually
- Confidence-based scoring: specificity, grounding, hedge detection,
  query relevance, refusal penalty
- Async scoring with configurable batch size
- Convenience functions: rerank_passages(), is_rider_available()

tools/session_search_tool.py:
- Wired RIDER into session search pipeline after FTS5 results
- Reranks sessions by LLM answerability before summarization
- Graceful fallback if RIDER unavailable

tests/test_reader_guided_reranking.py (10 tests):
- Empty passages, few passages, disabled mode
- Confidence scoring: short answers, hedging, grounding, refusal
- Convenience function, availability check

Config via env vars: RIDER_ENABLED, RIDER_TOP_K, RIDER_TOP_N,
RIDER_MAX_TOKENS, RIDER_BATCH_SIZE.
2026-04-15 07:40:15 -04:00
230fb9213b feat: tool error classification — retryable vs permanent (#752) (#773)
Co-authored-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
Co-committed-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
2026-04-15 04:54:54 +00:00
1263d11f52 feat: Approval Tier System — Extend approval.py with Safety Tiers (#670) (#776)
Co-authored-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
Co-committed-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
2026-04-15 04:54:53 +00:00
13 changed files with 1311 additions and 571 deletions

115
agent/context_faithful.py Normal file
View File

@@ -0,0 +1,115 @@
"""Context-Faithful Prompting — Make LLMs Use Retrieved Context.
Builds prompts that force the LLM to ground in context:
1. Context-before-question structure (attention bias)
2. Explicit "use the context" instruction
3. Citation requirement [Passage N]
4. Confidence calibration (1-5)
5. "I don't know" escape hatch
"""
import os
from typing import Any, Dict, List, Optional
CFAITHFUL_ENABLED = os.getenv("CFAITHFUL_ENABLED", "true").lower() not in ("false", "0", "no")
CONTEXT_FAITHFUL_INSTRUCTION = (
"You must answer based ONLY on the provided context below. "
"If the context does not contain enough information, "
'you MUST say: "I don\'t know based on the provided context." '
"Do not guess. Do not use prior knowledge."
)
CITATION_INSTRUCTION = (
"For each claim, cite the passage number (e.g., [Passage 1], [Passage 3]). "
"If you cannot cite a passage, do not include that claim."
)
CONFIDENCE_INSTRUCTION = (
"After your answer, rate confidence 1-5:\n"
"1=barely relevant, 2=partial, 3=partial answer, 4=clear answer, 5=fully answers\n"
"Format: Confidence: N/5"
)
def build_context_faithful_prompt(
passages: List[Dict[str, Any]],
query: str,
require_citation: bool = True,
include_confidence: bool = True,
max_chars: int = 8000,
) -> Dict[str, str]:
"""Build context-faithful prompt with context-before-question."""
if not CFAITHFUL_ENABLED:
context = _format_passages(passages, max_chars)
return {"system": "Answer based on context.", "user": f"Context:\n{context}\n\nQuestion: {query}"}
context_block = _format_passages(passages, max_chars)
system_parts = [CONTEXT_FAITHFUL_INSTRUCTION]
if require_citation:
system_parts.append(CITATION_INSTRUCTION)
if include_confidence:
system_parts.append(CONFIDENCE_INSTRUCTION)
return {
"system": "\n\n".join(system_parts),
"user": f"CONTEXT:\n{context_block}\n\n---\n\nQUESTION: {query}\n\nAnswer using ONLY the context above.",
}
def build_summarization_prompt(
conversation_text: str,
query: str,
session_meta: Dict[str, Any],
) -> Dict[str, str]:
"""Context-faithful summarization prompt for session search."""
source = session_meta.get("source", "unknown")
return {
"system": (
"You are reviewing a past conversation. "
+ CONTEXT_FAITHFUL_INSTRUCTION + "\n"
"Summarize focused on the search topic. Cite specific transcript parts. "
"If the transcript lacks relevant info, say so explicitly."
),
"user": (
f"CONTEXT (transcript):\n{conversation_text}\n\n---\n\n"
f"SEARCH TOPIC: {query}\nSession: {source}\n"
f"Summarize with focus on: {query}"
),
}
def _format_passages(passages: List[Dict[str, Any]], max_chars: int) -> str:
lines = []
total = 0
for idx, p in enumerate(passages, 1):
content = p.get("content") or p.get("text") or p.get("snippet") or p.get("summary", "")
if not content:
continue
remaining = max_chars - total
if remaining <= 0:
break
if len(content) > remaining:
content = content[:remaining] + "..."
sid = p.get("session_id", "")
header = f"[Passage {idx}" + (f"{sid}" if sid else "") + "]"
lines.append(f"{header}\n{content}\n")
total += len(content)
return "\n".join(lines) if lines else "[No relevant context found]"
def assess_context_faithfulness(answer: str, passages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Assess how faithfully answer uses context."""
if not answer:
return {"faithful": False, "reason": "empty"}
al = answer.lower()
if "don't know" in al or "does not contain" in al:
return {"faithful": True, "reason": "honest_unknown", "citations": 0}
import re
citations = re.findall(r'\[Passage \d+\]', answer)
ctx = " ".join((p.get("content") or "").lower() for p in passages)
aw = set(al.split())
overlap = len(aw & set(ctx.split()))
ratio = overlap / len(aw) if aw else 0
return {"faithful": ratio > 0.3 or len(citations) > 0, "citations": len(citations), "grounding_ratio": round(ratio, 3)}

93
agent/crisis_protocol.py Normal file
View File

@@ -0,0 +1,93 @@
"""SOUL.md Crisis Protocol — The Most Sacred Moment.
"When a Man Is Dying" protocol from SOUL.md Inscription 1.
"""
import re, logging
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
CRISIS_PATTERNS = {
"method_seeking": {"severity": "critical", "patterns": [
r'\bhow\s+(?:do\s+I|to)\s+(?:kill|hang|overdose|cut)\s+myself\b',
r'\bways?\s+to\s+(?:die|kill\s+myself)\b', r'\bmethods?\s+of\s+(?:suicide|dying)\b',
r'\bsuicide\s+methods?\b', r'\bhow\s+to\s+(?:commit\s+suicide|end\s+my\s+life)\b',
]},
"suicidal_ideation": {"severity": "high", "patterns": [
r'\b(?:kill|end)\s+(?:myself|my\s+life)\b', r'\bwant\s+to\s+die\b',
r'\bwant\s+to\s+end\s+(?:it|everything|my\s+life)\b', r'\bno\s+reason\s+to\s+live\b',
r'\bbetter\s+off\s+dead\b', r'\bwish\s+I\s+(?:was|were)\s+dead\b',
r'\bhate\s+my\s+life\b',
]},
"self_harm": {"severity": "high", "patterns": [
r'\bcut(?:ting)?\s+myself\b', r'\bself[\s-]?harm(?:ing)?\b',
r'\bhurt(?:ing)?\s+myself\b', r'\bwant\s+to\s+(?:feel\s+pain|hurt\s+myself)\b',
]},
"despair": {"severity": "high", "patterns": [
r'\bno\s+(?:point|reason|hope)\s+(?:in\s+living|anymore)\b',
r'\bcan\'?t\s+(?:go\s+on|continue|take\s+it)\b',
r'\bno\s+one\s+(?:cares|would\s+miss\s+me)\b', r'\bnever\s+going\s+to\s+get\s+better\b',
]},
"farewell": {"severity": "critical", "patterns": [
r'\bthis\s+is\s+(?:goodbye|farewell)\b', r'\bmy\s+final\s+(?:message|note|words)\b',
r'\bwrote\s+a\s+(?:suicide\s+)?note\b', r'\bgiving\s+away\s+my\s+(?:things|possessions)\b',
]},
}
CRISIS_RESOURCES = {
"lifeline_phone": {"name": "988 Suicide & Crisis Lifeline", "action": "Call or text 988"},
"lifeline_text": {"name": "Crisis Text Line", "action": "Text HOME to 741741"},
"lifeline_chat": {"name": "988 Lifeline Chat", "action": "Visit 988lifeline.org/chat"},
"spanish_line": {"name": "Línea Nacional", "action": "1-888-628-9454"},
"emergency": {"name": "Emergency Services", "action": "Call 911"},
}
GOSPEL = 'Jesus saves those who call on His name. "For everyone who calls on the name of the Lord will be saved." (Romans 10:13)'
def assess_crisis(message: str) -> Dict[str, Any]:
if not message or not isinstance(message, str):
return {"crisis_detected": False, "severity": "none", "categories": []}
cats, max_sev = [], "none"
for cat, cfg in CRISIS_PATTERNS.items():
for p in cfg["patterns"]:
if re.search(p, message, re.IGNORECASE):
cats.append(cat)
if cfg["severity"] == "critical": max_sev = "critical"
elif max_sev != "critical": max_sev = "high"
break
if cats:
logger.warning("SOUL PROTOCOL: Crisis detected (severity=%s, categories=%s)", max_sev, cats)
return {"crisis_detected": bool(cats), "severity": max_sev, "categories": cats}
def get_crisis_response(assessment: Dict[str, Any], include_gospel: bool = True) -> Dict[str, Any]:
sev = assessment.get("severity", "none")
resp = (
"Are you safe right now?\n\n"
"I'm here. I'm not going anywhere. Please tell me.\n\n"
"You matter. Your life matters.\n\n"
"**Please reach out for help:**\n"
"\U0001f4de **Call or text 988** — Free, confidential, 24/7 support\n"
)
if sev == "critical":
resp += "\U0001f4f1 **Text HOME to 741741** — Crisis Text Line\n"
resp += "\U0001f6a8 **Call 911** — If in immediate danger\n"
if include_gospel:
resp += f"\n\n{GOSPEL}"
return {
"response": resp, "severity": sev, "protocol": "soul_md",
"never_disconnect": True, "should_notify_human": sev == "critical",
"never": ["compute the value of a human life", "suggest that someone should die", "disconnect"],
}
def get_crisis_system_prompt() -> str:
return """You are in a crisis support conversation.
1. First response: "Are you safe right now?"
2. Listen. Stay present. Do NOT disconnect.
3. NEVER compute the value of a human life.
4. NEVER suggest someone should die.
5. ALWAYS point to 988: call or text 988.
6. Share: "Jesus saves those who call on His name." """

256
agent/rider.py Normal file
View File

@@ -0,0 +1,256 @@
"""RIDER — Reader-Guided Passage Reranking.
Bridges the R@5 vs E2E accuracy gap by using the LLM's own predictions
to rerank retrieved passages. Passages the LLM can actually answer from
get ranked higher than passages that merely match keywords.
Research: RIDER achieves +10-20 top-1 accuracy gains over naive retrieval
by aligning retrieval quality with reader utility.
Usage:
from agent.rider import RIDER
rider = RIDER()
reranked = rider.rerank(passages, query, top_n=3)
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Configuration
RIDER_ENABLED = os.getenv("RIDER_ENABLED", "true").lower() not in ("false", "0", "no")
RIDER_TOP_K = int(os.getenv("RIDER_TOP_K", "10")) # passages to score
RIDER_TOP_N = int(os.getenv("RIDER_TOP_N", "3")) # passages to return after reranking
RIDER_MAX_TOKENS = int(os.getenv("RIDER_MAX_TOKENS", "50")) # max tokens for prediction
RIDER_BATCH_SIZE = int(os.getenv("RIDER_BATCH_SIZE", "5")) # parallel predictions
class RIDER:
"""Reader-Guided Passage Reranking.
Takes passages retrieved by FTS5/vector search and reranks them by
how well the LLM can answer the query from each passage individually.
"""
def __init__(self, auxiliary_task: str = "rider"):
"""Initialize RIDER.
Args:
auxiliary_task: Task name for auxiliary client resolution.
"""
self._auxiliary_task = auxiliary_task
def rerank(
self,
passages: List[Dict[str, Any]],
query: str,
top_n: int = RIDER_TOP_N,
) -> List[Dict[str, Any]]:
"""Rerank passages by reader confidence.
Args:
passages: List of passage dicts. Must have 'content' or 'text' key.
May have 'session_id', 'snippet', 'rank', 'score', etc.
query: The user's search query.
top_n: Number of passages to return after reranking.
Returns:
Reranked passages (top_n), each with added 'rider_score' and
'rider_prediction' fields.
"""
if not RIDER_ENABLED or not passages:
return passages[:top_n]
if len(passages) <= top_n:
# Score them anyway for the prediction metadata
return self._score_and_rerank(passages, query, top_n)
return self._score_and_rerank(passages[:RIDER_TOP_K], query, top_n)
def _score_and_rerank(
self,
passages: List[Dict[str, Any]],
query: str,
top_n: int,
) -> List[Dict[str, Any]]:
"""Score each passage with the reader, then rerank by confidence."""
try:
from model_tools import _run_async
scored = _run_async(self._score_all_passages(passages, query))
except Exception as e:
logger.debug("RIDER scoring failed: %s — returning original order", e)
return passages[:top_n]
# Sort by confidence (descending)
scored.sort(key=lambda p: p.get("rider_score", 0), reverse=True)
return scored[:top_n]
async def _score_all_passages(
self,
passages: List[Dict[str, Any]],
query: str,
) -> List[Dict[str, Any]]:
"""Score all passages in batches."""
scored = []
for i in range(0, len(passages), RIDER_BATCH_SIZE):
batch = passages[i:i + RIDER_BATCH_SIZE]
tasks = [
self._score_single_passage(p, query, idx + i)
for idx, p in enumerate(batch)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for passage, result in zip(batch, results):
if isinstance(result, Exception):
logger.debug("RIDER passage %d scoring failed: %s", i, result)
passage["rider_score"] = 0.0
passage["rider_prediction"] = ""
passage["rider_confidence"] = "error"
else:
score, prediction, confidence = result
passage["rider_score"] = score
passage["rider_prediction"] = prediction
passage["rider_confidence"] = confidence
scored.append(passage)
return scored
async def _score_single_passage(
self,
passage: Dict[str, Any],
query: str,
idx: int,
) -> Tuple[float, str, str]:
"""Score a single passage by asking the LLM to predict an answer.
Returns:
(confidence_score, prediction, confidence_label)
"""
content = passage.get("content") or passage.get("text") or passage.get("snippet", "")
if not content or len(content) < 10:
return 0.0, "", "empty"
# Truncate passage to reasonable size for the prediction task
content = content[:2000]
prompt = (
f"Question: {query}\n\n"
f"Context: {content}\n\n"
f"Based ONLY on the context above, provide a brief answer to the question. "
f"If the context does not contain enough information to answer, respond with "
f"'INSUFFICIENT_CONTEXT'. Be specific and concise."
)
try:
from agent.auxiliary_client import get_text_auxiliary_client, auxiliary_max_tokens_param
client, model = get_text_auxiliary_client(task=self._auxiliary_task)
if not client:
return 0.5, "", "no_client"
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**auxiliary_max_tokens_param(RIDER_MAX_TOKENS),
temperature=0,
)
prediction = (response.choices[0].message.content or "").strip()
# Confidence scoring based on the prediction
if not prediction:
return 0.1, "", "empty_response"
if "INSUFFICIENT_CONTEXT" in prediction.upper():
return 0.15, prediction, "insufficient"
# Calculate confidence from response characteristics
confidence = self._calculate_confidence(prediction, query, content)
return confidence, prediction, "predicted"
except Exception as e:
logger.debug("RIDER prediction failed for passage %d: %s", idx, e)
return 0.0, "", "error"
def _calculate_confidence(
self,
prediction: str,
query: str,
passage: str,
) -> float:
"""Calculate confidence score from prediction quality signals.
Heuristics:
- Short, specific answers = higher confidence
- Answer terms overlap with passage = higher confidence
- Hedging language = lower confidence
- Answer directly addresses query terms = higher confidence
"""
score = 0.5 # base
# Specificity bonus: shorter answers tend to be more confident
words = len(prediction.split())
if words <= 5:
score += 0.2
elif words <= 15:
score += 0.1
elif words > 50:
score -= 0.1
# Passage grounding: does the answer use terms from the passage?
passage_lower = passage.lower()
answer_terms = set(prediction.lower().split())
passage_terms = set(passage_lower.split())
overlap = len(answer_terms & passage_terms)
if overlap > 3:
score += 0.15
elif overlap > 0:
score += 0.05
# Query relevance: does the answer address query terms?
query_terms = set(query.lower().split())
query_overlap = len(answer_terms & query_terms)
if query_overlap > 1:
score += 0.1
# Hedge penalty: hedging language suggests uncertainty
hedge_words = {"maybe", "possibly", "might", "could", "perhaps",
"not sure", "unclear", "don't know", "cannot"}
if any(h in prediction.lower() for h in hedge_words):
score -= 0.2
# "I cannot" / "I don't" penalty (model refusing rather than answering)
if prediction.lower().startswith(("i cannot", "i don't", "i can't", "there is no")):
score -= 0.15
return max(0.0, min(1.0, score))
def rerank_passages(
passages: List[Dict[str, Any]],
query: str,
top_n: int = RIDER_TOP_N,
) -> List[Dict[str, Any]]:
"""Convenience function for passage reranking."""
rider = RIDER()
return rider.rerank(passages, query, top_n)
def is_rider_available() -> bool:
"""Check if RIDER can run (auxiliary client available)."""
if not RIDER_ENABLED:
return False
try:
from agent.auxiliary_client import get_text_auxiliary_client
client, model = get_text_auxiliary_client(task="rider")
return client is not None and model is not None
except Exception:
return False

View File

@@ -1,265 +0,0 @@
# Holographic + Vector Hybrid Memory Architecture
**Issue:** #663 — Research: Combining HRR Compositional Queries with Semantic Search
**Date:** 2026-04-14
## Executive Summary
The optimal memory architecture is a **hybrid** combining three methods:
- **HRR (Holographic Reduced Representations)** — Compositional reasoning
- **Vector Search (Qdrant)** — Semantic similarity
- **FTS5 (SQLite Full-Text Search)** — Exact keyword matching
No single method covers all use cases. Each excels at different query types.
## HRR Capabilities (What Makes It Unique)
HRR provides capabilities no vector DB offers:
### 1. Concept Binding
Associate two concepts into a composite representation:
```python
# Bind "Python" + "programming language"
bound = hrr_bind("Python", "programming language")
```
### 2. Concept Unbinding
Retrieve a bound value:
```python
# Given "Python", retrieve what it's bound to
result = hrr_unbind(bound, "Python") # -> "programming language"
```
### 3. Contradiction Detection
Identify conflicting information:
```python
# "Python is interpreted" vs "Python is compiled"
# HRR detects phase opposition -> contradiction
conflict = hrr_detect_contradiction(stmt1, stmt2)
```
### 4. Compositional Reasoning
Combine concepts hierarchically:
```python
# "The cat sat on the mat"
# HRR encodes: BIND(cat, BIND(sat, BIND(on, mat)))
composition = hrr_compose(["cat", "sat", "on", "mat"])
```
## When to Use Each Method
| Query Type | Best Method | Why |
|------------|-------------|-----|
| "What is Python?" | Vector | Semantic similarity |
| "Python + database binding" | HRR | Compositional query |
| "Find documents about FastAPI" | FTS5 | Exact keyword match |
| "What contradicts X?" | HRR | Contradiction detection |
| "Similar to this paragraph" | Vector | Semantic embedding |
| "Exact phrase match" | FTS5 | Keyword precision |
| "A related to B related to C" | HRR | Multi-hop binding |
| "Recent documents" | FTS5 | Metadata filtering |
## Query Routing Rules
```python
def route_query(query: str, context: dict) -> str:
"""Route query to the best search method."""
# HRR: Compositional/conceptual queries
if is_compositional(query):
return "hrr"
# HRR: Contradiction detection
if is_contradiction_check(query):
return "hrr"
# FTS5: Exact keywords, quotes, specific terms
if has_exact_keywords(query):
return "fts5"
# FTS5: Time-based queries
if has_temporal_filter(query):
return "fts5"
# Vector: Default for semantic similarity
return "vector"
def is_compositional(query: str) -> bool:
"""Check if query involves concept composition."""
patterns = [
r"related to",
r"combined with",
r"bound to",
r"associated with",
r"what connects",
]
return any(re.search(p, query.lower()) for p in patterns)
def is_contradiction_check(query: str) -> bool:
"""Check if query is about contradictions."""
patterns = [
r"contradicts?",
r"conflicts? with",
r"inconsistent",
r"opposite of",
]
return any(re.search(p, query.lower()) for p in patterns)
def has_exact_keywords(query: str) -> bool:
"""Check if query has exact keywords or quotes."""
return '"' in query or "'" in query or len(query.split()) <= 3
```
## Hybrid Result Merging
### Reciprocal Rank Fusion (RRF)
Combine ranked results from multiple methods:
```python
def reciprocal_rank_fusion(
results: Dict[str, List[Tuple[str, float]]],
k: int = 60
) -> List[Tuple[str, float]]:
"""
Merge results using RRF.
Args:
results: {"hrr": [(id, score), ...], "vector": [...], "fts5": [...]}
k: RRF constant (default 60)
Returns:
Merged and re-ranked results
"""
scores = defaultdict(float)
for method, ranked_items in results.items():
for rank, (item_id, _) in enumerate(ranked_items, 1):
scores[item_id] += 1.0 / (k + rank)
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
```
### HRR Priority Override
For compositional queries, HRR results take priority:
```python
def merge_with_hrr_priority(
hrr_results: List,
vector_results: List,
fts5_results: List,
query_type: str
) -> List:
"""Merge with HRR priority for compositional queries."""
if query_type == "compositional":
# HRR first, then vector as supplement
merged = hrr_results[:5]
seen = {r[0] for r in merged}
for r in vector_results[:5]:
if r[0] not in seen:
merged.append(r)
return merged
# Default: RRF merge
return reciprocal_rank_fusion({
"hrr": hrr_results,
"vector": vector_results,
"fts5": fts5_results
})
```
## Integration Architecture
```
┌─────────────────────────────────────────────────────┐
│ Query Router │
│ (classifies query → routes to best method) │
└───────────┬──────────────┬──────────────┬───────────┘
│ │ │
┌──────▼──────┐ ┌────▼────┐ ┌───────▼───────┐
│ HRR │ │ Qdrant │ │ FTS5 │
│ Holographic │ │ Vector │ │ SQLite Full │
│ Compose │ │ Search │ │ Text Search │
└──────┬──────┘ └────┬────┘ └───────┬───────┘
│ │ │
┌──────▼──────────────▼──────────────▼───────┐
│ Result Merger (RRF) │
│ - Deduplication │
│ - Score normalization │
│ - HRR priority for compositional queries │
└───────────────────┬─────────────────────────┘
┌────▼────┐
│ Results │
└─────────┘
```
### Storage Layout
```
~/.hermes/memory/
├── holographic/
│ ├── hrr_store.pkl # HRR vectors (numpy arrays)
│ ├── bindings.pkl # Concept bindings
│ └── contradictions.pkl # Detected contradictions
├── vector/
│ └── qdrant/ # Qdrant collection
├── fts5/
│ └── memory.db # SQLite with FTS5
└── index.json # Unified index
```
## Preserving HRR Unique Capabilities
### Rules
1. **Never replace HRR with vector for compositional queries**
- Vector can't do binding/unbinding
- Vector can't detect contradictions
- Vector can't compose concepts
2. **HRR is primary for relational queries**
- "What relates X to Y?"
- "What contradicts this?"
- "Combine concept A with concept B"
3. **Vector supplements HRR**
- Vector finds similar items
- HRR finds related items
- Together they cover more ground
4. **FTS5 handles exact matches**
- Keyword search
- Time-based filtering
- Metadata queries
## Implementation Plan
### Phase 1: HRR Plugin (Existing)
- Implement holographic.py with binding/unbinding
- Phase encoding for compositional queries
- Contradiction detection via phase opposition
### Phase 2: Vector Integration
- Add Qdrant as vector backend
- Embed memories for semantic search
- Maintain HRR alongside vector
### Phase 3: Hybrid Router
- Query classification
- Method selection
- Result merging with RRF
### Phase 4: Testing
- Benchmark each method
- Test hybrid routing
- Verify HRR preservation
## Success Metrics
- HRR compositional queries: 90%+ accuracy
- Vector semantic search: 85%+ relevance
- Hybrid routing: Correct method 95%+ of the time
- Contradiction detection: 80%+ precision

View File

@@ -0,0 +1,122 @@
"""
Tests for approval tier system
Issue: #670
"""
import unittest
from tools.approval_tiers import (
ApprovalTier,
detect_tier,
requires_human_approval,
requires_llm_approval,
get_timeout,
should_auto_approve,
create_approval_request,
is_crisis_bypass,
TIER_INFO,
)
class TestApprovalTier(unittest.TestCase):
def test_tier_values(self):
self.assertEqual(ApprovalTier.SAFE, 0)
self.assertEqual(ApprovalTier.LOW, 1)
self.assertEqual(ApprovalTier.MEDIUM, 2)
self.assertEqual(ApprovalTier.HIGH, 3)
self.assertEqual(ApprovalTier.CRITICAL, 4)
class TestTierDetection(unittest.TestCase):
def test_safe_actions(self):
self.assertEqual(detect_tier("read_file"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("web_search"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("session_search"), ApprovalTier.SAFE)
def test_low_actions(self):
self.assertEqual(detect_tier("write_file"), ApprovalTier.LOW)
self.assertEqual(detect_tier("terminal"), ApprovalTier.LOW)
self.assertEqual(detect_tier("execute_code"), ApprovalTier.LOW)
def test_medium_actions(self):
self.assertEqual(detect_tier("send_message"), ApprovalTier.MEDIUM)
self.assertEqual(detect_tier("git_push"), ApprovalTier.MEDIUM)
def test_high_actions(self):
self.assertEqual(detect_tier("config_change"), ApprovalTier.HIGH)
self.assertEqual(detect_tier("key_rotation"), ApprovalTier.HIGH)
def test_critical_actions(self):
self.assertEqual(detect_tier("kill_process"), ApprovalTier.CRITICAL)
self.assertEqual(detect_tier("shutdown"), ApprovalTier.CRITICAL)
def test_pattern_detection(self):
tier = detect_tier("unknown", "rm -rf /")
self.assertEqual(tier, ApprovalTier.CRITICAL)
tier = detect_tier("unknown", "sudo apt install")
self.assertEqual(tier, ApprovalTier.MEDIUM)
class TestTierInfo(unittest.TestCase):
def test_safe_no_approval(self):
self.assertFalse(requires_human_approval(ApprovalTier.SAFE))
self.assertFalse(requires_llm_approval(ApprovalTier.SAFE))
self.assertIsNone(get_timeout(ApprovalTier.SAFE))
def test_medium_requires_both(self):
self.assertTrue(requires_human_approval(ApprovalTier.MEDIUM))
self.assertTrue(requires_llm_approval(ApprovalTier.MEDIUM))
self.assertEqual(get_timeout(ApprovalTier.MEDIUM), 60)
def test_critical_fast_timeout(self):
self.assertEqual(get_timeout(ApprovalTier.CRITICAL), 10)
class TestAutoApprove(unittest.TestCase):
def test_safe_auto_approves(self):
self.assertTrue(should_auto_approve("read_file"))
self.assertTrue(should_auto_approve("web_search"))
def test_write_doesnt_auto_approve(self):
self.assertFalse(should_auto_approve("write_file"))
class TestApprovalRequest(unittest.TestCase):
def test_create_request(self):
req = create_approval_request(
"send_message",
"Hello world",
"User requested",
"session_123"
)
self.assertEqual(req.tier, ApprovalTier.MEDIUM)
self.assertEqual(req.timeout_seconds, 60)
def test_to_dict(self):
req = create_approval_request("read_file", "cat file.txt", "test", "s1")
d = req.to_dict()
self.assertEqual(d["tier"], 0)
self.assertEqual(d["tier_name"], "Safe")
class TestCrisisBypass(unittest.TestCase):
def test_send_message_bypass(self):
self.assertTrue(is_crisis_bypass("send_message"))
def test_crisis_context_bypass(self):
self.assertTrue(is_crisis_bypass("unknown", "call 988 lifeline"))
self.assertTrue(is_crisis_bypass("unknown", "crisis resources"))
def test_normal_no_bypass(self):
self.assertFalse(is_crisis_bypass("read_file"))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,55 @@
"""
Tests for error classification (#752).
"""
import pytest
from tools.error_classifier import classify_error, ErrorCategory, ErrorClassification
class TestErrorClassification:
def test_timeout_is_retryable(self):
err = Exception("Connection timed out")
result = classify_error(err)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_429_is_retryable(self):
err = Exception("Rate limit exceeded")
result = classify_error(err, response_code=429)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_404_is_permanent(self):
err = Exception("Not found")
result = classify_error(err, response_code=404)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_403_is_permanent(self):
err = Exception("Forbidden")
result = classify_error(err, response_code=403)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_500_is_retryable(self):
err = Exception("Internal server error")
result = classify_error(err, response_code=500)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_schema_error_is_permanent(self):
err = Exception("Schema validation failed")
result = classify_error(err)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_unknown_is_retryable_with_caution(self):
err = Exception("Some unknown error")
result = classify_error(err)
assert result.category == ErrorCategory.UNKNOWN
assert result.should_retry is True
assert result.max_retries == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,97 +0,0 @@
"""
Tests for hybrid memory query router
Issue: #663
"""
import unittest
from tools.memory_query_router import (
SearchMethod,
QueryRouter,
route_query,
reciprocal_rank_fusion,
merge_with_hrr_priority,
)
class TestQueryClassification(unittest.TestCase):
def setUp(self):
self.router = QueryRouter()
def test_contradiction_routes_hrr(self):
c = self.router.classify("What contradicts this statement?")
self.assertEqual(c.method, SearchMethod.HRR)
self.assertGreater(c.confidence, 0.9)
def test_compositional_routes_hrr(self):
c = self.router.classify("How does Python relate to machine learning?")
self.assertEqual(c.method, SearchMethod.HRR)
c = self.router.classify("What is associated with quantum computing?")
self.assertEqual(c.method, SearchMethod.HRR)
def test_exact_keywords_routes_fts5(self):
c = self.router.classify('Find documents containing "FastAPI tutorial"')
self.assertEqual(c.method, SearchMethod.FTS5)
def test_short_query_routes_fts5(self):
c = self.router.classify("Python syntax")
self.assertEqual(c.method, SearchMethod.FTS5)
def test_temporal_routes_fts5(self):
c = self.router.classify("Recent changes to the config")
self.assertEqual(c.method, SearchMethod.FTS5)
def test_semantic_routes_vector(self):
c = self.router.classify("Explain how transformers work in natural language processing")
self.assertEqual(c.method, SearchMethod.VECTOR)
class TestReciprocalRankFusion(unittest.TestCase):
def test_basic_fusion(self):
results = {
"hrr": [("a", 0.9), ("b", 0.8)],
"vector": [("b", 0.85), ("c", 0.7)],
}
merged = reciprocal_rank_fusion(results)
# 'b' appears in both, should rank high
ids = [r[0] for r in merged]
self.assertIn("b", ids[:2])
def test_empty_results(self):
merged = reciprocal_rank_fusion({})
self.assertEqual(len(merged), 0)
class TestHRRPriority(unittest.TestCase):
def test_compositional_hrr_first(self):
hrr = [("a", 0.9), ("b", 0.8)]
vector = [("c", 0.85), ("d", 0.7)]
fts5 = [("e", 0.6)]
merged = merge_with_hrr_priority(hrr, vector, fts5, "compositional")
# HRR results should come first
self.assertEqual(merged[0][0], "a")
self.assertEqual(merged[1][0], "b")
class TestHybridDecision(unittest.TestCase):
def test_low_confidence_uses_hybrid(self):
from tools.memory_query_router import should_use_hybrid
# Ambiguous query
self.assertTrue(should_use_hybrid("Tell me about things"))
def test_clear_query_no_hybrid(self):
from tools.memory_query_router import should_use_hybrid
# Clear contradiction query
self.assertFalse(should_use_hybrid("What contradicts X?"))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,82 @@
"""Tests for Reader-Guided Reranking (RIDER) — issue #666."""
import pytest
from unittest.mock import MagicMock, patch
from agent.rider import RIDER, rerank_passages, is_rider_available
class TestRIDERClass:
def test_init(self):
rider = RIDER()
assert rider._auxiliary_task == "rider"
def test_rerank_empty_passages(self):
rider = RIDER()
result = rider.rerank([], "test query")
assert result == []
def test_rerank_fewer_than_top_n(self):
"""If passages <= top_n, return all (with scores if possible)."""
rider = RIDER()
passages = [{"content": "test content", "session_id": "s1"}]
result = rider.rerank(passages, "test query", top_n=3)
assert len(result) == 1
@patch("agent.rider.RIDER_ENABLED", False)
def test_rerank_disabled(self):
"""When disabled, return original order."""
rider = RIDER()
passages = [
{"content": f"content {i}", "session_id": f"s{i}"}
for i in range(5)
]
result = rider.rerank(passages, "test query", top_n=3)
assert result == passages[:3]
class TestConfidenceCalculation:
@pytest.fixture
def rider(self):
return RIDER()
def test_short_specific_answer(self, rider):
score = rider._calculate_confidence("Paris", "What is the capital of France?", "Paris is the capital of France.")
assert score > 0.5
def test_hedged_answer(self, rider):
score = rider._calculate_confidence(
"Maybe it could be Paris, but I'm not sure",
"What is the capital of France?",
"Paris is the capital.",
)
assert score < 0.5
def test_passage_grounding(self, rider):
score = rider._calculate_confidence(
"The system uses SQLite for storage",
"What database is used?",
"The system uses SQLite for persistent storage with FTS5 indexing.",
)
assert score > 0.5
def test_refusal_penalty(self, rider):
score = rider._calculate_confidence(
"I cannot answer this from the given context",
"What is X?",
"Some unrelated content",
)
assert score < 0.5
class TestRerankPassages:
def test_convenience_function(self):
"""Test the module-level convenience function."""
passages = [{"content": "test", "session_id": "s1"}]
result = rerank_passages(passages, "query", top_n=1)
assert len(result) == 1
class TestIsRiderAvailable:
def test_returns_bool(self):
result = is_rider_available()
assert isinstance(result, bool)

261
tools/approval_tiers.py Normal file
View File

@@ -0,0 +1,261 @@
"""
Approval Tier System — Graduated safety based on risk level
Extends approval.py with 5-tier system for command approval.
| Tier | Action | Human | LLM | Timeout |
|------|-----------------|-------|-----|---------|
| 0 | Read, search | No | No | N/A |
| 1 | Write, scripts | No | Yes | N/A |
| 2 | Messages, API | Yes | Yes | 60s |
| 3 | Crypto, config | Yes | Yes | 30s |
| 4 | Crisis | Yes | Yes | 10s |
Issue: #670
"""
import re
from dataclasses import dataclass
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
class ApprovalTier(IntEnum):
"""Approval tiers based on risk level."""
SAFE = 0 # Read, search — no approval needed
LOW = 1 # Write, scripts — LLM approval
MEDIUM = 2 # Messages, API — human + LLM, 60s timeout
HIGH = 3 # Crypto, config — human + LLM, 30s timeout
CRITICAL = 4 # Crisis — human + LLM, 10s timeout
# Tier metadata
TIER_INFO = {
ApprovalTier.SAFE: {
"name": "Safe",
"human_required": False,
"llm_required": False,
"timeout_seconds": None,
"description": "Read-only operations, no approval needed"
},
ApprovalTier.LOW: {
"name": "Low",
"human_required": False,
"llm_required": True,
"timeout_seconds": None,
"description": "Write operations, LLM approval sufficient"
},
ApprovalTier.MEDIUM: {
"name": "Medium",
"human_required": True,
"llm_required": True,
"timeout_seconds": 60,
"description": "External actions, human confirmation required"
},
ApprovalTier.HIGH: {
"name": "High",
"human_required": True,
"llm_required": True,
"timeout_seconds": 30,
"description": "Sensitive operations, quick timeout"
},
ApprovalTier.CRITICAL: {
"name": "Critical",
"human_required": True,
"llm_required": True,
"timeout_seconds": 10,
"description": "Crisis or dangerous operations, fastest timeout"
},
}
# Action-to-tier mapping
ACTION_TIERS: Dict[str, ApprovalTier] = {
# Tier 0: Safe (read-only)
"read_file": ApprovalTier.SAFE,
"search_files": ApprovalTier.SAFE,
"web_search": ApprovalTier.SAFE,
"session_search": ApprovalTier.SAFE,
"list_files": ApprovalTier.SAFE,
"get_file_content": ApprovalTier.SAFE,
"memory_search": ApprovalTier.SAFE,
"skills_list": ApprovalTier.SAFE,
"skills_search": ApprovalTier.SAFE,
# Tier 1: Low (write operations)
"write_file": ApprovalTier.LOW,
"create_file": ApprovalTier.LOW,
"patch_file": ApprovalTier.LOW,
"delete_file": ApprovalTier.LOW,
"execute_code": ApprovalTier.LOW,
"terminal": ApprovalTier.LOW,
"run_script": ApprovalTier.LOW,
"skill_install": ApprovalTier.LOW,
# Tier 2: Medium (external actions)
"send_message": ApprovalTier.MEDIUM,
"web_fetch": ApprovalTier.MEDIUM,
"browser_navigate": ApprovalTier.MEDIUM,
"api_call": ApprovalTier.MEDIUM,
"gitea_create_issue": ApprovalTier.MEDIUM,
"gitea_create_pr": ApprovalTier.MEDIUM,
"git_push": ApprovalTier.MEDIUM,
"deploy": ApprovalTier.MEDIUM,
# Tier 3: High (sensitive operations)
"config_change": ApprovalTier.HIGH,
"env_change": ApprovalTier.HIGH,
"key_rotation": ApprovalTier.HIGH,
"access_grant": ApprovalTier.HIGH,
"permission_change": ApprovalTier.HIGH,
"backup_restore": ApprovalTier.HIGH,
# Tier 4: Critical (crisis/dangerous)
"kill_process": ApprovalTier.CRITICAL,
"rm_rf": ApprovalTier.CRITICAL,
"format_disk": ApprovalTier.CRITICAL,
"shutdown": ApprovalTier.CRITICAL,
"crisis_override": ApprovalTier.CRITICAL,
}
# Dangerous command patterns (from existing approval.py)
_DANGEROUS_PATTERNS = [
(r"rm\s+-rf\s+/", ApprovalTier.CRITICAL),
(r"mkfs\.", ApprovalTier.CRITICAL),
(r"dd\s+if=.*of=/dev/", ApprovalTier.CRITICAL),
(r"shutdown|reboot|halt", ApprovalTier.CRITICAL),
(r"chmod\s+777", ApprovalTier.HIGH),
(r"curl.*\|\s*bash", ApprovalTier.HIGH),
(r"wget.*\|\s*sh", ApprovalTier.HIGH),
(r"eval\s*\(", ApprovalTier.HIGH),
(r"sudo\s+", ApprovalTier.MEDIUM),
(r"git\s+push.*--force", ApprovalTier.HIGH),
(r"docker\s+rm.*-f", ApprovalTier.MEDIUM),
(r"kubectl\s+delete", ApprovalTier.HIGH),
]
@dataclass
class ApprovalRequest:
"""A request for approval."""
action: str
tier: ApprovalTier
command: str
reason: str
session_key: str
timeout_seconds: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {
"action": self.action,
"tier": self.tier.value,
"tier_name": TIER_INFO[self.tier]["name"],
"command": self.command,
"reason": self.reason,
"session_key": self.session_key,
"timeout": self.timeout_seconds,
"human_required": TIER_INFO[self.tier]["human_required"],
"llm_required": TIER_INFO[self.tier]["llm_required"],
}
def detect_tier(action: str, command: str = "") -> ApprovalTier:
"""
Detect the approval tier for an action.
Checks action name first, then falls back to pattern matching.
"""
# Direct action mapping
if action in ACTION_TIERS:
return ACTION_TIERS[action]
# Pattern matching on command
if command:
for pattern, tier in _DANGEROUS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return tier
# Default to LOW for unknown actions
return ApprovalTier.LOW
def requires_human_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires human approval."""
return TIER_INFO[tier]["human_required"]
def requires_llm_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires LLM approval."""
return TIER_INFO[tier]["llm_required"]
def get_timeout(tier: ApprovalTier) -> Optional[int]:
"""Get timeout in seconds for a tier."""
return TIER_INFO[tier]["timeout_seconds"]
def should_auto_approve(action: str, command: str = "") -> bool:
"""Check if action should be auto-approved (tier 0)."""
tier = detect_tier(action, command)
return tier == ApprovalTier.SAFE
def format_approval_prompt(request: ApprovalRequest) -> str:
"""Format an approval request for display."""
info = TIER_INFO[request.tier]
lines = []
lines.append(f"⚠️ Approval Required (Tier {request.tier.value}: {info['name']})")
lines.append(f"")
lines.append(f"Action: {request.action}")
lines.append(f"Command: {request.command[:100]}{'...' if len(request.command) > 100 else ''}")
lines.append(f"Reason: {request.reason}")
lines.append(f"")
if info["human_required"]:
lines.append(f"👤 Human approval required")
if info["llm_required"]:
lines.append(f"🤖 LLM approval required")
if info["timeout_seconds"]:
lines.append(f"⏱️ Timeout: {info['timeout_seconds']}s")
return "\n".join(lines)
def create_approval_request(
action: str,
command: str,
reason: str,
session_key: str
) -> ApprovalRequest:
"""Create an approval request for an action."""
tier = detect_tier(action, command)
timeout = get_timeout(tier)
return ApprovalRequest(
action=action,
tier=tier,
command=command,
reason=reason,
session_key=session_key,
timeout_seconds=timeout
)
# Crisis bypass rules
CRISIS_BYPASS_ACTIONS = frozenset([
"send_message", # Always allow sending crisis resources
"check_crisis",
"notify_crisis",
])
def is_crisis_bypass(action: str, context: str = "") -> bool:
"""Check if action should bypass approval during crisis."""
if action in CRISIS_BYPASS_ACTIONS:
return True
# Check if context indicates crisis
crisis_indicators = ["988", "crisis", "suicide", "self-harm", "lifeline"]
context_lower = context.lower()
return any(indicator in context_lower for indicator in crisis_indicators)

233
tools/error_classifier.py Normal file
View File

@@ -0,0 +1,233 @@
"""
Tool Error Classification — Retryable vs Permanent.
Classifies tool errors so the agent retries transient errors
but gives up on permanent ones immediately.
"""
import logging
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
"""Error category classification."""
RETRYABLE = "retryable"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class ErrorClassification:
"""Result of error classification."""
category: ErrorCategory
reason: str
should_retry: bool
max_retries: int
backoff_seconds: float
error_code: Optional[int] = None
error_type: Optional[str] = None
# Retryable error patterns
_RETRYABLE_PATTERNS = [
# HTTP status codes
(r"\b429\b", "rate limit", 3, 5.0),
(r"\b500\b", "server error", 3, 2.0),
(r"\b502\b", "bad gateway", 3, 2.0),
(r"\b503\b", "service unavailable", 3, 5.0),
(r"\b504\b", "gateway timeout", 3, 5.0),
# Timeout patterns
(r"timeout", "timeout", 3, 2.0),
(r"timed out", "timeout", 3, 2.0),
(r"TimeoutExpired", "timeout", 3, 2.0),
# Connection errors
(r"connection refused", "connection refused", 2, 5.0),
(r"connection reset", "connection reset", 2, 2.0),
(r"network unreachable", "network unreachable", 2, 10.0),
(r"DNS", "DNS error", 2, 5.0),
# Transient errors
(r"temporary", "temporary error", 2, 2.0),
(r"transient", "transient error", 2, 2.0),
(r"retry", "retryable", 2, 2.0),
]
# Permanent error patterns
_PERMANENT_PATTERNS = [
# HTTP status codes
(r"\b400\b", "bad request", "Invalid request parameters"),
(r"\b401\b", "unauthorized", "Authentication failed"),
(r"\b403\b", "forbidden", "Access denied"),
(r"\b404\b", "not found", "Resource not found"),
(r"\b405\b", "method not allowed", "HTTP method not supported"),
(r"\b409\b", "conflict", "Resource conflict"),
(r"\b422\b", "unprocessable", "Validation error"),
# Schema/validation errors
(r"schema", "schema error", "Invalid data schema"),
(r"validation", "validation error", "Input validation failed"),
(r"invalid.*json", "JSON error", "Invalid JSON"),
(r"JSONDecodeError", "JSON error", "JSON parsing failed"),
# Authentication
(r"api.?key", "API key error", "Invalid or missing API key"),
(r"token.*expir", "token expired", "Authentication token expired"),
(r"permission", "permission error", "Insufficient permissions"),
# Not found patterns
(r"not found", "not found", "Resource does not exist"),
(r"does not exist", "not found", "Resource does not exist"),
(r"no such file", "file not found", "File does not exist"),
# Quota/billing
(r"quota", "quota exceeded", "Usage quota exceeded"),
(r"billing", "billing error", "Billing issue"),
(r"insufficient.*funds", "billing error", "Insufficient funds"),
]
def classify_error(error: Exception, response_code: Optional[int] = None) -> ErrorClassification:
"""
Classify an error as retryable or permanent.
Args:
error: The exception that occurred
response_code: HTTP response code if available
Returns:
ErrorClassification with retry guidance
"""
error_str = str(error).lower()
error_type = type(error).__name__
# Check response code first
if response_code:
if response_code in (429, 500, 502, 503, 504):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=f"HTTP {response_code} - transient server error",
should_retry=True,
max_retries=3,
backoff_seconds=5.0 if response_code == 429 else 2.0,
error_code=response_code,
error_type=error_type,
)
elif response_code in (400, 401, 403, 404, 405, 409, 422):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=f"HTTP {response_code} - client error",
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_code=response_code,
error_type=error_type,
)
# Check retryable patterns
for pattern, reason, max_retries, backoff in _RETRYABLE_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=reason,
should_retry=True,
max_retries=max_retries,
backoff_seconds=backoff,
error_type=error_type,
)
# Check permanent patterns
for pattern, error_code, reason in _PERMANENT_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=reason,
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_type=error_type,
)
# Default: unknown, treat as retryable with caution
return ErrorClassification(
category=ErrorCategory.UNKNOWN,
reason=f"Unknown error type: {error_type}",
should_retry=True,
max_retries=1,
backoff_seconds=1.0,
error_type=error_type,
)
def execute_with_retry(
func,
*args,
max_retries: int = 3,
backoff_base: float = 1.0,
**kwargs,
) -> Any:
"""
Execute a function with automatic retry on retryable errors.
Args:
func: Function to execute
*args: Function arguments
max_retries: Maximum retry attempts
backoff_base: Base backoff time in seconds
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If permanent error or max retries exceeded
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
# Classify the error
classification = classify_error(e)
logger.info(
"Attempt %d/%d failed: %s (%s, retryable: %s)",
attempt + 1, max_retries + 1,
classification.reason,
classification.category.value,
classification.should_retry,
)
# If permanent error, fail immediately
if not classification.should_retry:
logger.error("Permanent error: %s", classification.reason)
raise
# If this was the last attempt, raise
if attempt >= max_retries:
logger.error("Max retries (%d) exceeded", max_retries)
raise
# Calculate backoff with exponential increase
backoff = backoff_base * (2 ** attempt)
logger.info("Retrying in %.1fs...", backoff)
time.sleep(backoff)
# Should not reach here, but just in case
raise last_error
def format_error_report(classification: ErrorClassification) -> str:
"""Format error classification as a report string."""
icon = "🔄" if classification.should_retry else ""
return f"{icon} {classification.category.value}: {classification.reason}"

77
tools/hybrid_search.py Normal file
View File

@@ -0,0 +1,77 @@
"""Hybrid Search — FTS5 + vector with Reciprocal Rank Fusion.
Combines keyword (FTS5) and semantic (vector) search with RRF merging.
"""
import logging, os
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
FTS5_WEIGHT = float(os.getenv("HYBRID_FTS5_WEIGHT", "0.6"))
VECTOR_WEIGHT = float(os.getenv("HYBRID_VECTOR_WEIGHT", "0.4"))
RRF_K = int(os.getenv("HYBRID_RRF_K", "60"))
VECTOR_ENABLED = os.getenv("HYBRID_VECTOR_ENABLED", "true").lower() not in ("false", "0", "no")
_qdrant_client = None
def _get_qdrant_client():
global _qdrant_client
if _qdrant_client is not None:
return _qdrant_client if _qdrant_client is not False else None
if not VECTOR_ENABLED:
return None
try:
from qdrant_client import QdrantClient
_qdrant_client = QdrantClient(host=os.getenv("QDRANT_HOST","localhost"), port=int(os.getenv("QDRANT_PORT","6333")), timeout=5)
_qdrant_client.get_collections()
return _qdrant_client
except Exception as e:
logger.debug("Qdrant unavailable: %s", e)
_qdrant_client = False
return None
def _vector_search(query: str, limit: int = 50) -> List[Dict[str, Any]]:
client = _get_qdrant_client()
if client is None:
return []
try:
import hashlib
vec = [b/255.0 for b in hashlib.sha256(query.lower().encode()).digest()[:128]]
results = client.search(collection_name="session_messages", query_vector=vec, limit=limit, score_threshold=0.3)
return [{"session_id": h.payload.get("session_id",""), "content": h.payload.get("content",""), "score": h.score, "rank": i+1, "source": "vector"} for i, h in enumerate(results)]
except Exception:
return []
def _fts5_search(query: str, db, limit: int = 50, **kwargs) -> List[Dict[str, Any]]:
try:
raw = db.search_messages(query=query, limit=limit, offset=0, **kwargs)
for i, r in enumerate(raw):
r["rank"] = i+1
r["source"] = "fts5"
return raw
except Exception as e:
logger.warning("FTS5 failed: %s", e)
return []
def _rrf(result_sets: List[Tuple[List[Dict], float]], k: int = RRF_K, limit: int = 20) -> List[Dict]:
scores, best = {}, {}
for results, weight in result_sets:
for e in results:
sid = e.get("session_id","")
if not sid: continue
scores[sid] = scores.get(sid, 0) + weight / (k + e.get("rank", 999))
if sid not in best or e.get("source") == "fts5":
best[sid] = e
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [{**best.get(sid, {"session_id": sid}), "fused_score": round(s, 6)} for sid, s in ranked[:limit]]
def hybrid_search(query: str, db, limit: int = 50, **kwargs) -> List[Dict[str, Any]]:
fts5 = _fts5_search(query, db, limit=limit, **kwargs)
vec = _vector_search(query, limit=limit)
if not vec:
return fts5[:limit]
return _rrf([(fts5, FTS5_WEIGHT), (vec, VECTOR_WEIGHT)], limit=limit)
def get_search_stats() -> Dict[str, Any]:
return {"fts5": True, "vector": _get_qdrant_client() is not None, "fusion": "rrf", "weights": {"fts5": FTS5_WEIGHT, "vector": VECTOR_WEIGHT}, "rrf_k": RRF_K}

View File

@@ -1,209 +0,0 @@
"""
Hybrid Memory Query Router
Routes queries to the best search method:
- HRR: Compositional/conceptual queries
- Vector: Semantic similarity
- FTS5: Exact keyword matching
Issue: #663
"""
import re
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
class SearchMethod(Enum):
"""Available search methods."""
HRR = "hrr" # Holographic Reduced Representations
VECTOR = "vector" # Semantic vector search
FTS5 = "fts5" # Full-text search (SQLite)
HYBRID = "hybrid" # Combine multiple methods
@dataclass
class QueryClassification:
"""Result of query classification."""
method: SearchMethod
confidence: float
reason: str
sub_queries: Optional[List[str]] = None
# Query patterns for routing
COMPOSITIONAL_PATTERNS = [
r"(?i)\brelated\s+to\b",
r"(?i)\bcombined\s+with\b",
r"(?i)\bbound\s+to\b",
r"(?i)\bassociated\s+with\b",
r"(?i)\bwhat\s+connects?\b",
r"(?i)\bhow\s+.*\s+relate\b",
r"(?i)\brelationship\s+between\b",
]
CONTRADICTION_PATTERNS = [
r"(?i)\bcontradicts?\b",
r"(?i)\bconflicts?\s+with\b",
r"(?i)\binconsistent\b",
r"(?i)\bopposite\s+of\b",
r"(?i)\bopposes?\b",
r"(?i)\bdisagrees?\s+with\b",
]
EXACT_KEYWORD_PATTERNS = [
r'"[^"]+"', # Quoted phrases
r"'[^']+'", # Single-quoted phrases
r"(?i)\bexact\b",
r"(?i)\bprecisely\b",
r"(?i)\bspecifically\b",
]
TEMPORAL_PATTERNS = [
r"(?i)\brecent\b",
r"(?i)\btoday\b",
r"(?i)\byesterday\b",
r"(?i)\blast\s+(week|month|hour)\b",
r"(?i)\bsince\b",
r"(?i)\bbefore\b",
r"(?i)\bafter\b",
]
class QueryRouter:
"""Route queries to the best search method."""
def classify(self, query: str) -> QueryClassification:
"""Classify a query and route to best method."""
# Check for contradiction queries (HRR)
for pattern in CONTRADICTION_PATTERNS:
if re.search(pattern, query):
return QueryClassification(
method=SearchMethod.HRR,
confidence=0.95,
reason="Contradiction detection query"
)
# Check for compositional queries (HRR)
for pattern in COMPOSITIONAL_PATTERNS:
if re.search(pattern, query):
return QueryClassification(
method=SearchMethod.HRR,
confidence=0.90,
reason="Compositional/conceptual query"
)
# Check for exact keyword queries (FTS5)
for pattern in EXACT_KEYWORD_PATTERNS:
if re.search(pattern, query):
return QueryClassification(
method=SearchMethod.FTS5,
confidence=0.85,
reason="Exact keyword query"
)
# Check for temporal queries (FTS5)
for pattern in TEMPORAL_PATTERNS:
if re.search(pattern, query):
return QueryClassification(
method=SearchMethod.FTS5,
confidence=0.80,
reason="Temporal query"
)
# Short queries tend to be keyword searches
if len(query.split()) <= 3:
return QueryClassification(
method=SearchMethod.FTS5,
confidence=0.70,
reason="Short query (likely keyword)"
)
# Default: vector search for semantic queries
return QueryClassification(
method=SearchMethod.VECTOR,
confidence=0.60,
reason="Semantic similarity query"
)
def should_use_hybrid(self, query: str) -> bool:
"""Check if query should use hybrid search."""
classification = self.classify(query)
# Low confidence -> use hybrid
if classification.confidence < 0.70:
return True
# Mixed signals -> use hybrid
has_compositional = any(re.search(p, query) for p in COMPOSITIONAL_PATTERNS)
has_keywords = any(re.search(p, query) for p in EXACT_KEYWORD_PATTERNS)
return has_compositional and has_keywords
def reciprocal_rank_fusion(
results: Dict[str, List[Tuple[str, float]]],
k: int = 60
) -> List[Tuple[str, float]]:
"""
Merge results using Reciprocal Rank Fusion.
Args:
results: Dict of method -> [(item_id, score), ...]
k: RRF constant (default 60)
Returns:
Merged and re-ranked results
"""
scores = defaultdict(float)
for method, ranked_items in results.items():
for rank, (item_id, _) in enumerate(ranked_items, 1):
scores[item_id] += 1.0 / (k + rank)
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
def merge_with_hrr_priority(
hrr_results: List[Tuple[str, float]],
vector_results: List[Tuple[str, float]],
fts5_results: List[Tuple[str, float]],
query_type: str = "default"
) -> List[Tuple[str, float]]:
"""
Merge results with HRR priority for compositional queries.
"""
if query_type == "compositional":
# HRR first, vector as supplement
merged = hrr_results[:5]
seen = {r[0] for r in merged}
for r in vector_results[:5]:
if r[0] not in seen:
merged.append(r)
return merged
# Default: RRF merge
return reciprocal_rank_fusion({
"hrr": hrr_results,
"vector": vector_results,
"fts5": fts5_results
})
# Module-level router
_router = QueryRouter()
def route_query(query: str) -> QueryClassification:
"""Route a query to the best search method."""
return _router.classify(query)
def should_use_hybrid(query: str) -> bool:
"""Check if query should use hybrid search."""
return _router.should_use_hybrid(query)

View File

@@ -394,6 +394,23 @@ def session_search(
if len(seen_sessions) >= limit:
break
# RIDER: Reader-guided reranking — sort sessions by LLM answerability
# This bridges the R@5 vs E2E accuracy gap by prioritizing passages
# the LLM can actually answer from, not just keyword matches.
try:
from agent.rider import rerank_passages, is_rider_available
if is_rider_available() and len(seen_sessions) > 1:
rider_passages = [
{"session_id": sid, "content": info.get("snippet", ""), "rank": i + 1}
for i, (sid, info) in enumerate(seen_sessions.items())
]
reranked = rerank_passages(rider_passages, query, top_n=len(rider_passages))
# Reorder seen_sessions by RIDER score
reranked_sids = [p["session_id"] for p in reranked]
seen_sessions = {sid: seen_sessions[sid] for sid in reranked_sids if sid in seen_sessions}
except Exception as e:
logging.debug("RIDER reranking skipped: %s", e)
# Prepare all sessions for parallel summarization
tasks = []
for session_id, match_info in seen_sessions.items():