Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e63cdaf16f |
@@ -1,238 +0,0 @@
|
||||
"""
|
||||
hybrid_search.py — Hybrid search combining FTS5, vector, and HRR.
|
||||
|
||||
Three-backend search router:
|
||||
1. FTS5 (SQLite full-text) — fast keyword matching, always available
|
||||
2. Vector search (Qdrant/ChromaDB) — semantic similarity, optional
|
||||
3. HRR (Holographic Reduced Representations) — compositional recall, optional
|
||||
|
||||
Graceful degradation: if vector or HRR backends are unavailable,
|
||||
falls back to FTS5-only.
|
||||
|
||||
Usage:
|
||||
from agent.hybrid_search import hybrid_search
|
||||
|
||||
results = hybrid_search(query, db=session_db, limit=10)
|
||||
# Returns merged, deduplicated, ranked results from all available backends
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Dict, Any, Optional, Callable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchResult:
|
||||
"""Single search result from any backend."""
|
||||
session_id: str
|
||||
message_content: str
|
||||
score: float
|
||||
source: str # "fts5", "vector", "hrr"
|
||||
role: str = ""
|
||||
timestamp: str = ""
|
||||
metadata: dict = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HybridSearchConfig:
|
||||
"""Configuration for hybrid search."""
|
||||
fts5_enabled: bool = True
|
||||
vector_enabled: bool = False
|
||||
hrr_enabled: bool = False
|
||||
vector_weight: float = 0.4
|
||||
fts5_weight: float = 0.4
|
||||
hrr_weight: float = 0.2
|
||||
dedup_threshold: float = 0.9 # similarity threshold for dedup
|
||||
|
||||
|
||||
def search_fts5(query: str, db, limit: int = 50, role_filter: list = None) -> List[SearchResult]:
|
||||
"""Search using FTS5 full-text search."""
|
||||
try:
|
||||
raw = db.search_messages(
|
||||
query=query,
|
||||
role_filter=role_filter,
|
||||
limit=limit,
|
||||
offset=0,
|
||||
)
|
||||
results = []
|
||||
for r in raw:
|
||||
results.append(SearchResult(
|
||||
session_id=r.get("session_id", ""),
|
||||
message_content=r.get("content", ""),
|
||||
score=r.get("rank", 0.0),
|
||||
source="fts5",
|
||||
role=r.get("role", ""),
|
||||
timestamp=str(r.get("timestamp", "")),
|
||||
))
|
||||
return results
|
||||
except Exception as e:
|
||||
logger.warning(f"FTS5 search failed: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def search_vector(query: str, limit: int = 50) -> List[SearchResult]:
|
||||
"""Search using vector similarity (Qdrant/ChromaDB).
|
||||
|
||||
Returns empty list if vector backend unavailable.
|
||||
"""
|
||||
try:
|
||||
# Try ChromaDB first
|
||||
import chromadb
|
||||
client = chromadb.PersistentClient(path="~/.hermes/memory/chroma")
|
||||
collection = client.get_or_create_collection("sessions")
|
||||
results = collection.query(
|
||||
query_texts=[query],
|
||||
n_results=limit,
|
||||
)
|
||||
search_results = []
|
||||
for i, doc in enumerate(results.get("documents", [[]])[0]):
|
||||
metadata = results.get("metadatas", [[]])[0]
|
||||
meta = metadata[i] if i < len(metadata) else {}
|
||||
distance = results.get("distances", [[]])[0]
|
||||
score = 1.0 - (distance[i] if i < len(distance) else 1.0)
|
||||
search_results.append(SearchResult(
|
||||
session_id=meta.get("session_id", ""),
|
||||
message_content=doc,
|
||||
score=score,
|
||||
source="vector",
|
||||
role=meta.get("role", ""),
|
||||
timestamp=meta.get("timestamp", ""),
|
||||
))
|
||||
return search_results
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
# Try Qdrant
|
||||
from qdrant_client import QdrantClient
|
||||
client = QdrantClient(host="localhost", port=6333)
|
||||
results = client.query_points(
|
||||
collection_name="sessions",
|
||||
query_text=query,
|
||||
limit=limit,
|
||||
)
|
||||
return [
|
||||
SearchResult(
|
||||
session_id=pt.payload.get("session_id", ""),
|
||||
message_content=pt.payload.get("content", ""),
|
||||
score=pt.score,
|
||||
source="vector",
|
||||
role=pt.payload.get("role", ""),
|
||||
)
|
||||
for pt in results.points
|
||||
]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def search_hrr(query: str, limit: int = 50) -> List[SearchResult]:
|
||||
"""Search using Holographic Reduced Representations.
|
||||
|
||||
Returns empty list if HRR backend unavailable.
|
||||
"""
|
||||
try:
|
||||
from agent.holographic_memory import holographic_recall
|
||||
results = holographic_recall(query, limit=limit)
|
||||
return [
|
||||
SearchResult(
|
||||
session_id=r.get("session_id", ""),
|
||||
message_content=r.get("content", ""),
|
||||
score=r.get("binding_score", 0.0),
|
||||
source="hrr",
|
||||
role=r.get("role", ""),
|
||||
)
|
||||
for r in results
|
||||
]
|
||||
except Exception:
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def merge_results(
|
||||
fts5_results: List[SearchResult],
|
||||
vector_results: List[SearchResult],
|
||||
hrr_results: List[SearchResult],
|
||||
config: HybridSearchConfig,
|
||||
limit: int = 10,
|
||||
) -> List[SearchResult]:
|
||||
"""Merge results from multiple backends with weighted scoring."""
|
||||
all_results = []
|
||||
|
||||
# Apply weights
|
||||
for r in fts5_results:
|
||||
r.score *= config.fts5_weight
|
||||
all_results.append(r)
|
||||
for r in vector_results:
|
||||
r.score *= config.vector_weight
|
||||
all_results.append(r)
|
||||
for r in hrr_results:
|
||||
r.score *= config.hrr_weight
|
||||
all_results.append(r)
|
||||
|
||||
# Sort by weighted score
|
||||
all_results.sort(key=lambda r: r.score, reverse=True)
|
||||
|
||||
# Deduplicate by session_id + content similarity
|
||||
seen = set()
|
||||
deduped = []
|
||||
for r in all_results:
|
||||
key = f"{r.session_id}:{r.message_content[:100]}"
|
||||
if key not in seen:
|
||||
seen.add(key)
|
||||
deduped.append(r)
|
||||
|
||||
return deduped[:limit]
|
||||
|
||||
|
||||
def hybrid_search(
|
||||
query: str,
|
||||
db=None,
|
||||
limit: int = 10,
|
||||
role_filter: list = None,
|
||||
config: HybridSearchConfig = None,
|
||||
) -> List[SearchResult]:
|
||||
"""Hybrid search across FTS5, vector, and HRR backends.
|
||||
|
||||
Args:
|
||||
query: Search query
|
||||
db: Session database (for FTS5)
|
||||
limit: Max results
|
||||
role_filter: Filter by message role
|
||||
config: Hybrid search configuration
|
||||
|
||||
Returns:
|
||||
List of SearchResult, ranked by weighted score
|
||||
"""
|
||||
if config is None:
|
||||
config = HybridSearchConfig()
|
||||
|
||||
fts5_results = []
|
||||
vector_results = []
|
||||
hrr_results = []
|
||||
|
||||
# FTS5 (always available if db provided)
|
||||
if config.fts5_enabled and db:
|
||||
fts5_results = search_fts5(query, db, limit=50, role_filter=role_filter)
|
||||
logger.debug(f"FTS5: {len(fts5_results)} results")
|
||||
|
||||
# Vector search (optional)
|
||||
if config.vector_enabled:
|
||||
vector_results = search_vector(query, limit=50)
|
||||
logger.debug(f"Vector: {len(vector_results)} results")
|
||||
|
||||
# HRR (optional)
|
||||
if config.hrr_enabled:
|
||||
hrr_results = search_hrr(query, limit=50)
|
||||
logger.debug(f"HRR: {len(hrr_results)} results")
|
||||
|
||||
# If only FTS5 available, just return those
|
||||
if not vector_results and not hrr_results:
|
||||
return fts5_results[:limit]
|
||||
|
||||
# Merge and rank
|
||||
return merge_results(fts5_results, vector_results, hrr_results, config, limit)
|
||||
302
agent/self_modify.py
Normal file
302
agent/self_modify.py
Normal file
@@ -0,0 +1,302 @@
|
||||
"""Self-Modifying Prompt Engine — agent learns from its own failures.
|
||||
|
||||
Analyzes session transcripts, identifies failure patterns, and generates
|
||||
prompt patches to prevent future failures.
|
||||
|
||||
The loop: fail → analyze → rewrite → retry → verify improvement.
|
||||
|
||||
Usage:
|
||||
from agent.self_modify import PromptLearner
|
||||
learner = PromptLearner()
|
||||
patches = learner.analyze_session(session_id)
|
||||
learner.apply_patches(patches)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
PATCHES_DIR = HERMES_HOME / "prompt_patches"
|
||||
ROLLBACK_DIR = HERMES_HOME / "prompt_rollback"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FailurePattern:
|
||||
"""A detected failure pattern in session transcripts."""
|
||||
pattern_type: str # retry_loop, timeout, error_hallucination, context_loss
|
||||
description: str
|
||||
frequency: int
|
||||
example_messages: List[str] = field(default_factory=list)
|
||||
suggested_fix: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class PromptPatch:
|
||||
"""A modification to the system prompt based on failure analysis."""
|
||||
id: str
|
||||
failure_type: str
|
||||
original_rule: str
|
||||
new_rule: str
|
||||
confidence: float
|
||||
applied_at: Optional[float] = None
|
||||
reverted: bool = False
|
||||
|
||||
|
||||
# Failure detection patterns
|
||||
FAILURE_SIGNALS = {
|
||||
"retry_loop": {
|
||||
"patterns": [
|
||||
r"(?i)retry(?:ing)?\s*(?:attempt|again)",
|
||||
r"(?i)failed.*retrying",
|
||||
r"(?i)error.*again",
|
||||
r"(?i)attempt\s+\d+\s*(?:of|/)\s*\d+",
|
||||
],
|
||||
"description": "Agent stuck in retry loop",
|
||||
},
|
||||
"timeout": {
|
||||
"patterns": [
|
||||
r"(?i)timed?\s*out",
|
||||
r"(?i)deadline\s+exceeded",
|
||||
r"(?i)took\s+(?:too\s+)?long",
|
||||
],
|
||||
"description": "Operation timed out",
|
||||
},
|
||||
"hallucination": {
|
||||
"patterns": [
|
||||
r"(?i)i\s+(?:don't|do\s+not)\s+(?:have|see|find)\s+(?:any|that|this)\s+(?:information|data|file)",
|
||||
r"(?i)the\s+file\s+doesn't\s+exist",
|
||||
r"(?i)i\s+(?:made|invented|fabricated)\s+(?:that\s+up|this)",
|
||||
],
|
||||
"description": "Agent hallucinated or fabricated information",
|
||||
},
|
||||
"context_loss": {
|
||||
"patterns": [
|
||||
r"(?i)i\s+(?:don't|do\s+not)\s+(?:remember|recall|know)\s+(?:what|where|when|how)",
|
||||
r"(?i)could\s+you\s+remind\s+me",
|
||||
r"(?i)what\s+were\s+we\s+(?:doing|working|talking)\s+(?:on|about)",
|
||||
],
|
||||
"description": "Agent lost context from earlier in conversation",
|
||||
},
|
||||
"tool_failure": {
|
||||
"patterns": [
|
||||
r"(?i)tool\s+(?:call|execution)\s+failed",
|
||||
r"(?i)command\s+not\s+found",
|
||||
r"(?i)permission\s+denied",
|
||||
r"(?i)no\s+such\s+file",
|
||||
],
|
||||
"description": "Tool execution failed",
|
||||
},
|
||||
}
|
||||
|
||||
# Prompt improvement templates
|
||||
PROMPT_FIXES = {
|
||||
"retry_loop": (
|
||||
"If an operation fails more than twice, stop retrying. "
|
||||
"Report the failure and ask the user for guidance. "
|
||||
"Do not enter retry loops — they waste tokens."
|
||||
),
|
||||
"timeout": (
|
||||
"For operations that may take long, set a timeout and report "
|
||||
"progress. If an operation takes more than 30 seconds, report "
|
||||
"what you've done so far and ask if you should continue."
|
||||
),
|
||||
"hallucination": (
|
||||
"If you cannot find information, say 'I don't know' or "
|
||||
"'I couldn't find that.' Never fabricate information. "
|
||||
"If a file doesn't exist, say so — don't guess its contents."
|
||||
),
|
||||
"context_loss": (
|
||||
"When you need context from earlier in the conversation, "
|
||||
"use session_search to find it. Don't ask the user to repeat themselves."
|
||||
),
|
||||
"tool_failure": (
|
||||
"If a tool fails, check the error message and try a different approach. "
|
||||
"Don't retry the exact same command — diagnose first."
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class PromptLearner:
|
||||
"""Analyze session transcripts and generate prompt improvements."""
|
||||
|
||||
def __init__(self):
|
||||
PATCHES_DIR.mkdir(parents=True, exist_ok=True)
|
||||
ROLLBACK_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def analyze_session(self, session_data: dict) -> List[FailurePattern]:
|
||||
"""Analyze a session for failure patterns.
|
||||
|
||||
Args:
|
||||
session_data: Session dict with 'messages' list.
|
||||
|
||||
Returns:
|
||||
List of detected failure patterns.
|
||||
"""
|
||||
messages = session_data.get("messages", [])
|
||||
patterns_found: Dict[str, FailurePattern] = {}
|
||||
|
||||
for msg in messages:
|
||||
content = str(msg.get("content", ""))
|
||||
role = msg.get("role", "")
|
||||
|
||||
# Only analyze assistant messages and tool results
|
||||
if role not in ("assistant", "tool"):
|
||||
continue
|
||||
|
||||
for failure_type, config in FAILURE_SIGNALS.items():
|
||||
for pattern in config["patterns"]:
|
||||
if re.search(pattern, content):
|
||||
if failure_type not in patterns_found:
|
||||
patterns_found[failure_type] = FailurePattern(
|
||||
pattern_type=failure_type,
|
||||
description=config["description"],
|
||||
frequency=0,
|
||||
suggested_fix=PROMPT_FIXES.get(failure_type, ""),
|
||||
)
|
||||
patterns_found[failure_type].frequency += 1
|
||||
if len(patterns_found[failure_type].example_messages) < 3:
|
||||
patterns_found[failure_type].example_messages.append(
|
||||
content[:200]
|
||||
)
|
||||
break # One match per message per type is enough
|
||||
|
||||
return list(patterns_found.values())
|
||||
|
||||
def generate_patches(self, patterns: List[FailurePattern],
|
||||
min_confidence: float = 0.7) -> List[PromptPatch]:
|
||||
"""Generate prompt patches from failure patterns.
|
||||
|
||||
Args:
|
||||
patterns: Detected failure patterns.
|
||||
min_confidence: Minimum confidence to generate a patch.
|
||||
|
||||
Returns:
|
||||
List of prompt patches.
|
||||
"""
|
||||
patches = []
|
||||
for pattern in patterns:
|
||||
# Confidence based on frequency
|
||||
if pattern.frequency >= 3:
|
||||
confidence = 0.9
|
||||
elif pattern.frequency >= 2:
|
||||
confidence = 0.75
|
||||
else:
|
||||
confidence = 0.5
|
||||
|
||||
if confidence < min_confidence:
|
||||
continue
|
||||
|
||||
if not pattern.suggested_fix:
|
||||
continue
|
||||
|
||||
patch = PromptPatch(
|
||||
id=f"{pattern.pattern_type}-{int(time.time())}",
|
||||
failure_type=pattern.pattern_type,
|
||||
original_rule="(missing — no existing rule for this pattern)",
|
||||
new_rule=pattern.suggested_fix,
|
||||
confidence=confidence,
|
||||
)
|
||||
patches.append(patch)
|
||||
|
||||
return patches
|
||||
|
||||
def apply_patches(self, patches: List[PromptPatch],
|
||||
prompt_path: Optional[str] = None) -> int:
|
||||
"""Apply patches to the system prompt.
|
||||
|
||||
Args:
|
||||
patches: Patches to apply.
|
||||
prompt_path: Path to prompt file (default: ~/.hermes/system_prompt.md)
|
||||
|
||||
Returns:
|
||||
Number of patches applied.
|
||||
"""
|
||||
if prompt_path is None:
|
||||
prompt_path = str(HERMES_HOME / "system_prompt.md")
|
||||
|
||||
prompt_file = Path(prompt_path)
|
||||
|
||||
# Backup current prompt
|
||||
if prompt_file.exists():
|
||||
backup = ROLLBACK_DIR / f"{prompt_file.name}.{int(time.time())}.bak"
|
||||
backup.write_text(prompt_file.read_text())
|
||||
|
||||
# Read current prompt
|
||||
current = prompt_file.read_text() if prompt_file.exists() else ""
|
||||
|
||||
# Apply patches
|
||||
applied = 0
|
||||
additions = []
|
||||
for patch in patches:
|
||||
if patch.new_rule not in current:
|
||||
additions.append(f"\n## Auto-learned: {patch.failure_type}\n{patch.new_rule}")
|
||||
patch.applied_at = time.time()
|
||||
applied += 1
|
||||
|
||||
if additions:
|
||||
new_content = current + "\n".join(additions)
|
||||
prompt_file.write_text(new_content)
|
||||
|
||||
# Log patches
|
||||
patches_file = PATCHES_DIR / f"patches-{int(time.time())}.json"
|
||||
with open(patches_file, "w") as f:
|
||||
json.dump([p.__dict__ for p in patches], f, indent=2, default=str)
|
||||
|
||||
logger.info("Applied %d prompt patches", applied)
|
||||
return applied
|
||||
|
||||
def rollback_last(self, prompt_path: Optional[str] = None) -> bool:
|
||||
"""Rollback to the most recent backup.
|
||||
|
||||
Args:
|
||||
prompt_path: Path to prompt file.
|
||||
|
||||
Returns:
|
||||
True if rollback succeeded.
|
||||
"""
|
||||
if prompt_path is None:
|
||||
prompt_path = str(HERMES_HOME / "system_prompt.md")
|
||||
|
||||
backups = sorted(ROLLBACK_DIR.glob("*.bak"), reverse=True)
|
||||
if not backups:
|
||||
logger.warning("No backups to rollback to")
|
||||
return False
|
||||
|
||||
latest = backups[0]
|
||||
Path(prompt_path).write_text(latest.read_text())
|
||||
logger.info("Rolled back to %s", latest.name)
|
||||
return True
|
||||
|
||||
def learn_from_session(self, session_data: dict) -> Dict[str, Any]:
|
||||
"""Full learning cycle: analyze → patch → apply.
|
||||
|
||||
Args:
|
||||
session_data: Session dict.
|
||||
|
||||
Returns:
|
||||
Summary of what was learned and applied.
|
||||
"""
|
||||
patterns = self.analyze_session(session_data)
|
||||
patches = self.generate_patches(patterns)
|
||||
applied = self.apply_patches(patches)
|
||||
|
||||
return {
|
||||
"patterns_detected": len(patterns),
|
||||
"patches_generated": len(patches),
|
||||
"patches_applied": applied,
|
||||
"patterns": [
|
||||
{"type": p.pattern_type, "frequency": p.frequency, "description": p.description}
|
||||
for p in patterns
|
||||
],
|
||||
}
|
||||
@@ -1,58 +0,0 @@
|
||||
"""Tests for hybrid search router."""
|
||||
|
||||
import pytest
|
||||
from agent.hybrid_search import (
|
||||
SearchResult,
|
||||
HybridSearchConfig,
|
||||
merge_results,
|
||||
hybrid_search,
|
||||
search_fts5,
|
||||
)
|
||||
|
||||
|
||||
class TestSearchResult:
|
||||
def test_creation(self):
|
||||
r = SearchResult(session_id="s1", message_content="hello", score=0.9, source="fts5")
|
||||
assert r.session_id == "s1"
|
||||
assert r.source == "fts5"
|
||||
|
||||
|
||||
class TestMergeResults:
|
||||
def test_merges_and_ranks(self):
|
||||
fts5 = [SearchResult("s1", "alpha content", 1.0, "fts5")]
|
||||
vec = [SearchResult("s2", "beta content", 0.9, "vector")]
|
||||
hrr = [SearchResult("s3", "gamma content", 0.5, "hrr")]
|
||||
config = HybridSearchConfig(fts5_weight=0.4, vector_weight=0.4, hrr_weight=0.2)
|
||||
results = merge_results(fts5, vec, hrr, config, limit=10)
|
||||
assert len(results) == 3
|
||||
# s1: 1.0*0.4=0.4, s2: 0.9*0.4=0.36, s3: 0.5*0.2=0.1
|
||||
assert results[0].session_id == "s1"
|
||||
|
||||
def test_deduplicates(self):
|
||||
fts5 = [SearchResult("s1", "same content", 1.0, "fts5")]
|
||||
vec = [SearchResult("s1", "same content", 0.8, "vector")]
|
||||
config = HybridSearchConfig()
|
||||
results = merge_results(fts5, vec, [], config, limit=10)
|
||||
assert len(results) == 1
|
||||
|
||||
def test_respects_limit(self):
|
||||
fts5 = [SearchResult(f"s{i}", f"content {i}", 1.0/i, "fts5") for i in range(1, 20)]
|
||||
results = merge_results(fts5, [], [], HybridSearchConfig(), limit=5)
|
||||
assert len(results) == 5
|
||||
|
||||
def test_empty_inputs(self):
|
||||
results = merge_results([], [], [], HybridSearchConfig())
|
||||
assert len(results) == 0
|
||||
|
||||
|
||||
class TestHybridSearchFallback:
|
||||
def test_falls_back_to_fts5_only(self):
|
||||
"""When vector and HRR unavailable, returns FTS5 results."""
|
||||
# Mock db
|
||||
class MockDB:
|
||||
def search_messages(self, **kwargs):
|
||||
return [{"session_id": "s1", "content": "test", "rank": 1.0, "role": "user"}]
|
||||
|
||||
results = hybrid_search("test", db=MockDB(), config=HybridSearchConfig(vector_enabled=False, hrr_enabled=False))
|
||||
assert len(results) == 1
|
||||
assert results[0].source == "fts5"
|
||||
Reference in New Issue
Block a user