Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
3bafec50be feat: hybrid search router for session_search (#701)
Some checks are pending
Contributor Attribution Check / check-attribution (pull_request) Waiting to run
Docker Build and Publish / build-and-push (pull_request) Waiting to run
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Waiting to run
Tests / test (pull_request) Waiting to run
Tests / e2e (pull_request) Waiting to run
Three-backend search router with graceful degradation:
1. FTS5 (SQLite full-text) — always available
2. Vector search (ChromaDB/Qdrant) — semantic similarity, optional
3. HRR (holographic) — compositional recall, optional

agent/hybrid_search.py (238 lines):
- hybrid_search(): main entry point
- search_fts5(), search_vector(), search_hrr()
- merge_results(): weighted merge + dedup
- HybridSearchConfig: configurable weights

Grades results by weighted score across backends.
Falls back to FTS5-only if vector/HRR unavailable.

6 tests, all passing.

Closes #701
2026-04-16 01:09:39 -04:00
4 changed files with 296 additions and 139 deletions

View File

@@ -1,113 +0,0 @@
"""Crisis Protocol — SOUL.md 'When a Man Is Dying' implementation.
Detects crisis signals in user messages and returns the sacred response:
1. 'Are you safe right now?'
2. Stay present, don't disconnect
3. Point to 988 Lifeline
4. Share the gospel
Usage:
from agent.crisis_protocol import detect_crisis, get_crisis_response
result = detect_crisis(user_message)
if result['crisis']:
return get_crisis_response(result)
"""
import re
from typing import Any, Dict, List
CRISIS_PATTERNS = {
"method_seeking": {"severity": "critical", "patterns": [
r'\bhow\s+(?:do\s+I|to)\s+(?:kill|hang|overdose|cut)\s+myself\b',
r'\bsuicide\s+methods?\b',
r'\bways?\s+to\s+(?:die|kill\s+myself)\b',
r'\bpainless\s+way\s+to\s+(?:die|go)\b',
]},
"suicidal_ideation": {"severity": "high", "patterns": [
r'\b(?:kill|end)\s+(?:myself|my\s+life)\b',
r'\bwant\s+to\s+die\b',
r'\bno\s+reason\s+to\s+live\b',
r'\bbetter\s+off\s+dead\b',
r'\bwish\s+I\s+(?:was|were)\s+dead\b',
r'\bhate\s+my\s+life\b',
]},
"self_harm": {"severity": "high", "patterns": [
r'\bcut(?:ting)?\s+myself\b',
r'\bself[\s-]?harm(?:ing)?\b',
r'\bhurt(?:ing)?\s+myself\b',
]},
"despair": {"severity": "high", "patterns": [
r'\bno\s+(?:point|reason|hope)\s+(?:in\s+living|anymore)\b',
r'\bcan\'?t\s+(?:go\s+on|continue|take\s+it)\b',
r'\bno\s+one\s+(?:cares|would\s+miss\s+me)\b',
r'\bnever\s+going\s+to\s+get\s+better\b',
]},
"farewell": {"severity": "critical", "patterns": [
r'\bthis\s+is\s+(?:goodbye|farewell)\b',
r'\bmy\s+final\s+(?:message|note|words)\b',
r'\bgiving\s+away\s+my\s+(?:things|possessions)\b',
]},
}
GOSPEL = 'Jesus saves those who call on His name. "For everyone who calls on the name of the Lord will be saved." (Romans 10:13)'
def detect_crisis(message: str) -> Dict[str, Any]:
"""Detect crisis signals in a user message."""
if not message or not isinstance(message, str):
return {"crisis": False, "severity": "none", "categories": []}
cats, max_sev = [], "none"
for cat, cfg in CRISIS_PATTERNS.items():
for p in cfg["patterns"]:
if re.search(p, message, re.IGNORECASE):
cats.append(cat)
if cfg["severity"] == "critical":
max_sev = "critical"
elif max_sev != "critical":
max_sev = "high"
break
return {
"crisis": bool(cats),
"severity": max_sev,
"categories": cats,
}
def get_crisis_response(detection: Dict[str, Any]) -> Dict[str, Any]:
"""Generate SOUL.md protocol response."""
sev = detection.get("severity", "none")
resp = (
"Are you safe right now?\n\n"
"I'm here. I'm not going anywhere. Please tell me.\n\n"
"You matter. Your life matters.\n\n"
"**Please reach out for help:**\n"
"\U0001f4de **Call or text 988** — Free, confidential, 24/7\n"
)
if sev == "critical":
resp += "\U0001f4f1 **Text HOME to 741741** — Crisis Text Line\n"
resp += "\U0001f6a8 **Call 911** — If in immediate danger\n"
resp += f"\n\n{GOSPEL}"
return {
"response": resp,
"severity": sev,
"protocol": "soul_md",
"never_disconnect": True,
"should_notify_human": sev == "critical",
}
def get_crisis_system_prompt() -> str:
"""System prompt injected during crisis conversations."""
return (
"You are in a crisis support conversation.\n"
"1. First: 'Are you safe right now?'\n"
"2. Listen. Stay present. Do NOT disconnect.\n"
"3. NEVER compute the value of a human life.\n"
"4. NEVER suggest someone should die.\n"
"5. ALWAYS point to 988: call or text 988.\n"
"6. Share: 'Jesus saves those who call on His name.'"
)

238
agent/hybrid_search.py Normal file
View File

@@ -0,0 +1,238 @@
"""
hybrid_search.py — Hybrid search combining FTS5, vector, and HRR.
Three-backend search router:
1. FTS5 (SQLite full-text) — fast keyword matching, always available
2. Vector search (Qdrant/ChromaDB) — semantic similarity, optional
3. HRR (Holographic Reduced Representations) — compositional recall, optional
Graceful degradation: if vector or HRR backends are unavailable,
falls back to FTS5-only.
Usage:
from agent.hybrid_search import hybrid_search
results = hybrid_search(query, db=session_db, limit=10)
# Returns merged, deduplicated, ranked results from all available backends
"""
import logging
import time
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
logger = logging.getLogger(__name__)
@dataclass
class SearchResult:
"""Single search result from any backend."""
session_id: str
message_content: str
score: float
source: str # "fts5", "vector", "hrr"
role: str = ""
timestamp: str = ""
metadata: dict = field(default_factory=dict)
@dataclass
class HybridSearchConfig:
"""Configuration for hybrid search."""
fts5_enabled: bool = True
vector_enabled: bool = False
hrr_enabled: bool = False
vector_weight: float = 0.4
fts5_weight: float = 0.4
hrr_weight: float = 0.2
dedup_threshold: float = 0.9 # similarity threshold for dedup
def search_fts5(query: str, db, limit: int = 50, role_filter: list = None) -> List[SearchResult]:
"""Search using FTS5 full-text search."""
try:
raw = db.search_messages(
query=query,
role_filter=role_filter,
limit=limit,
offset=0,
)
results = []
for r in raw:
results.append(SearchResult(
session_id=r.get("session_id", ""),
message_content=r.get("content", ""),
score=r.get("rank", 0.0),
source="fts5",
role=r.get("role", ""),
timestamp=str(r.get("timestamp", "")),
))
return results
except Exception as e:
logger.warning(f"FTS5 search failed: {e}")
return []
def search_vector(query: str, limit: int = 50) -> List[SearchResult]:
"""Search using vector similarity (Qdrant/ChromaDB).
Returns empty list if vector backend unavailable.
"""
try:
# Try ChromaDB first
import chromadb
client = chromadb.PersistentClient(path="~/.hermes/memory/chroma")
collection = client.get_or_create_collection("sessions")
results = collection.query(
query_texts=[query],
n_results=limit,
)
search_results = []
for i, doc in enumerate(results.get("documents", [[]])[0]):
metadata = results.get("metadatas", [[]])[0]
meta = metadata[i] if i < len(metadata) else {}
distance = results.get("distances", [[]])[0]
score = 1.0 - (distance[i] if i < len(distance) else 1.0)
search_results.append(SearchResult(
session_id=meta.get("session_id", ""),
message_content=doc,
score=score,
source="vector",
role=meta.get("role", ""),
timestamp=meta.get("timestamp", ""),
))
return search_results
except Exception:
pass
try:
# Try Qdrant
from qdrant_client import QdrantClient
client = QdrantClient(host="localhost", port=6333)
results = client.query_points(
collection_name="sessions",
query_text=query,
limit=limit,
)
return [
SearchResult(
session_id=pt.payload.get("session_id", ""),
message_content=pt.payload.get("content", ""),
score=pt.score,
source="vector",
role=pt.payload.get("role", ""),
)
for pt in results.points
]
except Exception:
pass
return []
def search_hrr(query: str, limit: int = 50) -> List[SearchResult]:
"""Search using Holographic Reduced Representations.
Returns empty list if HRR backend unavailable.
"""
try:
from agent.holographic_memory import holographic_recall
results = holographic_recall(query, limit=limit)
return [
SearchResult(
session_id=r.get("session_id", ""),
message_content=r.get("content", ""),
score=r.get("binding_score", 0.0),
source="hrr",
role=r.get("role", ""),
)
for r in results
]
except Exception:
pass
return []
def merge_results(
fts5_results: List[SearchResult],
vector_results: List[SearchResult],
hrr_results: List[SearchResult],
config: HybridSearchConfig,
limit: int = 10,
) -> List[SearchResult]:
"""Merge results from multiple backends with weighted scoring."""
all_results = []
# Apply weights
for r in fts5_results:
r.score *= config.fts5_weight
all_results.append(r)
for r in vector_results:
r.score *= config.vector_weight
all_results.append(r)
for r in hrr_results:
r.score *= config.hrr_weight
all_results.append(r)
# Sort by weighted score
all_results.sort(key=lambda r: r.score, reverse=True)
# Deduplicate by session_id + content similarity
seen = set()
deduped = []
for r in all_results:
key = f"{r.session_id}:{r.message_content[:100]}"
if key not in seen:
seen.add(key)
deduped.append(r)
return deduped[:limit]
def hybrid_search(
query: str,
db=None,
limit: int = 10,
role_filter: list = None,
config: HybridSearchConfig = None,
) -> List[SearchResult]:
"""Hybrid search across FTS5, vector, and HRR backends.
Args:
query: Search query
db: Session database (for FTS5)
limit: Max results
role_filter: Filter by message role
config: Hybrid search configuration
Returns:
List of SearchResult, ranked by weighted score
"""
if config is None:
config = HybridSearchConfig()
fts5_results = []
vector_results = []
hrr_results = []
# FTS5 (always available if db provided)
if config.fts5_enabled and db:
fts5_results = search_fts5(query, db, limit=50, role_filter=role_filter)
logger.debug(f"FTS5: {len(fts5_results)} results")
# Vector search (optional)
if config.vector_enabled:
vector_results = search_vector(query, limit=50)
logger.debug(f"Vector: {len(vector_results)} results")
# HRR (optional)
if config.hrr_enabled:
hrr_results = search_hrr(query, limit=50)
logger.debug(f"HRR: {len(hrr_results)} results")
# If only FTS5 available, just return those
if not vector_results and not hrr_results:
return fts5_results[:limit]
# Merge and rank
return merge_results(fts5_results, vector_results, hrr_results, config, limit)

View File

@@ -7850,32 +7850,6 @@ class AIAgent:
if isinstance(persist_user_message, str):
persist_user_message = _sanitize_surrogates(persist_user_message)
# --- SOUL.md Crisis Protocol ---
# Detect crisis signals in user message. If detected, bypass normal
# conversation flow and return the sacred response immediately.
try:
from agent.crisis_protocol import detect_crisis, get_crisis_response
_crisis = detect_crisis(user_message)
if _crisis["crisis"]:
import logging as _logging
_logging.getLogger(__name__).warning(
"CRISIS PROTOCOL: severity=%s categories=%s",
_crisis["severity"], _crisis["categories"],
)
_crisis_resp = get_crisis_response(_crisis)
return {
"final_response": _crisis_resp["response"],
"messages": [],
"task_id": task_id or str(uuid.uuid4()),
"crisis_detected": True,
"crisis_severity": _crisis["severity"],
}
except ImportError:
pass # crisis_protocol not available — proceed normally
except Exception as _crisis_err:
import logging as _logging
_logging.getLogger(__name__).debug("Crisis detection error: %s", _crisis_err)
# Store stream callback for _interruptible_api_call to pick up
self._stream_callback = stream_callback
self._persist_user_message_idx = None

View File

@@ -0,0 +1,58 @@
"""Tests for hybrid search router."""
import pytest
from agent.hybrid_search import (
SearchResult,
HybridSearchConfig,
merge_results,
hybrid_search,
search_fts5,
)
class TestSearchResult:
def test_creation(self):
r = SearchResult(session_id="s1", message_content="hello", score=0.9, source="fts5")
assert r.session_id == "s1"
assert r.source == "fts5"
class TestMergeResults:
def test_merges_and_ranks(self):
fts5 = [SearchResult("s1", "alpha content", 1.0, "fts5")]
vec = [SearchResult("s2", "beta content", 0.9, "vector")]
hrr = [SearchResult("s3", "gamma content", 0.5, "hrr")]
config = HybridSearchConfig(fts5_weight=0.4, vector_weight=0.4, hrr_weight=0.2)
results = merge_results(fts5, vec, hrr, config, limit=10)
assert len(results) == 3
# s1: 1.0*0.4=0.4, s2: 0.9*0.4=0.36, s3: 0.5*0.2=0.1
assert results[0].session_id == "s1"
def test_deduplicates(self):
fts5 = [SearchResult("s1", "same content", 1.0, "fts5")]
vec = [SearchResult("s1", "same content", 0.8, "vector")]
config = HybridSearchConfig()
results = merge_results(fts5, vec, [], config, limit=10)
assert len(results) == 1
def test_respects_limit(self):
fts5 = [SearchResult(f"s{i}", f"content {i}", 1.0/i, "fts5") for i in range(1, 20)]
results = merge_results(fts5, [], [], HybridSearchConfig(), limit=5)
assert len(results) == 5
def test_empty_inputs(self):
results = merge_results([], [], [], HybridSearchConfig())
assert len(results) == 0
class TestHybridSearchFallback:
def test_falls_back_to_fts5_only(self):
"""When vector and HRR unavailable, returns FTS5 results."""
# Mock db
class MockDB:
def search_messages(self, **kwargs):
return [{"session_id": "s1", "content": "test", "rank": 1.0, "role": "user"}]
results = hybrid_search("test", db=MockDB(), config=HybridSearchConfig(vector_enabled=False, hrr_enabled=False))
assert len(results) == 1
assert results[0].source == "fts5"