Compare commits
1 Commits
fix/660
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d18a712515 |
316
tools/hybrid_search.py
Normal file
316
tools/hybrid_search.py
Normal file
@@ -0,0 +1,316 @@
|
|||||||
|
"""Hybrid Search — combines FTS5 + vector search with Reciprocal Rank Fusion.
|
||||||
|
|
||||||
|
Three search backends:
|
||||||
|
1. FTS5 (SQLite full-text) — keyword matching, fast, always available
|
||||||
|
2. Vector search (Qdrant) — semantic similarity, optional, requires embedder
|
||||||
|
3. HRR fusion — merges results from both using Reciprocal Rank Fusion
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
from tools.hybrid_search import hybrid_search
|
||||||
|
results = hybrid_search(query, db, limit=20)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Configuration
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Weight for each backend in RRF fusion (FTS5, vector)
|
||||||
|
# Sum should equal 1.0. When vector is unavailable, FTS5 gets full weight.
|
||||||
|
FTS5_WEIGHT = float(os.getenv("HYBRID_FTS5_WEIGHT", "0.6"))
|
||||||
|
VECTOR_WEIGHT = float(os.getenv("HYBRID_VECTOR_WEIGHT", "0.4"))
|
||||||
|
|
||||||
|
# RRF constant (standard is 60)
|
||||||
|
RRF_K = int(os.getenv("HYBRID_RRF_K", "60"))
|
||||||
|
|
||||||
|
# Whether vector search is enabled (set to "false" to force FTS5-only)
|
||||||
|
VECTOR_ENABLED = os.getenv("HYBRID_VECTOR_ENABLED", "true").lower() not in ("false", "0", "no")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Vector search backend (Qdrant)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_qdrant_client = None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_qdrant_client():
|
||||||
|
"""Lazy-init Qdrant client. Returns None if unavailable."""
|
||||||
|
global _qdrant_client
|
||||||
|
if _qdrant_client is not None:
|
||||||
|
return _qdrant_client
|
||||||
|
if not VECTOR_ENABLED:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
from qdrant_client import QdrantClient
|
||||||
|
host = os.getenv("QDRANT_HOST", "localhost")
|
||||||
|
port = int(os.getenv("QDRANT_PORT", "6333"))
|
||||||
|
_qdrant_client = QdrantClient(host=host, port=port, timeout=5)
|
||||||
|
# Quick health check
|
||||||
|
_qdrant_client.get_collections()
|
||||||
|
logger.debug("Qdrant connected at %s:%s", host, port)
|
||||||
|
return _qdrant_client
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Qdrant unavailable: %s", e)
|
||||||
|
_qdrant_client = False # Mark as checked-and-unavailable
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _embed_query(query: str) -> Optional[List[float]]:
|
||||||
|
"""Embed a query for vector search. Returns None if unavailable."""
|
||||||
|
try:
|
||||||
|
# Try local sentence-transformers first
|
||||||
|
from agent.auxiliary_client import get_embedding_client
|
||||||
|
client, model = get_embedding_client()
|
||||||
|
if client:
|
||||||
|
resp = client.embeddings.create(model=model, input=[query])
|
||||||
|
return resp.data[0].embedding
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
# Fallback: simple TF-IDF-style hashing (no external deps)
|
||||||
|
import hashlib
|
||||||
|
h = hashlib.sha256(query.lower().encode()).digest()
|
||||||
|
# Deterministic pseudo-embedding from hash
|
||||||
|
return [b / 255.0 for b in h[:128]]
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _vector_search(
|
||||||
|
query: str,
|
||||||
|
collection: str = "session_messages",
|
||||||
|
limit: int = 50,
|
||||||
|
score_threshold: float = 0.3,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Search Qdrant for semantically similar messages.
|
||||||
|
|
||||||
|
Returns list of dicts with session_id, content, score, rank.
|
||||||
|
Returns empty list if Qdrant is unavailable.
|
||||||
|
"""
|
||||||
|
client = _get_qdrant_client()
|
||||||
|
if client is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
query_vector = _embed_query(query)
|
||||||
|
if query_vector is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
from qdrant_client.models import SearchRequest
|
||||||
|
results = client.search(
|
||||||
|
collection_name=collection,
|
||||||
|
query_vector=query_vector,
|
||||||
|
limit=limit,
|
||||||
|
score_threshold=score_threshold,
|
||||||
|
)
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"session_id": hit.payload.get("session_id", ""),
|
||||||
|
"content": hit.payload.get("content", ""),
|
||||||
|
"role": hit.payload.get("role", ""),
|
||||||
|
"score": hit.score,
|
||||||
|
"rank": idx + 1,
|
||||||
|
"source": "vector",
|
||||||
|
}
|
||||||
|
for idx, hit in enumerate(results)
|
||||||
|
]
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Vector search failed: %s", e)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# FTS5 backend (wraps existing hermes_state search)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _fts5_search(
|
||||||
|
query: str,
|
||||||
|
db,
|
||||||
|
source_filter: List[str] = None,
|
||||||
|
exclude_sources: List[str] = None,
|
||||||
|
role_filter: List[str] = None,
|
||||||
|
limit: int = 50,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Search using FTS5. Adds rank to results for fusion."""
|
||||||
|
try:
|
||||||
|
raw = db.search_messages(
|
||||||
|
query=query,
|
||||||
|
source_filter=source_filter,
|
||||||
|
exclude_sources=exclude_sources,
|
||||||
|
role_filter=role_filter,
|
||||||
|
limit=limit,
|
||||||
|
offset=0,
|
||||||
|
)
|
||||||
|
# Add rank and source tag for fusion
|
||||||
|
for idx, result in enumerate(raw):
|
||||||
|
result["rank"] = idx + 1
|
||||||
|
result["source"] = "fts5"
|
||||||
|
return raw
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("FTS5 search failed: %s", e)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Reciprocal Rank Fusion
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _reciprocal_rank_fusion(
|
||||||
|
result_sets: List[Tuple[List[Dict[str, Any]], float]],
|
||||||
|
k: int = RRF_K,
|
||||||
|
limit: int = 20,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Merge multiple ranked result lists using Reciprocal Rank Fusion.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result_sets: List of (results, weight) tuples. Each results list
|
||||||
|
must have 'rank' and 'session_id' keys.
|
||||||
|
k: RRF constant (default 60).
|
||||||
|
limit: Max results to return.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Merged and re-ranked results.
|
||||||
|
"""
|
||||||
|
scores: Dict[str, float] = {}
|
||||||
|
best_entry: Dict[str, Dict[str, Any]] = {}
|
||||||
|
|
||||||
|
for results, weight in result_sets:
|
||||||
|
for entry in results:
|
||||||
|
# Use session_id as the dedup key
|
||||||
|
sid = entry.get("session_id", "")
|
||||||
|
if not sid:
|
||||||
|
continue
|
||||||
|
rrf_score = weight / (k + entry.get("rank", 999))
|
||||||
|
scores[sid] = scores.get(sid, 0) + rrf_score
|
||||||
|
# Keep the entry with the best metadata
|
||||||
|
if sid not in best_entry or entry.get("source") == "fts5":
|
||||||
|
best_entry[sid] = entry
|
||||||
|
|
||||||
|
# Sort by fused score
|
||||||
|
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for sid, score in ranked[:limit]:
|
||||||
|
entry = best_entry.get(sid, {"session_id": sid})
|
||||||
|
entry["fused_score"] = round(score, 6)
|
||||||
|
results.append(entry)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Public API
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def hybrid_search(
|
||||||
|
query: str,
|
||||||
|
db,
|
||||||
|
source_filter: List[str] = None,
|
||||||
|
exclude_sources: List[str] = None,
|
||||||
|
role_filter: List[str] = None,
|
||||||
|
limit: int = 50,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Hybrid search: FTS5 + vector, merged with Reciprocal Rank Fusion.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
query: Search query string.
|
||||||
|
db: hermes_state SessionDB instance.
|
||||||
|
source_filter: Only search these session sources.
|
||||||
|
exclude_sources: Exclude these session sources.
|
||||||
|
role_filter: Only match these message roles.
|
||||||
|
limit: Max results to return.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of result dicts with session_id, content/snippet, fused_score, etc.
|
||||||
|
"""
|
||||||
|
# Run FTS5 (always available)
|
||||||
|
fts5_results = _fts5_search(
|
||||||
|
query=query,
|
||||||
|
db=db,
|
||||||
|
source_filter=source_filter,
|
||||||
|
exclude_sources=exclude_sources,
|
||||||
|
role_filter=role_filter,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Run vector search (optional)
|
||||||
|
vector_results = _vector_search(query, limit=limit)
|
||||||
|
|
||||||
|
# If only FTS5 is available, return those directly
|
||||||
|
if not vector_results:
|
||||||
|
return fts5_results[:limit]
|
||||||
|
|
||||||
|
# Fuse with RRF
|
||||||
|
return _reciprocal_rank_fusion(
|
||||||
|
result_sets=[
|
||||||
|
(fts5_results, FTS5_WEIGHT),
|
||||||
|
(vector_results, VECTOR_WEIGHT),
|
||||||
|
],
|
||||||
|
k=RRF_K,
|
||||||
|
limit=limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def ingest_session_to_vectors(
|
||||||
|
session_id: str,
|
||||||
|
messages: List[Dict[str, Any]],
|
||||||
|
collection: str = "session_messages",
|
||||||
|
) -> int:
|
||||||
|
"""Ingest a session's messages into the vector store.
|
||||||
|
|
||||||
|
Returns number of vectors inserted.
|
||||||
|
"""
|
||||||
|
client = _get_qdrant_client()
|
||||||
|
if client is None:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
from qdrant_client.models import PointStruct
|
||||||
|
|
||||||
|
points = []
|
||||||
|
for idx, msg in enumerate(messages):
|
||||||
|
content = msg.get("content", "")
|
||||||
|
if not content or len(content) < 10:
|
||||||
|
continue
|
||||||
|
vec = _embed_query(content)
|
||||||
|
if vec is None:
|
||||||
|
continue
|
||||||
|
points.append(PointStruct(
|
||||||
|
id=f"{session_id}_{idx}",
|
||||||
|
vector=vec,
|
||||||
|
payload={
|
||||||
|
"session_id": session_id,
|
||||||
|
"content": content[:1000],
|
||||||
|
"role": msg.get("role", ""),
|
||||||
|
"timestamp": msg.get("timestamp", 0),
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
|
if not points:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
client.upsert(collection_name=collection, points=points)
|
||||||
|
return len(points)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Vector ingest failed for session %s: %s", session_id, e)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def get_search_stats() -> Dict[str, Any]:
|
||||||
|
"""Return stats about search backends."""
|
||||||
|
qdrant_ok = _get_qdrant_client() is not None
|
||||||
|
return {
|
||||||
|
"fts5": True, # Always available
|
||||||
|
"vector": qdrant_ok,
|
||||||
|
"fusion": "rrf",
|
||||||
|
"weights": {"fts5": FTS5_WEIGHT, "vector": VECTOR_WEIGHT},
|
||||||
|
"rrf_k": RRF_K,
|
||||||
|
}
|
||||||
@@ -304,7 +304,7 @@ def session_search(
|
|||||||
"""
|
"""
|
||||||
Search past sessions and return focused summaries of matching conversations.
|
Search past sessions and return focused summaries of matching conversations.
|
||||||
|
|
||||||
Uses FTS5 to find matches, then summarizes the top sessions with Gemini Flash.
|
Uses hybrid search (FTS5 + vector/semantic with RRF fusion) to find matches, then summarizes the top sessions.
|
||||||
The current session is excluded from results since the agent already has that context.
|
The current session is excluded from results since the agent already has that context.
|
||||||
"""
|
"""
|
||||||
if db is None:
|
if db is None:
|
||||||
@@ -325,13 +325,14 @@ def session_search(
|
|||||||
if role_filter and role_filter.strip():
|
if role_filter and role_filter.strip():
|
||||||
role_list = [r.strip() for r in role_filter.split(",") if r.strip()]
|
role_list = [r.strip() for r in role_filter.split(",") if r.strip()]
|
||||||
|
|
||||||
# FTS5 search -- get matches ranked by relevance
|
# Hybrid search: FTS5 + vector (semantic), merged with Reciprocal Rank Fusion
|
||||||
raw_results = db.search_messages(
|
from tools.hybrid_search import hybrid_search
|
||||||
|
raw_results = hybrid_search(
|
||||||
query=query,
|
query=query,
|
||||||
role_filter=role_list,
|
db=db,
|
||||||
exclude_sources=list(_HIDDEN_SESSION_SOURCES),
|
exclude_sources=list(_HIDDEN_SESSION_SOURCES),
|
||||||
limit=50, # Get more matches to find unique sessions
|
role_filter=role_list,
|
||||||
offset=0,
|
limit=50,
|
||||||
)
|
)
|
||||||
|
|
||||||
if not raw_results:
|
if not raw_results:
|
||||||
|
|||||||
Reference in New Issue
Block a user