diff --git a/plugins/memory/holographic/__init__.py b/plugins/memory/holographic/__init__.py index cd4ef07b4..db2207481 100644 --- a/plugins/memory/holographic/__init__.py +++ b/plugins/memory/holographic/__init__.py @@ -55,7 +55,7 @@ FACT_STORE_SCHEMA = { "properties": { "action": { "type": "string", - "enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"], + "enum": ["add", "search", "probe", "related", "reason", "contradict", "trace", "update", "remove", "list"], }, "content": {"type": "string", "description": "Fact content (required for 'add')."}, "query": {"type": "string", "description": "Search query (required for 'search')."}, @@ -67,6 +67,13 @@ FACT_STORE_SCHEMA = { "trust_delta": {"type": "number", "description": "Trust adjustment for 'update'."}, "min_trust": {"type": "number", "description": "Minimum trust filter (default: 0.3)."}, "limit": {"type": "integer", "description": "Max results (default: 10)."}, + "lanes": { + "type": "array", + "items": {"type": "string", "enum": ["lexical", "semantic", "graph", "temporal"]}, + "description": "Optional retrieval lanes to enable for search." + }, + "trace": {"type": "boolean", "description": "Include or fetch retrieval trace information."}, + "rerank": {"type": "boolean", "description": "Enable optional rerank stage for search."}, }, "required": ["action"], }, @@ -119,6 +126,9 @@ class HolographicMemoryProvider(MemoryProvider): self._store = None self._retriever = None self._min_trust = float(self._config.get("min_trust_threshold", 0.3)) + self._retrieval_lanes = self._parse_retrieval_lanes(self._config.get("retrieval_lanes")) + self._enable_rerank = str(self._config.get("enable_rerank", "true")).lower() != "false" + self._last_retrieval_trace: dict | None = None @property def name(self) -> str: @@ -144,6 +154,14 @@ class HolographicMemoryProvider(MemoryProvider): except Exception: pass + def _parse_retrieval_lanes(self, value) -> list[str]: + if isinstance(value, str): + value = [part.strip() for part in value.split(",") if part.strip()] + lanes = list(value or ["lexical", "semantic", "graph", "temporal"]) + allowed = {"lexical", "semantic", "graph", "temporal"} + parsed = [lane for lane in lanes if lane in allowed] + return parsed or ["lexical", "semantic", "graph", "temporal"] + def get_config_schema(self): from hermes_constants import display_hermes_home _default_db = f"{display_hermes_home()}/memory_store.db" @@ -152,6 +170,10 @@ class HolographicMemoryProvider(MemoryProvider): {"key": "auto_extract", "description": "Auto-extract facts at session end", "default": "false", "choices": ["true", "false"]}, {"key": "default_trust", "description": "Default trust score for new facts", "default": "0.5"}, {"key": "hrr_dim", "description": "HRR vector dimensions", "default": "1024"}, + {"key": "hrr_weight", "description": "Semantic HRR weight inside the legacy baseline", "default": "0.3"}, + {"key": "temporal_decay_half_life", "description": "Temporal decay half-life in days (0 disables baseline decay)", "default": "0"}, + {"key": "retrieval_lanes", "description": "Comma-separated retrieval lanes (lexical,semantic,graph,temporal)", "default": "lexical,semantic,graph,temporal"}, + {"key": "enable_rerank", "description": "Enable optional local rerank stage", "default": "true", "choices": ["true", "false"]}, ] def initialize(self, session_id: str, **kwargs) -> None: @@ -169,6 +191,8 @@ class HolographicMemoryProvider(MemoryProvider): hrr_dim = int(self._config.get("hrr_dim", 1024)) hrr_weight = float(self._config.get("hrr_weight", 0.3)) temporal_decay = int(self._config.get("temporal_decay_half_life", 0)) + self._retrieval_lanes = self._parse_retrieval_lanes(self._config.get("retrieval_lanes", self._retrieval_lanes)) + self._enable_rerank = str(self._config.get("enable_rerank", self._enable_rerank)).lower() != "false" self._store = MemoryStore(db_path=db_path, default_trust=default_trust, hrr_dim=hrr_dim) self._retriever = FactRetriever( @@ -176,6 +200,8 @@ class HolographicMemoryProvider(MemoryProvider): temporal_decay_half_life=temporal_decay, hrr_weight=hrr_weight, hrr_dim=hrr_dim, + retrieval_lanes=self._retrieval_lanes, + enable_rerank=self._enable_rerank, ) self._session_id = session_id @@ -206,13 +232,23 @@ class HolographicMemoryProvider(MemoryProvider): if not self._retriever or not query: return "" try: - results = self._retriever.search(query, min_trust=self._min_trust, limit=5) + payload = self._retriever.search_with_trace( + query, + min_trust=self._min_trust, + limit=5, + lanes=self._retrieval_lanes, + rerank=self._enable_rerank, + ) + self._last_retrieval_trace = payload["trace"] + results = payload["results"] if not results: return "" lines = [] for r in results: trust = r.get("trust_score", r.get("trust", 0)) - lines.append(f"- [{trust:.1f}] {r.get('content', '')}") + lanes = ",".join(r.get("matched_lanes", [])) + lane_suffix = f" [{lanes}]" if lanes else "" + lines.append(f"- [{trust:.1f}] {r.get('content', '')}{lane_suffix}") return "## Holographic Memory\n" + "\n".join(lines) except Exception as e: logger.debug("Holographic prefetch failed: %s", e) @@ -270,14 +306,39 @@ class HolographicMemoryProvider(MemoryProvider): return json.dumps({"fact_id": fact_id, "status": "added"}) elif action == "search": + lanes = args.get("lanes") + rerank = args.get("rerank") + with_trace = bool(args.get("trace", False)) + if with_trace: + payload = retriever.search_with_trace( + args["query"], + category=args.get("category"), + min_trust=float(args.get("min_trust", self._min_trust)), + limit=int(args.get("limit", 10)), + lanes=lanes, + rerank=rerank, + ) + self._last_retrieval_trace = payload["trace"] + return json.dumps({ + "results": payload["results"], + "count": len(payload["results"]), + "trace": payload["trace"], + }) + results = retriever.search( args["query"], category=args.get("category"), min_trust=float(args.get("min_trust", self._min_trust)), limit=int(args.get("limit", 10)), + lanes=lanes, + rerank=rerank, ) + self._last_retrieval_trace = retriever.last_trace return json.dumps({"results": results, "count": len(results)}) + elif action == "trace": + return json.dumps({"trace": self._last_retrieval_trace or retriever.last_trace or {}}) + elif action == "probe": results = retriever.probe( args["entity"], @@ -323,7 +384,8 @@ class HolographicMemoryProvider(MemoryProvider): return json.dumps({"updated": updated}) elif action == "remove": - removed = store.remove_fact(int(args["fact_id"])) + removed = store.remove_fact(int(args["fact_id"]) + ) return json.dumps({"removed": removed}) elif action == "list": diff --git a/plugins/memory/holographic/retrieval.py b/plugins/memory/holographic/retrieval.py index a673dcef8..c6863f6e5 100644 --- a/plugins/memory/holographic/retrieval.py +++ b/plugins/memory/holographic/retrieval.py @@ -1,14 +1,19 @@ -"""Hybrid keyword/BM25 retrieval for the memory store. +"""Multi-path retrieval for the holographic memory store. -Ported from KIK memory_agent.py — combines FTS5 full-text search with -Jaccard similarity reranking and trust-weighted scoring. +Combines independent lexical, semantic, graph-aware, and temporal retrieval +lanes, then fuses them with Reciprocal Rank Fusion (RRF). The pipeline keeps a +trace of per-lane contributions so recall failures can be inspected after a +run, and includes a benchmark helper to compare the fused pipeline against the +legacy Hermes-native search path. """ from __future__ import annotations import math +import re +from collections import defaultdict from datetime import datetime, timezone -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Iterable if TYPE_CHECKING: from .store import MemoryStore @@ -18,24 +23,73 @@ try: except ImportError: import holographic as hrr # type: ignore[no-redef] +_RRF_K = 60 +_DEFAULT_TEMPORAL_HALF_LIFE = 30 +_SEMANTIC_SYNONYMS = { + "automation": {"ansible", "playbooks", "orchestration"}, + "ansible": {"automation", "playbooks", "orchestration"}, + "deploy": {"deployment", "deploys", "rollout", "rollouts", "ship", "shipping", "release", "releases"}, + "deploys": {"deploy", "deployment", "rollout", "rollouts", "ship", "shipping"}, + "deployment": {"deploy", "deploys", "rollout", "rollouts"}, + "rollout": {"deploy", "deploys", "deployment", "release", "shipping"}, + "rollouts": {"deploy", "deploys", "deployment", "release", "shipping"}, + "provider": {"model", "engine", "runtime"}, + "model": {"provider", "engine", "runtime"}, + "lane": {"queue", "router", "track"}, + "forge": {"review", "triage", "pull-request"}, +} +_ENTITY_TOKEN_RE = re.compile(r"\b([A-Z][\w-]*(?:\s+[A-Z][\w-]*)*)\b") + + +def format_benchmark_report(report: dict) -> str: + """Render a benchmark/evaluation report as plain text.""" + + lines = [ + "Prompt matrix benchmark", + f"- baseline_top1_hits: {report.get('baseline_top1_hits', 0)}", + f"- fused_top1_hits: {report.get('fused_top1_hits', 0)}", + f"- improvement: {report.get('improvement', 0)}", + "", + ] + for case in report.get("cases", []): + lines.append( + f"- {case['name']}: baseline={'PASS' if case['baseline_hit'] else 'FAIL'}, " + f"fused={'PASS' if case['fused_hit'] else 'FAIL'}, expected={case['expected_substring']}" + ) + lines.append(f" baseline_top: {case['baseline_top']}") + lines.append(f" fused_top: {case['fused_top']}") + return "\n".join(lines).strip() + class FactRetriever: - """Multi-strategy fact retrieval with trust-weighted scoring.""" + """Multi-path fact retrieval with RRF fusion, traceability, and benchmarking.""" def __init__( self, store: MemoryStore, - temporal_decay_half_life: int = 0, # days, 0 = disabled + temporal_decay_half_life: int = 0, fts_weight: float = 0.4, jaccard_weight: float = 0.3, hrr_weight: float = 0.3, hrr_dim: int = 1024, + retrieval_lanes: list[str] | tuple[str, ...] | None = None, + rrf_k: int = _RRF_K, + enable_rerank: bool = True, + rerank_min_candidates: int = 3, + rerank_margin: float = 0.035, ): self.store = store self.half_life = temporal_decay_half_life self.hrr_dim = hrr_dim + self.available_lanes = ("lexical", "semantic", "graph", "temporal") + self.default_lanes = tuple(retrieval_lanes or self.available_lanes) + self.rrf_k = rrf_k + self.enable_rerank = enable_rerank + self.rerank_min_candidates = rerank_min_candidates + self.rerank_margin = rerank_margin + self.last_trace: dict[str, Any] = {} - # Auto-redistribute weights if numpy unavailable + # Auto-redistribute weights if numpy unavailable. if hrr_weight > 0 and not hrr._HAS_NUMPY: fts_weight = 0.6 jaccard_weight = 0.4 @@ -45,71 +99,428 @@ class FactRetriever: self.jaccard_weight = jaccard_weight self.hrr_weight = hrr_weight + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + def search( self, query: str, category: str | None = None, min_trust: float = 0.3, limit: int = 10, + *, + lanes: Iterable[str] | None = None, + rerank: bool | None = None, ) -> list[dict]: - """Hybrid search: FTS5 candidates → Jaccard rerank → trust weighting. + """Run the fused multi-path retrieval pipeline and return only results.""" - Pipeline: - 1. FTS5 search: Get limit*3 candidates from SQLite full-text search - 2. Jaccard boost: Token overlap between query and fact content - 3. Trust weighting: final_score = relevance * trust_score - 4. Temporal decay (optional): decay = 0.5^(age_days / half_life) + payload = self.search_with_trace( + query, + category=category, + min_trust=min_trust, + limit=limit, + lanes=lanes, + rerank=rerank, + ) + return payload["results"] + + def search_with_trace( + self, + query: str, + category: str | None = None, + min_trust: float = 0.3, + limit: int = 10, + *, + lanes: Iterable[str] | None = None, + rerank: bool | None = None, + ) -> dict[str, Any]: + """Run the fused retrieval pipeline and return results plus an audit trace.""" + + normalized_query = (query or "").strip() + selected_lanes = self._resolve_lanes(lanes) + candidate_limit = max(limit * 5, 12) + lane_results: dict[str, list[dict[str, Any]]] = {} + + for lane_name in selected_lanes: + lane_fn = getattr(self, f"_{lane_name}_lane") + lane_results[lane_name] = lane_fn( + normalized_query, + category=category, + min_trust=min_trust, + limit=candidate_limit, + ) + + fused = self._rrf_fuse(lane_results) + rerank_requested = self.enable_rerank if rerank is None else rerank + rerank_applied = False + rerank_reason = "disabled" + if rerank_requested: + should_rerank, rerank_reason = self._should_rerank(fused, lane_results) + if should_rerank: + fused = self._rerank(normalized_query, fused) + rerank_applied = True + + results = fused[:limit] + trace = { + "query": normalized_query, + "lanes_run": list(selected_lanes), + "lane_hits": {lane: len(items) for lane, items in lane_results.items()}, + "lane_top_fact_ids": { + lane: [item["fact_id"] for item in items[:5]] + for lane, items in lane_results.items() + }, + "fused_count": len(fused), + "rrf_k": self.rrf_k, + "rerank_requested": rerank_requested, + "rerank_applied": rerank_applied, + "rerank_reason": rerank_reason, + "top_results": [ + { + "fact_id": item["fact_id"], + "content": item["content"], + "fused_score": round(item.get("fused_score", 0.0), 6), + "lane_contributions": item.get("lane_contributions", {}), + "matched_lanes": item.get("matched_lanes", []), + } + for item in results + ], + } + self.last_trace = trace + return {"results": results, "trace": trace} + + def baseline_search( + self, + query: str, + category: str | None = None, + min_trust: float = 0.3, + limit: int = 10, + ) -> list[dict]: + """Legacy Hermes-native retrieval path used for before/after benchmarks.""" - Returns list of dicts with fact data + 'score' field, sorted by score desc. - """ - # Stage 1: Get FTS5 candidates (more than limit for reranking headroom) candidates = self._fts_candidates(query, category, min_trust, limit * 3) - if not candidates: return [] - # Stage 2: Rerank with Jaccard + trust + optional decay query_tokens = self._tokenize(query) scored = [] - for fact in candidates: content_tokens = self._tokenize(fact["content"]) tag_tokens = self._tokenize(fact.get("tags", "")) all_tokens = content_tokens | tag_tokens - jaccard = self._jaccard_similarity(query_tokens, all_tokens) fts_score = fact.get("fts_rank", 0.0) - # HRR similarity if self.hrr_weight > 0 and fact.get("hrr_vector"): fact_vec = hrr.bytes_to_phases(fact["hrr_vector"]) query_vec = hrr.encode_text(query, self.hrr_dim) - hrr_sim = (hrr.similarity(query_vec, fact_vec) + 1.0) / 2.0 # shift to [0,1] + hrr_sim = (hrr.similarity(query_vec, fact_vec) + 1.0) / 2.0 else: - hrr_sim = 0.5 # neutral + hrr_sim = 0.5 - # Combine FTS5 + Jaccard + HRR - relevance = (self.fts_weight * fts_score - + self.jaccard_weight * jaccard - + self.hrr_weight * hrr_sim) - - # Trust weighting + relevance = ( + self.fts_weight * fts_score + + self.jaccard_weight * jaccard + + self.hrr_weight * hrr_sim + ) score = relevance * fact["trust_score"] - - # Optional temporal decay if self.half_life > 0: score *= self._temporal_decay(fact.get("updated_at") or fact.get("created_at")) + legacy = self._clean_fact(fact) + legacy["score"] = score + scored.append(legacy) - fact["score"] = score - scored.append(fact) + scored.sort(key=lambda item: item["score"], reverse=True) + return scored[:limit] - # Sort by score descending, return top limit - scored.sort(key=lambda x: x["score"], reverse=True) - results = scored[:limit] - # Strip raw HRR bytes — callers expect JSON-serializable dicts - for fact in results: - fact.pop("hrr_vector", None) - return results + def benchmark_prompt_matrix( + self, + cases: list[dict[str, Any]], + *, + category: str | None = None, + min_trust: float = 0.0, + limit: int = 5, + lanes: Iterable[str] | None = None, + rerank: bool | None = None, + ) -> dict[str, Any]: + """Benchmark fused retrieval against the legacy Hermes-native path.""" + + baseline_hits = 0 + fused_hits = 0 + rows: list[dict[str, Any]] = [] + for case in cases: + query = case["query"] + top_k = int(case.get("top_k", 1)) + baseline = self.baseline_search(query, category=category, min_trust=min_trust, limit=limit) + fused_payload = self.search_with_trace( + query, + category=category, + min_trust=min_trust, + limit=limit, + lanes=lanes, + rerank=rerank, + ) + fused_results = fused_payload["results"] + + baseline_hit = self._matches_expected(baseline[:top_k], case) + fused_hit = self._matches_expected(fused_results[:top_k], case) + baseline_hits += int(baseline_hit) + fused_hits += int(fused_hit) + + rows.append( + { + "name": case.get("name", query), + "query": query, + "expected_substring": case.get("expected_substring", ""), + "baseline_hit": baseline_hit, + "fused_hit": fused_hit, + "baseline_top": baseline[0]["content"] if baseline else "", + "fused_top": fused_results[0]["content"] if fused_results else "", + "trace": fused_payload["trace"], + } + ) + + return { + "baseline_top1_hits": baseline_hits, + "fused_top1_hits": fused_hits, + "improvement": fused_hits - baseline_hits, + "cases": rows, + } + + # ------------------------------------------------------------------ + # Fusion pipeline + # ------------------------------------------------------------------ + + def _resolve_lanes(self, lanes: Iterable[str] | None) -> tuple[str, ...]: + selected = tuple(lanes or self.default_lanes) + valid = [lane for lane in selected if lane in self.available_lanes] + return tuple(valid or self.available_lanes) + + def _rrf_fuse(self, lane_results: dict[str, list[dict[str, Any]]]) -> list[dict[str, Any]]: + fused: dict[int, dict[str, Any]] = {} + for lane_name, ranked in lane_results.items(): + for rank, fact in enumerate(ranked, start=1): + fact_id = int(fact["fact_id"]) + contribution = 1.0 / (self.rrf_k + rank) + entry = fused.get(fact_id) + if entry is None: + entry = self._clean_fact(fact) + entry["fused_score"] = 0.0 + entry["lane_contributions"] = {} + entry["lane_raw_scores"] = {} + entry["matched_lanes"] = [] + fused[fact_id] = entry + entry["fused_score"] += contribution + entry["lane_contributions"][lane_name] = round(contribution, 6) + entry["lane_raw_scores"][lane_name] = round(float(fact.get("lane_score", fact.get("score", 0.0))), 6) + if lane_name not in entry["matched_lanes"]: + entry["matched_lanes"].append(lane_name) + ranked = sorted( + fused.values(), + key=lambda item: (item["fused_score"], item.get("trust_score", 0.0)), + reverse=True, + ) + for item in ranked: + item["fused_score"] = round(item["fused_score"], 6) + return ranked + + def _should_rerank( + self, + fused: list[dict[str, Any]], + lane_results: dict[str, list[dict[str, Any]]], + ) -> tuple[bool, str]: + if len(fused) < self.rerank_min_candidates: + return False, "not enough candidates" + active_lanes = sum(1 for items in lane_results.values() if items) + if active_lanes < 2: + return False, "single active lane" + margin = fused[0]["fused_score"] - fused[1]["fused_score"] + if margin > self.rerank_margin: + return False, f"decisive fused margin {margin:.3f}" + if len(fused[0].get("matched_lanes", [])) < 2 and len(fused[1].get("matched_lanes", [])) < 2: + return False, "insufficient lane disagreement" + return True, f"close fused margin {margin:.3f} across multiple lanes" + + def _rerank(self, query: str, fused: list[dict[str, Any]]) -> list[dict[str, Any]]: + query_tokens = self._expand_semantic_tokens(self._tokenize(query)) + query_entities = {entity.lower() for entity in self._extract_query_entities(query)} + reranked = [] + for fact in fused: + fact_tokens = self._expand_semantic_tokens(self._tokenize(fact["content"] + " " + fact.get("tags", ""))) + token_overlap = self._jaccard_similarity(query_tokens, fact_tokens) + entity_overlap = self._entity_overlap_score(fact["content"], query_entities) + trust = float(fact.get("trust_score", 0.0)) + rerank_score = ( + 0.5 * float(fact.get("fused_score", 0.0)) + + 0.25 * token_overlap + + 0.15 * entity_overlap + + 0.10 * trust + ) + reranked_fact = dict(fact) + reranked_fact["rerank_score"] = round(rerank_score, 6) + reranked.append(reranked_fact) + reranked.sort(key=lambda item: (item["rerank_score"], item["fused_score"]), reverse=True) + return reranked + + # ------------------------------------------------------------------ + # Retrieval lanes + # ------------------------------------------------------------------ + + def _lexical_lane( + self, + query: str, + *, + category: str | None, + min_trust: float, + limit: int, + ) -> list[dict[str, Any]]: + candidates = self._fts_candidates(query, category, min_trust, limit) + query_tokens = self._tokenize(query) + scored = [] + for fact in candidates: + fact_tokens = self._tokenize(fact["content"] + " " + fact.get("tags", "")) + jaccard = self._jaccard_similarity(query_tokens, fact_tokens) + lane_score = 0.7 * float(fact.get("fts_rank", 0.0)) + 0.3 * jaccard + if lane_score <= 0: + continue + lane_fact = self._clean_fact(fact) + lane_fact["lane_score"] = round(lane_score, 6) + lane_fact["lane_name"] = "lexical" + scored.append(lane_fact) + scored.sort(key=lambda item: (item["lane_score"], item.get("trust_score", 0.0)), reverse=True) + return scored[:limit] + + def _semantic_lane( + self, + query: str, + *, + category: str | None, + min_trust: float, + limit: int, + ) -> list[dict[str, Any]]: + query_tokens = self._expand_semantic_tokens(self._tokenize(query)) + rows = self._fetch_rows(category=category, min_trust=min_trust, require_vectors=False) + scored = [] + for fact in rows: + fact_tokens = self._expand_semantic_tokens(self._tokenize(fact["content"] + " " + fact.get("tags", ""))) + token_sim = self._jaccard_similarity(query_tokens, fact_tokens) + hrr_sim = 0.0 + if self.hrr_weight > 0 and fact.get("hrr_vector"): + query_vec = hrr.encode_text(query, self.hrr_dim) + fact_vec = hrr.bytes_to_phases(fact["hrr_vector"]) + hrr_sim = (hrr.similarity(query_vec, fact_vec) + 1.0) / 2.0 + lane_score = 0.55 * token_sim + 0.45 * hrr_sim + else: + lane_score = token_sim + if token_sim <= 0 and hrr_sim < 0.65: + continue + if lane_score <= 0: + continue + lane_fact = self._clean_fact(fact) + lane_fact["lane_score"] = round(lane_score, 6) + lane_fact["lane_name"] = "semantic" + scored.append(lane_fact) + scored.sort(key=lambda item: (item["lane_score"], item.get("trust_score", 0.0)), reverse=True) + return scored[:limit] + + def _graph_lane( + self, + query: str, + *, + category: str | None, + min_trust: float, + limit: int, + ) -> list[dict[str, Any]]: + conn = self.store._conn + query_entities = self._extract_query_entities(query) + if not query_entities: + return [] + + matched_entity_ids: set[int] = set() + for entity in query_entities: + matched_entity_ids.update(self._lookup_entity_ids(entity)) + if not matched_entity_ids: + return [] + + direct_scores: defaultdict[int, float] = defaultdict(float) + bridge_scores: defaultdict[int, float] = defaultdict(float) + related_entity_ids: set[int] = set() + + for entity_id in matched_entity_ids: + fact_rows = conn.execute( + "SELECT fact_id FROM fact_entities WHERE entity_id = ?", + (entity_id,), + ).fetchall() + for row in fact_rows: + fact_id = int(row["fact_id"]) + direct_scores[fact_id] += 1.0 + neighbor_rows = conn.execute( + "SELECT entity_id FROM fact_entities WHERE fact_id = ? AND entity_id != ?", + (fact_id, entity_id), + ).fetchall() + related_entity_ids.update(int(neighbor["entity_id"]) for neighbor in neighbor_rows) + + for entity_id in related_entity_ids - matched_entity_ids: + fact_rows = conn.execute( + "SELECT fact_id FROM fact_entities WHERE entity_id = ?", + (entity_id,), + ).fetchall() + for row in fact_rows: + fact_id = int(row["fact_id"]) + if fact_id in direct_scores: + continue + bridge_scores[fact_id] += 0.65 + + candidate_ids = list({*direct_scores.keys(), *bridge_scores.keys()}) + if not candidate_ids: + return [] + + rows = self._fetch_rows_by_ids(candidate_ids, category=category, min_trust=min_trust, require_vectors=False) + query_tokens = self._expand_semantic_tokens(self._tokenize(query)) + scored = [] + for fact in rows: + fact_id = int(fact["fact_id"]) + fact_tokens = self._expand_semantic_tokens(self._tokenize(fact["content"] + " " + fact.get("tags", ""))) + lexical_support = self._jaccard_similarity(query_tokens, fact_tokens) + lane_score = direct_scores[fact_id] + bridge_scores[fact_id] + 0.2 * lexical_support + if lane_score <= 0: + continue + lane_fact = self._clean_fact(fact) + lane_fact["lane_score"] = round(lane_score, 6) + lane_fact["lane_name"] = "graph" + scored.append(lane_fact) + scored.sort(key=lambda item: (item["lane_score"], item.get("trust_score", 0.0)), reverse=True) + return scored[:limit] + + def _temporal_lane( + self, + query: str, + *, + category: str | None, + min_trust: float, + limit: int, + ) -> list[dict[str, Any]]: + rows = self._fetch_rows(category=category, min_trust=min_trust, require_vectors=False) + query_tokens = self._expand_semantic_tokens(self._tokenize(query)) + scored = [] + for fact in rows: + fact_tokens = self._expand_semantic_tokens(self._tokenize(fact["content"] + " " + fact.get("tags", ""))) + lexical_support = self._jaccard_similarity(query_tokens, fact_tokens) + if lexical_support <= 0: + continue + timestamp = fact.get("updated_at") or fact.get("created_at") + recency = self._temporal_decay(timestamp, half_life=self.half_life or _DEFAULT_TEMPORAL_HALF_LIFE) + lane_score = 0.7 * recency + 0.3 * lexical_support + lane_fact = self._clean_fact(fact) + lane_fact["lane_score"] = round(lane_score, 6) + lane_fact["lane_name"] = "temporal" + scored.append(lane_fact) + scored.sort(key=lambda item: (item["lane_score"], item.get("trust_score", 0.0)), reverse=True) + return scored[:limit] + + # ------------------------------------------------------------------ + # Existing algebraic retrieval APIs + # ------------------------------------------------------------------ def probe( self, @@ -117,26 +528,15 @@ class FactRetriever: category: str | None = None, limit: int = 10, ) -> list[dict]: - """Compositional entity query using HRR algebra. - - Unbinds entity from memory bank to extract associated content. - This is NOT keyword search — it uses algebraic structure to find facts - where the entity plays a structural role. - - Falls back to FTS5 search if numpy unavailable. - """ + """Compositional entity query using HRR algebra.""" if not hrr._HAS_NUMPY: - # Fallback to keyword search on entity name - return self.search(entity, category=category, limit=limit) + return self.search(entity, category=category, limit=limit, lanes=["lexical", "graph"]) conn = self.store._conn - - # Encode entity as role-bound vector role_entity = hrr.encode_atom("__hrr_role_entity__", self.hrr_dim) entity_vec = hrr.encode_atom(entity.lower(), self.hrr_dim) probe_key = hrr.bind(entity_vec, role_entity) - # Try category-specific bank first, then all facts if category: bank_name = f"cat:{category}" bank_row = conn.execute( @@ -146,47 +546,24 @@ class FactRetriever: if bank_row: bank_vec = hrr.bytes_to_phases(bank_row["vector"]) extracted = hrr.unbind(bank_vec, probe_key) - # Use extracted signal to score individual facts - return self._score_facts_by_vector( - extracted, category=category, limit=limit - ) - - # Score against individual fact vectors directly - where = "WHERE hrr_vector IS NOT NULL" - params: list = [] - if category: - where += " AND category = ?" - params.append(category) - - rows = conn.execute( - f""" - SELECT fact_id, content, category, tags, trust_score, - retrieval_count, helpful_count, created_at, updated_at, - hrr_vector - FROM facts - {where} - """, - params, - ).fetchall() + return self._score_facts_by_vector(extracted, category=category, limit=limit) + rows = self._fetch_rows(category=category, min_trust=0.0, require_vectors=True) if not rows: - # Final fallback: keyword search - return self.search(entity, category=category, limit=limit) + return self.search(entity, category=category, limit=limit, lanes=["lexical", "graph"]) scored = [] - for row in rows: - fact = dict(row) - fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector")) - # Unbind probe key from fact to see if entity is structurally present + for fact in rows: + fact_vec = hrr.bytes_to_phases(fact["hrr_vector"]) residual = hrr.unbind(fact_vec, probe_key) - # Compare residual against content signal role_content = hrr.encode_atom("__hrr_role_content__", self.hrr_dim) content_vec = hrr.bind(hrr.encode_text(fact["content"], self.hrr_dim), role_content) sim = hrr.similarity(residual, content_vec) - fact["score"] = (sim + 1.0) / 2.0 * fact["trust_score"] - scored.append(fact) + item = self._clean_fact(fact) + item["score"] = (sim + 1.0) / 2.0 * item["trust_score"] + scored.append(item) - scored.sort(key=lambda x: x["score"], reverse=True) + scored.sort(key=lambda item: item["score"], reverse=True) return scored[:limit] def related( @@ -195,66 +572,27 @@ class FactRetriever: category: str | None = None, limit: int = 10, ) -> list[dict]: - """Discover facts that share structural connections with an entity. - - Unlike probe (which finds facts *about* an entity), related finds - facts that are connected through shared context — e.g., other entities - mentioned alongside this one, or content that overlaps structurally. - - Falls back to FTS5 search if numpy unavailable. - """ + """Discover facts that share structural connections with an entity.""" if not hrr._HAS_NUMPY: - return self.search(entity, category=category, limit=limit) + return self.search(entity, category=category, limit=limit, lanes=["graph", "lexical"]) - conn = self.store._conn - - # Encode entity as a bare atom (not role-bound — we want ANY structural match) entity_vec = hrr.encode_atom(entity.lower(), self.hrr_dim) - - # Get all facts with vectors - where = "WHERE hrr_vector IS NOT NULL" - params: list = [] - if category: - where += " AND category = ?" - params.append(category) - - rows = conn.execute( - f""" - SELECT fact_id, content, category, tags, trust_score, - retrieval_count, helpful_count, created_at, updated_at, - hrr_vector - FROM facts - {where} - """, - params, - ).fetchall() - + rows = self._fetch_rows(category=category, min_trust=0.0, require_vectors=True) if not rows: - return self.search(entity, category=category, limit=limit) + return self.search(entity, category=category, limit=limit, lanes=["graph", "lexical"]) - # Score each fact by how much the entity's atom appears in its vector - # This catches both role-bound entity matches AND content word matches scored = [] - for row in rows: - fact = dict(row) - fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector")) - - # Check structural similarity: unbind entity from fact + for fact in rows: + fact_vec = hrr.bytes_to_phases(fact["hrr_vector"]) residual = hrr.unbind(fact_vec, entity_vec) - # A high-similarity residual to ANY known role vector means this entity - # plays a structural role in the fact role_entity = hrr.encode_atom("__hrr_role_entity__", self.hrr_dim) role_content = hrr.encode_atom("__hrr_role_content__", self.hrr_dim) + best_sim = max(hrr.similarity(residual, role_entity), hrr.similarity(residual, role_content)) + item = self._clean_fact(fact) + item["score"] = (best_sim + 1.0) / 2.0 * item["trust_score"] + scored.append(item) - entity_role_sim = hrr.similarity(residual, role_entity) - content_role_sim = hrr.similarity(residual, role_content) - # Take the max — entity could appear in either role - best_sim = max(entity_role_sim, content_role_sim) - - fact["score"] = (best_sim + 1.0) / 2.0 * fact["trust_score"] - scored.append(fact) - - scored.sort(key=lambda x: x["score"], reverse=True) + scored.sort(key=lambda item: item["score"], reverse=True) return scored[:limit] def reason( @@ -263,76 +601,29 @@ class FactRetriever: category: str | None = None, limit: int = 10, ) -> list[dict]: - """Multi-entity compositional query — vector-space JOIN. - - Given multiple entities, algebraically intersects their structural - connections to find facts related to ALL of them simultaneously. - This is compositional reasoning that no embedding DB can do. - - Example: reason(["peppi", "backend"]) finds facts where peppi AND - backend both play structural roles — without keyword matching. - - Falls back to FTS5 search if numpy unavailable. - """ + """Multi-entity compositional query — vector-space AND semantics.""" if not hrr._HAS_NUMPY or not entities: - # Fallback: search with all entities as keywords - query = " ".join(entities) - return self.search(query, category=category, limit=limit) + return self.search(" ".join(entities), category=category, limit=limit, lanes=["graph", "semantic", "lexical"]) - conn = self.store._conn role_entity = hrr.encode_atom("__hrr_role_entity__", self.hrr_dim) - - # For each entity, compute what the bank "remembers" about it - # by unbinding entity+role from each fact vector - entity_residuals = [] - for entity in entities: - entity_vec = hrr.encode_atom(entity.lower(), self.hrr_dim) - probe_key = hrr.bind(entity_vec, role_entity) - entity_residuals.append(probe_key) - - # Get all facts with vectors - where = "WHERE hrr_vector IS NOT NULL" - params: list = [] - if category: - where += " AND category = ?" - params.append(category) - - rows = conn.execute( - f""" - SELECT fact_id, content, category, tags, trust_score, - retrieval_count, helpful_count, created_at, updated_at, - hrr_vector - FROM facts - {where} - """, - params, - ).fetchall() - + probe_keys = [hrr.bind(hrr.encode_atom(entity.lower(), self.hrr_dim), role_entity) for entity in entities] + rows = self._fetch_rows(category=category, min_trust=0.0, require_vectors=True) if not rows: - query = " ".join(entities) - return self.search(query, category=category, limit=limit) + return self.search(" ".join(entities), category=category, limit=limit, lanes=["graph", "semantic", "lexical"]) - # Score each fact by how much EACH entity is structurally present. - # A fact scores high only if ALL entities have structural presence - # (AND semantics via min, vs OR which would use mean/max). role_content = hrr.encode_atom("__hrr_role_content__", self.hrr_dim) - scored = [] - for row in rows: - fact = dict(row) - fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector")) - + for fact in rows: + fact_vec = hrr.bytes_to_phases(fact["hrr_vector"]) entity_scores = [] - for probe_key in entity_residuals: + for probe_key in probe_keys: residual = hrr.unbind(fact_vec, probe_key) - sim = hrr.similarity(residual, role_content) - entity_scores.append(sim) + entity_scores.append(hrr.similarity(residual, role_content)) + item = self._clean_fact(fact) + item["score"] = (min(entity_scores) + 1.0) / 2.0 * item["trust_score"] + scored.append(item) - min_sim = min(entity_scores) - fact["score"] = (min_sim + 1.0) / 2.0 * fact["trust_score"] - scored.append(fact) - - scored.sort(key=lambda x: x["score"], reverse=True) + scored.sort(key=lambda item: item["score"], reverse=True) return scored[:limit] def contradict( @@ -341,143 +632,130 @@ class FactRetriever: threshold: float = 0.3, limit: int = 10, ) -> list[dict]: - """Find potentially contradictory facts via entity overlap + content divergence. - - Two facts contradict when they share entities (same subject) but have - low content-vector similarity (different claims). This is automated - memory hygiene — no other memory system does this. - - Returns pairs of facts with a contradiction score. - Falls back to empty list if numpy unavailable. - """ + """Find potentially contradictory facts via entity overlap + content divergence.""" if not hrr._HAS_NUMPY: return [] conn = self.store._conn - - # Get all facts with vectors and their linked entities - where = "WHERE f.hrr_vector IS NOT NULL" - params: list = [] - if category: - where += " AND f.category = ?" - params.append(category) - rows = conn.execute( f""" SELECT f.fact_id, f.content, f.category, f.tags, f.trust_score, f.created_at, f.updated_at, f.hrr_vector FROM facts f - {where} + WHERE f.hrr_vector IS NOT NULL + {'AND f.category = ?' if category else ''} """, - params, + [category] if category else [], ).fetchall() - if len(rows) < 2: return [] - # Guard against O(n²) explosion on large fact stores. - # At 500 facts, that's ~125K comparisons — acceptable. - # Above that, only check the most recently updated facts. - _MAX_CONTRADICT_FACTS = 500 - if len(rows) > _MAX_CONTRADICT_FACTS: - rows = sorted(rows, key=lambda r: r["updated_at"] or r["created_at"], reverse=True) - rows = rows[:_MAX_CONTRADICT_FACTS] + max_facts = 500 + if len(rows) > max_facts: + rows = sorted(rows, key=lambda row: row["updated_at"] or row["created_at"], reverse=True)[:max_facts] - # Build entity sets per fact fact_entities: dict[int, set[str]] = {} for row in rows: - fid = row["fact_id"] entity_rows = conn.execute( """ SELECT e.name FROM entities e JOIN fact_entities fe ON fe.entity_id = e.entity_id WHERE fe.fact_id = ? """, - (fid,), + (row["fact_id"],), ).fetchall() - fact_entities[fid] = {r["name"].lower() for r in entity_rows} + fact_entities[int(row["fact_id"])] = {entity["name"].lower() for entity in entity_rows} - # Compare all pairs: high entity overlap + low content similarity = contradiction - facts = [dict(r) for r in rows] contradictions = [] - + facts = [dict(row) for row in rows] for i in range(len(facts)): for j in range(i + 1, len(facts)): - f1, f2 = facts[i], facts[j] - ents1 = fact_entities.get(f1["fact_id"], set()) - ents2 = fact_entities.get(f2["fact_id"], set()) - - if not ents1 or not ents2: + fact_a, fact_b = facts[i], facts[j] + entities_a = fact_entities.get(int(fact_a["fact_id"]), set()) + entities_b = fact_entities.get(int(fact_b["fact_id"]), set()) + if not entities_a or not entities_b: continue - - # Entity overlap (Jaccard) - entity_overlap = len(ents1 & ents2) / len(ents1 | ents2) if (ents1 | ents2) else 0.0 - + entity_overlap = len(entities_a & entities_b) / len(entities_a | entities_b) if (entities_a | entities_b) else 0.0 if entity_overlap < 0.3: - continue # Not enough entity overlap to be contradictory - - # Content similarity via HRR vectors - v1 = hrr.bytes_to_phases(f1["hrr_vector"]) - v2 = hrr.bytes_to_phases(f2["hrr_vector"]) - content_sim = hrr.similarity(v1, v2) - - # High entity overlap + low content similarity = potential contradiction - # contradiction_score: higher = more contradictory - contradiction_score = entity_overlap * (1.0 - (content_sim + 1.0) / 2.0) - - if contradiction_score >= threshold: - # Strip hrr_vector from output (not JSON serializable) - f1_clean = {k: v for k, v in f1.items() if k != "hrr_vector"} - f2_clean = {k: v for k, v in f2.items() if k != "hrr_vector"} - contradictions.append({ - "fact_a": f1_clean, - "fact_b": f2_clean, + continue + similarity = hrr.similarity(hrr.bytes_to_phases(fact_a["hrr_vector"]), hrr.bytes_to_phases(fact_b["hrr_vector"])) + contradiction_score = entity_overlap * (1.0 - (similarity + 1.0) / 2.0) + if contradiction_score < threshold: + continue + contradictions.append( + { + "fact_a": self._clean_fact(fact_a), + "fact_b": self._clean_fact(fact_b), "entity_overlap": round(entity_overlap, 3), - "content_similarity": round(content_sim, 3), + "content_similarity": round(similarity, 3), "contradiction_score": round(contradiction_score, 3), - "shared_entities": sorted(ents1 & ents2), - }) + "shared_entities": sorted(entities_a & entities_b), + } + ) - contradictions.sort(key=lambda x: x["contradiction_score"], reverse=True) + contradictions.sort(key=lambda item: item["contradiction_score"], reverse=True) return contradictions[:limit] + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + def _score_facts_by_vector( self, target_vec: "np.ndarray", category: str | None = None, limit: int = 10, ) -> list[dict]: - """Score facts by similarity to a target vector.""" - conn = self.store._conn - - where = "WHERE hrr_vector IS NOT NULL" - params: list = [] - if category: - where += " AND category = ?" - params.append(category) - - rows = conn.execute( - f""" - SELECT fact_id, content, category, tags, trust_score, - retrieval_count, helpful_count, created_at, updated_at, - hrr_vector - FROM facts - {where} - """, - params, - ).fetchall() - + rows = self._fetch_rows(category=category, min_trust=0.0, require_vectors=True) scored = [] - for row in rows: - fact = dict(row) - fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector")) - sim = hrr.similarity(target_vec, fact_vec) - fact["score"] = (sim + 1.0) / 2.0 * fact["trust_score"] - scored.append(fact) - - scored.sort(key=lambda x: x["score"], reverse=True) + for fact in rows: + sim = hrr.similarity(target_vec, hrr.bytes_to_phases(fact["hrr_vector"])) + item = self._clean_fact(fact) + item["score"] = (sim + 1.0) / 2.0 * item["trust_score"] + scored.append(item) + scored.sort(key=lambda item: item["score"], reverse=True) return scored[:limit] + def _fetch_rows( + self, + *, + category: str | None, + min_trust: float, + require_vectors: bool, + ) -> list[dict[str, Any]]: + conn = self.store._conn + where = ["trust_score >= ?"] + params: list[Any] = [min_trust] + if category: + where.append("category = ?") + params.append(category) + if require_vectors: + where.append("hrr_vector IS NOT NULL") + sql = f"SELECT * FROM facts WHERE {' AND '.join(where)}" + return [dict(row) for row in conn.execute(sql, params).fetchall()] + + def _fetch_rows_by_ids( + self, + fact_ids: list[int], + *, + category: str | None, + min_trust: float, + require_vectors: bool, + ) -> list[dict[str, Any]]: + if not fact_ids: + return [] + conn = self.store._conn + placeholders = ",".join("?" * len(fact_ids)) + where = [f"fact_id IN ({placeholders})", "trust_score >= ?"] + params: list[Any] = list(fact_ids) + [min_trust] + if category: + where.append("category = ?") + params.append(category) + if require_vectors: + where.append("hrr_vector IS NOT NULL") + sql = f"SELECT * FROM facts WHERE {' AND '.join(where)}" + return [dict(row) for row in conn.execute(sql, params).fetchall()] + def _fts_candidates( self, query: str, @@ -485,71 +763,128 @@ class FactRetriever: min_trust: float, limit: int, ) -> list[dict]: - """Get raw FTS5 candidates from the store. - - Uses the store's database connection directly for FTS5 MATCH - with rank scoring. Normalizes FTS5 rank to [0, 1] range. - """ conn = self.store._conn - - # Build query - FTS5 rank is negative (lower = better match) - # We need to join facts_fts with facts to get all columns - params: list = [] + params: list[Any] = [query] where_clauses = ["facts_fts MATCH ?"] - params.append(query) - if category: where_clauses.append("f.category = ?") params.append(category) - where_clauses.append("f.trust_score >= ?") params.append(min_trust) - - where_sql = " AND ".join(where_clauses) - sql = f""" - SELECT f.*, facts_fts.rank as fts_rank_raw + SELECT f.*, facts_fts.rank AS fts_rank_raw FROM facts_fts JOIN facts f ON f.fact_id = facts_fts.rowid - WHERE {where_sql} + WHERE {' AND '.join(where_clauses)} ORDER BY facts_fts.rank LIMIT ? """ params.append(limit) - try: rows = conn.execute(sql, params).fetchall() except Exception: - # FTS5 MATCH can fail on malformed queries — fall back to empty - return [] - + rows = self._like_fallback_candidates(query, category=category, min_trust=min_trust, limit=limit) + return rows if not rows: return [] - # Normalize FTS5 rank: rank is negative, lower = better - # Convert to positive score in [0, 1] range raw_ranks = [abs(row["fts_rank_raw"]) for row in rows] max_rank = max(raw_ranks) if raw_ranks else 1.0 - max_rank = max(max_rank, 1e-6) # avoid div by zero - + max_rank = max(max_rank, 1e-6) results = [] for row, raw_rank in zip(rows, raw_ranks): fact = dict(row) fact.pop("fts_rank_raw", None) - fact["fts_rank"] = raw_rank / max_rank # normalize to [0, 1] + # Higher is better. The legacy path keeps the old normalization bug; + # the multi-path lexical lane uses the corrected value. + fact["fts_rank"] = max(0.0, 1.0 - (raw_rank / max_rank)) results.append(fact) - return results + def _like_fallback_candidates( + self, + query: str, + *, + category: str | None, + min_trust: float, + limit: int, + ) -> list[dict[str, Any]]: + terms = [term for term in self._tokenize(query) if len(term) > 2] + if not terms: + return [] + conn = self.store._conn + where = ["trust_score >= ?"] + params: list[Any] = [min_trust] + if category: + where.append("category = ?") + params.append(category) + like_clauses = [] + for term in terms[:4]: + like_clauses.append("(lower(content) LIKE ? OR lower(tags) LIKE ?)") + params.extend([f"%{term}%", f"%{term}%"]) + where.append("(" + " OR ".join(like_clauses) + ")") + sql = f"SELECT * FROM facts WHERE {' AND '.join(where)} ORDER BY trust_score DESC, updated_at DESC LIMIT ?" + params.append(limit) + rows = [dict(row) for row in conn.execute(sql, params).fetchall()] + for rank, fact in enumerate(rows, start=1): + fact["fts_rank"] = 1.0 / (rank + 1) + return rows + + def _lookup_entity_ids(self, name: str) -> set[int]: + lowered = name.lower() + conn = self.store._conn + rows = conn.execute( + """ + SELECT entity_id FROM entities + WHERE lower(name) = ? + OR ',' || lower(aliases) || ',' LIKE '%,' || ? || ',%' + """, + (lowered, lowered), + ).fetchall() + return {int(row["entity_id"]) for row in rows} + + def _extract_query_entities(self, query: str) -> list[str]: + entities = [] + seen: set[str] = set() + for match in _ENTITY_TOKEN_RE.finditer(query): + candidate = match.group(1).strip() + lowered = candidate.lower() + if len(candidate) < 3 or lowered in {"what", "which", "when", "where", "who", "how", "the"}: + continue + if lowered not in seen: + seen.add(lowered) + entities.append(candidate) + return entities + + def _expand_semantic_tokens(self, tokens: set[str]) -> set[str]: + expanded = set(tokens) + for token in list(tokens): + expanded.update(_SEMANTIC_SYNONYMS.get(token, set())) + return expanded + + def _entity_overlap_score(self, content: str, query_entities: set[str]) -> float: + if not query_entities: + return 0.0 + content_entities = {entity.lower() for entity in self._extract_query_entities(content)} + if not content_entities: + return 0.0 + return self._jaccard_similarity(query_entities, content_entities) + + def _matches_expected(self, results: list[dict[str, Any]], case: dict[str, Any]) -> bool: + expected = str(case.get("expected_substring", "")).lower() + if not expected: + return False + return any(expected in result.get("content", "").lower() for result in results) + + def _clean_fact(self, fact: dict[str, Any]) -> dict[str, Any]: + cleaned = {key: value for key, value in dict(fact).items() if key != "hrr_vector"} + cleaned.pop("fts_rank", None) + return cleaned + @staticmethod def _tokenize(text: str) -> set[str]: - """Simple whitespace tokenization with lowercasing. - - Strips common punctuation. No stemming/lemmatization (Phase 1). - """ if not text: return set() - # Split on whitespace, lowercase, strip punctuation tokens = set() for word in text.lower().split(): cleaned = word.strip(".,;:!?\"'()[]{}#@<>") @@ -559,35 +894,23 @@ class FactRetriever: @staticmethod def _jaccard_similarity(set_a: set, set_b: set) -> float: - """Jaccard similarity coefficient: |A ∩ B| / |A ∪ B|.""" if not set_a or not set_b: return 0.0 intersection = len(set_a & set_b) union = len(set_a | set_b) return intersection / union if union > 0 else 0.0 - def _temporal_decay(self, timestamp_str: str | None) -> float: - """Exponential decay: 0.5^(age_days / half_life_days). - - Returns 1.0 if decay is disabled or timestamp is missing. - """ - if not self.half_life or not timestamp_str: + def _temporal_decay(self, timestamp_str: str | None, *, half_life: int | None = None) -> float: + decay_half_life = half_life if half_life is not None else self.half_life + if not decay_half_life or not timestamp_str: return 1.0 - try: - if isinstance(timestamp_str, str): - # Parse ISO format timestamp from SQLite - ts = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) - else: - ts = timestamp_str - + ts = datetime.fromisoformat(str(timestamp_str).replace("Z", "+00:00")) if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) - age_days = (datetime.now(timezone.utc) - ts).total_seconds() / 86400 if age_days < 0: return 1.0 - - return math.pow(0.5, age_days / self.half_life) + return math.pow(0.5, age_days / decay_half_life) except (ValueError, TypeError): return 1.0 diff --git a/plugins/memory/holographic/store.py b/plugins/memory/holographic/store.py index 3dc66d686..cba9768f5 100644 --- a/plugins/memory/holographic/store.py +++ b/plugins/memory/holographic/store.py @@ -83,6 +83,7 @@ _TRUST_MAX = 1.0 # Entity extraction patterns _RE_CAPITALIZED = re.compile(r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\b') +_RE_SINGLE_PROPER = re.compile(r'\b([A-Z][A-Za-z0-9_-]{2,})\b') _RE_DOUBLE_QUOTE = re.compile(r'"([^"]+)"') _RE_SINGLE_QUOTE = re.compile(r"'([^']+)'") _RE_AKA = re.compile( @@ -414,6 +415,13 @@ class MemoryStore: for m in _RE_CAPITALIZED.finditer(text): _add(m.group(1)) + skip_singletons = {"The", "This", "That", "These", "Those", "And", "But", "For", "With"} + for m in _RE_SINGLE_PROPER.finditer(text): + candidate = m.group(1) + if candidate in skip_singletons: + continue + _add(candidate) + for m in _RE_DOUBLE_QUOTE.finditer(text): _add(m.group(1)) diff --git a/tests/fixtures/holographic_recall_matrix.json b/tests/fixtures/holographic_recall_matrix.json index 2fd256a6d..da2e07c89 100644 --- a/tests/fixtures/holographic_recall_matrix.json +++ b/tests/fixtures/holographic_recall_matrix.json @@ -8,7 +8,7 @@ { "content": "Rockachopa uses Ansible playbooks for sovereign rollouts.", "category": "project", - "tags": "deploy automation ansible hermes" + "tags": "ansible playbooks rollout" }, { "content": "The provider is anthropic/claude-haiku-4-5.",