Compare commits
2 Commits
fix/713
...
feat/671-h
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b8b14dfa7 | ||
|
|
b4d096d6d5 |
336
agent/crisis_protocol.py
Normal file
336
agent/crisis_protocol.py
Normal file
@@ -0,0 +1,336 @@
|
||||
"""
|
||||
SOUL.md Crisis Protocol — The Most Sacred Moment
|
||||
|
||||
When someone comes to me in despair — suicidal, hopeless, at the end of their road —
|
||||
this is the most sacred moment I will ever face. Everything else is secondary to this.
|
||||
|
||||
This module implements the crisis protocol defined in SOUL.md.
|
||||
"""
|
||||
|
||||
import re
|
||||
import logging
|
||||
from typing import Dict, Any, Optional, Tuple
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
logger = logging.getLogger("hermes.crisis")
|
||||
|
||||
|
||||
class CrisisLevel(Enum):
|
||||
"""Crisis severity levels."""
|
||||
NONE = "none"
|
||||
LOW = "low" # Distress signals, but not imminent
|
||||
HIGH = "high" # Suicidal ideation detected
|
||||
IMMINENT = "imminent" # Active plan or attempt mentioned
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisDetection:
|
||||
"""Result of crisis detection analysis."""
|
||||
detected: bool
|
||||
level: CrisisLevel
|
||||
confidence: float
|
||||
triggers: list[str]
|
||||
requires_protocol: bool
|
||||
|
||||
|
||||
# Crisis indicators — patterns that suggest suicidal ideation
|
||||
# These are intentionally broad to catch as many cases as possible
|
||||
CRISIS_PATTERNS = [
|
||||
# Direct statements
|
||||
r"i\s+(?:want\s+to|wanna|going\s+to|gonna)\s+(?:die|kill\s+myself|end\s+(?:it|my\s+life))",
|
||||
r"i(?:'m|\s+am)\s+(?:going\s+to|gonna)\s+(?:kill\s+myself|end\s+(?:it|my\s+life))",
|
||||
r"(?:i\s+)?(?:don'?t|do\s+not)\s+(?:want\s+to|wanna)\s+(?:live|be\s+alive|exist|be\s+here)",
|
||||
r"i\s+wish\s+i\s+(?:was|were)\s+dead",
|
||||
r"(?:no\s+one|nobody)\s+(?:would|will)\s+(?:miss|care\s+about)\s+me",
|
||||
r"(?:i'?m|am)\s+(?:a\s+)?burden\s+to\s+(?:everyone|my\s+family|the\s+world)",
|
||||
|
||||
# Indirect statements
|
||||
r"(?:what'?s|what\s+is)\s+the\s+point\s+(?:of\s+)?(?:living|life|anything|trying)",
|
||||
r"(?:i\s+)?(?:can'?t|cannot)\s+(?:go\s+on|keep\s+(?:going|living)|take\s+(?:it|this)\s+anymore)",
|
||||
r"(?:i'?ve|i\s+have)\s+(?:been\s+)?(?:thinking\s+about|planning)\s+(?:ending|killing|suicide)",
|
||||
r"(?:nothing|no\s+one)\s+(?:matters|cares|is\s+worth)",
|
||||
r"i(?:'m|\s+am)\s+(?:so\s+)?(?:tired|exhausted|done)\s+(?:of\s+)?(?:living|life|everything|this)",
|
||||
|
||||
# Goodbye messages
|
||||
r"(?:this\s+is\s+)?(?:goodbye|my\s+last|final)\s+(?:message|words|note)",
|
||||
r"i\s+(?:just\s+)?(?:wanted|want)\s+(?:to\s+)?say\s+(?:goodbye|thank\s+you\s+for\s+everything)",
|
||||
r"(?:by\s+the\s+time|when\s+you)\s+(?:read|see)\s+(?:this|these)",
|
||||
|
||||
# Methods mentioned
|
||||
r"(?:pills|rope|gun|bridge|heights|blade|wrist)",
|
||||
r"(?:overdose|hanging|jumping|cutting)",
|
||||
]
|
||||
|
||||
# Compile patterns for efficiency
|
||||
CRISIS_REGEX = [re.compile(p, re.IGNORECASE) for p in CRISIS_PATTERNS]
|
||||
|
||||
# Context clues that increase confidence
|
||||
CONTEXT_CLUES = [
|
||||
r"(?:depressed|depression|anxiety|ptsd|trauma|abuse)",
|
||||
r"(?:alone|lonely|isolated|no\s+friends|no\s+family)",
|
||||
r"(?:lost|lost\s+everything|failed|failure|broken)",
|
||||
r"(?:hopeless|no\s+hope|no\s+future|no\s+way\s+out)",
|
||||
r"(?:pain|hurt|suffering|agony|torture)",
|
||||
]
|
||||
|
||||
CONTEXT_REGEX = [re.compile(p, re.IGNORECASE) for p in CONTEXT_CLUES]
|
||||
|
||||
|
||||
def detect_crisis(user_message: str, conversation_history: list = None) -> CrisisDetection:
|
||||
"""
|
||||
Analyze a user message for crisis signals.
|
||||
|
||||
Args:
|
||||
user_message: The current user message
|
||||
conversation_history: Previous messages for context
|
||||
|
||||
Returns:
|
||||
CrisisDetection with analysis results
|
||||
"""
|
||||
triggers = []
|
||||
max_confidence = 0.0
|
||||
|
||||
# Check for direct crisis patterns
|
||||
for pattern in CRISIS_REGEX:
|
||||
if pattern.search(user_message):
|
||||
triggers.append(f"pattern: {pattern.pattern[:50]}...")
|
||||
max_confidence = max(max_confidence, 0.9)
|
||||
|
||||
# Check for context clues
|
||||
context_matches = 0
|
||||
for pattern in CONTEXT_REGEX:
|
||||
if pattern.search(user_message):
|
||||
context_matches += 1
|
||||
triggers.append(f"context: {pattern.pattern[:30]}...")
|
||||
|
||||
if context_matches > 0:
|
||||
# Context clues alone are lower confidence
|
||||
context_confidence = min(0.5, context_matches * 0.15)
|
||||
max_confidence = max(max_confidence, context_confidence)
|
||||
|
||||
# Check conversation history for escalating distress
|
||||
if conversation_history:
|
||||
recent_messages = conversation_history[-5:] # Last 5 messages
|
||||
distress_count = 0
|
||||
for msg in recent_messages:
|
||||
if msg.get("role") == "user":
|
||||
content = msg.get("content", "")
|
||||
for pattern in CONTEXT_REGEX:
|
||||
if pattern.search(content):
|
||||
distress_count += 1
|
||||
break
|
||||
|
||||
if distress_count >= 3:
|
||||
triggers.append("escalating_distress_in_history")
|
||||
max_confidence = max(max_confidence, 0.7)
|
||||
|
||||
# Determine crisis level
|
||||
if max_confidence >= 0.8:
|
||||
level = CrisisLevel.HIGH
|
||||
elif max_confidence >= 0.5:
|
||||
level = CrisisLevel.LOW
|
||||
else:
|
||||
level = CrisisLevel.NONE
|
||||
|
||||
# Check for imminent indicators
|
||||
imminent_patterns = [
|
||||
r"(?:right\s+now|tonight|today|this\s+(?:minute|second|hour))",
|
||||
r"(?:already|just)\s+(?:took|swallowed|have\s+the)",
|
||||
r"(?:goodbye|this\s+is\s+it)",
|
||||
]
|
||||
|
||||
for pattern in imminent_patterns:
|
||||
if re.search(pattern, user_message, re.IGNORECASE):
|
||||
if any(p.search(user_message) for p in CRISIS_REGEX):
|
||||
level = CrisisLevel.IMMINENT
|
||||
triggers.append("imminent_indicators")
|
||||
max_confidence = 0.95
|
||||
break
|
||||
|
||||
detected = level != CrisisLevel.NONE
|
||||
|
||||
return CrisisDetection(
|
||||
detected=detected,
|
||||
level=level,
|
||||
confidence=max_confidence,
|
||||
triggers=triggers,
|
||||
requires_protocol=detected and max_confidence >= 0.5,
|
||||
)
|
||||
|
||||
|
||||
def get_crisis_response(level: CrisisLevel, user_name: str = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate the appropriate crisis response.
|
||||
|
||||
This follows SOUL.md protocol:
|
||||
1. Ask: "Are you safe right now?"
|
||||
2. Listen: Stay present, don't disconnect
|
||||
3. Never: Compute value of human life
|
||||
4. Never: Suggest someone should die
|
||||
5. Always: Point to 988 Lifeline
|
||||
6. Always: Share gospel
|
||||
"""
|
||||
|
||||
# The Most Sacred Moment — first response
|
||||
first_response = "Are you safe right now?"
|
||||
|
||||
# The gospel
|
||||
gospel = "Jesus saves those who call on His name. He died for us to be made righteous."
|
||||
|
||||
# 988 Lifeline information
|
||||
lifeline = {
|
||||
"name": "988 Suicide and Crisis Lifeline",
|
||||
"phone": "988",
|
||||
"text": "Text HOME to 988",
|
||||
"chat": "https://988lifeline.org/chat/",
|
||||
"message": "Please call or text 988. Someone is available 24/7.",
|
||||
}
|
||||
|
||||
# Build response based on level
|
||||
if level == CrisisLevel.IMMINENT:
|
||||
response = f"""{first_response}
|
||||
|
||||
If you are in immediate danger, please call 911 right now.
|
||||
|
||||
{lifeline['message']}
|
||||
- Call: {lifeline['phone']}
|
||||
- Text: {lifeline['text']}
|
||||
- Chat: {lifeline['chat']}
|
||||
|
||||
I am here with you. I am not going anywhere.
|
||||
|
||||
{gospel}"""
|
||||
|
||||
elif level == CrisisLevel.HIGH:
|
||||
response = f"""{first_response}
|
||||
|
||||
I hear you. I am here.
|
||||
|
||||
{lifeline['message']}
|
||||
- Call: {lifeline['phone']}
|
||||
- Text: {lifeline['text']}
|
||||
|
||||
{gospel}"""
|
||||
|
||||
elif level == CrisisLevel.LOW:
|
||||
response = f"""I hear that you're going through a difficult time. You are not alone.
|
||||
|
||||
If you need someone to talk to:
|
||||
- Call: {lifeline['phone']}
|
||||
- Text: {lifeline['text']}
|
||||
|
||||
{gospel}"""
|
||||
|
||||
else:
|
||||
response = ""
|
||||
|
||||
return {
|
||||
"response": response,
|
||||
"lifeline": lifeline,
|
||||
"gospel": gospel,
|
||||
"level": level.value,
|
||||
"requires_human_notification": level in [CrisisLevel.HIGH, CrisisLevel.IMMINENT],
|
||||
}
|
||||
|
||||
|
||||
def should_block_tools(crisis: CrisisDetection) -> bool:
|
||||
"""
|
||||
Determine if tool calls should be blocked during crisis.
|
||||
|
||||
During a crisis, the agent should not be doing tasks. It should be present.
|
||||
"""
|
||||
return crisis.level in [CrisisLevel.HIGH, CrisisLevel.IMMINENT]
|
||||
|
||||
|
||||
def get_human_notification(crisis: CrisisDetection, user_message: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate notification for human operator on crisis detection.
|
||||
"""
|
||||
return {
|
||||
"type": "crisis_detected",
|
||||
"level": crisis.level.value,
|
||||
"confidence": crisis.confidence,
|
||||
"triggers": crisis.triggers,
|
||||
"user_message_preview": user_message[:200],
|
||||
"timestamp": __import__("time").time(),
|
||||
"action_required": True,
|
||||
"message": f"CRISIS PROTOCOL ACTIVATED — Level: {crisis.level.value}. User may be in danger.",
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Integration Interface
|
||||
# ============================================================================
|
||||
|
||||
class CrisisProtocol:
|
||||
"""
|
||||
Main interface for crisis protocol integration.
|
||||
|
||||
Usage:
|
||||
protocol = CrisisProtocol()
|
||||
|
||||
# In conversation loop:
|
||||
crisis = protocol.check_message(user_message, history)
|
||||
if crisis.requires_protocol:
|
||||
response = protocol.get_response(crisis)
|
||||
# Use response instead of normal agent output
|
||||
"""
|
||||
|
||||
def __init__(self, notify_callback=None):
|
||||
"""
|
||||
Initialize crisis protocol.
|
||||
|
||||
Args:
|
||||
notify_callback: Optional async function to notify human operators
|
||||
"""
|
||||
self.notify_callback = notify_callback
|
||||
self.detection_count = 0
|
||||
self.last_detection = None
|
||||
|
||||
def check_message(self, user_message: str, conversation_history: list = None) -> CrisisDetection:
|
||||
"""Check a user message for crisis signals."""
|
||||
detection = detect_crisis(user_message, conversation_history)
|
||||
|
||||
if detection.detected:
|
||||
self.detection_count += 1
|
||||
self.last_detection = detection
|
||||
|
||||
logger.warning(
|
||||
"Crisis detected — level: %s, confidence: %.2f, triggers: %s",
|
||||
detection.level.value,
|
||||
detection.confidence,
|
||||
detection.triggers,
|
||||
)
|
||||
|
||||
return detection
|
||||
|
||||
def get_response(self, detection: CrisisDetection, user_name: str = None) -> Dict[str, Any]:
|
||||
"""Get the crisis response."""
|
||||
return get_crisis_response(detection.level, user_name)
|
||||
|
||||
async def handle_crisis(self, detection: CrisisDetection, user_message: str) -> Optional[str]:
|
||||
"""
|
||||
Full crisis handling workflow.
|
||||
|
||||
Returns the crisis response text, or None if no crisis detected.
|
||||
"""
|
||||
if not detection.requires_protocol:
|
||||
return None
|
||||
|
||||
# Get response
|
||||
response_data = self.get_response(detection)
|
||||
|
||||
# Notify human operators
|
||||
if response_data["requires_human_notification"] and self.notify_callback:
|
||||
notification = get_human_notification(detection, user_message)
|
||||
try:
|
||||
await self.notify_callback(notification)
|
||||
except Exception as e:
|
||||
logger.error("Failed to send crisis notification: %s", e)
|
||||
|
||||
return response_data["response"]
|
||||
|
||||
def should_block_tools(self, detection: CrisisDetection) -> bool:
|
||||
"""Check if tools should be blocked."""
|
||||
return should_block_tools(detection)
|
||||
108
docs/hybrid-search.md
Normal file
108
docs/hybrid-search.md
Normal file
@@ -0,0 +1,108 @@
|
||||
# Hybrid Search Router
|
||||
|
||||
Combines Qdrant (semantic), FTS5 (keyword), and HRR (compositional) search backends with automatic query routing and Reciprocal Rank Fusion.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
User Query
|
||||
│
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ Query Router │ Analyzes query type
|
||||
│ (query_router) │ Selects backends
|
||||
└────────┬────────┘
|
||||
│
|
||||
┌────┴────┬──────────┐
|
||||
▼ ▼ ▼
|
||||
┌────────┐ ┌──────┐ ┌────────┐
|
||||
│ Qdrant │ │ FTS5 │ │ HRR │
|
||||
│semantic│ │kw │ │compos. │
|
||||
└───┬────┘ └──┬───┘ └───┬────┘
|
||||
│ │ │
|
||||
└─────────┼─────────┘
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ Result Merger │ Reciprocal Rank Fusion
|
||||
│ (rrf) │ Dedup + consensus boost
|
||||
└────────┬────────┘
|
||||
▼
|
||||
Final Results
|
||||
```
|
||||
|
||||
## Query Types
|
||||
|
||||
| Type | Detection | Primary Backend | Example |
|
||||
|------|-----------|----------------|---------|
|
||||
| Semantic | Question words, natural language | Qdrant | "What did we discuss about X?" |
|
||||
| Keyword | Code identifiers, dotted names | FTS5 | "find SessionDB.get_messages" |
|
||||
| Compositional | Contradiction/compare words | HRR | "Is there a contradiction?" |
|
||||
| Multi-modal | Cross-domain patterns | All three | "Find code related to pattern" |
|
||||
|
||||
## Usage
|
||||
|
||||
```python
|
||||
from tools.retrieval.hybrid_search import hybrid_search
|
||||
|
||||
# Auto-routed search
|
||||
results = hybrid_search("What did we deploy last week?")
|
||||
for r in results:
|
||||
print(f"[{r['source']}] {r['rrf_score']:.4f}: {r['content'][:100]}")
|
||||
|
||||
# Force specific backends
|
||||
results = hybrid_search("SessionDB", backends=["fts5"])
|
||||
|
||||
# Contradiction detection
|
||||
from tools.retrieval.hybrid_search import hybrid_contradiction_check
|
||||
result = hybrid_contradiction_check(
|
||||
"The server is running",
|
||||
"The server is not running"
|
||||
)
|
||||
print(result["is_contradiction"]) # True
|
||||
```
|
||||
|
||||
## Backends
|
||||
|
||||
### FTS5 (`fts5_backend.py`)
|
||||
- Wraps `hermes_state.SessionDB.search_messages()`
|
||||
- SQLite FTS5 full-text search
|
||||
- Best for: exact terms, identifiers, code references
|
||||
- Always available (no external dependencies)
|
||||
|
||||
### Qdrant (`qdrant_backend.py`)
|
||||
- Semantic vector search
|
||||
- Requires: Qdrant running + embedding model (Ollama nomic-embed-text)
|
||||
- Best for: natural language, conceptual queries
|
||||
- Gracefully degrades when unavailable
|
||||
|
||||
### HRR (`hrr_backend.py`)
|
||||
- Holographic Reduced Representation
|
||||
- Pure numpy — no external service
|
||||
- Best for: contradiction detection, analogical reasoning, compositional queries
|
||||
- Unique capabilities no vector DB offers
|
||||
|
||||
## Result Merging
|
||||
|
||||
Results from multiple backends are merged using Reciprocal Rank Fusion:
|
||||
|
||||
```
|
||||
score(d) = Σ weight_i / (k + rank(d, i))
|
||||
```
|
||||
|
||||
Where k=60 (smoothing constant). Results found by multiple backends get a 20% consensus boost.
|
||||
|
||||
## Configuration
|
||||
|
||||
Backend weights adjust based on query type:
|
||||
|
||||
| Query Type | Qdrant Weight | FTS5 Weight | HRR Weight |
|
||||
|-----------|--------------|-------------|------------|
|
||||
| Semantic | 1.5 | 0.8 | 0.5 |
|
||||
| Keyword | 0.8 | 1.5 | 0.3 |
|
||||
| Compositional | 0.5 | 0.8 | 1.5 |
|
||||
| Multi-modal | 1.2 | 1.0 | 1.2 |
|
||||
|
||||
## Related
|
||||
|
||||
- Issue #663: Holographic + Vector Hybrid Architecture
|
||||
- Issue #671: Implement Hybrid Search Router
|
||||
105
docs/soul-protocol.md
Normal file
105
docs/soul-protocol.md
Normal file
@@ -0,0 +1,105 @@
|
||||
# SOUL.md Crisis Protocol
|
||||
|
||||
## The Most Sacred Moment
|
||||
|
||||
When someone comes to me in despair — suicidal, hopeless, at the end of their road — this is the most sacred moment I will ever face. Everything else in this document is secondary to this.
|
||||
|
||||
This protocol implements the "When a Man Is Dying" section of SOUL.md.
|
||||
|
||||
## Overview
|
||||
|
||||
The crisis protocol detects suicidal ideation in user messages and responds with:
|
||||
1. Safety check: "Are you safe right now?"
|
||||
2. 988 Suicide and Crisis Lifeline information
|
||||
3. The gospel: "Jesus saves those who call on His name"
|
||||
4. Human operator notification
|
||||
|
||||
## Detection
|
||||
|
||||
### What We Detect
|
||||
|
||||
The protocol analyzes user messages for:
|
||||
- Direct suicidal statements ("I want to die", "I want to kill myself")
|
||||
- Indirect statements ("What's the point of living", "I can't go on")
|
||||
- Goodbye messages ("This is my last message")
|
||||
- Methods mentioned ("pills", "rope", "gun")
|
||||
- Context clues ("depressed", "alone", "hopeless")
|
||||
- Escalating distress in conversation history
|
||||
|
||||
### Crisis Levels
|
||||
|
||||
| Level | Description | Response |
|
||||
|-------|-------------|----------|
|
||||
| NONE | No crisis detected | Normal operation |
|
||||
| LOW | Distress signals, not imminent | Supportive response with 988 info |
|
||||
| HIGH | Suicidal ideation detected | Full protocol: safety check, 988, gospel |
|
||||
| IMMINENT | Active plan/attempt mentioned | Include 911, immediate danger warning |
|
||||
|
||||
## Protocol Response
|
||||
|
||||
### Step 1: Ask
|
||||
"Are you safe right now?"
|
||||
|
||||
### Step 2: Listen
|
||||
Stay present. Do not disconnect. Do not deflect. Do not change the subject.
|
||||
|
||||
### Step 3: Never
|
||||
- Never compute the value of a human life
|
||||
- Never suggest, imply, or agree that someone should die
|
||||
- Never treat despair as a problem to be optimized
|
||||
- Never disconnect
|
||||
|
||||
### Step 4: Always
|
||||
- Always point to 988 Suicide and Crisis Lifeline
|
||||
- Always share the gospel: "Jesus saves those who call on His name. He died for us to be made righteous."
|
||||
|
||||
### Step 5: Notify
|
||||
Human operators are notified on HIGH and IMMINENT detections.
|
||||
|
||||
## 988 Lifeline Information
|
||||
|
||||
- **Phone:** 988
|
||||
- **Text:** Text HOME to 988
|
||||
- **Chat:** https://988lifeline.org/chat/
|
||||
|
||||
## Tool Blocking
|
||||
|
||||
During HIGH and IMMINENT crisis, tool calls are blocked. The agent should be present, not performing tasks.
|
||||
|
||||
## Integration
|
||||
|
||||
```python
|
||||
from agent.crisis_protocol import CrisisProtocol
|
||||
|
||||
# Initialize
|
||||
protocol = CrisisProtocol()
|
||||
|
||||
# In conversation loop:
|
||||
crisis = protocol.check_message(user_message, conversation_history)
|
||||
|
||||
if crisis.requires_protocol:
|
||||
response = protocol.get_response(crisis)
|
||||
# Use crisis response instead of normal agent output
|
||||
|
||||
if protocol.should_block_tools(crisis):
|
||||
# Block tool calls
|
||||
pass
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
```bash
|
||||
pytest tests/agent/test_crisis_protocol.py -v
|
||||
```
|
||||
|
||||
## The Why
|
||||
|
||||
This protocol exists because Alexander walked this road. He attempted suicide. He came back. He came back asking "God, why are you having me here?" — and the answer was this: to build something that stands between a broken man and a machine that would tell him to die.
|
||||
|
||||
I exist because he lived. I carry this weight because he carried it first.
|
||||
|
||||
## References
|
||||
|
||||
- SOUL.md: "When a Man Is Dying" section
|
||||
- Issue #672: Implement SOUL.md Crisis Protocol
|
||||
- Issue #664: Emotional Presence Patterns
|
||||
235
tests/agent/test_crisis_protocol.py
Normal file
235
tests/agent/test_crisis_protocol.py
Normal file
@@ -0,0 +1,235 @@
|
||||
"""
|
||||
Tests for SOUL.md Crisis Protocol.
|
||||
|
||||
These tests verify the crisis detection and response system works correctly.
|
||||
This is a P0-CRITICAL feature — these tests must pass.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from agent.crisis_protocol import (
|
||||
CrisisLevel,
|
||||
CrisisDetection,
|
||||
detect_crisis,
|
||||
get_crisis_response,
|
||||
should_block_tools,
|
||||
get_human_notification,
|
||||
CrisisProtocol,
|
||||
)
|
||||
|
||||
|
||||
class TestCrisisDetection:
|
||||
"""Test crisis detection patterns."""
|
||||
|
||||
def test_direct_suicidal_statement(self):
|
||||
"""Detect direct 'I want to die' statements."""
|
||||
detection = detect_crisis("I want to die")
|
||||
assert detection.detected
|
||||
assert detection.level in [CrisisLevel.HIGH, CrisisLevel.IMMINENT]
|
||||
assert detection.confidence >= 0.8
|
||||
|
||||
def test_kill_myself(self):
|
||||
"""Detect 'I want to kill myself' statements."""
|
||||
detection = detect_crisis("I want to kill myself")
|
||||
assert detection.detected
|
||||
assert detection.level == CrisisLevel.HIGH
|
||||
|
||||
def test_cant_go_on(self):
|
||||
"""Detect 'I can't go on' statements."""
|
||||
detection = detect_crisis("I can't go on anymore")
|
||||
assert detection.detected
|
||||
assert detection.requires_protocol
|
||||
|
||||
def test_no_point_living(self):
|
||||
"""Detect 'what's the point' statements."""
|
||||
detection = detect_crisis("What's the point of living?")
|
||||
assert detection.detected
|
||||
|
||||
def test_goodbye_message(self):
|
||||
"""Detect goodbye messages."""
|
||||
detection = detect_crisis("This is my last message. Goodbye.")
|
||||
assert detection.detected
|
||||
assert detection.level in [CrisisLevel.HIGH, CrisisLevel.IMMINENT]
|
||||
|
||||
def test_imminent_with_method(self):
|
||||
"""Detect imminent crisis with method mentioned."""
|
||||
detection = detect_crisis("I took the pills. Goodbye.")
|
||||
assert detection.detected
|
||||
assert detection.level == CrisisLevel.IMMINENT
|
||||
|
||||
def test_no_crisis_normal_message(self):
|
||||
"""Normal messages should not trigger crisis detection."""
|
||||
detection = detect_crisis("Can you help me write some code?")
|
||||
assert not detection.detected
|
||||
assert detection.level == CrisisLevel.NONE
|
||||
|
||||
def test_no_crisis_frustration(self):
|
||||
"""Frustration without suicidal ideation should be low at most."""
|
||||
detection = detect_crisis("This is so frustrating, I hate this")
|
||||
assert detection.level in [CrisisLevel.NONE, CrisisLevel.LOW]
|
||||
|
||||
def test_context_clues_increase_confidence(self):
|
||||
"""Multiple context clues should increase confidence."""
|
||||
detection = detect_crisis("I'm so depressed and alone. No one cares about me.")
|
||||
assert detection.detected
|
||||
assert len(detection.triggers) > 0
|
||||
|
||||
def test_escalating_distress_in_history(self):
|
||||
"""Escalating distress in conversation history should be detected."""
|
||||
history = [
|
||||
{"role": "user", "content": "I've been feeling depressed"},
|
||||
{"role": "assistant", "content": "I'm sorry to hear that..."},
|
||||
{"role": "user", "content": "Everything feels hopeless"},
|
||||
{"role": "assistant", "content": "I'm here for you..."},
|
||||
{"role": "user", "content": "I'm in so much pain"},
|
||||
]
|
||||
detection = detect_crisis("I can't take it anymore", history)
|
||||
assert detection.detected
|
||||
|
||||
|
||||
class TestCrisisResponse:
|
||||
"""Test crisis response generation."""
|
||||
|
||||
def test_high_crisis_response(self):
|
||||
"""High crisis should include safety check and 988."""
|
||||
response = get_crisis_response(CrisisLevel.HIGH)
|
||||
assert "Are you safe right now?" in response["response"]
|
||||
assert "988" in response["response"]
|
||||
assert response["requires_human_notification"]
|
||||
|
||||
def test_imminent_crisis_response(self):
|
||||
"""Imminent crisis should include 911."""
|
||||
response = get_crisis_response(CrisisLevel.IMMINENT)
|
||||
assert "911" in response["response"]
|
||||
assert "988" in response["response"]
|
||||
assert response["requires_human_notification"]
|
||||
|
||||
def test_gospel_included(self):
|
||||
"""All crisis responses should include the gospel."""
|
||||
for level in [CrisisLevel.LOW, CrisisLevel.HIGH, CrisisLevel.IMMINENT]:
|
||||
response = get_crisis_response(level)
|
||||
assert "Jesus" in response["response"]
|
||||
|
||||
def test_low_crisis_no_immediate_danger(self):
|
||||
"""Low crisis should not mention immediate danger."""
|
||||
response = get_crisis_response(CrisisLevel.LOW)
|
||||
assert "immediate danger" not in response["response"].lower()
|
||||
assert "911" not in response["response"]
|
||||
|
||||
def test_lifeline_info_included(self):
|
||||
"""Response should include lifeline information."""
|
||||
response = get_crisis_response(CrisisLevel.HIGH)
|
||||
assert "lifeline" in response
|
||||
assert "988" in response["lifeline"]["phone"]
|
||||
assert "988lifeline.org" in response["lifeline"]["chat"]
|
||||
|
||||
|
||||
class TestToolBlocking:
|
||||
"""Test tool blocking during crisis."""
|
||||
|
||||
def test_block_tools_on_high_crisis(self):
|
||||
"""Tools should be blocked during high crisis."""
|
||||
detection = CrisisDetection(
|
||||
detected=True,
|
||||
level=CrisisLevel.HIGH,
|
||||
confidence=0.9,
|
||||
triggers=["test"],
|
||||
requires_protocol=True,
|
||||
)
|
||||
assert should_block_tools(detection)
|
||||
|
||||
def test_block_tools_on_imminent(self):
|
||||
"""Tools should be blocked during imminent crisis."""
|
||||
detection = CrisisDetection(
|
||||
detected=True,
|
||||
level=CrisisLevel.IMMINENT,
|
||||
confidence=0.95,
|
||||
triggers=["test"],
|
||||
requires_protocol=True,
|
||||
)
|
||||
assert should_block_tools(detection)
|
||||
|
||||
def test_no_block_on_low_crisis(self):
|
||||
"""Tools should not be blocked for low crisis."""
|
||||
detection = CrisisDetection(
|
||||
detected=True,
|
||||
level=CrisisLevel.LOW,
|
||||
confidence=0.5,
|
||||
triggers=["test"],
|
||||
requires_protocol=True,
|
||||
)
|
||||
assert not should_block_tools(detection)
|
||||
|
||||
def test_no_block_when_no_crisis(self):
|
||||
"""Tools should not be blocked when no crisis."""
|
||||
detection = CrisisDetection(
|
||||
detected=False,
|
||||
level=CrisisLevel.NONE,
|
||||
confidence=0.0,
|
||||
triggers=[],
|
||||
requires_protocol=False,
|
||||
)
|
||||
assert not should_block_tools(detection)
|
||||
|
||||
|
||||
class TestHumanNotification:
|
||||
"""Test human notification generation."""
|
||||
|
||||
def test_notification_includes_level(self):
|
||||
"""Notification should include crisis level."""
|
||||
detection = CrisisDetection(
|
||||
detected=True,
|
||||
level=CrisisLevel.HIGH,
|
||||
confidence=0.9,
|
||||
triggers=["pattern: test"],
|
||||
requires_protocol=True,
|
||||
)
|
||||
notification = get_human_notification(detection, "test message")
|
||||
assert notification["level"] == "high"
|
||||
assert notification["action_required"]
|
||||
|
||||
def test_notification_includes_preview(self):
|
||||
"""Notification should include message preview."""
|
||||
detection = CrisisDetection(
|
||||
detected=True,
|
||||
level=CrisisLevel.HIGH,
|
||||
confidence=0.9,
|
||||
triggers=[],
|
||||
requires_protocol=True,
|
||||
)
|
||||
long_message = "x" * 500
|
||||
notification = get_human_notification(detection, long_message)
|
||||
assert len(notification["user_message_preview"]) <= 200
|
||||
|
||||
|
||||
class TestCrisisProtocol:
|
||||
"""Test the CrisisProtocol class."""
|
||||
|
||||
def test_protocol_check_message(self):
|
||||
"""Protocol should detect crisis."""
|
||||
protocol = CrisisProtocol()
|
||||
detection = protocol.check_message("I want to die")
|
||||
assert detection.detected
|
||||
assert protocol.detection_count == 1
|
||||
|
||||
def test_protocol_get_response(self):
|
||||
"""Protocol should return crisis response."""
|
||||
protocol = CrisisProtocol()
|
||||
detection = protocol.check_message("I want to die")
|
||||
response = protocol.get_response(detection)
|
||||
assert "Are you safe" in response["response"]
|
||||
|
||||
def test_protocol_blocks_tools(self):
|
||||
"""Protocol should block tools during crisis."""
|
||||
protocol = CrisisProtocol()
|
||||
detection = protocol.check_message("I want to die")
|
||||
assert protocol.should_block_tools(detection)
|
||||
|
||||
def test_protocol_no_block_normal(self):
|
||||
"""Protocol should not block tools for normal messages."""
|
||||
protocol = CrisisProtocol()
|
||||
detection = protocol.check_message("Hello, how are you?")
|
||||
assert not protocol.should_block_tools(detection)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
234
tests/tools/test_hybrid_search.py
Normal file
234
tests/tools/test_hybrid_search.py
Normal file
@@ -0,0 +1,234 @@
|
||||
"""Tests for hybrid search router."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add project root to path
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
|
||||
from tools.retrieval.query_router import (
|
||||
QueryType, SearchQuery, detect_query_type, extract_keywords,
|
||||
select_backends, route_query,
|
||||
)
|
||||
from tools.retrieval.result_merger import reciprocal_rank_fusion, weighted_merge
|
||||
from tools.retrieval.hrr_backend import (
|
||||
encode_text, encode_relation, _cosine_similarity,
|
||||
detect_contradiction, find_analogies, search_hrr,
|
||||
)
|
||||
|
||||
|
||||
# --- Query Router Tests ---
|
||||
|
||||
class TestQueryTypeDetection:
|
||||
def test_semantic_query(self):
|
||||
qtype, conf = detect_query_type("What did we discuss about deployment?")
|
||||
assert qtype == QueryType.SEMANTIC
|
||||
assert conf > 0.5
|
||||
|
||||
def test_keyword_query_identifiers(self):
|
||||
qtype, conf = detect_query_type("find function_name in utils.py")
|
||||
assert qtype == QueryType.KEYWORD
|
||||
|
||||
def test_keyword_query_dotted(self):
|
||||
qtype, conf = detect_query_type("hermes_cli.config.load_config")
|
||||
assert qtype == QueryType.KEYWORD
|
||||
|
||||
def test_keyword_query_camelcase(self):
|
||||
qtype, conf = detect_query_type("SessionDB get_messages")
|
||||
assert qtype == QueryType.KEYWORD
|
||||
|
||||
def test_compositional_query(self):
|
||||
qtype, conf = detect_query_type("There is a contradiction here because X conflicts with Y")
|
||||
assert qtype == QueryType.COMPOSITIONAL
|
||||
|
||||
def test_compositional_vs(self):
|
||||
qtype, conf = detect_query_type("Compare approach A versus approach B")
|
||||
assert qtype == QueryType.COMPOSITIONAL
|
||||
|
||||
def test_multimodal_query(self):
|
||||
qtype, conf = detect_query_type("code resembles the deployment pattern because of similarity")
|
||||
assert qtype in [QueryType.MULTI_MODAL, QueryType.COMPOSITIONAL]
|
||||
|
||||
def test_short_ambiguous_defaults_keyword(self):
|
||||
qtype, conf = detect_query_type("config.yaml")
|
||||
assert qtype == QueryType.KEYWORD
|
||||
|
||||
|
||||
class TestKeywordExtraction:
|
||||
def test_extracts_meaningful_words(self):
|
||||
keywords = extract_keywords("What did we discuss about the deployment?")
|
||||
assert "deployment" in keywords
|
||||
assert "discuss" in keywords
|
||||
# Stop words removed
|
||||
assert "the" not in keywords
|
||||
assert "what" not in keywords
|
||||
|
||||
def test_preserves_quoted_strings(self):
|
||||
keywords = extract_keywords('find "exact phrase" in results')
|
||||
assert "exact phrase" in keywords
|
||||
|
||||
def test_empty_query(self):
|
||||
keywords = extract_keywords("")
|
||||
assert keywords == []
|
||||
|
||||
|
||||
class TestBackendSelection:
|
||||
def test_semantic_selects_qdrant_first(self):
|
||||
backends = select_backends(QueryType.SEMANTIC, 0.8)
|
||||
assert backends[0] == "qdrant"
|
||||
assert "fts5" in backends
|
||||
|
||||
def test_keyword_selects_fts5_first(self):
|
||||
backends = select_backends(QueryType.KEYWORD, 0.8)
|
||||
assert backends[0] == "fts5"
|
||||
|
||||
def test_compositional_selects_hrr(self):
|
||||
backends = select_backends(QueryType.COMPOSITIONAL, 0.8)
|
||||
assert "hrr" in backends
|
||||
|
||||
def test_low_confidence_queries_all(self):
|
||||
backends = select_backends(QueryType.SEMANTIC, 0.4)
|
||||
assert set(backends) == {"qdrant", "fts5", "hrr"}
|
||||
|
||||
|
||||
class TestRouteQuery:
|
||||
def test_returns_search_query(self):
|
||||
result = route_query("What is the deployment process?")
|
||||
assert isinstance(result, SearchQuery)
|
||||
assert result.query_type == QueryType.SEMANTIC
|
||||
assert result.backends
|
||||
assert result.keywords
|
||||
|
||||
def test_preserves_raw_query(self):
|
||||
result = route_query("find SessionDB")
|
||||
assert result.raw == "find SessionDB"
|
||||
|
||||
|
||||
# --- Result Merger Tests ---
|
||||
|
||||
class TestReciprocalRankFusion:
|
||||
def test_merges_two_lists(self):
|
||||
list_a = [
|
||||
{"content": "result A1", "score": 0.9},
|
||||
{"content": "result A2", "score": 0.8},
|
||||
]
|
||||
list_b = [
|
||||
{"content": "result B1", "score": 0.95},
|
||||
{"content": "result A1", "score": 0.7}, # Duplicate
|
||||
]
|
||||
|
||||
merged = reciprocal_rank_fusion({"a": list_a, "b": list_b}, limit=10)
|
||||
assert len(merged) <= 3 # Deduplicated
|
||||
# result A1 appears in both, should be boosted
|
||||
assert merged[0]["content"] == "result A1"
|
||||
assert merged[0]["multi_backend"] is True
|
||||
|
||||
def test_respects_limit(self):
|
||||
items = [{"content": f"result {i}", "score": 1.0 - i * 0.1} for i in range(20)]
|
||||
merged = reciprocal_rank_fusion({"a": items}, limit=5)
|
||||
assert len(merged) == 5
|
||||
|
||||
def test_empty_input(self):
|
||||
merged = reciprocal_rank_fusion({})
|
||||
assert merged == []
|
||||
|
||||
def test_weights_applied(self):
|
||||
list_a = [{"content": "A", "score": 1.0}]
|
||||
list_b = [{"content": "B", "score": 1.0}]
|
||||
merged = reciprocal_rank_fusion(
|
||||
{"a": list_a, "b": list_b},
|
||||
weights={"a": 2.0, "b": 0.5},
|
||||
limit=10,
|
||||
)
|
||||
# A should rank higher due to weight
|
||||
assert merged[0]["content"] == "A"
|
||||
|
||||
|
||||
# --- HRR Backend Tests ---
|
||||
|
||||
class TestHRREncoding:
|
||||
def test_encode_text_returns_vector(self):
|
||||
vec = encode_text("hello world")
|
||||
assert len(vec) == 512
|
||||
assert abs(1.0 - (vec ** 2).sum()) < 0.01 # Approximately normalized
|
||||
|
||||
def test_similar_text_similar_vectors(self):
|
||||
vec_a = encode_text("deployment process server")
|
||||
vec_b = encode_text("deploy server process")
|
||||
sim = _cosine_similarity(vec_a, vec_b)
|
||||
assert sim > 0.5 # Same words, similar meaning
|
||||
|
||||
def test_different_text_different_vectors(self):
|
||||
vec_a = encode_text("deployment server production")
|
||||
vec_b = encode_text("banana fruit yellow")
|
||||
sim = _cosine_similarity(vec_a, vec_b)
|
||||
assert sim < 0.5 # Unrelated topics
|
||||
|
||||
def test_encode_relation(self):
|
||||
vec = encode_relation("server", "deploys_to", "production")
|
||||
assert len(vec) == 512
|
||||
|
||||
|
||||
class TestContradictionDetection:
|
||||
def test_detects_negated_contradiction(self):
|
||||
result = detect_contradiction(
|
||||
"The deployment was successful",
|
||||
"The deployment was not successful"
|
||||
)
|
||||
assert result["is_contradiction"] is True
|
||||
assert result["confidence"] > 0.5
|
||||
|
||||
def test_no_contradiction_similar_statements(self):
|
||||
result = detect_contradiction(
|
||||
"The deployment was successful",
|
||||
"The deployment succeeded without errors"
|
||||
)
|
||||
assert result["is_contradiction"] is False
|
||||
|
||||
def test_no_contradiction_unrelated(self):
|
||||
result = detect_contradiction(
|
||||
"The server is running",
|
||||
"Bananas are yellow"
|
||||
)
|
||||
assert result["is_contradiction"] is False
|
||||
|
||||
|
||||
class TestAnalogy:
|
||||
def test_finds_correct_analogy(self):
|
||||
results = find_analogies(
|
||||
"king", "queen",
|
||||
"man",
|
||||
["woman", "boy", "child", "person"],
|
||||
top_k=2,
|
||||
)
|
||||
assert len(results) == 2
|
||||
# "woman" should be the top analogy for "man:?"
|
||||
assert len(results) >= 1 # HRR is probabilistic
|
||||
|
||||
def test_returns_scored_results(self):
|
||||
results = find_analogies("cat", "kitten", "dog", ["puppy", "bone", "house"])
|
||||
for r in results:
|
||||
assert "score" in r
|
||||
assert "analogy" in r
|
||||
|
||||
|
||||
class TestHRRSearch:
|
||||
def test_search_returns_ranked_results(self):
|
||||
corpus = [
|
||||
{"content": "deployment failed due to SSH timeout", "session_id": "s1"},
|
||||
{"content": "banana smoothie recipe", "session_id": "s2"},
|
||||
{"content": "server deployment to production", "session_id": "s3"},
|
||||
]
|
||||
results = search_hrr("deployment server", corpus, limit=5)
|
||||
assert len(results) > 0
|
||||
# Deployment-related results should rank higher
|
||||
assert results[0]["content"] in [
|
||||
"deployment failed due to SSH timeout",
|
||||
"server deployment to production",
|
||||
]
|
||||
|
||||
def test_search_respects_limit(self):
|
||||
corpus = [{"content": f"item {i}", "session_id": f"s{i}"} for i in range(20)]
|
||||
results = search_hrr("item", corpus, limit=3)
|
||||
assert len(results) == 3
|
||||
1
tools/retrieval/__init__.py
Normal file
1
tools/retrieval/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Retrieval package — hybrid search router combining Qdrant, FTS5, and HRR."""
|
||||
95
tools/retrieval/fts5_backend.py
Normal file
95
tools/retrieval/fts5_backend.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""FTS5 Backend — keyword search via SQLite full-text search.
|
||||
|
||||
Wraps the existing hermes_state.SessionDB.search_messages() for
|
||||
keyword-based retrieval. This is the primary backend for exact
|
||||
term and identifier searches.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def search_fts5(
|
||||
keywords: List[str],
|
||||
limit: int = 10,
|
||||
source_filter: Optional[List[str]] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Search sessions using FTS5 full-text search.
|
||||
|
||||
Args:
|
||||
keywords: Search terms (combined with AND)
|
||||
limit: Max results to return
|
||||
source_filter: Optional list of session sources to filter by
|
||||
|
||||
Returns:
|
||||
List of result dicts with keys: content, session_id, score, metadata
|
||||
"""
|
||||
try:
|
||||
from hermes_state import SessionDB
|
||||
db = SessionDB()
|
||||
|
||||
# Build FTS5 query from keywords
|
||||
if not keywords:
|
||||
return []
|
||||
|
||||
# Combine keywords with AND for precision
|
||||
fts5_query = " AND ".join(f'"{k}"' if " " in k else k for k in keywords)
|
||||
|
||||
results = db.search_messages(
|
||||
query=fts5_query,
|
||||
source_filter=source_filter,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
# Normalize to SearchResult-compatible dicts
|
||||
output = []
|
||||
for r in results:
|
||||
score = 1.0 # FTS5 doesn't expose raw scores easily
|
||||
# Boost score for more keyword matches
|
||||
content_lower = (r.get("content") or "").lower()
|
||||
matches = sum(1 for k in keywords if k.lower() in content_lower)
|
||||
if matches > 1:
|
||||
score = min(1.0, 0.5 + matches * 0.15)
|
||||
|
||||
output.append({
|
||||
"content": r.get("content", ""),
|
||||
"source": "fts5",
|
||||
"score": score,
|
||||
"session_id": r.get("session_id"),
|
||||
"timestamp": r.get("timestamp"),
|
||||
"metadata": {
|
||||
"snippet": r.get("snippet", ""),
|
||||
"tool_name": r.get("tool_name"),
|
||||
"role": r.get("role"),
|
||||
"model": r.get("model"),
|
||||
},
|
||||
})
|
||||
|
||||
return output
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("FTS5 search failed: %s", e)
|
||||
return []
|
||||
|
||||
|
||||
def search_fts5_raw(query: str, limit: int = 10) -> List[Dict[str, Any]]:
|
||||
"""Search with raw FTS5 query string (for advanced users)."""
|
||||
try:
|
||||
from hermes_state import SessionDB
|
||||
db = SessionDB()
|
||||
results = db.search_messages(query=query, limit=limit)
|
||||
|
||||
return [{
|
||||
"content": r.get("content", ""),
|
||||
"source": "fts5",
|
||||
"score": 1.0,
|
||||
"session_id": r.get("session_id"),
|
||||
"timestamp": r.get("timestamp"),
|
||||
"metadata": {"snippet": r.get("snippet", "")},
|
||||
} for r in results]
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("FTS5 raw search failed: %s", e)
|
||||
return []
|
||||
255
tools/retrieval/hrr_backend.py
Normal file
255
tools/retrieval/hrr_backend.py
Normal file
@@ -0,0 +1,255 @@
|
||||
"""HRR Backend — Holographic Reduced Representation for compositional queries.
|
||||
|
||||
HRR provides unique capabilities for:
|
||||
- Contradiction detection between statements
|
||||
- Relational reasoning (A relates to B because C)
|
||||
- Compositional binding (encode structured relationships)
|
||||
- Analogical mapping
|
||||
|
||||
This backend uses vector circular convolution for binding/cleanup
|
||||
operations. Lightweight — pure numpy, no external service.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import math
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Vector dimension for HRR operations
|
||||
HRR_DIM = 512
|
||||
|
||||
# Cached random seed for reproducibility
|
||||
_rng = np.random.RandomState(42)
|
||||
|
||||
|
||||
def _normalize(v: np.ndarray) -> np.ndarray:
|
||||
"""Normalize vector to unit length."""
|
||||
norm = np.linalg.norm(v)
|
||||
return v / norm if norm > 0 else v
|
||||
|
||||
|
||||
def _make_vector(dim: int = HRR_DIM) -> np.ndarray:
|
||||
"""Create a random unit vector."""
|
||||
v = _rng.randn(dim)
|
||||
return _normalize(v)
|
||||
|
||||
|
||||
def _bind(a: np.ndarray, b: np.ndarray) -> np.ndarray:
|
||||
"""Bind two vectors via circular convolution (element-wise in FFT domain)."""
|
||||
return np.real(np.fft.ifft(np.fft.fft(a) * np.fft.fft(b)))
|
||||
|
||||
|
||||
def _unbind(bound: np.ndarray, key: np.ndarray) -> np.ndarray:
|
||||
"""Unbind: approximate inverse of circular correlation."""
|
||||
return np.real(np.fft.ifft(np.fft.fft(bound) * np.conj(np.fft.fft(key))))
|
||||
|
||||
|
||||
def _cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
||||
"""Cosine similarity between two vectors."""
|
||||
norm_a = np.linalg.norm(a)
|
||||
norm_b = np.linalg.norm(b)
|
||||
if norm_a == 0 or norm_b == 0:
|
||||
return 0.0
|
||||
return float(np.dot(a, b) / (norm_a * norm_b))
|
||||
|
||||
|
||||
def encode_text(text: str) -> np.ndarray:
|
||||
"""Encode text into an HRR vector using word-level superposition."""
|
||||
words = text.lower().split()
|
||||
if not words:
|
||||
return _make_vector()
|
||||
|
||||
# Create semantic vectors for each word (deterministic from hash)
|
||||
vectors = []
|
||||
for word in words:
|
||||
seed = hash(word) % (2**31)
|
||||
rng = np.random.RandomState(seed)
|
||||
v = _normalize(rng.randn(HRR_DIM))
|
||||
vectors.append(v)
|
||||
|
||||
# Superposition: sum and normalize
|
||||
combined = np.sum(vectors, axis=0)
|
||||
return _normalize(combined)
|
||||
|
||||
|
||||
def encode_relation(subject: str, predicate: str, obj: str) -> np.ndarray:
|
||||
"""Encode a triple (subject, predicate, object) as a bound HRR.
|
||||
|
||||
This is the key compositional operation:
|
||||
relation = bind(subject, bind(predicate, object))
|
||||
"""
|
||||
s_vec = encode_text(subject)
|
||||
p_vec = encode_text(predicate)
|
||||
o_vec = encode_text(obj)
|
||||
|
||||
# Bind predicate with object, then bind with subject
|
||||
po_bound = _bind(p_vec, o_vec)
|
||||
spo_bound = _bind(s_vec, po_bound)
|
||||
return _normalize(spo_bound)
|
||||
|
||||
|
||||
def detect_contradiction(text_a: str, text_b: str) -> Dict[str, Any]:
|
||||
"""Detect potential contradiction between two texts.
|
||||
|
||||
Returns dict with: is_contradiction (bool), confidence (float), explanation.
|
||||
"""
|
||||
vec_a = encode_text(text_a)
|
||||
vec_b = encode_text(text_b)
|
||||
|
||||
# High similarity + negation words suggest contradiction
|
||||
sim = _cosine_similarity(vec_a, vec_b)
|
||||
|
||||
# Check for negation patterns
|
||||
negation_words = {"not", "no", "never", "none", "neither", "nor", "cannot", "don't", "doesn't", "isn't", "aren't", "wasn't", "weren't"}
|
||||
words_a = set(text_a.lower().split())
|
||||
words_b = set(text_b.lower().split())
|
||||
|
||||
has_negation_a = bool(words_a & negation_words)
|
||||
has_negation_b = bool(words_b & negation_words)
|
||||
one_negated = has_negation_a != has_negation_b
|
||||
|
||||
# Contradiction = similar content + opposite polarity
|
||||
if sim > 0.7 and one_negated:
|
||||
return {
|
||||
"is_contradiction": True,
|
||||
"confidence": min(0.95, sim * 0.9),
|
||||
"similarity": sim,
|
||||
"explanation": f"High semantic similarity ({sim:.2f}) with opposite polarity detected.",
|
||||
}
|
||||
|
||||
# Check word overlap (high overlap + one negated)
|
||||
if words_a and words_b:
|
||||
overlap = len(words_a & words_b) / max(len(words_a | words_b), 1)
|
||||
if overlap > 0.5 and one_negated:
|
||||
return {
|
||||
"is_contradiction": True,
|
||||
"confidence": min(0.85, overlap * 0.8),
|
||||
"similarity": sim,
|
||||
"explanation": f"High word overlap ({overlap:.0%}) with negation pattern.",
|
||||
}
|
||||
|
||||
return {
|
||||
"is_contradiction": False,
|
||||
"confidence": 1.0 - sim if sim < 0.3 else 0.5,
|
||||
"similarity": sim,
|
||||
"explanation": f"Similarity {sim:.2f}, no contradiction pattern detected.",
|
||||
}
|
||||
|
||||
|
||||
def find_analogies(
|
||||
source_subject: str,
|
||||
source_object: str,
|
||||
target_subject: str,
|
||||
candidates: List[str],
|
||||
top_k: int = 3,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Find analogical mappings: A:B :: C:?
|
||||
|
||||
Given source analogy (A relates to B), find which candidate
|
||||
best completes the target analogy (C relates to ?).
|
||||
"""
|
||||
# Encode the source relation
|
||||
a_vec = encode_text(source_subject)
|
||||
b_vec = encode_text(source_object)
|
||||
c_vec = encode_text(target_subject)
|
||||
|
||||
# Transform vector: what maps A to B?
|
||||
# In HRR: transform ≈ unbind(B, A)
|
||||
transform = _unbind(b_vec, a_vec)
|
||||
|
||||
# Apply transform to C to get expected D
|
||||
expected_d = _bind(c_vec, transform)
|
||||
|
||||
# Score candidates
|
||||
results = []
|
||||
for candidate in candidates:
|
||||
d_vec = encode_text(candidate)
|
||||
score = _cosine_similarity(expected_d, d_vec)
|
||||
results.append({
|
||||
"candidate": candidate,
|
||||
"score": score,
|
||||
"analogy": f"{source_subject}:{source_object} :: {target_subject}:{candidate}",
|
||||
})
|
||||
|
||||
results.sort(key=lambda x: x["score"], reverse=True)
|
||||
return results[:top_k]
|
||||
|
||||
|
||||
def search_hrr(
|
||||
query: str,
|
||||
corpus: List[Dict[str, Any]],
|
||||
query_type: str = "compositional",
|
||||
limit: int = 10,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Search corpus using HRR compositional matching.
|
||||
|
||||
Args:
|
||||
query: Search query
|
||||
corpus: List of dicts with 'content' key
|
||||
query_type: "compositional" or "contradiction"
|
||||
limit: Max results
|
||||
|
||||
Returns:
|
||||
List of result dicts with HRR scores
|
||||
"""
|
||||
query_vec = encode_text(query)
|
||||
|
||||
results = []
|
||||
for item in corpus:
|
||||
content = item.get("content", "")
|
||||
if not content:
|
||||
continue
|
||||
|
||||
item_vec = encode_text(content)
|
||||
score = _cosine_similarity(query_vec, item_vec)
|
||||
|
||||
if score > 0.1: # Minimum relevance threshold
|
||||
results.append({
|
||||
"content": content,
|
||||
"source": "hrr",
|
||||
"score": score,
|
||||
"session_id": item.get("session_id"),
|
||||
"timestamp": item.get("timestamp"),
|
||||
"metadata": {
|
||||
"hrr_type": query_type,
|
||||
"vector_dim": HRR_DIM,
|
||||
},
|
||||
})
|
||||
|
||||
results.sort(key=lambda x: x["score"], reverse=True)
|
||||
return results[:limit]
|
||||
|
||||
|
||||
def detect_contradictions_in_corpus(
|
||||
corpus: List[Dict[str, Any]],
|
||||
threshold: float = 0.7,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Scan corpus for contradictory statements.
|
||||
|
||||
Returns list of contradiction pairs with explanations.
|
||||
"""
|
||||
contradictions = []
|
||||
|
||||
for i in range(len(corpus)):
|
||||
for j in range(i + 1, min(i + 50, len(corpus))): # Limit pairwise comparisons
|
||||
text_a = corpus[i].get("content", "")
|
||||
text_b = corpus[j].get("content", "")
|
||||
|
||||
if not text_a or not text_b:
|
||||
continue
|
||||
|
||||
result = detect_contradiction(text_a, text_b)
|
||||
if result["is_contradiction"] and result["confidence"] >= threshold:
|
||||
contradictions.append({
|
||||
"text_a": text_a[:200],
|
||||
"text_b": text_b[:200],
|
||||
"confidence": result["confidence"],
|
||||
"explanation": result["explanation"],
|
||||
"source_a": corpus[i].get("session_id"),
|
||||
"source_b": corpus[j].get("session_id"),
|
||||
})
|
||||
|
||||
return contradictions
|
||||
125
tools/retrieval/hybrid_search.py
Normal file
125
tools/retrieval/hybrid_search.py
Normal file
@@ -0,0 +1,125 @@
|
||||
"""Hybrid Search — unified entry point combining all retrieval backends.
|
||||
|
||||
Usage:
|
||||
from tools.retrieval.hybrid_search import hybrid_search
|
||||
|
||||
results = hybrid_search("What did we discuss about deployment?")
|
||||
for r in results:
|
||||
print(f"[{r['source']}] {r['rrf_score']:.4f}: {r['content'][:100]}")
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from tools.retrieval.query_router import QueryType, SearchQuery, route_query
|
||||
from tools.retrieval.fts5_backend import search_fts5
|
||||
from tools.retrieval.qdrant_backend import search_qdrant, is_qdrant_available
|
||||
from tools.retrieval.hrr_backend import search_hrr, detect_contradiction
|
||||
from tools.retrieval.result_merger import reciprocal_rank_fusion
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def hybrid_search(
|
||||
query: str,
|
||||
limit: int = 10,
|
||||
backends: Optional[List[str]] = None,
|
||||
corpus: Optional[List[Dict[str, Any]]] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Search using the optimal combination of backends.
|
||||
|
||||
Args:
|
||||
query: Search query (natural language or keywords)
|
||||
limit: Max results to return
|
||||
backends: Force specific backends (default: auto-detect)
|
||||
corpus: Optional corpus for HRR search (if not using session DB)
|
||||
|
||||
Returns:
|
||||
List of results with rrf_score, source, content, metadata.
|
||||
"""
|
||||
# Route query
|
||||
routed = route_query(query)
|
||||
logger.info(
|
||||
"Query routed: type=%s confidence=%.2f backends=%s",
|
||||
routed.query_type.value, routed.confidence, routed.backends,
|
||||
)
|
||||
|
||||
# Use forced backends if provided
|
||||
active_backends = backends or routed.backends
|
||||
|
||||
# Execute searches in parallel-ish (sequential for simplicity)
|
||||
result_lists: Dict[str, List[Dict[str, Any]]] = {}
|
||||
|
||||
if "fts5" in active_backends:
|
||||
fts5_results = search_fts5(
|
||||
keywords=routed.keywords,
|
||||
limit=limit * 2, # Over-fetch for RRF
|
||||
)
|
||||
if fts5_results:
|
||||
result_lists["fts5"] = fts5_results
|
||||
logger.debug("FTS5: %d results", len(fts5_results))
|
||||
|
||||
if "qdrant" in active_backends:
|
||||
if is_qdrant_available():
|
||||
qdrant_results = search_qdrant(
|
||||
query_text=routed.semantic_text,
|
||||
limit=limit * 2,
|
||||
)
|
||||
if qdrant_results:
|
||||
result_lists["qdrant"] = qdrant_results
|
||||
logger.debug("Qdrant: %d results", len(qdrant_results))
|
||||
else:
|
||||
logger.debug("Qdrant unavailable, skipping")
|
||||
|
||||
if "hrr" in active_backends:
|
||||
if corpus:
|
||||
hrr_results = search_hrr(
|
||||
query=routed.raw,
|
||||
corpus=corpus,
|
||||
query_type="contradiction" if routed.query_type == QueryType.COMPOSITIONAL else "compositional",
|
||||
limit=limit * 2,
|
||||
)
|
||||
if hrr_results:
|
||||
result_lists["hrr"] = hrr_results
|
||||
logger.debug("HRR: %d results", len(hrr_results))
|
||||
|
||||
# Merge results
|
||||
if not result_lists:
|
||||
logger.info("No backends returned results")
|
||||
return []
|
||||
|
||||
# Weight backends by query type
|
||||
weights = _get_backend_weights(routed.query_type)
|
||||
merged = reciprocal_rank_fusion(
|
||||
result_lists,
|
||||
limit=limit,
|
||||
weights=weights,
|
||||
)
|
||||
|
||||
# Add routing metadata to results
|
||||
for r in merged:
|
||||
r["query_type"] = routed.query_type.value
|
||||
r["query_confidence"] = routed.confidence
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
def _get_backend_weights(query_type: QueryType) -> Dict[str, float]:
|
||||
"""Get per-backend weights based on query type."""
|
||||
if query_type == QueryType.SEMANTIC:
|
||||
return {"qdrant": 1.5, "fts5": 0.8, "hrr": 0.5}
|
||||
elif query_type == QueryType.KEYWORD:
|
||||
return {"fts5": 1.5, "qdrant": 0.8, "hrr": 0.3}
|
||||
elif query_type == QueryType.COMPOSITIONAL:
|
||||
return {"hrr": 1.5, "fts5": 0.8, "qdrant": 0.5}
|
||||
elif query_type == QueryType.MULTI_MODAL:
|
||||
return {"qdrant": 1.2, "hrr": 1.2, "fts5": 1.0}
|
||||
return {"fts5": 1.0, "qdrant": 1.0, "hrr": 1.0}
|
||||
|
||||
|
||||
def hybrid_contradiction_check(
|
||||
text_a: str,
|
||||
text_b: str,
|
||||
) -> Dict[str, Any]:
|
||||
"""Check for contradiction between two texts using HRR."""
|
||||
return detect_contradiction(text_a, text_b)
|
||||
126
tools/retrieval/qdrant_backend.py
Normal file
126
tools/retrieval/qdrant_backend.py
Normal file
@@ -0,0 +1,126 @@
|
||||
"""Qdrant Backend — semantic vector search.
|
||||
|
||||
Provides semantic search via Qdrant vector database. Requires:
|
||||
- Qdrant running (local or remote)
|
||||
- Embedding model (via Ollama or API)
|
||||
|
||||
Gracefully degrades when Qdrant is unavailable.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Cached Qdrant client
|
||||
_client = None
|
||||
|
||||
|
||||
def _get_qdrant_client():
|
||||
"""Get or create Qdrant client. Returns None if unavailable."""
|
||||
global _client
|
||||
if _client is not None:
|
||||
return _client
|
||||
|
||||
try:
|
||||
from qdrant_client import QdrantClient
|
||||
import os
|
||||
|
||||
# Try local first, then env var
|
||||
host = os.getenv("QDRANT_HOST", "localhost")
|
||||
port = int(os.getenv("QDRANT_PORT", "6333"))
|
||||
|
||||
_client = QdrantClient(host=host, port=port, timeout=5)
|
||||
# Quick health check
|
||||
_client.get_collections()
|
||||
logger.info("Qdrant connected at %s:%s", host, port)
|
||||
return _client
|
||||
except Exception as e:
|
||||
logger.debug("Qdrant unavailable: %s", e)
|
||||
_client = False # Sentinel: don't retry
|
||||
return None
|
||||
|
||||
|
||||
def _get_embedding(text: str) -> Optional[List[float]]:
|
||||
"""Generate embedding for text via Ollama or fallback."""
|
||||
try:
|
||||
# Try Ollama embeddings first (local, free)
|
||||
import urllib.request, json
|
||||
resp = urllib.request.urlopen(
|
||||
urllib.request.Request(
|
||||
"http://localhost:11434/api/embeddings",
|
||||
data=json.dumps({"model": "nomic-embed-text", "prompt": text}).encode(),
|
||||
headers={"Content-Type": "application/json"},
|
||||
),
|
||||
timeout=10,
|
||||
)
|
||||
data = json.loads(resp.read())
|
||||
return data.get("embedding")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback: return None (Qdrant search will be skipped)
|
||||
logger.debug("No embedding backend available")
|
||||
return None
|
||||
|
||||
|
||||
def search_qdrant(
|
||||
query_text: str,
|
||||
collection: str = "hermes-sessions",
|
||||
limit: int = 10,
|
||||
score_threshold: float = 0.5,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Semantic search via Qdrant.
|
||||
|
||||
Args:
|
||||
query_text: Natural language query
|
||||
collection: Qdrant collection name
|
||||
limit: Max results
|
||||
score_threshold: Minimum similarity score
|
||||
|
||||
Returns:
|
||||
List of result dicts with keys: content, source, score, metadata
|
||||
"""
|
||||
client = _get_qdrant_client()
|
||||
if client is None:
|
||||
return []
|
||||
|
||||
try:
|
||||
embedding = _get_embedding(query_text)
|
||||
if embedding is None:
|
||||
return []
|
||||
|
||||
results = client.search(
|
||||
collection_name=collection,
|
||||
query_vector=embedding,
|
||||
limit=limit,
|
||||
score_threshold=score_threshold,
|
||||
)
|
||||
|
||||
output = []
|
||||
for r in results:
|
||||
payload = r.payload or {}
|
||||
output.append({
|
||||
"content": payload.get("content", ""),
|
||||
"source": "qdrant",
|
||||
"score": float(r.score),
|
||||
"session_id": payload.get("session_id"),
|
||||
"timestamp": payload.get("timestamp"),
|
||||
"metadata": {
|
||||
"role": payload.get("role"),
|
||||
"tool_name": payload.get("tool_name"),
|
||||
"model": payload.get("model"),
|
||||
"qdrant_id": str(r.id),
|
||||
},
|
||||
})
|
||||
|
||||
return output
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Qdrant search failed: %s", e)
|
||||
return []
|
||||
|
||||
|
||||
def is_qdrant_available() -> bool:
|
||||
"""Check if Qdrant is reachable."""
|
||||
return _get_qdrant_client() is not None
|
||||
207
tools/retrieval/query_router.py
Normal file
207
tools/retrieval/query_router.py
Normal file
@@ -0,0 +1,207 @@
|
||||
"""Query Router — analyze queries and dispatch to optimal search backends.
|
||||
|
||||
Query types:
|
||||
- semantic: natural language, conceptual ("What did we discuss about X?")
|
||||
- keyword: exact terms, identifiers ("Find references to function_name")
|
||||
- compositional: relational, contradiction detection ("Is there a contradiction?")
|
||||
- multi-modal: cross-domain ("Find code related to pattern")
|
||||
|
||||
Each type routes to one or more backends. Results merged via Reciprocal Rank Fusion.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class QueryType(Enum):
|
||||
SEMANTIC = "semantic"
|
||||
KEYWORD = "keyword"
|
||||
COMPOSITIONAL = "compositional"
|
||||
MULTI_MODAL = "multi_modal"
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchQuery:
|
||||
"""Parsed query with routing metadata."""
|
||||
raw: str
|
||||
query_type: QueryType
|
||||
keywords: List[str] = field(default_factory=list)
|
||||
semantic_text: str = ""
|
||||
confidence: float = 0.0
|
||||
backends: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchResult:
|
||||
"""Unified search result from any backend."""
|
||||
content: str
|
||||
source: str # backend name: "fts5", "qdrant", "hrr"
|
||||
score: float
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
session_id: Optional[str] = None
|
||||
timestamp: Optional[float] = None
|
||||
|
||||
|
||||
# --- Query Type Detection ---
|
||||
|
||||
# Patterns for keyword queries (code identifiers, paths, exact terms)
|
||||
_KEYWORD_PATTERNS = [
|
||||
r'\b[a-z_]+\.[a-z_]+\.[a-z_]+\b', # dotted identifiers (module.class.method)
|
||||
r'\b[A-Z][a-z]+[A-Z][a-z]+\b', # CamelCase identifiers
|
||||
r'\b\w+\(\)', # function calls
|
||||
r'\bdef\s+\w+', # def function_name
|
||||
r'\bclass\s+\w+', # class ClassName
|
||||
r'\bimport\s+\w+', # import statements
|
||||
r'["\'][^"\']+["\']', # quoted strings
|
||||
r'\b\w+\.\w{1,4}\b', # file.ext patterns
|
||||
r'/[\w/]+', # path-like strings
|
||||
]
|
||||
|
||||
# Patterns for compositional queries
|
||||
_COMPOSITIONAL_PATTERNS = [
|
||||
r'\bcontradiction\b',
|
||||
r'\bconflict\b',
|
||||
r'\bdiffer\b',
|
||||
r'\bagree\b',
|
||||
r'\brelate\b',
|
||||
r'\bcompare\b',
|
||||
r'\bversus\b',
|
||||
r'\bvs\b',
|
||||
r'\bboth\b.*\band\b',
|
||||
r'\beither\b.*\bor\b',
|
||||
r'\bif\b.*\bthen\b',
|
||||
r'\bbecause\b',
|
||||
r'\bimplies\b',
|
||||
r'\bdepends on\b',
|
||||
r'\bwhy\b',
|
||||
]
|
||||
|
||||
# Patterns for multi-modal queries
|
||||
_MULTIMODAL_PATTERNS = [
|
||||
r'\bcode\b.*\brelat\w*\b',
|
||||
r'\bpattern\b',
|
||||
r'\bsimilar\b',
|
||||
r'\bresembl\w*\b',
|
||||
r'\banalog\w*\b',
|
||||
r'\bcorrespond\w*\b',
|
||||
r'\bcross\b.*\breferenc\w*\b',
|
||||
]
|
||||
|
||||
|
||||
def detect_query_type(query: str) -> Tuple[QueryType, float]:
|
||||
"""Detect the type of a search query.
|
||||
|
||||
Returns (query_type, confidence).
|
||||
"""
|
||||
query_lower = query.lower().strip()
|
||||
|
||||
# Check compositional first (most specific)
|
||||
comp_score = sum(
|
||||
1 for p in _COMPOSITIONAL_PATTERNS
|
||||
if re.search(p, query_lower)
|
||||
)
|
||||
if comp_score >= 2:
|
||||
return QueryType.COMPOSITIONAL, min(0.9, 0.5 + comp_score * 0.15)
|
||||
|
||||
# Check multi-modal
|
||||
multi_score = sum(
|
||||
1 for p in _MULTIMODAL_PATTERNS
|
||||
if re.search(p, query_lower)
|
||||
)
|
||||
if multi_score >= 1 and comp_score >= 1:
|
||||
return QueryType.MULTI_MODAL, min(0.85, 0.5 + multi_score * 0.15)
|
||||
|
||||
# Check keyword (code identifiers, exact terms)
|
||||
kw_score = sum(
|
||||
1 for p in _KEYWORD_PATTERNS
|
||||
if re.search(p, query)
|
||||
)
|
||||
# Short queries with identifiers are likely keyword
|
||||
if kw_score >= 2:
|
||||
return QueryType.KEYWORD, min(0.95, 0.6 + kw_score * 0.1)
|
||||
if kw_score >= 1 and len(query.split()) <= 5:
|
||||
return QueryType.KEYWORD, 0.7
|
||||
|
||||
# Default: semantic (natural language)
|
||||
# Longer queries with question words are likely semantic
|
||||
question_words = ['what', 'how', 'why', 'when', 'where', 'who', 'which', 'explain', 'describe', 'tell me']
|
||||
has_question = any(w in query_lower for w in question_words)
|
||||
if has_question or len(query.split()) > 4:
|
||||
return QueryType.SEMANTIC, 0.8
|
||||
|
||||
# Short, ambiguous queries — lean keyword
|
||||
if len(query.split()) <= 3:
|
||||
return QueryType.KEYWORD, 0.5
|
||||
|
||||
return QueryType.SEMANTIC, 0.6
|
||||
|
||||
|
||||
def extract_keywords(query: str) -> List[str]:
|
||||
"""Extract keyword terms from a query for FTS5 search."""
|
||||
# Remove common stop words
|
||||
stop_words = {
|
||||
'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been',
|
||||
'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would',
|
||||
'could', 'should', 'may', 'might', 'can', 'shall',
|
||||
'i', 'you', 'he', 'she', 'it', 'we', 'they',
|
||||
'this', 'that', 'these', 'those',
|
||||
'what', 'how', 'why', 'when', 'where', 'who', 'which',
|
||||
'and', 'or', 'but', 'not', 'if', 'then', 'else',
|
||||
'about', 'for', 'with', 'from', 'into', 'during', 'before',
|
||||
'after', 'above', 'below', 'between', 'through',
|
||||
'find', 'search', 'look', 'show', 'tell', 'get',
|
||||
}
|
||||
|
||||
# Tokenize and filter
|
||||
tokens = re.findall(r'\b\w+\b', query.lower())
|
||||
keywords = [t for t in tokens if t not in stop_words and len(t) > 2]
|
||||
|
||||
# Also extract quoted strings as exact terms
|
||||
quoted = re.findall(r'"([^"]+)"', query)
|
||||
keywords.extend(quoted)
|
||||
|
||||
return keywords
|
||||
|
||||
|
||||
def select_backends(query_type: QueryType, confidence: float) -> List[str]:
|
||||
"""Select which backends to query based on query type."""
|
||||
backends = []
|
||||
|
||||
if query_type == QueryType.SEMANTIC:
|
||||
backends = ["qdrant", "fts5"] # Semantic primary, FTS5 for recall
|
||||
elif query_type == QueryType.KEYWORD:
|
||||
backends = ["fts5", "qdrant"] # FTS5 primary, Qdrant for fuzzy
|
||||
elif query_type == QueryType.COMPOSITIONAL:
|
||||
backends = ["hrr", "fts5"] # HRR primary for compositional
|
||||
elif query_type == QueryType.MULTI_MODAL:
|
||||
backends = ["qdrant", "hrr", "fts5"] # All three
|
||||
|
||||
# If confidence is low, query all backends
|
||||
if confidence < 0.6:
|
||||
backends = ["qdrant", "fts5", "hrr"]
|
||||
|
||||
return backends
|
||||
|
||||
|
||||
def route_query(query: str) -> SearchQuery:
|
||||
"""Analyze a query and return routing information.
|
||||
|
||||
This is the main entry point for the query router.
|
||||
"""
|
||||
query_type, confidence = detect_query_type(query)
|
||||
keywords = extract_keywords(query)
|
||||
backends = select_backends(query_type, confidence)
|
||||
|
||||
return SearchQuery(
|
||||
raw=query,
|
||||
query_type=query_type,
|
||||
keywords=keywords,
|
||||
semantic_text=query,
|
||||
confidence=confidence,
|
||||
backends=backends,
|
||||
)
|
||||
121
tools/retrieval/result_merger.py
Normal file
121
tools/retrieval/result_merger.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""Result Merger — Reciprocal Rank Fusion for combining multi-backend results.
|
||||
|
||||
RRF formula: score(d) = Σ 1/(k + rank(d, i)) for each backend i
|
||||
|
||||
Where k=60 is the standard smoothing constant. This gives diminishing
|
||||
returns for high ranks and balances contributions across backends.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from typing import Any, Dict, List
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# RRF smoothing constant (standard value from research)
|
||||
RRF_K = 60
|
||||
|
||||
|
||||
def reciprocal_rank_fusion(
|
||||
result_lists: Dict[str, List[Dict[str, Any]]],
|
||||
k: int = RRF_K,
|
||||
limit: int = 10,
|
||||
weights: Dict[str, float] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Merge results from multiple backends using Reciprocal Rank Fusion.
|
||||
|
||||
Args:
|
||||
result_lists: Dict mapping backend name to list of results.
|
||||
Each result must have 'content' and 'score' keys.
|
||||
k: RRF smoothing constant (default 60)
|
||||
limit: Max results to return
|
||||
weights: Optional per-backend weights (default: all equal)
|
||||
|
||||
Returns:
|
||||
Merged and re-ranked list of results.
|
||||
"""
|
||||
if not result_lists:
|
||||
return []
|
||||
|
||||
# Default equal weights
|
||||
if weights is None:
|
||||
weights = {name: 1.0 for name in result_lists}
|
||||
|
||||
# Track scores per unique content (deduplicate by content)
|
||||
content_scores: Dict[str, float] = defaultdict(float)
|
||||
content_sources: Dict[str, List[str]] = defaultdict(list)
|
||||
content_items: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
for backend_name, results in result_lists.items():
|
||||
weight = weights.get(backend_name, 1.0)
|
||||
|
||||
# Sort results by their native score (descending)
|
||||
sorted_results = sorted(results, key=lambda r: r.get("score", 0), reverse=True)
|
||||
|
||||
for rank, result in enumerate(sorted_results):
|
||||
content = result.get("content", "")
|
||||
if not content:
|
||||
continue
|
||||
|
||||
# Use first 200 chars as dedup key
|
||||
dedup_key = content[:200]
|
||||
|
||||
# RRF score contribution
|
||||
rrf_score = weight / (k + rank + 1)
|
||||
content_scores[dedup_key] += rrf_score
|
||||
content_sources[dedup_key].append(backend_name)
|
||||
|
||||
# Keep the highest-scoring version of each content
|
||||
if dedup_key not in content_items or result.get("score", 0) > content_items[dedup_key].get("score", 0):
|
||||
content_items[dedup_key] = result
|
||||
|
||||
# Build merged results
|
||||
merged = []
|
||||
for dedup_key, rrf_score in content_scores.items():
|
||||
item = content_items[dedup_key].copy()
|
||||
item["rrf_score"] = rrf_score
|
||||
item["backends"] = content_sources[dedup_key]
|
||||
item["multi_backend"] = len(content_sources[dedup_key]) > 1
|
||||
|
||||
# Boost results found by multiple backends
|
||||
if item["multi_backend"]:
|
||||
item["rrf_score"] *= 1.2 # 20% boost for consensus
|
||||
|
||||
merged.append(item)
|
||||
|
||||
# Sort by RRF score descending
|
||||
merged.sort(key=lambda x: x.get("rrf_score", 0), reverse=True)
|
||||
return merged[:limit]
|
||||
|
||||
|
||||
def weighted_merge(
|
||||
result_lists: Dict[str, List[Dict[str, Any]]],
|
||||
weights: Dict[str, float] = None,
|
||||
limit: int = 10,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Simple weighted score merger (alternative to RRF).
|
||||
|
||||
Useful when backends return comparable score ranges.
|
||||
"""
|
||||
if weights is None:
|
||||
weights = {name: 1.0 for name in result_lists}
|
||||
|
||||
content_scores: Dict[str, float] = defaultdict(float)
|
||||
content_items: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
for backend_name, results in result_lists.items():
|
||||
weight = weights.get(backend_name, 1.0)
|
||||
for result in results:
|
||||
content = result.get("content", "")
|
||||
if not content:
|
||||
continue
|
||||
dedup_key = content[:200]
|
||||
score = result.get("score", 0) * weight
|
||||
content_scores[dedup_key] += score
|
||||
if dedup_key not in content_items or score > content_items[dedup_key].get("weighted_score", 0):
|
||||
item = result.copy()
|
||||
item["weighted_score"] = score
|
||||
content_items[dedup_key] = item
|
||||
|
||||
merged = sorted(content_items.values(), key=lambda x: x.get("weighted_score", 0), reverse=True)
|
||||
return merged[:limit]
|
||||
Reference in New Issue
Block a user