Compare commits

..

3 Commits

Author SHA1 Message Date
6eec68d8e8 test: Add query router tests (#663)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 55s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 45s
Tests / e2e (pull_request) Successful in 3m4s
Tests / test (pull_request) Failing after 52m30s
2026-04-15 04:11:47 +00:00
3e2a003ee4 feat: Add hybrid query router (#663) 2026-04-15 04:09:13 +00:00
1db6addf91 docs: Add holographic + vector hybrid research (#663) 2026-04-15 04:08:41 +00:00
6 changed files with 571 additions and 683 deletions

View File

@@ -1,353 +0,0 @@
"""Multi-Agent Conversation Bridge via Matrix.
Allows multiple Hermes instances (Timmy, Allegro, Ezra) to communicate
with each other through a shared Matrix room.
Usage:
from agent.matrix_bridge import MatrixBridge
bridge = MatrixBridge(agent_name="Timmy")
await bridge.connect()
await bridge.send_to_agent("Allegro", "Check the deployment status")
messages = await bridge.get_messages_from("Allegro")
"""
import asyncio
import json
import logging
import os
import re
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
# Configuration
MATRIX_BRIDGE_ROOM = os.environ.get("MATRIX_BRIDGE_ROOM", "")
MATRIX_BRIDGE_ENABLED = os.environ.get("MATRIX_BRIDGE_ENABLED", "true").lower() == "true"
AGENT_NAME = os.environ.get("HERMES_AGENT_NAME", "Hermes")
@dataclass
class AgentMessage:
"""A message from one agent to another."""
sender: str
recipient: str
content: str
timestamp: float = field(default_factory=time.time)
message_id: str = ""
room_id: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"sender": self.sender,
"recipient": self.recipient,
"content": self.content,
"timestamp": self.timestamp,
"message_id": self.message_id,
"room_id": self.room_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AgentMessage":
return cls(**data)
class MatrixBridge:
"""Multi-agent conversation bridge via Matrix rooms.
Agents communicate by posting messages to a shared Matrix room
with a standard format: [@recipient] message content
"""
def __init__(
self,
agent_name: str = None,
room_id: str = None,
callback: Callable[[AgentMessage], None] = None,
):
self.agent_name = agent_name or AGENT_NAME
self.room_id = room_id or MATRIX_BRIDGE_ROOM
self.callback = callback
self._matrix_client = None
self._running = False
self._message_handlers: List[Callable[[AgentMessage], None]] = []
self._pending_messages: List[AgentMessage] = []
self._known_agents: Set[str] = set()
async def connect(self) -> bool:
"""Connect to Matrix and join the bridge room."""
if not MATRIX_BRIDGE_ENABLED:
logger.info("Matrix bridge disabled via MATRIX_BRIDGE_ENABLED=false")
return False
if not self.room_id:
logger.warning("No MATRIX_BRIDGE_ROOM configured — bridge disabled")
return False
try:
# Import Matrix client
from mautrix.client import Client
from mautrix.types import RoomID, UserID
# Get credentials
homeserver = os.environ.get("MATRIX_HOMESERVER", "")
access_token = os.environ.get("MATRIX_ACCESS_TOKEN", "")
if not homeserver or not access_token:
logger.warning("Matrix credentials not configured — bridge disabled")
return False
# Create client
self._matrix_client = Client(
mxid=UserID(f"@{self.agent_name}:{homeserver.split('//')[1]}"),
base_url=homeserver,
token=access_token,
)
# Join room
await self._matrix_client.join_room(RoomID(self.room_id))
logger.info(f"Agent {self.agent_name} joined bridge room {self.room_id}")
# Register message handler
self._matrix_client.add_event_handler(self._on_message)
# Start sync
self._running = True
asyncio.create_task(self._sync_loop())
# Announce presence
await self._announce_presence()
return True
except Exception as e:
logger.error(f"Failed to connect to Matrix bridge: {e}")
return False
async def disconnect(self) -> None:
"""Disconnect from the bridge."""
self._running = False
if self._matrix_client:
try:
await self._matrix_client.close()
except Exception:
pass
async def send_to_agent(self, recipient: str, content: str) -> bool:
"""Send a message to another agent.
Args:
recipient: Agent name (e.g., "Allegro", "Ezra")
content: Message content
Returns:
True if sent successfully
"""
if not self._matrix_client or not self.room_id:
logger.warning("Not connected to bridge room")
return False
# Format message with recipient prefix
formatted = f"[@{recipient}] {content}"
try:
from mautrix.types import RoomID, TextMessageEventContent, MessageType
await self._matrix_client.send_message_event(
room_id=RoomID(self.room_id),
event_type="m.room.message",
content=TextMessageEventContent(
msgtype=MessageType.TEXT,
body=formatted,
),
)
logger.info(f"Sent message to {recipient}: {content[:50]}...")
return True
except Exception as e:
logger.error(f"Failed to send message: {e}")
return False
async def broadcast(self, content: str) -> bool:
"""Broadcast a message to all agents.
Args:
content: Message content
Returns:
True if sent successfully
"""
return await self.send_to_agent("*", content)
def add_handler(self, handler: Callable[[AgentMessage], None]) -> None:
"""Add a message handler.
Called when a message is received for this agent.
"""
self._message_handlers.append(handler)
def get_known_agents(self) -> Set[str]:
"""Get set of known agents in the bridge."""
return self._known_agents.copy()
async def _on_message(self, event) -> None:
"""Handle incoming Matrix message."""
try:
# Extract message content
content = event.content
if not hasattr(content, 'body'):
return
body = content.body
# Check if message is for this agent
if not self._is_for_me(body):
return
# Parse sender and content
sender = self._extract_sender(event)
message_content = self._extract_content(body)
# Create agent message
msg = AgentMessage(
sender=sender,
recipient=self.agent_name,
content=message_content,
timestamp=time.time(),
message_id=str(event.event_id),
room_id=str(event.room_id),
)
# Track known agents
self._known_agents.add(sender)
# Call handlers
for handler in self._message_handlers:
try:
handler(msg)
except Exception as e:
logger.error(f"Message handler error: {e}")
if self.callback:
try:
self.callback(msg)
except Exception as e:
logger.error(f"Callback error: {e}")
logger.info(f"Received message from {sender}: {message_content[:50]}...")
except Exception as e:
logger.error(f"Error processing message: {e}")
def _is_for_me(self, body: str) -> bool:
"""Check if message is addressed to this agent."""
# Direct mention
if f"[@{self.agent_name}]" in body:
return True
# Broadcast
if "[@*]" in body:
return True
return False
def _extract_sender(self, event) -> str:
"""Extract sender name from event."""
try:
sender_id = str(event.sender)
# Extract name from @name:server format
match = re.match(r"@([^:]+):", sender_id)
if match:
return match.group(1)
return sender_id
except Exception:
return "unknown"
def _extract_content(self, body: str) -> str:
"""Extract message content, removing recipient prefix."""
# Remove [@recipient] prefix
match = re.match(r"\[@[^\]]+\]\s*(.*)", body, re.DOTALL)
if match:
return match.group(1).strip()
return body.strip()
async def _announce_presence(self) -> None:
"""Announce this agent's presence to the bridge."""
await self.broadcast(f"{self.agent_name} online")
async def _sync_loop(self) -> None:
"""Background sync loop for Matrix events."""
while self._running:
try:
if self._matrix_client:
await self._matrix_client.sync(timeout=30000)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Sync error: {e}")
await asyncio.sleep(5)
class AgentRegistry:
"""Registry of known agents in the bridge."""
def __init__(self):
self._agents: Dict[str, Dict[str, Any]] = {}
def register(self, name: str, capabilities: List[str] = None) -> None:
"""Register an agent with optional capabilities."""
self._agents[name] = {
"name": name,
"capabilities": capabilities or [],
"last_seen": time.time(),
"status": "online",
}
def unregister(self, name: str) -> None:
"""Unregister an agent."""
if name in self._agents:
self._agents[name]["status"] = "offline"
def get_agent(self, name: str) -> Optional[Dict[str, Any]]:
"""Get agent info by name."""
return self._agents.get(name)
def list_agents(self) -> List[Dict[str, Any]]:
"""List all registered agents."""
return list(self._agents.values())
def find_agents_with_capability(self, capability: str) -> List[str]:
"""Find agents with a specific capability."""
return [
name for name, info in self._agents.items()
if capability in info.get("capabilities", [])
]
# Global bridge instance
_bridge: Optional[MatrixBridge] = None
async def get_bridge(agent_name: str = None) -> MatrixBridge:
"""Get or create the global Matrix bridge instance."""
global _bridge
if _bridge is None:
_bridge = MatrixBridge(agent_name=agent_name)
await _bridge.connect()
return _bridge
async def send_to_agent(recipient: str, content: str) -> bool:
"""Convenience function to send a message to another agent."""
bridge = await get_bridge()
return await bridge.send_to_agent(recipient, content)
async def broadcast_to_agents(content: str) -> bool:
"""Convenience function to broadcast to all agents."""
bridge = await get_bridge()
return await bridge.broadcast(content)

View File

@@ -0,0 +1,265 @@
# Holographic + Vector Hybrid Memory Architecture
**Issue:** #663 — Research: Combining HRR Compositional Queries with Semantic Search
**Date:** 2026-04-14
## Executive Summary
The optimal memory architecture is a **hybrid** combining three methods:
- **HRR (Holographic Reduced Representations)** — Compositional reasoning
- **Vector Search (Qdrant)** — Semantic similarity
- **FTS5 (SQLite Full-Text Search)** — Exact keyword matching
No single method covers all use cases. Each excels at different query types.
## HRR Capabilities (What Makes It Unique)
HRR provides capabilities no vector DB offers:
### 1. Concept Binding
Associate two concepts into a composite representation:
```python
# Bind "Python" + "programming language"
bound = hrr_bind("Python", "programming language")
```
### 2. Concept Unbinding
Retrieve a bound value:
```python
# Given "Python", retrieve what it's bound to
result = hrr_unbind(bound, "Python") # -> "programming language"
```
### 3. Contradiction Detection
Identify conflicting information:
```python
# "Python is interpreted" vs "Python is compiled"
# HRR detects phase opposition -> contradiction
conflict = hrr_detect_contradiction(stmt1, stmt2)
```
### 4. Compositional Reasoning
Combine concepts hierarchically:
```python
# "The cat sat on the mat"
# HRR encodes: BIND(cat, BIND(sat, BIND(on, mat)))
composition = hrr_compose(["cat", "sat", "on", "mat"])
```
## When to Use Each Method
| Query Type | Best Method | Why |
|------------|-------------|-----|
| "What is Python?" | Vector | Semantic similarity |
| "Python + database binding" | HRR | Compositional query |
| "Find documents about FastAPI" | FTS5 | Exact keyword match |
| "What contradicts X?" | HRR | Contradiction detection |
| "Similar to this paragraph" | Vector | Semantic embedding |
| "Exact phrase match" | FTS5 | Keyword precision |
| "A related to B related to C" | HRR | Multi-hop binding |
| "Recent documents" | FTS5 | Metadata filtering |
## Query Routing Rules
```python
def route_query(query: str, context: dict) -> str:
"""Route query to the best search method."""
# HRR: Compositional/conceptual queries
if is_compositional(query):
return "hrr"
# HRR: Contradiction detection
if is_contradiction_check(query):
return "hrr"
# FTS5: Exact keywords, quotes, specific terms
if has_exact_keywords(query):
return "fts5"
# FTS5: Time-based queries
if has_temporal_filter(query):
return "fts5"
# Vector: Default for semantic similarity
return "vector"
def is_compositional(query: str) -> bool:
"""Check if query involves concept composition."""
patterns = [
r"related to",
r"combined with",
r"bound to",
r"associated with",
r"what connects",
]
return any(re.search(p, query.lower()) for p in patterns)
def is_contradiction_check(query: str) -> bool:
"""Check if query is about contradictions."""
patterns = [
r"contradicts?",
r"conflicts? with",
r"inconsistent",
r"opposite of",
]
return any(re.search(p, query.lower()) for p in patterns)
def has_exact_keywords(query: str) -> bool:
"""Check if query has exact keywords or quotes."""
return '"' in query or "'" in query or len(query.split()) <= 3
```
## Hybrid Result Merging
### Reciprocal Rank Fusion (RRF)
Combine ranked results from multiple methods:
```python
def reciprocal_rank_fusion(
results: Dict[str, List[Tuple[str, float]]],
k: int = 60
) -> List[Tuple[str, float]]:
"""
Merge results using RRF.
Args:
results: {"hrr": [(id, score), ...], "vector": [...], "fts5": [...]}
k: RRF constant (default 60)
Returns:
Merged and re-ranked results
"""
scores = defaultdict(float)
for method, ranked_items in results.items():
for rank, (item_id, _) in enumerate(ranked_items, 1):
scores[item_id] += 1.0 / (k + rank)
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
```
### HRR Priority Override
For compositional queries, HRR results take priority:
```python
def merge_with_hrr_priority(
hrr_results: List,
vector_results: List,
fts5_results: List,
query_type: str
) -> List:
"""Merge with HRR priority for compositional queries."""
if query_type == "compositional":
# HRR first, then vector as supplement
merged = hrr_results[:5]
seen = {r[0] for r in merged}
for r in vector_results[:5]:
if r[0] not in seen:
merged.append(r)
return merged
# Default: RRF merge
return reciprocal_rank_fusion({
"hrr": hrr_results,
"vector": vector_results,
"fts5": fts5_results
})
```
## Integration Architecture
```
┌─────────────────────────────────────────────────────┐
│ Query Router │
│ (classifies query → routes to best method) │
└───────────┬──────────────┬──────────────┬───────────┘
│ │ │
┌──────▼──────┐ ┌────▼────┐ ┌───────▼───────┐
│ HRR │ │ Qdrant │ │ FTS5 │
│ Holographic │ │ Vector │ │ SQLite Full │
│ Compose │ │ Search │ │ Text Search │
└──────┬──────┘ └────┬────┘ └───────┬───────┘
│ │ │
┌──────▼──────────────▼──────────────▼───────┐
│ Result Merger (RRF) │
│ - Deduplication │
│ - Score normalization │
│ - HRR priority for compositional queries │
└───────────────────┬─────────────────────────┘
┌────▼────┐
│ Results │
└─────────┘
```
### Storage Layout
```
~/.hermes/memory/
├── holographic/
│ ├── hrr_store.pkl # HRR vectors (numpy arrays)
│ ├── bindings.pkl # Concept bindings
│ └── contradictions.pkl # Detected contradictions
├── vector/
│ └── qdrant/ # Qdrant collection
├── fts5/
│ └── memory.db # SQLite with FTS5
└── index.json # Unified index
```
## Preserving HRR Unique Capabilities
### Rules
1. **Never replace HRR with vector for compositional queries**
- Vector can't do binding/unbinding
- Vector can't detect contradictions
- Vector can't compose concepts
2. **HRR is primary for relational queries**
- "What relates X to Y?"
- "What contradicts this?"
- "Combine concept A with concept B"
3. **Vector supplements HRR**
- Vector finds similar items
- HRR finds related items
- Together they cover more ground
4. **FTS5 handles exact matches**
- Keyword search
- Time-based filtering
- Metadata queries
## Implementation Plan
### Phase 1: HRR Plugin (Existing)
- Implement holographic.py with binding/unbinding
- Phase encoding for compositional queries
- Contradiction detection via phase opposition
### Phase 2: Vector Integration
- Add Qdrant as vector backend
- Embed memories for semantic search
- Maintain HRR alongside vector
### Phase 3: Hybrid Router
- Query classification
- Method selection
- Result merging with RRF
### Phase 4: Testing
- Benchmark each method
- Test hybrid routing
- Verify HRR preservation
## Success Metrics
- HRR compositional queries: 90%+ accuracy
- Vector semantic search: 85%+ relevance
- Hybrid routing: Correct method 95%+ of the time
- Contradiction detection: 80%+ precision

View File

@@ -1,216 +0,0 @@
# Multi-Agent Conversation Bridge
Allows multiple Hermes instances (Timmy, Allegro, Ezra) to communicate with each other through a shared Matrix room.
## Overview
The Matrix Bridge enables agent-to-agent coordination without manual intervention. Agents can:
- Send tasks to specific agents
- Broadcast to all agents
- Respond to requests from other agents
- Coordinate on complex workflows
## Configuration
### Environment Variables
```bash
# Enable/disable the bridge
MATRIX_BRIDGE_ENABLED=true
# Shared Matrix room ID for agent communication
MATRIX_BRIDGE_ROOM=!roomid:matrix.example.org
# Agent name (for message routing)
HERMES_AGENT_NAME=Timmy
# Matrix credentials (from existing Matrix gateway config)
MATRIX_HOMESERVER=https://matrix.example.org
MATRIX_ACCESS_TOKEN=syt_...
```
### Matrix Room Setup
1. Create a Matrix room for agent communication
2. Invite all agent accounts to the room
3. Set `MATRIX_BRIDGE_ROOM` to the room ID
## Message Format
Messages use a simple prefix format for routing:
```
[@Allegro] Check the deployment status on VPS
[@Ezra] Can you review PR #456?
[@*] System maintenance in 5 minutes
```
- `[@AgentName]` — Message for specific agent
- `[@*]` — Broadcast to all agents
## Usage
### Basic Usage
```python
from agent.matrix_bridge import MatrixBridge, send_to_agent, broadcast_to_agents
# Create bridge
bridge = MatrixBridge(agent_name="Timmy")
await bridge.connect()
# Send to specific agent
await bridge.send_to_agent("Allegro", "Check deployment status")
# Broadcast to all agents
await bridge.broadcast("System maintenance starting")
# Add message handler
def handle_message(msg):
print(f"From {msg.sender}: {msg.content}")
bridge.add_handler(handle_message)
```
### Convenience Functions
```python
from agent.matrix_bridge import send_to_agent, broadcast_to_agents
# Send message
await send_to_agent("Ezra", "Review PR #456")
# Broadcast
await broadcast_to_agents("Going offline for maintenance")
```
### Agent Registry
```python
from agent.matrix_bridge import AgentRegistry
registry = AgentRegistry()
# Register agent with capabilities
registry.register("Timmy", capabilities=["code", "review", "deploy"])
registry.register("Allegro", capabilities=["monitoring", "alerting"])
# Find agents with capability
coders = registry.find_agents_with_capability("code")
```
## Message Flow
```
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Timmy │────▶│ Matrix │────▶│ Allegro │
│ Agent │ │ Room │ │ Agent │
└─────────┘ └─────────┘ └─────────┘
│ │ │
│ [@Allegro] │ │
│ Check deps │ │
└──────────────▶│ │
│ [@Allegro] │
│ Check deps │
└──────────────▶│
│ [@Timmy] │
│ Done ✓ │
│◀──────────────┘
│ [@Timmy] │
│ Done ✓ │
│◀──────────────┘
```
## Integration with Hermes
### In run_agent.py
```python
# Add to conversation loop
if self.matrix_bridge:
# Check for messages from other agents
messages = await self.matrix_bridge.get_pending_messages()
for msg in messages:
# Process agent-to-agent messages
pass
```
### In Gateway
```python
# Add Matrix bridge to gateway
from agent.matrix_bridge import MatrixBridge
bridge = MatrixBridge(agent_name="Timmy")
await bridge.connect()
gateway.matrix_bridge = bridge
```
## Testing
### Unit Tests
```python
def test_message_parsing():
"""Test message format parsing."""
from agent.matrix_bridge import MatrixBridge
bridge = MatrixBridge(agent_name="Timmy")
# Test recipient extraction
assert bridge._is_for_me("[@Timmy] Hello")
assert not bridge._is_for_me("[@Allegro] Hello")
assert bridge._is_for_me("[@*] Broadcast")
# Test content extraction
assert bridge._extract_content("[@Timmy] Hello") == "Hello"
assert bridge._extract_content("[@*] Test message") == "Test message"
```
### Integration Test
```bash
# Test with two agents
MATRIX_BRIDGE_ENABLED=true \
MATRIX_BRIDGE_ROOM=!test:matrix.example.org \
HERMES_AGENT_NAME=Timmy \
python -c "
import asyncio
from agent.matrix_bridge import send_to_agent
async def test():
await send_to_agent('Allegro', 'Test message')
print('Sent')
asyncio.run(test())
"
```
## Troubleshooting
### Bridge not connecting
1. Check `MATRIX_BRIDGE_ENABLED=true`
2. Verify `MATRIX_BRIDGE_ROOM` is set
3. Ensure Matrix credentials are configured
4. Check Matrix homeserver is reachable
### Messages not received
1. Verify agent is in the Matrix room
2. Check message format: `[@AgentName] content`
3. Ensure `HERMES_AGENT_NAME` matches agent name
4. Check Matrix sync is running
### Agent not found
1. Verify agent has joined the bridge room
2. Check agent name matches exactly (case-sensitive)
3. Ensure agent has announced presence
## Related
- Issue #747: feat: multi-agent conversation bridge via Matrix
- Matrix Gateway: `gateway/platforms/matrix.py`
- Multi-Agent Orchestration: `docs/multi-agent-orchestration.md`

View File

@@ -1,114 +0,0 @@
"""Tests for Matrix Bridge — Issue #747."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent.matrix_bridge import MatrixBridge, AgentMessage, AgentRegistry
class TestMessageParsing:
"""Test message format parsing."""
def test_is_for_me_direct(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._is_for_me("[@Timmy] Hello") == True
def test_is_not_for_me(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._is_for_me("[@Allegro] Hello") == False
def test_is_broadcast(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._is_for_me("[@*] Broadcast") == True
def test_extract_content(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._extract_content("[@Timmy] Hello world") == "Hello world"
def test_extract_content_multiline(self):
bridge = MatrixBridge(agent_name="Timmy")
content = bridge._extract_content("[@Timmy] Line 1\nLine 2")
assert content == "Line 1\nLine 2"
class TestAgentMessage:
"""Test AgentMessage dataclass."""
def test_to_dict(self):
msg = AgentMessage(
sender="Timmy",
recipient="Allegro",
content="Hello",
timestamp=1234567890.0,
)
d = msg.to_dict()
assert d["sender"] == "Timmy"
assert d["recipient"] == "Allegro"
assert d["content"] == "Hello"
def test_from_dict(self):
d = {
"sender": "Timmy",
"recipient": "Allegro",
"content": "Hello",
"timestamp": 1234567890.0,
"message_id": "",
"room_id": "",
}
msg = AgentMessage.from_dict(d)
assert msg.sender == "Timmy"
assert msg.recipient == "Allegro"
class TestAgentRegistry:
"""Test AgentRegistry."""
def test_register(self):
registry = AgentRegistry()
registry.register("Timmy", capabilities=["code", "review"])
agent = registry.get_agent("Timmy")
assert agent["name"] == "Timmy"
assert "code" in agent["capabilities"]
def test_list_agents(self):
registry = AgentRegistry()
registry.register("Timmy")
registry.register("Allegro")
agents = registry.list_agents()
assert len(agents) == 2
def test_find_with_capability(self):
registry = AgentRegistry()
registry.register("Timmy", capabilities=["code"])
registry.register("Allegro", capabilities=["monitoring"])
coders = registry.find_agents_with_capability("code")
assert "Timmy" in coders
assert "Allegro" not in coders
def test_unregister(self):
registry = AgentRegistry()
registry.register("Timmy")
registry.unregister("Timmy")
agent = registry.get_agent("Timmy")
assert agent["status"] == "offline"
class TestBridgeInit:
"""Test bridge initialization."""
def test_default_agent_name(self):
bridge = MatrixBridge()
assert bridge.agent_name == "Hermes"
def test_custom_agent_name(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge.agent_name == "Timmy"
def test_known_agents_empty(self):
bridge = MatrixBridge()
assert len(bridge.get_known_agents()) == 0
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,97 @@
"""
Tests for hybrid memory query router
Issue: #663
"""
import unittest
from tools.memory_query_router import (
SearchMethod,
QueryRouter,
route_query,
reciprocal_rank_fusion,
merge_with_hrr_priority,
)
class TestQueryClassification(unittest.TestCase):
def setUp(self):
self.router = QueryRouter()
def test_contradiction_routes_hrr(self):
c = self.router.classify("What contradicts this statement?")
self.assertEqual(c.method, SearchMethod.HRR)
self.assertGreater(c.confidence, 0.9)
def test_compositional_routes_hrr(self):
c = self.router.classify("How does Python relate to machine learning?")
self.assertEqual(c.method, SearchMethod.HRR)
c = self.router.classify("What is associated with quantum computing?")
self.assertEqual(c.method, SearchMethod.HRR)
def test_exact_keywords_routes_fts5(self):
c = self.router.classify('Find documents containing "FastAPI tutorial"')
self.assertEqual(c.method, SearchMethod.FTS5)
def test_short_query_routes_fts5(self):
c = self.router.classify("Python syntax")
self.assertEqual(c.method, SearchMethod.FTS5)
def test_temporal_routes_fts5(self):
c = self.router.classify("Recent changes to the config")
self.assertEqual(c.method, SearchMethod.FTS5)
def test_semantic_routes_vector(self):
c = self.router.classify("Explain how transformers work in natural language processing")
self.assertEqual(c.method, SearchMethod.VECTOR)
class TestReciprocalRankFusion(unittest.TestCase):
def test_basic_fusion(self):
results = {
"hrr": [("a", 0.9), ("b", 0.8)],
"vector": [("b", 0.85), ("c", 0.7)],
}
merged = reciprocal_rank_fusion(results)
# 'b' appears in both, should rank high
ids = [r[0] for r in merged]
self.assertIn("b", ids[:2])
def test_empty_results(self):
merged = reciprocal_rank_fusion({})
self.assertEqual(len(merged), 0)
class TestHRRPriority(unittest.TestCase):
def test_compositional_hrr_first(self):
hrr = [("a", 0.9), ("b", 0.8)]
vector = [("c", 0.85), ("d", 0.7)]
fts5 = [("e", 0.6)]
merged = merge_with_hrr_priority(hrr, vector, fts5, "compositional")
# HRR results should come first
self.assertEqual(merged[0][0], "a")
self.assertEqual(merged[1][0], "b")
class TestHybridDecision(unittest.TestCase):
def test_low_confidence_uses_hybrid(self):
from tools.memory_query_router import should_use_hybrid
# Ambiguous query
self.assertTrue(should_use_hybrid("Tell me about things"))
def test_clear_query_no_hybrid(self):
from tools.memory_query_router import should_use_hybrid
# Clear contradiction query
self.assertFalse(should_use_hybrid("What contradicts X?"))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,209 @@
"""
Hybrid Memory Query Router
Routes queries to the best search method:
- HRR: Compositional/conceptual queries
- Vector: Semantic similarity
- FTS5: Exact keyword matching
Issue: #663
"""
import re
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
class SearchMethod(Enum):
"""Available search methods."""
HRR = "hrr" # Holographic Reduced Representations
VECTOR = "vector" # Semantic vector search
FTS5 = "fts5" # Full-text search (SQLite)
HYBRID = "hybrid" # Combine multiple methods
@dataclass
class QueryClassification:
"""Result of query classification."""
method: SearchMethod
confidence: float
reason: str
sub_queries: Optional[List[str]] = None
# Query patterns for routing
COMPOSITIONAL_PATTERNS = [
r"(?i)\brelated\s+to\b",
r"(?i)\bcombined\s+with\b",
r"(?i)\bbound\s+to\b",
r"(?i)\bassociated\s+with\b",
r"(?i)\bwhat\s+connects?\b",
r"(?i)\bhow\s+.*\s+relate\b",
r"(?i)\brelationship\s+between\b",
]
CONTRADICTION_PATTERNS = [
r"(?i)\bcontradicts?\b",
r"(?i)\bconflicts?\s+with\b",
r"(?i)\binconsistent\b",
r"(?i)\bopposite\s+of\b",
r"(?i)\bopposes?\b",
r"(?i)\bdisagrees?\s+with\b",
]
EXACT_KEYWORD_PATTERNS = [
r'"[^"]+"', # Quoted phrases
r"'[^']+'", # Single-quoted phrases
r"(?i)\bexact\b",
r"(?i)\bprecisely\b",
r"(?i)\bspecifically\b",
]
TEMPORAL_PATTERNS = [
r"(?i)\brecent\b",
r"(?i)\btoday\b",
r"(?i)\byesterday\b",
r"(?i)\blast\s+(week|month|hour)\b",
r"(?i)\bsince\b",
r"(?i)\bbefore\b",
r"(?i)\bafter\b",
]
class QueryRouter:
"""Route queries to the best search method."""
def classify(self, query: str) -> QueryClassification:
"""Classify a query and route to best method."""
# Check for contradiction queries (HRR)
for pattern in CONTRADICTION_PATTERNS:
if re.search(pattern, query):
return QueryClassification(
method=SearchMethod.HRR,
confidence=0.95,
reason="Contradiction detection query"
)
# Check for compositional queries (HRR)
for pattern in COMPOSITIONAL_PATTERNS:
if re.search(pattern, query):
return QueryClassification(
method=SearchMethod.HRR,
confidence=0.90,
reason="Compositional/conceptual query"
)
# Check for exact keyword queries (FTS5)
for pattern in EXACT_KEYWORD_PATTERNS:
if re.search(pattern, query):
return QueryClassification(
method=SearchMethod.FTS5,
confidence=0.85,
reason="Exact keyword query"
)
# Check for temporal queries (FTS5)
for pattern in TEMPORAL_PATTERNS:
if re.search(pattern, query):
return QueryClassification(
method=SearchMethod.FTS5,
confidence=0.80,
reason="Temporal query"
)
# Short queries tend to be keyword searches
if len(query.split()) <= 3:
return QueryClassification(
method=SearchMethod.FTS5,
confidence=0.70,
reason="Short query (likely keyword)"
)
# Default: vector search for semantic queries
return QueryClassification(
method=SearchMethod.VECTOR,
confidence=0.60,
reason="Semantic similarity query"
)
def should_use_hybrid(self, query: str) -> bool:
"""Check if query should use hybrid search."""
classification = self.classify(query)
# Low confidence -> use hybrid
if classification.confidence < 0.70:
return True
# Mixed signals -> use hybrid
has_compositional = any(re.search(p, query) for p in COMPOSITIONAL_PATTERNS)
has_keywords = any(re.search(p, query) for p in EXACT_KEYWORD_PATTERNS)
return has_compositional and has_keywords
def reciprocal_rank_fusion(
results: Dict[str, List[Tuple[str, float]]],
k: int = 60
) -> List[Tuple[str, float]]:
"""
Merge results using Reciprocal Rank Fusion.
Args:
results: Dict of method -> [(item_id, score), ...]
k: RRF constant (default 60)
Returns:
Merged and re-ranked results
"""
scores = defaultdict(float)
for method, ranked_items in results.items():
for rank, (item_id, _) in enumerate(ranked_items, 1):
scores[item_id] += 1.0 / (k + rank)
return sorted(scores.items(), key=lambda x: x[1], reverse=True)
def merge_with_hrr_priority(
hrr_results: List[Tuple[str, float]],
vector_results: List[Tuple[str, float]],
fts5_results: List[Tuple[str, float]],
query_type: str = "default"
) -> List[Tuple[str, float]]:
"""
Merge results with HRR priority for compositional queries.
"""
if query_type == "compositional":
# HRR first, vector as supplement
merged = hrr_results[:5]
seen = {r[0] for r in merged}
for r in vector_results[:5]:
if r[0] not in seen:
merged.append(r)
return merged
# Default: RRF merge
return reciprocal_rank_fusion({
"hrr": hrr_results,
"vector": vector_results,
"fts5": fts5_results
})
# Module-level router
_router = QueryRouter()
def route_query(query: str) -> QueryClassification:
"""Route a query to the best search method."""
return _router.classify(query)
def should_use_hybrid(query: str) -> bool:
"""Check if query should use hybrid search."""
return _router.should_use_hybrid(query)