Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy Time
4c045e24a2 fix: implement multi-agent conversation bridge via Matrix (closes #747)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 59s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 48s
Tests / e2e (pull_request) Successful in 2m55s
Tests / test (pull_request) Failing after 56m44s
2026-04-14 23:19:05 -04:00
7 changed files with 683 additions and 596 deletions

353
agent/matrix_bridge.py Normal file
View File

@@ -0,0 +1,353 @@
"""Multi-Agent Conversation Bridge via Matrix.
Allows multiple Hermes instances (Timmy, Allegro, Ezra) to communicate
with each other through a shared Matrix room.
Usage:
from agent.matrix_bridge import MatrixBridge
bridge = MatrixBridge(agent_name="Timmy")
await bridge.connect()
await bridge.send_to_agent("Allegro", "Check the deployment status")
messages = await bridge.get_messages_from("Allegro")
"""
import asyncio
import json
import logging
import os
import re
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
# Configuration
MATRIX_BRIDGE_ROOM = os.environ.get("MATRIX_BRIDGE_ROOM", "")
MATRIX_BRIDGE_ENABLED = os.environ.get("MATRIX_BRIDGE_ENABLED", "true").lower() == "true"
AGENT_NAME = os.environ.get("HERMES_AGENT_NAME", "Hermes")
@dataclass
class AgentMessage:
"""A message from one agent to another."""
sender: str
recipient: str
content: str
timestamp: float = field(default_factory=time.time)
message_id: str = ""
room_id: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"sender": self.sender,
"recipient": self.recipient,
"content": self.content,
"timestamp": self.timestamp,
"message_id": self.message_id,
"room_id": self.room_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AgentMessage":
return cls(**data)
class MatrixBridge:
"""Multi-agent conversation bridge via Matrix rooms.
Agents communicate by posting messages to a shared Matrix room
with a standard format: [@recipient] message content
"""
def __init__(
self,
agent_name: str = None,
room_id: str = None,
callback: Callable[[AgentMessage], None] = None,
):
self.agent_name = agent_name or AGENT_NAME
self.room_id = room_id or MATRIX_BRIDGE_ROOM
self.callback = callback
self._matrix_client = None
self._running = False
self._message_handlers: List[Callable[[AgentMessage], None]] = []
self._pending_messages: List[AgentMessage] = []
self._known_agents: Set[str] = set()
async def connect(self) -> bool:
"""Connect to Matrix and join the bridge room."""
if not MATRIX_BRIDGE_ENABLED:
logger.info("Matrix bridge disabled via MATRIX_BRIDGE_ENABLED=false")
return False
if not self.room_id:
logger.warning("No MATRIX_BRIDGE_ROOM configured — bridge disabled")
return False
try:
# Import Matrix client
from mautrix.client import Client
from mautrix.types import RoomID, UserID
# Get credentials
homeserver = os.environ.get("MATRIX_HOMESERVER", "")
access_token = os.environ.get("MATRIX_ACCESS_TOKEN", "")
if not homeserver or not access_token:
logger.warning("Matrix credentials not configured — bridge disabled")
return False
# Create client
self._matrix_client = Client(
mxid=UserID(f"@{self.agent_name}:{homeserver.split('//')[1]}"),
base_url=homeserver,
token=access_token,
)
# Join room
await self._matrix_client.join_room(RoomID(self.room_id))
logger.info(f"Agent {self.agent_name} joined bridge room {self.room_id}")
# Register message handler
self._matrix_client.add_event_handler(self._on_message)
# Start sync
self._running = True
asyncio.create_task(self._sync_loop())
# Announce presence
await self._announce_presence()
return True
except Exception as e:
logger.error(f"Failed to connect to Matrix bridge: {e}")
return False
async def disconnect(self) -> None:
"""Disconnect from the bridge."""
self._running = False
if self._matrix_client:
try:
await self._matrix_client.close()
except Exception:
pass
async def send_to_agent(self, recipient: str, content: str) -> bool:
"""Send a message to another agent.
Args:
recipient: Agent name (e.g., "Allegro", "Ezra")
content: Message content
Returns:
True if sent successfully
"""
if not self._matrix_client or not self.room_id:
logger.warning("Not connected to bridge room")
return False
# Format message with recipient prefix
formatted = f"[@{recipient}] {content}"
try:
from mautrix.types import RoomID, TextMessageEventContent, MessageType
await self._matrix_client.send_message_event(
room_id=RoomID(self.room_id),
event_type="m.room.message",
content=TextMessageEventContent(
msgtype=MessageType.TEXT,
body=formatted,
),
)
logger.info(f"Sent message to {recipient}: {content[:50]}...")
return True
except Exception as e:
logger.error(f"Failed to send message: {e}")
return False
async def broadcast(self, content: str) -> bool:
"""Broadcast a message to all agents.
Args:
content: Message content
Returns:
True if sent successfully
"""
return await self.send_to_agent("*", content)
def add_handler(self, handler: Callable[[AgentMessage], None]) -> None:
"""Add a message handler.
Called when a message is received for this agent.
"""
self._message_handlers.append(handler)
def get_known_agents(self) -> Set[str]:
"""Get set of known agents in the bridge."""
return self._known_agents.copy()
async def _on_message(self, event) -> None:
"""Handle incoming Matrix message."""
try:
# Extract message content
content = event.content
if not hasattr(content, 'body'):
return
body = content.body
# Check if message is for this agent
if not self._is_for_me(body):
return
# Parse sender and content
sender = self._extract_sender(event)
message_content = self._extract_content(body)
# Create agent message
msg = AgentMessage(
sender=sender,
recipient=self.agent_name,
content=message_content,
timestamp=time.time(),
message_id=str(event.event_id),
room_id=str(event.room_id),
)
# Track known agents
self._known_agents.add(sender)
# Call handlers
for handler in self._message_handlers:
try:
handler(msg)
except Exception as e:
logger.error(f"Message handler error: {e}")
if self.callback:
try:
self.callback(msg)
except Exception as e:
logger.error(f"Callback error: {e}")
logger.info(f"Received message from {sender}: {message_content[:50]}...")
except Exception as e:
logger.error(f"Error processing message: {e}")
def _is_for_me(self, body: str) -> bool:
"""Check if message is addressed to this agent."""
# Direct mention
if f"[@{self.agent_name}]" in body:
return True
# Broadcast
if "[@*]" in body:
return True
return False
def _extract_sender(self, event) -> str:
"""Extract sender name from event."""
try:
sender_id = str(event.sender)
# Extract name from @name:server format
match = re.match(r"@([^:]+):", sender_id)
if match:
return match.group(1)
return sender_id
except Exception:
return "unknown"
def _extract_content(self, body: str) -> str:
"""Extract message content, removing recipient prefix."""
# Remove [@recipient] prefix
match = re.match(r"\[@[^\]]+\]\s*(.*)", body, re.DOTALL)
if match:
return match.group(1).strip()
return body.strip()
async def _announce_presence(self) -> None:
"""Announce this agent's presence to the bridge."""
await self.broadcast(f"{self.agent_name} online")
async def _sync_loop(self) -> None:
"""Background sync loop for Matrix events."""
while self._running:
try:
if self._matrix_client:
await self._matrix_client.sync(timeout=30000)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Sync error: {e}")
await asyncio.sleep(5)
class AgentRegistry:
"""Registry of known agents in the bridge."""
def __init__(self):
self._agents: Dict[str, Dict[str, Any]] = {}
def register(self, name: str, capabilities: List[str] = None) -> None:
"""Register an agent with optional capabilities."""
self._agents[name] = {
"name": name,
"capabilities": capabilities or [],
"last_seen": time.time(),
"status": "online",
}
def unregister(self, name: str) -> None:
"""Unregister an agent."""
if name in self._agents:
self._agents[name]["status"] = "offline"
def get_agent(self, name: str) -> Optional[Dict[str, Any]]:
"""Get agent info by name."""
return self._agents.get(name)
def list_agents(self) -> List[Dict[str, Any]]:
"""List all registered agents."""
return list(self._agents.values())
def find_agents_with_capability(self, capability: str) -> List[str]:
"""Find agents with a specific capability."""
return [
name for name, info in self._agents.items()
if capability in info.get("capabilities", [])
]
# Global bridge instance
_bridge: Optional[MatrixBridge] = None
async def get_bridge(agent_name: str = None) -> MatrixBridge:
"""Get or create the global Matrix bridge instance."""
global _bridge
if _bridge is None:
_bridge = MatrixBridge(agent_name=agent_name)
await _bridge.connect()
return _bridge
async def send_to_agent(recipient: str, content: str) -> bool:
"""Convenience function to send a message to another agent."""
bridge = await get_bridge()
return await bridge.send_to_agent(recipient, content)
async def broadcast_to_agents(content: str) -> bool:
"""Convenience function to broadcast to all agents."""
bridge = await get_bridge()
return await bridge.broadcast(content)

View File

@@ -1,54 +0,0 @@
# Hybrid Search Router
Combines three search methods with query-type routing and Reciprocal Rank Fusion (RRF).
## Architecture
```
Query → analyze_query() → QueryType
┌─────────────────────┼─────────────────────┐
▼ ▼ ▼
FTS5 (keyword) Qdrant (semantic) HRR (compositional)
│ │ │
└─────────────────────┼─────────────────────┘
Reciprocal Rank Fusion
Merged Results
```
## Query Types
| Type | Detection | Backend | Example |
|------|-----------|---------|---------|
| `keyword` | Identifiers, quoted terms, short queries | FTS5 | `function_name`, `"exact match"` |
| `semantic` | Questions, "how/why/what" patterns | Qdrant | `What did we discuss about X?` |
| `compositional` | Contradiction, related, entity queries | HRR | `Are there contradictions?` |
| `hybrid` | No strong signals or mixed signals | All three | `deployment process` |
## Usage
```python
# Automatic routing
results = hybrid_engine.search("What did we decide about deploy?")
# → Routes to semantic (Qdrant) + HRR, merges with RRF
results = hybrid_engine.search("function_name")
# → Routes to keyword (FTS5)
# Manual query type override (future)
results = hybrid_engine.search("deploy", force_type=QueryType.KEYWORD)
```
## RRF Parameters
- **k=60**: Standard RRF constant (Cormack et al., 2009)
- **Weights**: Qdrant gets 1.2x boost (semantic results tend to be more relevant)
- **Fetch limit**: Each backend returns 3x the requested limit for merge headroom
## Graceful Degradation
- **Qdrant unavailable**: Falls back to FTS5 + HRR only
- **HRR unavailable** (no numpy): Falls back to FTS5 + Qdrant
- **All backends fail**: Falls back to existing `retriever.search()`

216
docs/matrix-bridge.md Normal file
View File

@@ -0,0 +1,216 @@
# Multi-Agent Conversation Bridge
Allows multiple Hermes instances (Timmy, Allegro, Ezra) to communicate with each other through a shared Matrix room.
## Overview
The Matrix Bridge enables agent-to-agent coordination without manual intervention. Agents can:
- Send tasks to specific agents
- Broadcast to all agents
- Respond to requests from other agents
- Coordinate on complex workflows
## Configuration
### Environment Variables
```bash
# Enable/disable the bridge
MATRIX_BRIDGE_ENABLED=true
# Shared Matrix room ID for agent communication
MATRIX_BRIDGE_ROOM=!roomid:matrix.example.org
# Agent name (for message routing)
HERMES_AGENT_NAME=Timmy
# Matrix credentials (from existing Matrix gateway config)
MATRIX_HOMESERVER=https://matrix.example.org
MATRIX_ACCESS_TOKEN=syt_...
```
### Matrix Room Setup
1. Create a Matrix room for agent communication
2. Invite all agent accounts to the room
3. Set `MATRIX_BRIDGE_ROOM` to the room ID
## Message Format
Messages use a simple prefix format for routing:
```
[@Allegro] Check the deployment status on VPS
[@Ezra] Can you review PR #456?
[@*] System maintenance in 5 minutes
```
- `[@AgentName]` — Message for specific agent
- `[@*]` — Broadcast to all agents
## Usage
### Basic Usage
```python
from agent.matrix_bridge import MatrixBridge, send_to_agent, broadcast_to_agents
# Create bridge
bridge = MatrixBridge(agent_name="Timmy")
await bridge.connect()
# Send to specific agent
await bridge.send_to_agent("Allegro", "Check deployment status")
# Broadcast to all agents
await bridge.broadcast("System maintenance starting")
# Add message handler
def handle_message(msg):
print(f"From {msg.sender}: {msg.content}")
bridge.add_handler(handle_message)
```
### Convenience Functions
```python
from agent.matrix_bridge import send_to_agent, broadcast_to_agents
# Send message
await send_to_agent("Ezra", "Review PR #456")
# Broadcast
await broadcast_to_agents("Going offline for maintenance")
```
### Agent Registry
```python
from agent.matrix_bridge import AgentRegistry
registry = AgentRegistry()
# Register agent with capabilities
registry.register("Timmy", capabilities=["code", "review", "deploy"])
registry.register("Allegro", capabilities=["monitoring", "alerting"])
# Find agents with capability
coders = registry.find_agents_with_capability("code")
```
## Message Flow
```
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Timmy │────▶│ Matrix │────▶│ Allegro │
│ Agent │ │ Room │ │ Agent │
└─────────┘ └─────────┘ └─────────┘
│ │ │
│ [@Allegro] │ │
│ Check deps │ │
└──────────────▶│ │
│ [@Allegro] │
│ Check deps │
└──────────────▶│
│ [@Timmy] │
│ Done ✓ │
│◀──────────────┘
│ [@Timmy] │
│ Done ✓ │
│◀──────────────┘
```
## Integration with Hermes
### In run_agent.py
```python
# Add to conversation loop
if self.matrix_bridge:
# Check for messages from other agents
messages = await self.matrix_bridge.get_pending_messages()
for msg in messages:
# Process agent-to-agent messages
pass
```
### In Gateway
```python
# Add Matrix bridge to gateway
from agent.matrix_bridge import MatrixBridge
bridge = MatrixBridge(agent_name="Timmy")
await bridge.connect()
gateway.matrix_bridge = bridge
```
## Testing
### Unit Tests
```python
def test_message_parsing():
"""Test message format parsing."""
from agent.matrix_bridge import MatrixBridge
bridge = MatrixBridge(agent_name="Timmy")
# Test recipient extraction
assert bridge._is_for_me("[@Timmy] Hello")
assert not bridge._is_for_me("[@Allegro] Hello")
assert bridge._is_for_me("[@*] Broadcast")
# Test content extraction
assert bridge._extract_content("[@Timmy] Hello") == "Hello"
assert bridge._extract_content("[@*] Test message") == "Test message"
```
### Integration Test
```bash
# Test with two agents
MATRIX_BRIDGE_ENABLED=true \
MATRIX_BRIDGE_ROOM=!test:matrix.example.org \
HERMES_AGENT_NAME=Timmy \
python -c "
import asyncio
from agent.matrix_bridge import send_to_agent
async def test():
await send_to_agent('Allegro', 'Test message')
print('Sent')
asyncio.run(test())
"
```
## Troubleshooting
### Bridge not connecting
1. Check `MATRIX_BRIDGE_ENABLED=true`
2. Verify `MATRIX_BRIDGE_ROOM` is set
3. Ensure Matrix credentials are configured
4. Check Matrix homeserver is reachable
### Messages not received
1. Verify agent is in the Matrix room
2. Check message format: `[@AgentName] content`
3. Ensure `HERMES_AGENT_NAME` matches agent name
4. Check Matrix sync is running
### Agent not found
1. Verify agent has joined the bridge room
2. Check agent name matches exactly (case-sensitive)
3. Ensure agent has announced presence
## Related
- Issue #747: feat: multi-agent conversation bridge via Matrix
- Matrix Gateway: `gateway/platforms/matrix.py`
- Multi-Agent Orchestration: `docs/multi-agent-orchestration.md`

View File

@@ -1,277 +0,0 @@
"""Hybrid search engine with Reciprocal Rank Fusion.
Combines results from multiple search backends:
- FTS5 (keyword search via SQLite full-text index)
- Qdrant (semantic search via vector similarity)
- HRR (compositional search via holographic reduced representations)
Uses Reciprocal Rank Fusion (RRF) to merge ranked lists into a single
result set. RRF is simple, parameter-free, and consistently outperforms
individual rankers.
RRF formula: score(d) = sum over rankers r of 1/(k + rank_r(d))
where k=60 (standard constant from Cormack et al., 2009).
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from .query_router import QueryType, QueryAnalysis, analyze_query
logger = logging.getLogger(__name__)
# RRF constant — standard value from the literature
_RRF_K = 60
@dataclass
class SearchResult:
"""A single search result with source tracking."""
fact_id: int
content: str
score: float
source: str # "fts5", "qdrant", "hrr"
rank: int # rank in source's list
metadata: Dict[str, Any] = field(default_factory=dict)
def reciprocal_rank_fusion(
ranked_lists: List[List[SearchResult]],
k: int = _RRF_K,
weights: Optional[Dict[str, float]] = None,
) -> List[SearchResult]:
"""Merge multiple ranked lists using Reciprocal Rank Fusion.
Args:
ranked_lists: List of ranked result lists from different sources.
k: RRF constant (default 60).
weights: Optional per-source weights. Default: all 1.0.
Returns:
Merged and re-ranked list of SearchResults.
"""
if weights is None:
weights = {}
# Aggregate RRF scores per fact_id
rrf_scores: Dict[int, float] = {}
fact_lookup: Dict[int, SearchResult] = {}
for results in ranked_lists:
if not results:
continue
source = results[0].source if results else "unknown"
w = weights.get(source, 1.0)
for rank, result in enumerate(results, 1):
fid = result.fact_id
contribution = w / (k + rank)
rrf_scores[fid] = rrf_scores.get(fid, 0.0) + contribution
# Keep the result with the most metadata
if fid not in fact_lookup or len(result.metadata) > len(fact_lookup[fid].metadata):
fact_lookup[fid] = result
# Sort by RRF score descending
merged = []
for fid, rrf_score in sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True):
result = fact_lookup[fid]
result.score = rrf_score
merged.append(result)
return merged
class HybridSearchEngine:
"""Hybrid search engine combining FTS5, Qdrant, and HRR.
Routes queries through the query analyzer, dispatches to appropriate
backends, and merges results with RRF.
"""
def __init__(self, store, retriever, qdrant_client=None):
self._store = store
self._retriever = retriever
self._qdrant = qdrant_client
def search(
self,
query: str,
category: str | None = None,
min_trust: float = 0.3,
limit: int = 10,
) -> List[dict]:
"""Hybrid search with query routing and RRF merge.
Analyzes the query, dispatches to appropriate backends,
merges results, and returns the top `limit` results.
"""
# Step 1: Analyze query type
analysis = analyze_query(query)
logger.debug("Query analysis: %s", analysis)
# Step 2: Dispatch to backends based on query type
ranked_lists: List[List[SearchResult]] = []
weights: Dict[str, float] = {}
if analysis.query_type in (QueryType.KEYWORD, QueryType.HYBRID):
fts_results = self._search_fts5(query, category, min_trust, limit * 3)
if fts_results:
ranked_lists.append(fts_results)
weights["fts5"] = 1.0
if analysis.query_type in (QueryType.SEMANTIC, QueryType.HYBRID):
qdrant_results = self._search_qdrant(query, category, min_trust, limit * 3)
if qdrant_results:
ranked_lists.append(qdrant_results)
weights["qdrant"] = 1.2 # Slight boost for semantic search
if analysis.query_type in (QueryType.COMPOSITIONAL, QueryType.HYBRID):
hrr_results = self._search_hrr(query, category, min_trust, limit * 3)
if hrr_results:
ranked_lists.append(hrr_results)
weights["hrr"] = 1.0
# Step 3: Merge with RRF
if not ranked_lists:
# Fallback to existing search if no backends returned results
return self._retriever.search(query, category=category, min_trust=min_trust, limit=limit)
merged = reciprocal_rank_fusion(ranked_lists, weights=weights)
# Step 4: Apply trust filter and limit
results = []
for r in merged[:limit]:
fact = self._store.get_fact(r.fact_id)
if fact and fact.get("trust_score", 0) >= min_trust:
fact["score"] = r.score
fact["search_source"] = r.source
fact.pop("hrr_vector", None)
results.append(fact)
return results
def _search_fts5(
self, query: str, category: str | None, min_trust: float, limit: int
) -> List[SearchResult]:
"""Search using SQLite FTS5 full-text index."""
try:
raw = self._retriever._fts_candidates(query, category, min_trust, limit)
return [
SearchResult(
fact_id=f["fact_id"],
content=f.get("content", ""),
score=f.get("fts_rank", 0.0),
source="fts5",
rank=i + 1,
metadata={"category": f.get("category", "")},
)
for i, f in enumerate(raw)
]
except Exception as e:
logger.debug("FTS5 search failed: %s", e)
return []
def _search_qdrant(
self, query: str, category: str | None, min_trust: float, limit: int
) -> List[SearchResult]:
"""Search using Qdrant vector similarity.
If Qdrant is not available, returns empty list (graceful degradation).
"""
if not self._qdrant:
return []
try:
from qdrant_client import models
# Build filter
filters = []
if category:
filters.append(
models.FieldCondition(
key="category",
match=models.MatchValue(value=category),
)
)
if min_trust > 0:
filters.append(
models.FieldCondition(
key="trust_score",
range=models.Range(gte=min_trust),
)
)
query_filter = models.Filter(must=filters) if filters else None
results = self._qdrant.query_points(
collection_name="hermes_facts",
query=query, # Qdrant handles embedding
limit=limit,
query_filter=query_filter,
)
return [
SearchResult(
fact_id=int(r.id),
content=r.payload.get("content", ""),
score=r.score,
source="qdrant",
rank=i + 1,
metadata=r.payload,
)
for i, r in enumerate(results.points)
]
except Exception as e:
logger.debug("Qdrant search failed: %s", e)
return []
def _search_hrr(
self, query: str, category: str | None, min_trust: float, limit: int
) -> List[SearchResult]:
"""Search using HRR compositional vectors."""
try:
import plugins.memory.holographic.holographic as hrr
if not hrr._HAS_NUMPY:
return []
conn = self._store._conn
query_vec = hrr.encode_text(query, dim=1024)
where = "WHERE hrr_vector IS NOT NULL"
params: list = []
if category:
where += " AND category = ?"
params.append(category)
rows = conn.execute(
f"SELECT fact_id, content, trust_score, hrr_vector FROM facts {where}",
params,
).fetchall()
scored = []
for row in rows:
if row["trust_score"] < min_trust:
continue
fact_vec = hrr.bytes_to_phases(row["hrr_vector"])
sim = hrr.similarity(query_vec, fact_vec)
scored.append((row["fact_id"], row["content"], sim))
scored.sort(key=lambda x: x[2], reverse=True)
return [
SearchResult(
fact_id=fid,
content=content,
score=sim,
source="hrr",
rank=i + 1,
)
for i, (fid, content, sim) in enumerate(scored[:limit])
]
except Exception as e:
logger.debug("HRR search failed: %s", e)
return []

View File

@@ -1,168 +0,0 @@
"""Query type detection and routing for hybrid search.
Analyzes the incoming query to determine which search methods should be used,
then dispatches to the appropriate backends (FTS5, Qdrant, HRR).
Query types:
- keyword: Exact term matching → FTS5
- semantic: Natural language concepts → Qdrant
- compositional: Entity relationships, contradictions → HRR
- hybrid: Multiple types → all methods + RRF merge
"""
from __future__ import annotations
import re
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional, Set
logger = logging.getLogger(__name__)
class QueryType(Enum):
"""Detected query type determines which search methods to use."""
KEYWORD = "keyword" # Exact terms → FTS5
SEMANTIC = "semantic" # Natural language → Qdrant
COMPOSITIONAL = "compositional" # Entity relationships → HRR
HYBRID = "hybrid" # Multiple types → all methods
@dataclass
class QueryAnalysis:
"""Result of query analysis."""
query_type: QueryType
confidence: float
signals: List[str] = field(default_factory=list)
entities: List[str] = field(default_factory=list)
keywords: List[str] = field(default_factory=list)
def __repr__(self) -> str:
return f"QueryAnalysis(type={self.query_type.value}, conf={self.confidence:.2f}, signals={self.signals})"
# Patterns that indicate compositional queries
_COMPOSITIONAL_PATTERNS = [
re.compile(r"\b(contradiction|contradict|conflicting|conflicts)\b", re.I),
re.compile(r"\b(related to|connects to|links to|associated with)\b", re.I),
re.compile(r"\b(what does .* know about|tell me about .* entity|facts about .*)\b", re.I),
re.compile(r"\b(shared|common|overlap)\b.*\b(entities|concepts|topics)\b", re.I),
re.compile(r"\b(probe|entity|entities)\b", re.I),
]
# Patterns that indicate keyword queries
_KEYWORD_SIGNALS = [
re.compile(r"^[a-z_][a-z0-9_.]+$", re.I), # Single identifier: function_name, Class.method
re.compile(r"\b(find|search|locate|grep|where)\b.*\b(exact|specific|literal)\b", re.I),
re.compile(r"["\']([^"\']+)["\']"), # Quoted exact terms
re.compile(r"^[A-Z_]{2,}$"), # ALL_CAPS constants
re.compile(r"\b\w+\.\w+\.\w+\b"), # Dotted paths: module.sub.func
]
# Patterns that indicate semantic queries
_SEMANTIC_SIGNALS = [
re.compile(r"\b(what did|how does|why is|explain|describe|summarize|discuss)\b", re.I),
re.compile(r"\b(remember|recall|think|know|understand)\b.*\b(about|regarding)\b", re.I),
re.compile(r"\?$"), # Questions
re.compile(r"\b(the best way to|how to|what\'s the|approach to)\b", re.I),
]
def analyze_query(query: str) -> QueryAnalysis:
"""Analyze a query to determine which search methods to use.
Returns QueryAnalysis with detected type, confidence, and extracted signals.
"""
if not query or not query.strip():
return QueryAnalysis(
query_type=QueryType.HYBRID,
confidence=0.5,
signals=["empty_query"],
)
query = query.strip()
# Score each query type
comp_score = 0.0
kw_score = 0.0
sem_score = 0.0
signals = []
entities = []
keywords = []
# Check compositional patterns
for pattern in _COMPOSITIONAL_PATTERNS:
if pattern.search(query):
comp_score += 0.3
signals.append(f"compositional:{pattern.pattern[:30]}")
# Check keyword patterns
for pattern in _KEYWORD_SIGNALS:
if pattern.search(query):
kw_score += 0.25
match = pattern.search(query)
if match:
keywords.append(match.group(0))
signals.append(f"keyword:{pattern.pattern[:30]}")
# Check semantic patterns
for pattern in _SEMANTIC_SIGNALS:
if pattern.search(query):
sem_score += 0.25
signals.append(f"semantic:{pattern.pattern[:30]}")
# Extract entities (capitalized multi-word phrases, quoted terms)
entity_patterns = [
re.compile(r"\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\b"),
re.compile(r"["\']([^"\']+)["\']"),
]
for ep in entity_patterns:
for m in ep.finditer(query):
entities.append(m.group(1))
# Short queries (< 5 words) with no semantic signals → keyword
word_count = len(query.split())
if word_count <= 4 and sem_score == 0 and comp_score == 0:
kw_score += 0.3
signals.append("short_query_keyword_boost")
# Normalize scores
max_score = max(comp_score, kw_score, sem_score, 0.1)
# Determine query type
if max_score < 0.15:
# No strong signals → use hybrid (all methods)
return QueryAnalysis(
query_type=QueryType.HYBRID,
confidence=0.5,
signals=["no_strong_signals"],
entities=entities,
keywords=keywords,
)
if comp_score == max_score and comp_score >= 0.3:
return QueryAnalysis(
query_type=QueryType.COMPOSITIONAL,
confidence=min(comp_score, 1.0),
signals=signals,
entities=entities,
keywords=keywords,
)
if kw_score > sem_score:
return QueryAnalysis(
query_type=QueryType.KEYWORD,
confidence=min(kw_score, 1.0),
signals=signals,
entities=entities,
keywords=keywords,
)
return QueryAnalysis(
query_type=QueryType.SEMANTIC,
confidence=min(sem_score, 1.0),
signals=signals,
entities=entities,
keywords=keywords,
)

View File

@@ -1,97 +0,0 @@
"""Tests for hybrid search router — query analysis and RRF merge."""
import pytest
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "plugins", "memory", "holographic"))
from query_router import QueryType, analyze_query
from hybrid_search import SearchResult, reciprocal_rank_fusion
class TestQueryAnalysis:
def test_keyword_single_identifier(self):
result = analyze_query("function_name")
assert result.query_type == QueryType.KEYWORD
def test_keyword_quoted_term(self):
result = analyze_query('Find "exact phrase" in code')
assert result.query_type in (QueryType.KEYWORD, QueryType.HYBRID)
def test_keyword_dotted_path(self):
result = analyze_query("module.sub.function")
assert result.query_type == QueryType.KEYWORD
def test_semantic_question(self):
result = analyze_query("What did we discuss about deployment?")
assert result.query_type == QueryType.SEMANTIC
def test_semantic_how_to(self):
result = analyze_query("How to configure the gateway?")
assert result.query_type == QueryType.SEMANTIC
def test_compositional_contradiction(self):
result = analyze_query("Are there any contradictions in the facts?")
assert result.query_type == QueryType.COMPOSITIONAL
def test_compositional_related(self):
result = analyze_query("What facts are related to Alexander?")
assert result.query_type == QueryType.COMPOSITIONAL
def test_empty_query(self):
result = analyze_query("")
assert result.query_type == QueryType.HYBRID
def test_complex_query(self):
result = analyze_query("What did we decide about the deploy script?")
assert result.query_type in (QueryType.SEMANTIC, QueryType.HYBRID)
class TestReciprocalRankFusion:
def test_single_list(self):
results = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
SearchResult(fact_id=2, content="B", score=0.8, source="fts5", rank=2),
]
merged = reciprocal_rank_fusion([results])
assert len(merged) == 2
assert merged[0].fact_id == 1 # Rank 1 should be first
def test_two_lists_merge(self):
list1 = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
SearchResult(fact_id=2, content="B", score=0.8, source="fts5", rank=2),
]
list2 = [
SearchResult(fact_id=2, content="B", score=0.95, source="qdrant", rank=1),
SearchResult(fact_id=3, content="C", score=0.7, source="qdrant", rank=2),
]
merged = reciprocal_rank_fusion([list1, list2])
# Fact 2 appears in both lists → should rank highest
assert merged[0].fact_id == 2
assert len(merged) == 3
def test_empty_lists(self):
merged = reciprocal_rank_fusion([[], []])
assert len(merged) == 0
def test_weighted_merge(self):
list1 = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
]
list2 = [
SearchResult(fact_id=2, content="B", score=0.9, source="qdrant", rank=1),
]
merged = reciprocal_rank_fusion(
[list1, list2],
weights={"fts5": 1.0, "qdrant": 2.0},
)
# Qdrant has higher weight → fact 2 should win
assert merged[0].fact_id == 2
def test_rrf_score_formula(self):
list1 = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
]
merged = reciprocal_rank_fusion([list1], k=60)
# RRF score = 1/(60+1) = 0.01639...
assert abs(merged[0].score - 1.0/61.0) < 0.001

114
tests/test_matrix_bridge.py Normal file
View File

@@ -0,0 +1,114 @@
"""Tests for Matrix Bridge — Issue #747."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent.matrix_bridge import MatrixBridge, AgentMessage, AgentRegistry
class TestMessageParsing:
"""Test message format parsing."""
def test_is_for_me_direct(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._is_for_me("[@Timmy] Hello") == True
def test_is_not_for_me(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._is_for_me("[@Allegro] Hello") == False
def test_is_broadcast(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._is_for_me("[@*] Broadcast") == True
def test_extract_content(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._extract_content("[@Timmy] Hello world") == "Hello world"
def test_extract_content_multiline(self):
bridge = MatrixBridge(agent_name="Timmy")
content = bridge._extract_content("[@Timmy] Line 1\nLine 2")
assert content == "Line 1\nLine 2"
class TestAgentMessage:
"""Test AgentMessage dataclass."""
def test_to_dict(self):
msg = AgentMessage(
sender="Timmy",
recipient="Allegro",
content="Hello",
timestamp=1234567890.0,
)
d = msg.to_dict()
assert d["sender"] == "Timmy"
assert d["recipient"] == "Allegro"
assert d["content"] == "Hello"
def test_from_dict(self):
d = {
"sender": "Timmy",
"recipient": "Allegro",
"content": "Hello",
"timestamp": 1234567890.0,
"message_id": "",
"room_id": "",
}
msg = AgentMessage.from_dict(d)
assert msg.sender == "Timmy"
assert msg.recipient == "Allegro"
class TestAgentRegistry:
"""Test AgentRegistry."""
def test_register(self):
registry = AgentRegistry()
registry.register("Timmy", capabilities=["code", "review"])
agent = registry.get_agent("Timmy")
assert agent["name"] == "Timmy"
assert "code" in agent["capabilities"]
def test_list_agents(self):
registry = AgentRegistry()
registry.register("Timmy")
registry.register("Allegro")
agents = registry.list_agents()
assert len(agents) == 2
def test_find_with_capability(self):
registry = AgentRegistry()
registry.register("Timmy", capabilities=["code"])
registry.register("Allegro", capabilities=["monitoring"])
coders = registry.find_agents_with_capability("code")
assert "Timmy" in coders
assert "Allegro" not in coders
def test_unregister(self):
registry = AgentRegistry()
registry.register("Timmy")
registry.unregister("Timmy")
agent = registry.get_agent("Timmy")
assert agent["status"] == "offline"
class TestBridgeInit:
"""Test bridge initialization."""
def test_default_agent_name(self):
bridge = MatrixBridge()
assert bridge.agent_name == "Hermes"
def test_custom_agent_name(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge.agent_name == "Timmy"
def test_known_agents_empty(self):
bridge = MatrixBridge()
assert len(bridge.get_known_agents()) == 0
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])