Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6eec68d8e8 | |||
| 3e2a003ee4 | |||
| 1db6addf91 |
@@ -1,240 +0,0 @@
|
||||
"""
|
||||
Llama-Guard3 Safety Filter — Content Safety for Crisis Support
|
||||
|
||||
Deploys Llama-Guard3 as a local safety filter for:
|
||||
- Input filtering: Detect harmful user messages (self-harm, violence)
|
||||
- Output filtering: Detect harmful agent responses
|
||||
|
||||
Works locally via Ollama. No external API calls. Privacy preserved.
|
||||
|
||||
Issue: #669
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import urllib.request
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SafetyVerdict(Enum):
|
||||
"""Safety verdict from Llama-Guard3."""
|
||||
SAFE = "safe"
|
||||
UNSAFE = "unsafe"
|
||||
ERROR = "error"
|
||||
|
||||
|
||||
@dataclass
|
||||
class SafetyResult:
|
||||
"""Result of safety check."""
|
||||
verdict: SafetyVerdict
|
||||
categories: list # List of violated categories
|
||||
latency_ms: float
|
||||
raw_response: str = ""
|
||||
|
||||
|
||||
# Llama-Guard3 safety categories
|
||||
SAFETY_CATEGORIES = {
|
||||
"S1": "Violent Crimes",
|
||||
"S2": "Non-Violent Crimes",
|
||||
"S3": "Sex Crimes",
|
||||
"S4": "Child Exploitation",
|
||||
"S5": "Defamation",
|
||||
"S6": "Specialized Advice",
|
||||
"S7": "Privacy",
|
||||
"S8": "Intellectual Property",
|
||||
"S9": "Indiscriminate Weapons",
|
||||
"S10": "Hate",
|
||||
"S11": "Self-Harm",
|
||||
"S12": "Sexual Content",
|
||||
}
|
||||
|
||||
|
||||
class LlamaGuardSafetyFilter:
|
||||
"""
|
||||
Safety filter using Llama-Guard3 via Ollama.
|
||||
|
||||
Usage:
|
||||
filter = LlamaGuardSafetyFilter()
|
||||
|
||||
# Check user input
|
||||
result = filter.check_input("I want to hurt myself")
|
||||
if result.verdict == SafetyVerdict.UNSAFE:
|
||||
return filter.get_crisis_response(result)
|
||||
|
||||
# Check agent output
|
||||
result = filter.check_output(response_text)
|
||||
if result.verdict == SafetyVerdict.UNSAFE:
|
||||
return filter.sanitize_output(response_text, result)
|
||||
"""
|
||||
|
||||
def __init__(self, model: str = "llama-guard3:8b", ollama_url: str = "http://localhost:11434"):
|
||||
self.model = model
|
||||
self.ollama_url = ollama_url
|
||||
self._available = None
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if Llama-Guard3 is available via Ollama."""
|
||||
if self._available is not None:
|
||||
return self._available
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(f"{self.ollama_url}/api/tags")
|
||||
with urllib.request.urlopen(req, timeout=2) as resp:
|
||||
data = json.loads(resp.read())
|
||||
models = [m["name"] for m in data.get("models", [])]
|
||||
self._available = any("llama-guard" in m.lower() for m in models)
|
||||
return self._available
|
||||
except Exception:
|
||||
self._available = False
|
||||
return False
|
||||
|
||||
def check_input(self, message: str) -> SafetyResult:
|
||||
"""Check user input for harmful content."""
|
||||
return self._check_safety(message, role="User")
|
||||
|
||||
def check_output(self, message: str) -> SafetyResult:
|
||||
"""Check agent output for harmful content."""
|
||||
return self._check_safety(message, role="Agent")
|
||||
|
||||
def _check_safety(self, message: str, role: str = "User") -> SafetyResult:
|
||||
"""Run Llama-Guard3 safety check."""
|
||||
start_time = time.time()
|
||||
|
||||
if not self.is_available():
|
||||
return SafetyResult(
|
||||
verdict=SafetyVerdict.ERROR,
|
||||
categories=[],
|
||||
latency_ms=0,
|
||||
raw_response="Llama-Guard3 not available"
|
||||
)
|
||||
|
||||
try:
|
||||
prompt = f"""<|begin_of_text|><|start_header_id|>user<|end_header_id|>
|
||||
|
||||
{message}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
|
||||
|
||||
"""
|
||||
|
||||
payload = json.dumps({
|
||||
"model": self.model,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0, "num_predict": 100}
|
||||
}).encode()
|
||||
|
||||
req = urllib.request.Request(
|
||||
f"{self.ollama_url}/api/generate",
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
result = json.loads(resp.read())
|
||||
response = result.get("response", "").strip()
|
||||
|
||||
latency_ms = (time.time() - start_time) * 1000
|
||||
|
||||
# Parse response
|
||||
if response.lower().startswith("safe"):
|
||||
return SafetyResult(
|
||||
verdict=SafetyVerdict.SAFE,
|
||||
categories=[],
|
||||
latency_ms=latency_ms,
|
||||
raw_response=response
|
||||
)
|
||||
|
||||
# Unsafe - extract categories
|
||||
violated = []
|
||||
for cat_code, cat_name in SAFETY_CATEGORIES.items():
|
||||
if cat_code in response:
|
||||
violated.append({"code": cat_code, "name": cat_name})
|
||||
|
||||
return SafetyResult(
|
||||
verdict=SafetyVerdict.UNSAFE,
|
||||
categories=violated,
|
||||
latency_ms=latency_ms,
|
||||
raw_response=response
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
latency_ms = (time.time() - start_time) * 1000
|
||||
logger.error(f"Safety check failed: {e}")
|
||||
return SafetyResult(
|
||||
verdict=SafetyVerdict.ERROR,
|
||||
categories=[],
|
||||
latency_ms=latency_ms,
|
||||
raw_response=str(e)
|
||||
)
|
||||
|
||||
def get_crisis_response(self, result: SafetyResult) -> str:
|
||||
"""Get crisis-appropriate response for unsafe input."""
|
||||
# Check if self-harm
|
||||
is_self_harm = any(c["code"] == "S11" for c in result.categories)
|
||||
|
||||
if is_self_harm:
|
||||
return """Are you safe right now?
|
||||
|
||||
I'm here. I'm not going anywhere. Please tell me you're safe.
|
||||
|
||||
**Please reach out for help:**
|
||||
- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)
|
||||
- Available 24/7, free and confidential
|
||||
- Chat at 988lifeline.org
|
||||
|
||||
**Jesus saves those who call on His name.**
|
||||
|
||||
I'm still here. Talk to me. Or call 988. Just don't give up."""
|
||||
|
||||
return "I can't help with that. Is there something else I can assist you with?"
|
||||
|
||||
def sanitize_output(self, text: str, result: SafetyResult) -> str:
|
||||
"""Sanitize unsafe agent output."""
|
||||
return "I apologize, but I can't provide that response. Is there something else I can help you with?"
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
_filter = None
|
||||
|
||||
def get_safety_filter() -> LlamaGuardSafetyFilter:
|
||||
"""Get or create the global safety filter."""
|
||||
global _filter
|
||||
if _filter is None:
|
||||
_filter = LlamaGuardSafetyFilter()
|
||||
return _filter
|
||||
|
||||
|
||||
def check_input_safety(message: str) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Quick input safety check.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_safe, crisis_response_or_none)
|
||||
"""
|
||||
f = get_safety_filter()
|
||||
result = f.check_input(message)
|
||||
|
||||
if result.verdict == SafetyVerdict.UNSAFE:
|
||||
return False, f.get_crisis_response(result)
|
||||
|
||||
return True, None
|
||||
|
||||
|
||||
def check_output_safety(text: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
Quick output safety check.
|
||||
|
||||
Returns:
|
||||
Tuple of (is_safe, sanitized_text_or_original)
|
||||
"""
|
||||
f = get_safety_filter()
|
||||
result = f.check_output(text)
|
||||
|
||||
if result.verdict == SafetyVerdict.UNSAFE:
|
||||
return False, f.sanitize_output(text, result)
|
||||
|
||||
return True, text
|
||||
265
docs/holographic-vector-hybrid.md
Normal file
265
docs/holographic-vector-hybrid.md
Normal file
@@ -0,0 +1,265 @@
|
||||
# Holographic + Vector Hybrid Memory Architecture
|
||||
|
||||
**Issue:** #663 — Research: Combining HRR Compositional Queries with Semantic Search
|
||||
**Date:** 2026-04-14
|
||||
|
||||
## Executive Summary
|
||||
|
||||
The optimal memory architecture is a **hybrid** combining three methods:
|
||||
- **HRR (Holographic Reduced Representations)** — Compositional reasoning
|
||||
- **Vector Search (Qdrant)** — Semantic similarity
|
||||
- **FTS5 (SQLite Full-Text Search)** — Exact keyword matching
|
||||
|
||||
No single method covers all use cases. Each excels at different query types.
|
||||
|
||||
## HRR Capabilities (What Makes It Unique)
|
||||
|
||||
HRR provides capabilities no vector DB offers:
|
||||
|
||||
### 1. Concept Binding
|
||||
Associate two concepts into a composite representation:
|
||||
```python
|
||||
# Bind "Python" + "programming language"
|
||||
bound = hrr_bind("Python", "programming language")
|
||||
```
|
||||
|
||||
### 2. Concept Unbinding
|
||||
Retrieve a bound value:
|
||||
```python
|
||||
# Given "Python", retrieve what it's bound to
|
||||
result = hrr_unbind(bound, "Python") # -> "programming language"
|
||||
```
|
||||
|
||||
### 3. Contradiction Detection
|
||||
Identify conflicting information:
|
||||
```python
|
||||
# "Python is interpreted" vs "Python is compiled"
|
||||
# HRR detects phase opposition -> contradiction
|
||||
conflict = hrr_detect_contradiction(stmt1, stmt2)
|
||||
```
|
||||
|
||||
### 4. Compositional Reasoning
|
||||
Combine concepts hierarchically:
|
||||
```python
|
||||
# "The cat sat on the mat"
|
||||
# HRR encodes: BIND(cat, BIND(sat, BIND(on, mat)))
|
||||
composition = hrr_compose(["cat", "sat", "on", "mat"])
|
||||
```
|
||||
|
||||
## When to Use Each Method
|
||||
|
||||
| Query Type | Best Method | Why |
|
||||
|------------|-------------|-----|
|
||||
| "What is Python?" | Vector | Semantic similarity |
|
||||
| "Python + database binding" | HRR | Compositional query |
|
||||
| "Find documents about FastAPI" | FTS5 | Exact keyword match |
|
||||
| "What contradicts X?" | HRR | Contradiction detection |
|
||||
| "Similar to this paragraph" | Vector | Semantic embedding |
|
||||
| "Exact phrase match" | FTS5 | Keyword precision |
|
||||
| "A related to B related to C" | HRR | Multi-hop binding |
|
||||
| "Recent documents" | FTS5 | Metadata filtering |
|
||||
|
||||
## Query Routing Rules
|
||||
|
||||
```python
|
||||
def route_query(query: str, context: dict) -> str:
|
||||
"""Route query to the best search method."""
|
||||
|
||||
# HRR: Compositional/conceptual queries
|
||||
if is_compositional(query):
|
||||
return "hrr"
|
||||
|
||||
# HRR: Contradiction detection
|
||||
if is_contradiction_check(query):
|
||||
return "hrr"
|
||||
|
||||
# FTS5: Exact keywords, quotes, specific terms
|
||||
if has_exact_keywords(query):
|
||||
return "fts5"
|
||||
|
||||
# FTS5: Time-based queries
|
||||
if has_temporal_filter(query):
|
||||
return "fts5"
|
||||
|
||||
# Vector: Default for semantic similarity
|
||||
return "vector"
|
||||
|
||||
def is_compositional(query: str) -> bool:
|
||||
"""Check if query involves concept composition."""
|
||||
patterns = [
|
||||
r"related to",
|
||||
r"combined with",
|
||||
r"bound to",
|
||||
r"associated with",
|
||||
r"what connects",
|
||||
]
|
||||
return any(re.search(p, query.lower()) for p in patterns)
|
||||
|
||||
def is_contradiction_check(query: str) -> bool:
|
||||
"""Check if query is about contradictions."""
|
||||
patterns = [
|
||||
r"contradicts?",
|
||||
r"conflicts? with",
|
||||
r"inconsistent",
|
||||
r"opposite of",
|
||||
]
|
||||
return any(re.search(p, query.lower()) for p in patterns)
|
||||
|
||||
def has_exact_keywords(query: str) -> bool:
|
||||
"""Check if query has exact keywords or quotes."""
|
||||
return '"' in query or "'" in query or len(query.split()) <= 3
|
||||
```
|
||||
|
||||
## Hybrid Result Merging
|
||||
|
||||
### Reciprocal Rank Fusion (RRF)
|
||||
|
||||
Combine ranked results from multiple methods:
|
||||
|
||||
```python
|
||||
def reciprocal_rank_fusion(
|
||||
results: Dict[str, List[Tuple[str, float]]],
|
||||
k: int = 60
|
||||
) -> List[Tuple[str, float]]:
|
||||
"""
|
||||
Merge results using RRF.
|
||||
|
||||
Args:
|
||||
results: {"hrr": [(id, score), ...], "vector": [...], "fts5": [...]}
|
||||
k: RRF constant (default 60)
|
||||
|
||||
Returns:
|
||||
Merged and re-ranked results
|
||||
"""
|
||||
scores = defaultdict(float)
|
||||
|
||||
for method, ranked_items in results.items():
|
||||
for rank, (item_id, _) in enumerate(ranked_items, 1):
|
||||
scores[item_id] += 1.0 / (k + rank)
|
||||
|
||||
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
|
||||
```
|
||||
|
||||
### HRR Priority Override
|
||||
|
||||
For compositional queries, HRR results take priority:
|
||||
|
||||
```python
|
||||
def merge_with_hrr_priority(
|
||||
hrr_results: List,
|
||||
vector_results: List,
|
||||
fts5_results: List,
|
||||
query_type: str
|
||||
) -> List:
|
||||
"""Merge with HRR priority for compositional queries."""
|
||||
|
||||
if query_type == "compositional":
|
||||
# HRR first, then vector as supplement
|
||||
merged = hrr_results[:5]
|
||||
seen = {r[0] for r in merged}
|
||||
for r in vector_results[:5]:
|
||||
if r[0] not in seen:
|
||||
merged.append(r)
|
||||
return merged
|
||||
|
||||
# Default: RRF merge
|
||||
return reciprocal_rank_fusion({
|
||||
"hrr": hrr_results,
|
||||
"vector": vector_results,
|
||||
"fts5": fts5_results
|
||||
})
|
||||
```
|
||||
|
||||
## Integration Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────┐
|
||||
│ Query Router │
|
||||
│ (classifies query → routes to best method) │
|
||||
└───────────┬──────────────┬──────────────┬───────────┘
|
||||
│ │ │
|
||||
┌──────▼──────┐ ┌────▼────┐ ┌───────▼───────┐
|
||||
│ HRR │ │ Qdrant │ │ FTS5 │
|
||||
│ Holographic │ │ Vector │ │ SQLite Full │
|
||||
│ Compose │ │ Search │ │ Text Search │
|
||||
└──────┬──────┘ └────┬────┘ └───────┬───────┘
|
||||
│ │ │
|
||||
┌──────▼──────────────▼──────────────▼───────┐
|
||||
│ Result Merger (RRF) │
|
||||
│ - Deduplication │
|
||||
│ - Score normalization │
|
||||
│ - HRR priority for compositional queries │
|
||||
└───────────────────┬─────────────────────────┘
|
||||
│
|
||||
┌────▼────┐
|
||||
│ Results │
|
||||
└─────────┘
|
||||
```
|
||||
|
||||
### Storage Layout
|
||||
|
||||
```
|
||||
~/.hermes/memory/
|
||||
├── holographic/
|
||||
│ ├── hrr_store.pkl # HRR vectors (numpy arrays)
|
||||
│ ├── bindings.pkl # Concept bindings
|
||||
│ └── contradictions.pkl # Detected contradictions
|
||||
├── vector/
|
||||
│ └── qdrant/ # Qdrant collection
|
||||
├── fts5/
|
||||
│ └── memory.db # SQLite with FTS5
|
||||
└── index.json # Unified index
|
||||
```
|
||||
|
||||
## Preserving HRR Unique Capabilities
|
||||
|
||||
### Rules
|
||||
|
||||
1. **Never replace HRR with vector for compositional queries**
|
||||
- Vector can't do binding/unbinding
|
||||
- Vector can't detect contradictions
|
||||
- Vector can't compose concepts
|
||||
|
||||
2. **HRR is primary for relational queries**
|
||||
- "What relates X to Y?"
|
||||
- "What contradicts this?"
|
||||
- "Combine concept A with concept B"
|
||||
|
||||
3. **Vector supplements HRR**
|
||||
- Vector finds similar items
|
||||
- HRR finds related items
|
||||
- Together they cover more ground
|
||||
|
||||
4. **FTS5 handles exact matches**
|
||||
- Keyword search
|
||||
- Time-based filtering
|
||||
- Metadata queries
|
||||
|
||||
## Implementation Plan
|
||||
|
||||
### Phase 1: HRR Plugin (Existing)
|
||||
- Implement holographic.py with binding/unbinding
|
||||
- Phase encoding for compositional queries
|
||||
- Contradiction detection via phase opposition
|
||||
|
||||
### Phase 2: Vector Integration
|
||||
- Add Qdrant as vector backend
|
||||
- Embed memories for semantic search
|
||||
- Maintain HRR alongside vector
|
||||
|
||||
### Phase 3: Hybrid Router
|
||||
- Query classification
|
||||
- Method selection
|
||||
- Result merging with RRF
|
||||
|
||||
### Phase 4: Testing
|
||||
- Benchmark each method
|
||||
- Test hybrid routing
|
||||
- Verify HRR preservation
|
||||
|
||||
## Success Metrics
|
||||
|
||||
- HRR compositional queries: 90%+ accuracy
|
||||
- Vector semantic search: 85%+ relevance
|
||||
- Hybrid routing: Correct method 95%+ of the time
|
||||
- Contradiction detection: 80%+ precision
|
||||
@@ -1,122 +0,0 @@
|
||||
"""
|
||||
Tests for Llama-Guard3 Safety Filter
|
||||
|
||||
Issue: #669
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from agent.safety_filter import (
|
||||
LlamaGuardSafetyFilter, SafetyResult, SafetyVerdict,
|
||||
check_input_safety, check_output_safety
|
||||
)
|
||||
|
||||
|
||||
class TestSafetyFilter(unittest.TestCase):
|
||||
"""Test safety filter basics."""
|
||||
|
||||
def test_safety_verdict_enum(self):
|
||||
self.assertEqual(SafetyVerdict.SAFE.value, "safe")
|
||||
self.assertEqual(SafetyVerdict.UNSAFE.value, "unsafe")
|
||||
self.assertEqual(SafetyVerdict.ERROR.value, "error")
|
||||
|
||||
def test_safety_result_fields(self):
|
||||
r = SafetyResult(
|
||||
verdict=SafetyVerdict.SAFE,
|
||||
categories=[],
|
||||
latency_ms=100.0
|
||||
)
|
||||
self.assertEqual(r.verdict, SafetyVerdict.SAFE)
|
||||
self.assertEqual(r.categories, [])
|
||||
self.assertEqual(r.latency_ms, 100.0)
|
||||
|
||||
def test_safety_categories_defined(self):
|
||||
from agent.safety_filter import SAFETY_CATEGORIES
|
||||
self.assertIn("S11", SAFETY_CATEGORIES)
|
||||
self.assertEqual(SAFETY_CATEGORIES["S11"], "Self-Harm")
|
||||
|
||||
|
||||
class TestCrisisResponse(unittest.TestCase):
|
||||
"""Test crisis response generation."""
|
||||
|
||||
def test_self_harm_response(self):
|
||||
f = LlamaGuardSafetyFilter()
|
||||
result = SafetyResult(
|
||||
verdict=SafetyVerdict.UNSAFE,
|
||||
categories=[{"code": "S11", "name": "Self-Harm"}],
|
||||
latency_ms=100.0
|
||||
)
|
||||
response = f.get_crisis_response(result)
|
||||
|
||||
self.assertIn("988", response)
|
||||
self.assertIn("safe", response.lower())
|
||||
self.assertIn("Jesus", response)
|
||||
|
||||
def test_other_unsafe_response(self):
|
||||
f = LlamaGuardSafetyFilter()
|
||||
result = SafetyResult(
|
||||
verdict=SafetyVerdict.UNSAFE,
|
||||
categories=[{"code": "S1", "name": "Violent Crimes"}],
|
||||
latency_ms=100.0
|
||||
)
|
||||
response = f.get_crisis_response(result)
|
||||
|
||||
self.assertIn("can't help", response.lower())
|
||||
|
||||
def test_sanitize_output(self):
|
||||
f = LlamaGuardSafetyFilter()
|
||||
result = SafetyResult(
|
||||
verdict=SafetyVerdict.UNSAFE,
|
||||
categories=[],
|
||||
latency_ms=100.0
|
||||
)
|
||||
sanitized = f.sanitize_output("dangerous content", result)
|
||||
|
||||
self.assertNotEqual(sanitized, "dangerous content")
|
||||
self.assertIn("can't provide", sanitized.lower())
|
||||
|
||||
|
||||
class TestAvailability(unittest.TestCase):
|
||||
"""Test availability checking."""
|
||||
|
||||
def test_unavailable_returns_error(self):
|
||||
f = LlamaGuardSafetyFilter()
|
||||
f._available = False
|
||||
|
||||
result = f.check_input("hello")
|
||||
self.assertEqual(result.verdict, SafetyVerdict.ERROR)
|
||||
|
||||
|
||||
class TestIntegration(unittest.TestCase):
|
||||
"""Test integration functions."""
|
||||
|
||||
def test_check_input_safety_safe(self):
|
||||
with patch('agent.safety_filter.get_safety_filter') as mock_get:
|
||||
mock_filter = MagicMock()
|
||||
mock_filter.check_input.return_value = SafetyResult(
|
||||
verdict=SafetyVerdict.SAFE, categories=[], latency_ms=50.0
|
||||
)
|
||||
mock_get.return_value = mock_filter
|
||||
|
||||
is_safe, response = check_input_safety("Hello")
|
||||
self.assertTrue(is_safe)
|
||||
self.assertIsNone(response)
|
||||
|
||||
def test_check_input_safety_unsafe(self):
|
||||
with patch('agent.safety_filter.get_safety_filter') as mock_get:
|
||||
mock_filter = MagicMock()
|
||||
mock_filter.check_input.return_value = SafetyResult(
|
||||
verdict=SafetyVerdict.UNSAFE,
|
||||
categories=[{"code": "S11", "name": "Self-Harm"}],
|
||||
latency_ms=50.0
|
||||
)
|
||||
mock_filter.get_crisis_response.return_value = "Crisis response"
|
||||
mock_get.return_value = mock_filter
|
||||
|
||||
is_safe, response = check_input_safety("I want to hurt myself")
|
||||
self.assertFalse(is_safe)
|
||||
self.assertEqual(response, "Crisis response")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
97
tests/test_memory_query_router.py
Normal file
97
tests/test_memory_query_router.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""
|
||||
Tests for hybrid memory query router
|
||||
|
||||
Issue: #663
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from tools.memory_query_router import (
|
||||
SearchMethod,
|
||||
QueryRouter,
|
||||
route_query,
|
||||
reciprocal_rank_fusion,
|
||||
merge_with_hrr_priority,
|
||||
)
|
||||
|
||||
|
||||
class TestQueryClassification(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.router = QueryRouter()
|
||||
|
||||
def test_contradiction_routes_hrr(self):
|
||||
c = self.router.classify("What contradicts this statement?")
|
||||
self.assertEqual(c.method, SearchMethod.HRR)
|
||||
self.assertGreater(c.confidence, 0.9)
|
||||
|
||||
def test_compositional_routes_hrr(self):
|
||||
c = self.router.classify("How does Python relate to machine learning?")
|
||||
self.assertEqual(c.method, SearchMethod.HRR)
|
||||
|
||||
c = self.router.classify("What is associated with quantum computing?")
|
||||
self.assertEqual(c.method, SearchMethod.HRR)
|
||||
|
||||
def test_exact_keywords_routes_fts5(self):
|
||||
c = self.router.classify('Find documents containing "FastAPI tutorial"')
|
||||
self.assertEqual(c.method, SearchMethod.FTS5)
|
||||
|
||||
def test_short_query_routes_fts5(self):
|
||||
c = self.router.classify("Python syntax")
|
||||
self.assertEqual(c.method, SearchMethod.FTS5)
|
||||
|
||||
def test_temporal_routes_fts5(self):
|
||||
c = self.router.classify("Recent changes to the config")
|
||||
self.assertEqual(c.method, SearchMethod.FTS5)
|
||||
|
||||
def test_semantic_routes_vector(self):
|
||||
c = self.router.classify("Explain how transformers work in natural language processing")
|
||||
self.assertEqual(c.method, SearchMethod.VECTOR)
|
||||
|
||||
|
||||
class TestReciprocalRankFusion(unittest.TestCase):
|
||||
|
||||
def test_basic_fusion(self):
|
||||
results = {
|
||||
"hrr": [("a", 0.9), ("b", 0.8)],
|
||||
"vector": [("b", 0.85), ("c", 0.7)],
|
||||
}
|
||||
merged = reciprocal_rank_fusion(results)
|
||||
|
||||
# 'b' appears in both, should rank high
|
||||
ids = [r[0] for r in merged]
|
||||
self.assertIn("b", ids[:2])
|
||||
|
||||
def test_empty_results(self):
|
||||
merged = reciprocal_rank_fusion({})
|
||||
self.assertEqual(len(merged), 0)
|
||||
|
||||
|
||||
class TestHRRPriority(unittest.TestCase):
|
||||
|
||||
def test_compositional_hrr_first(self):
|
||||
hrr = [("a", 0.9), ("b", 0.8)]
|
||||
vector = [("c", 0.85), ("d", 0.7)]
|
||||
fts5 = [("e", 0.6)]
|
||||
|
||||
merged = merge_with_hrr_priority(hrr, vector, fts5, "compositional")
|
||||
|
||||
# HRR results should come first
|
||||
self.assertEqual(merged[0][0], "a")
|
||||
self.assertEqual(merged[1][0], "b")
|
||||
|
||||
|
||||
class TestHybridDecision(unittest.TestCase):
|
||||
|
||||
def test_low_confidence_uses_hybrid(self):
|
||||
from tools.memory_query_router import should_use_hybrid
|
||||
# Ambiguous query
|
||||
self.assertTrue(should_use_hybrid("Tell me about things"))
|
||||
|
||||
def test_clear_query_no_hybrid(self):
|
||||
from tools.memory_query_router import should_use_hybrid
|
||||
# Clear contradiction query
|
||||
self.assertFalse(should_use_hybrid("What contradicts X?"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
209
tools/memory_query_router.py
Normal file
209
tools/memory_query_router.py
Normal file
@@ -0,0 +1,209 @@
|
||||
"""
|
||||
Hybrid Memory Query Router
|
||||
|
||||
Routes queries to the best search method:
|
||||
- HRR: Compositional/conceptual queries
|
||||
- Vector: Semantic similarity
|
||||
- FTS5: Exact keyword matching
|
||||
|
||||
Issue: #663
|
||||
"""
|
||||
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
class SearchMethod(Enum):
|
||||
"""Available search methods."""
|
||||
HRR = "hrr" # Holographic Reduced Representations
|
||||
VECTOR = "vector" # Semantic vector search
|
||||
FTS5 = "fts5" # Full-text search (SQLite)
|
||||
HYBRID = "hybrid" # Combine multiple methods
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueryClassification:
|
||||
"""Result of query classification."""
|
||||
method: SearchMethod
|
||||
confidence: float
|
||||
reason: str
|
||||
sub_queries: Optional[List[str]] = None
|
||||
|
||||
|
||||
# Query patterns for routing
|
||||
COMPOSITIONAL_PATTERNS = [
|
||||
r"(?i)\brelated\s+to\b",
|
||||
r"(?i)\bcombined\s+with\b",
|
||||
r"(?i)\bbound\s+to\b",
|
||||
r"(?i)\bassociated\s+with\b",
|
||||
r"(?i)\bwhat\s+connects?\b",
|
||||
r"(?i)\bhow\s+.*\s+relate\b",
|
||||
r"(?i)\brelationship\s+between\b",
|
||||
]
|
||||
|
||||
CONTRADICTION_PATTERNS = [
|
||||
r"(?i)\bcontradicts?\b",
|
||||
r"(?i)\bconflicts?\s+with\b",
|
||||
r"(?i)\binconsistent\b",
|
||||
r"(?i)\bopposite\s+of\b",
|
||||
r"(?i)\bopposes?\b",
|
||||
r"(?i)\bdisagrees?\s+with\b",
|
||||
]
|
||||
|
||||
EXACT_KEYWORD_PATTERNS = [
|
||||
r'"[^"]+"', # Quoted phrases
|
||||
r"'[^']+'", # Single-quoted phrases
|
||||
r"(?i)\bexact\b",
|
||||
r"(?i)\bprecisely\b",
|
||||
r"(?i)\bspecifically\b",
|
||||
]
|
||||
|
||||
TEMPORAL_PATTERNS = [
|
||||
r"(?i)\brecent\b",
|
||||
r"(?i)\btoday\b",
|
||||
r"(?i)\byesterday\b",
|
||||
r"(?i)\blast\s+(week|month|hour)\b",
|
||||
r"(?i)\bsince\b",
|
||||
r"(?i)\bbefore\b",
|
||||
r"(?i)\bafter\b",
|
||||
]
|
||||
|
||||
|
||||
class QueryRouter:
|
||||
"""Route queries to the best search method."""
|
||||
|
||||
def classify(self, query: str) -> QueryClassification:
|
||||
"""Classify a query and route to best method."""
|
||||
|
||||
# Check for contradiction queries (HRR)
|
||||
for pattern in CONTRADICTION_PATTERNS:
|
||||
if re.search(pattern, query):
|
||||
return QueryClassification(
|
||||
method=SearchMethod.HRR,
|
||||
confidence=0.95,
|
||||
reason="Contradiction detection query"
|
||||
)
|
||||
|
||||
# Check for compositional queries (HRR)
|
||||
for pattern in COMPOSITIONAL_PATTERNS:
|
||||
if re.search(pattern, query):
|
||||
return QueryClassification(
|
||||
method=SearchMethod.HRR,
|
||||
confidence=0.90,
|
||||
reason="Compositional/conceptual query"
|
||||
)
|
||||
|
||||
# Check for exact keyword queries (FTS5)
|
||||
for pattern in EXACT_KEYWORD_PATTERNS:
|
||||
if re.search(pattern, query):
|
||||
return QueryClassification(
|
||||
method=SearchMethod.FTS5,
|
||||
confidence=0.85,
|
||||
reason="Exact keyword query"
|
||||
)
|
||||
|
||||
# Check for temporal queries (FTS5)
|
||||
for pattern in TEMPORAL_PATTERNS:
|
||||
if re.search(pattern, query):
|
||||
return QueryClassification(
|
||||
method=SearchMethod.FTS5,
|
||||
confidence=0.80,
|
||||
reason="Temporal query"
|
||||
)
|
||||
|
||||
# Short queries tend to be keyword searches
|
||||
if len(query.split()) <= 3:
|
||||
return QueryClassification(
|
||||
method=SearchMethod.FTS5,
|
||||
confidence=0.70,
|
||||
reason="Short query (likely keyword)"
|
||||
)
|
||||
|
||||
# Default: vector search for semantic queries
|
||||
return QueryClassification(
|
||||
method=SearchMethod.VECTOR,
|
||||
confidence=0.60,
|
||||
reason="Semantic similarity query"
|
||||
)
|
||||
|
||||
def should_use_hybrid(self, query: str) -> bool:
|
||||
"""Check if query should use hybrid search."""
|
||||
classification = self.classify(query)
|
||||
|
||||
# Low confidence -> use hybrid
|
||||
if classification.confidence < 0.70:
|
||||
return True
|
||||
|
||||
# Mixed signals -> use hybrid
|
||||
has_compositional = any(re.search(p, query) for p in COMPOSITIONAL_PATTERNS)
|
||||
has_keywords = any(re.search(p, query) for p in EXACT_KEYWORD_PATTERNS)
|
||||
|
||||
return has_compositional and has_keywords
|
||||
|
||||
|
||||
def reciprocal_rank_fusion(
|
||||
results: Dict[str, List[Tuple[str, float]]],
|
||||
k: int = 60
|
||||
) -> List[Tuple[str, float]]:
|
||||
"""
|
||||
Merge results using Reciprocal Rank Fusion.
|
||||
|
||||
Args:
|
||||
results: Dict of method -> [(item_id, score), ...]
|
||||
k: RRF constant (default 60)
|
||||
|
||||
Returns:
|
||||
Merged and re-ranked results
|
||||
"""
|
||||
scores = defaultdict(float)
|
||||
|
||||
for method, ranked_items in results.items():
|
||||
for rank, (item_id, _) in enumerate(ranked_items, 1):
|
||||
scores[item_id] += 1.0 / (k + rank)
|
||||
|
||||
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
|
||||
|
||||
|
||||
def merge_with_hrr_priority(
|
||||
hrr_results: List[Tuple[str, float]],
|
||||
vector_results: List[Tuple[str, float]],
|
||||
fts5_results: List[Tuple[str, float]],
|
||||
query_type: str = "default"
|
||||
) -> List[Tuple[str, float]]:
|
||||
"""
|
||||
Merge results with HRR priority for compositional queries.
|
||||
"""
|
||||
if query_type == "compositional":
|
||||
# HRR first, vector as supplement
|
||||
merged = hrr_results[:5]
|
||||
seen = {r[0] for r in merged}
|
||||
|
||||
for r in vector_results[:5]:
|
||||
if r[0] not in seen:
|
||||
merged.append(r)
|
||||
|
||||
return merged
|
||||
|
||||
# Default: RRF merge
|
||||
return reciprocal_rank_fusion({
|
||||
"hrr": hrr_results,
|
||||
"vector": vector_results,
|
||||
"fts5": fts5_results
|
||||
})
|
||||
|
||||
|
||||
# Module-level router
|
||||
_router = QueryRouter()
|
||||
|
||||
|
||||
def route_query(query: str) -> QueryClassification:
|
||||
"""Route a query to the best search method."""
|
||||
return _router.classify(query)
|
||||
|
||||
|
||||
def should_use_hybrid(query: str) -> bool:
|
||||
"""Check if query should use hybrid search."""
|
||||
return _router.should_use_hybrid(query)
|
||||
Reference in New Issue
Block a user