Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 23s
The holographic retriever had temporal decay implemented but disabled (half_life=0). All facts scored equally regardless of age — a 2-year-old fact about a deprecated tool scored the same as yesterday's deployment config. This commit: 1. Changes default temporal_decay_half_life from 0 to 60 days - 60 days: facts lose half their relevance every 2 months - Configurable via config.yaml: plugins.hermes-memory-store.temporal_decay_half_life - Added to config schema so `hermes memory setup` exposes it 2. Adds access-recency boost to search scoring - Facts accessed within 1 half-life get up to 1.5x boost on their decay factor - Boost tapers linearly from 1.5 (just accessed) to 1.0 (1 half-life ago) - Capped at 1.0 effective score (boost can't exceed fresh-fact score) - Prevents actively-used facts from decaying prematurely 3. Scoring pipeline: score = relevance * trust * decay * min(1.0, access_boost) - Fresh facts: decay=1.0, boost≈1.5 → score unchanged - 60-day-old, recently accessed: decay=0.5, boost≈1.25 → score=0.625 - 60-day-old, not accessed: decay=0.5, boost=1.0 → score=0.5 - 120-day-old, not accessed: decay=0.25, boost=1.0 → score=0.25 23 tests covering: - Temporal decay formula (fresh, 1HL, 2HL, 3HL, disabled, None, invalid, future) - Access recency boost (just accessed, halfway, at HL, beyond HL, disabled, range) - Integration (recently-accessed old fact > equally-old unaccessed fact) - Default config verification (half_life=60, not 0) Fixes #241
640 lines
23 KiB
Python
640 lines
23 KiB
Python
"""Hybrid keyword/BM25 retrieval for the memory store.
|
||
|
||
Ported from KIK memory_agent.py — combines FTS5 full-text search with
|
||
Jaccard similarity reranking and trust-weighted scoring.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
from datetime import datetime, timezone
|
||
from typing import TYPE_CHECKING
|
||
|
||
if TYPE_CHECKING:
|
||
from .store import MemoryStore
|
||
|
||
try:
|
||
from . import holographic as hrr
|
||
except ImportError:
|
||
import holographic as hrr # type: ignore[no-redef]
|
||
|
||
|
||
class FactRetriever:
|
||
"""Multi-strategy fact retrieval with trust-weighted scoring."""
|
||
|
||
def __init__(
|
||
self,
|
||
store: MemoryStore,
|
||
temporal_decay_half_life: int = 0, # days, 0 = disabled
|
||
fts_weight: float = 0.4,
|
||
jaccard_weight: float = 0.3,
|
||
hrr_weight: float = 0.3,
|
||
hrr_dim: int = 1024,
|
||
):
|
||
self.store = store
|
||
self.half_life = temporal_decay_half_life
|
||
self.hrr_dim = hrr_dim
|
||
|
||
# Auto-redistribute weights if numpy unavailable
|
||
if hrr_weight > 0 and not hrr._HAS_NUMPY:
|
||
fts_weight = 0.6
|
||
jaccard_weight = 0.4
|
||
hrr_weight = 0.0
|
||
|
||
self.fts_weight = fts_weight
|
||
self.jaccard_weight = jaccard_weight
|
||
self.hrr_weight = hrr_weight
|
||
|
||
def search(
|
||
self,
|
||
query: str,
|
||
category: str | None = None,
|
||
min_trust: float = 0.3,
|
||
limit: int = 10,
|
||
) -> list[dict]:
|
||
"""Hybrid search: FTS5 candidates → Jaccard rerank → trust weighting.
|
||
|
||
Pipeline:
|
||
1. FTS5 search: Get limit*3 candidates from SQLite full-text search
|
||
2. Jaccard boost: Token overlap between query and fact content
|
||
3. Trust weighting: final_score = relevance * trust_score
|
||
4. Temporal decay (optional): decay = 0.5^(age_days / half_life)
|
||
|
||
Returns list of dicts with fact data + 'score' field, sorted by score desc.
|
||
"""
|
||
# Stage 1: Get FTS5 candidates (more than limit for reranking headroom)
|
||
candidates = self._fts_candidates(query, category, min_trust, limit * 3)
|
||
|
||
if not candidates:
|
||
return []
|
||
|
||
# Stage 2: Rerank with Jaccard + trust + optional decay
|
||
query_tokens = self._tokenize(query)
|
||
scored = []
|
||
|
||
for fact in candidates:
|
||
content_tokens = self._tokenize(fact["content"])
|
||
tag_tokens = self._tokenize(fact.get("tags", ""))
|
||
all_tokens = content_tokens | tag_tokens
|
||
|
||
jaccard = self._jaccard_similarity(query_tokens, all_tokens)
|
||
fts_score = fact.get("fts_rank", 0.0)
|
||
|
||
# HRR similarity
|
||
if self.hrr_weight > 0 and fact.get("hrr_vector"):
|
||
fact_vec = hrr.bytes_to_phases(fact["hrr_vector"])
|
||
query_vec = hrr.encode_text(query, self.hrr_dim)
|
||
hrr_sim = (hrr.similarity(query_vec, fact_vec) + 1.0) / 2.0 # shift to [0,1]
|
||
else:
|
||
hrr_sim = 0.5 # neutral
|
||
|
||
# Combine FTS5 + Jaccard + HRR
|
||
relevance = (self.fts_weight * fts_score
|
||
+ self.jaccard_weight * jaccard
|
||
+ self.hrr_weight * hrr_sim)
|
||
|
||
# Trust weighting
|
||
score = relevance * fact["trust_score"]
|
||
|
||
# Optional temporal decay
|
||
if self.half_life > 0:
|
||
decay = self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
|
||
# Access-recency boost: facts retrieved recently decay slower.
|
||
# A fact accessed within 1 half-life gets up to 1.5x the decay
|
||
# factor, tapering to 1.0x (no boost) after 2 half-lives.
|
||
last_accessed = fact.get("last_accessed_at")
|
||
if last_accessed:
|
||
access_boost = self._access_recency_boost(last_accessed)
|
||
decay = min(1.0, decay * access_boost)
|
||
score *= decay
|
||
|
||
fact["score"] = score
|
||
scored.append(fact)
|
||
|
||
# Sort by score descending, return top limit
|
||
scored.sort(key=lambda x: x["score"], reverse=True)
|
||
results = scored[:limit]
|
||
# Strip raw HRR bytes — callers expect JSON-serializable dicts
|
||
for fact in results:
|
||
fact.pop("hrr_vector", None)
|
||
return results
|
||
|
||
def probe(
|
||
self,
|
||
entity: str,
|
||
category: str | None = None,
|
||
limit: int = 10,
|
||
) -> list[dict]:
|
||
"""Compositional entity query using HRR algebra.
|
||
|
||
Unbinds entity from memory bank to extract associated content.
|
||
This is NOT keyword search — it uses algebraic structure to find facts
|
||
where the entity plays a structural role.
|
||
|
||
Falls back to FTS5 search if numpy unavailable.
|
||
"""
|
||
if not hrr._HAS_NUMPY:
|
||
# Fallback to keyword search on entity name
|
||
return self.search(entity, category=category, limit=limit)
|
||
|
||
conn = self.store._conn
|
||
|
||
# Encode entity as role-bound vector
|
||
role_entity = hrr.encode_atom("__hrr_role_entity__", self.hrr_dim)
|
||
entity_vec = hrr.encode_atom(entity.lower(), self.hrr_dim)
|
||
probe_key = hrr.bind(entity_vec, role_entity)
|
||
|
||
# Try category-specific bank first, then all facts
|
||
if category:
|
||
bank_name = f"cat:{category}"
|
||
bank_row = conn.execute(
|
||
"SELECT vector FROM memory_banks WHERE bank_name = ?",
|
||
(bank_name,),
|
||
).fetchone()
|
||
if bank_row:
|
||
bank_vec = hrr.bytes_to_phases(bank_row["vector"])
|
||
extracted = hrr.unbind(bank_vec, probe_key)
|
||
# Use extracted signal to score individual facts
|
||
return self._score_facts_by_vector(
|
||
extracted, category=category, limit=limit
|
||
)
|
||
|
||
# Score against individual fact vectors directly
|
||
where = "WHERE hrr_vector IS NOT NULL"
|
||
params: list = []
|
||
if category:
|
||
where += " AND category = ?"
|
||
params.append(category)
|
||
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT fact_id, content, category, tags, trust_score,
|
||
retrieval_count, helpful_count, created_at, updated_at,
|
||
hrr_vector
|
||
FROM facts
|
||
{where}
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
|
||
if not rows:
|
||
# Final fallback: keyword search
|
||
return self.search(entity, category=category, limit=limit)
|
||
|
||
scored = []
|
||
for row in rows:
|
||
fact = dict(row)
|
||
fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector"))
|
||
# Unbind probe key from fact to see if entity is structurally present
|
||
residual = hrr.unbind(fact_vec, probe_key)
|
||
# Compare residual against content signal
|
||
role_content = hrr.encode_atom("__hrr_role_content__", self.hrr_dim)
|
||
content_vec = hrr.bind(hrr.encode_text(fact["content"], self.hrr_dim), role_content)
|
||
sim = hrr.similarity(residual, content_vec)
|
||
fact["score"] = (sim + 1.0) / 2.0 * fact["trust_score"]
|
||
scored.append(fact)
|
||
|
||
scored.sort(key=lambda x: x["score"], reverse=True)
|
||
return scored[:limit]
|
||
|
||
def related(
|
||
self,
|
||
entity: str,
|
||
category: str | None = None,
|
||
limit: int = 10,
|
||
) -> list[dict]:
|
||
"""Discover facts that share structural connections with an entity.
|
||
|
||
Unlike probe (which finds facts *about* an entity), related finds
|
||
facts that are connected through shared context — e.g., other entities
|
||
mentioned alongside this one, or content that overlaps structurally.
|
||
|
||
Falls back to FTS5 search if numpy unavailable.
|
||
"""
|
||
if not hrr._HAS_NUMPY:
|
||
return self.search(entity, category=category, limit=limit)
|
||
|
||
conn = self.store._conn
|
||
|
||
# Encode entity as a bare atom (not role-bound — we want ANY structural match)
|
||
entity_vec = hrr.encode_atom(entity.lower(), self.hrr_dim)
|
||
|
||
# Get all facts with vectors
|
||
where = "WHERE hrr_vector IS NOT NULL"
|
||
params: list = []
|
||
if category:
|
||
where += " AND category = ?"
|
||
params.append(category)
|
||
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT fact_id, content, category, tags, trust_score,
|
||
retrieval_count, helpful_count, created_at, updated_at,
|
||
hrr_vector
|
||
FROM facts
|
||
{where}
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
|
||
if not rows:
|
||
return self.search(entity, category=category, limit=limit)
|
||
|
||
# Score each fact by how much the entity's atom appears in its vector
|
||
# This catches both role-bound entity matches AND content word matches
|
||
scored = []
|
||
for row in rows:
|
||
fact = dict(row)
|
||
fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector"))
|
||
|
||
# Check structural similarity: unbind entity from fact
|
||
residual = hrr.unbind(fact_vec, entity_vec)
|
||
# A high-similarity residual to ANY known role vector means this entity
|
||
# plays a structural role in the fact
|
||
role_entity = hrr.encode_atom("__hrr_role_entity__", self.hrr_dim)
|
||
role_content = hrr.encode_atom("__hrr_role_content__", self.hrr_dim)
|
||
|
||
entity_role_sim = hrr.similarity(residual, role_entity)
|
||
content_role_sim = hrr.similarity(residual, role_content)
|
||
# Take the max — entity could appear in either role
|
||
best_sim = max(entity_role_sim, content_role_sim)
|
||
|
||
fact["score"] = (best_sim + 1.0) / 2.0 * fact["trust_score"]
|
||
scored.append(fact)
|
||
|
||
scored.sort(key=lambda x: x["score"], reverse=True)
|
||
return scored[:limit]
|
||
|
||
def reason(
|
||
self,
|
||
entities: list[str],
|
||
category: str | None = None,
|
||
limit: int = 10,
|
||
) -> list[dict]:
|
||
"""Multi-entity compositional query — vector-space JOIN.
|
||
|
||
Given multiple entities, algebraically intersects their structural
|
||
connections to find facts related to ALL of them simultaneously.
|
||
This is compositional reasoning that no embedding DB can do.
|
||
|
||
Example: reason(["peppi", "backend"]) finds facts where peppi AND
|
||
backend both play structural roles — without keyword matching.
|
||
|
||
Falls back to FTS5 search if numpy unavailable.
|
||
"""
|
||
if not hrr._HAS_NUMPY or not entities:
|
||
# Fallback: search with all entities as keywords
|
||
query = " ".join(entities)
|
||
return self.search(query, category=category, limit=limit)
|
||
|
||
conn = self.store._conn
|
||
role_entity = hrr.encode_atom("__hrr_role_entity__", self.hrr_dim)
|
||
|
||
# For each entity, compute what the bank "remembers" about it
|
||
# by unbinding entity+role from each fact vector
|
||
entity_residuals = []
|
||
for entity in entities:
|
||
entity_vec = hrr.encode_atom(entity.lower(), self.hrr_dim)
|
||
probe_key = hrr.bind(entity_vec, role_entity)
|
||
entity_residuals.append(probe_key)
|
||
|
||
# Get all facts with vectors
|
||
where = "WHERE hrr_vector IS NOT NULL"
|
||
params: list = []
|
||
if category:
|
||
where += " AND category = ?"
|
||
params.append(category)
|
||
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT fact_id, content, category, tags, trust_score,
|
||
retrieval_count, helpful_count, created_at, updated_at,
|
||
hrr_vector
|
||
FROM facts
|
||
{where}
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
|
||
if not rows:
|
||
query = " ".join(entities)
|
||
return self.search(query, category=category, limit=limit)
|
||
|
||
# Score each fact by how much EACH entity is structurally present.
|
||
# A fact scores high only if ALL entities have structural presence
|
||
# (AND semantics via min, vs OR which would use mean/max).
|
||
role_content = hrr.encode_atom("__hrr_role_content__", self.hrr_dim)
|
||
|
||
scored = []
|
||
for row in rows:
|
||
fact = dict(row)
|
||
fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector"))
|
||
|
||
entity_scores = []
|
||
for probe_key in entity_residuals:
|
||
residual = hrr.unbind(fact_vec, probe_key)
|
||
sim = hrr.similarity(residual, role_content)
|
||
entity_scores.append(sim)
|
||
|
||
min_sim = min(entity_scores)
|
||
fact["score"] = (min_sim + 1.0) / 2.0 * fact["trust_score"]
|
||
scored.append(fact)
|
||
|
||
scored.sort(key=lambda x: x["score"], reverse=True)
|
||
return scored[:limit]
|
||
|
||
def contradict(
|
||
self,
|
||
category: str | None = None,
|
||
threshold: float = 0.3,
|
||
limit: int = 10,
|
||
) -> list[dict]:
|
||
"""Find potentially contradictory facts via entity overlap + content divergence.
|
||
|
||
Two facts contradict when they share entities (same subject) but have
|
||
low content-vector similarity (different claims). This is automated
|
||
memory hygiene — no other memory system does this.
|
||
|
||
Returns pairs of facts with a contradiction score.
|
||
Falls back to empty list if numpy unavailable.
|
||
"""
|
||
if not hrr._HAS_NUMPY:
|
||
return []
|
||
|
||
conn = self.store._conn
|
||
|
||
# Get all facts with vectors and their linked entities
|
||
where = "WHERE f.hrr_vector IS NOT NULL"
|
||
params: list = []
|
||
if category:
|
||
where += " AND f.category = ?"
|
||
params.append(category)
|
||
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT f.fact_id, f.content, f.category, f.tags, f.trust_score,
|
||
f.created_at, f.updated_at, f.hrr_vector
|
||
FROM facts f
|
||
{where}
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
|
||
if len(rows) < 2:
|
||
return []
|
||
|
||
# Guard against O(n²) explosion on large fact stores.
|
||
# At 500 facts, that's ~125K comparisons — acceptable.
|
||
# Above that, only check the most recently updated facts.
|
||
_MAX_CONTRADICT_FACTS = 500
|
||
if len(rows) > _MAX_CONTRADICT_FACTS:
|
||
rows = sorted(rows, key=lambda r: r["updated_at"] or r["created_at"], reverse=True)
|
||
rows = rows[:_MAX_CONTRADICT_FACTS]
|
||
|
||
# Build entity sets per fact
|
||
fact_entities: dict[int, set[str]] = {}
|
||
for row in rows:
|
||
fid = row["fact_id"]
|
||
entity_rows = conn.execute(
|
||
"""
|
||
SELECT e.name FROM entities e
|
||
JOIN fact_entities fe ON fe.entity_id = e.entity_id
|
||
WHERE fe.fact_id = ?
|
||
""",
|
||
(fid,),
|
||
).fetchall()
|
||
fact_entities[fid] = {r["name"].lower() for r in entity_rows}
|
||
|
||
# Compare all pairs: high entity overlap + low content similarity = contradiction
|
||
facts = [dict(r) for r in rows]
|
||
contradictions = []
|
||
|
||
for i in range(len(facts)):
|
||
for j in range(i + 1, len(facts)):
|
||
f1, f2 = facts[i], facts[j]
|
||
ents1 = fact_entities.get(f1["fact_id"], set())
|
||
ents2 = fact_entities.get(f2["fact_id"], set())
|
||
|
||
if not ents1 or not ents2:
|
||
continue
|
||
|
||
# Entity overlap (Jaccard)
|
||
entity_overlap = len(ents1 & ents2) / len(ents1 | ents2) if (ents1 | ents2) else 0.0
|
||
|
||
if entity_overlap < 0.3:
|
||
continue # Not enough entity overlap to be contradictory
|
||
|
||
# Content similarity via HRR vectors
|
||
v1 = hrr.bytes_to_phases(f1["hrr_vector"])
|
||
v2 = hrr.bytes_to_phases(f2["hrr_vector"])
|
||
content_sim = hrr.similarity(v1, v2)
|
||
|
||
# High entity overlap + low content similarity = potential contradiction
|
||
# contradiction_score: higher = more contradictory
|
||
contradiction_score = entity_overlap * (1.0 - (content_sim + 1.0) / 2.0)
|
||
|
||
if contradiction_score >= threshold:
|
||
# Strip hrr_vector from output (not JSON serializable)
|
||
f1_clean = {k: v for k, v in f1.items() if k != "hrr_vector"}
|
||
f2_clean = {k: v for k, v in f2.items() if k != "hrr_vector"}
|
||
contradictions.append({
|
||
"fact_a": f1_clean,
|
||
"fact_b": f2_clean,
|
||
"entity_overlap": round(entity_overlap, 3),
|
||
"content_similarity": round(content_sim, 3),
|
||
"contradiction_score": round(contradiction_score, 3),
|
||
"shared_entities": sorted(ents1 & ents2),
|
||
})
|
||
|
||
contradictions.sort(key=lambda x: x["contradiction_score"], reverse=True)
|
||
return contradictions[:limit]
|
||
|
||
def _score_facts_by_vector(
|
||
self,
|
||
target_vec: "np.ndarray",
|
||
category: str | None = None,
|
||
limit: int = 10,
|
||
) -> list[dict]:
|
||
"""Score facts by similarity to a target vector."""
|
||
conn = self.store._conn
|
||
|
||
where = "WHERE hrr_vector IS NOT NULL"
|
||
params: list = []
|
||
if category:
|
||
where += " AND category = ?"
|
||
params.append(category)
|
||
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT fact_id, content, category, tags, trust_score,
|
||
retrieval_count, helpful_count, created_at, updated_at,
|
||
hrr_vector
|
||
FROM facts
|
||
{where}
|
||
""",
|
||
params,
|
||
).fetchall()
|
||
|
||
scored = []
|
||
for row in rows:
|
||
fact = dict(row)
|
||
fact_vec = hrr.bytes_to_phases(fact.pop("hrr_vector"))
|
||
sim = hrr.similarity(target_vec, fact_vec)
|
||
fact["score"] = (sim + 1.0) / 2.0 * fact["trust_score"]
|
||
scored.append(fact)
|
||
|
||
scored.sort(key=lambda x: x["score"], reverse=True)
|
||
return scored[:limit]
|
||
|
||
def _fts_candidates(
|
||
self,
|
||
query: str,
|
||
category: str | None,
|
||
min_trust: float,
|
||
limit: int,
|
||
) -> list[dict]:
|
||
"""Get raw FTS5 candidates from the store.
|
||
|
||
Uses the store's database connection directly for FTS5 MATCH
|
||
with rank scoring. Normalizes FTS5 rank to [0, 1] range.
|
||
"""
|
||
conn = self.store._conn
|
||
|
||
# Build query - FTS5 rank is negative (lower = better match)
|
||
# We need to join facts_fts with facts to get all columns
|
||
params: list = []
|
||
where_clauses = ["facts_fts MATCH ?"]
|
||
params.append(query)
|
||
|
||
if category:
|
||
where_clauses.append("f.category = ?")
|
||
params.append(category)
|
||
|
||
where_clauses.append("f.trust_score >= ?")
|
||
params.append(min_trust)
|
||
|
||
where_sql = " AND ".join(where_clauses)
|
||
|
||
sql = f"""
|
||
SELECT f.*, facts_fts.rank as fts_rank_raw
|
||
FROM facts_fts
|
||
JOIN facts f ON f.fact_id = facts_fts.rowid
|
||
WHERE {where_sql}
|
||
ORDER BY facts_fts.rank
|
||
LIMIT ?
|
||
"""
|
||
params.append(limit)
|
||
|
||
try:
|
||
rows = conn.execute(sql, params).fetchall()
|
||
except Exception:
|
||
# FTS5 MATCH can fail on malformed queries — fall back to empty
|
||
return []
|
||
|
||
if not rows:
|
||
return []
|
||
|
||
# Normalize FTS5 rank: rank is negative, lower = better
|
||
# Convert to positive score in [0, 1] range
|
||
raw_ranks = [abs(row["fts_rank_raw"]) for row in rows]
|
||
max_rank = max(raw_ranks) if raw_ranks else 1.0
|
||
max_rank = max(max_rank, 1e-6) # avoid div by zero
|
||
|
||
results = []
|
||
for row, raw_rank in zip(rows, raw_ranks):
|
||
fact = dict(row)
|
||
fact.pop("fts_rank_raw", None)
|
||
fact["fts_rank"] = raw_rank / max_rank # normalize to [0, 1]
|
||
results.append(fact)
|
||
|
||
return results
|
||
|
||
@staticmethod
|
||
def _tokenize(text: str) -> set[str]:
|
||
"""Simple whitespace tokenization with lowercasing.
|
||
|
||
Strips common punctuation. No stemming/lemmatization (Phase 1).
|
||
"""
|
||
if not text:
|
||
return set()
|
||
# Split on whitespace, lowercase, strip punctuation
|
||
tokens = set()
|
||
for word in text.lower().split():
|
||
cleaned = word.strip(".,;:!?\"'()[]{}#@<>")
|
||
if cleaned:
|
||
tokens.add(cleaned)
|
||
return tokens
|
||
|
||
@staticmethod
|
||
def _jaccard_similarity(set_a: set, set_b: set) -> float:
|
||
"""Jaccard similarity coefficient: |A ∩ B| / |A ∪ B|."""
|
||
if not set_a or not set_b:
|
||
return 0.0
|
||
intersection = len(set_a & set_b)
|
||
union = len(set_a | set_b)
|
||
return intersection / union if union > 0 else 0.0
|
||
|
||
def _temporal_decay(self, timestamp_str: str | None) -> float:
|
||
"""Exponential decay: 0.5^(age_days / half_life_days).
|
||
|
||
Returns 1.0 if decay is disabled or timestamp is missing.
|
||
"""
|
||
if not self.half_life or not timestamp_str:
|
||
return 1.0
|
||
|
||
try:
|
||
if isinstance(timestamp_str, str):
|
||
# Parse ISO format timestamp from SQLite
|
||
ts = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
|
||
else:
|
||
ts = timestamp_str
|
||
|
||
if ts.tzinfo is None:
|
||
ts = ts.replace(tzinfo=timezone.utc)
|
||
|
||
age_days = (datetime.now(timezone.utc) - ts).total_seconds() / 86400
|
||
if age_days < 0:
|
||
return 1.0
|
||
|
||
return math.pow(0.5, age_days / self.half_life)
|
||
except (ValueError, TypeError):
|
||
return 1.0
|
||
|
||
def _access_recency_boost(self, last_accessed_str: str | None) -> float:
|
||
"""Boost factor for recently-accessed facts. Range [1.0, 1.5].
|
||
|
||
Facts accessed within 1 half-life get up to 1.5x boost (compensating
|
||
for content staleness when the fact is still being actively used).
|
||
Boost decays linearly to 1.0 (no boost) at 2 half-lives.
|
||
|
||
Returns 1.0 if half-life is disabled or timestamp is missing.
|
||
"""
|
||
if not self.half_life or not last_accessed_str:
|
||
return 1.0
|
||
|
||
try:
|
||
if isinstance(last_accessed_str, str):
|
||
ts = datetime.fromisoformat(last_accessed_str.replace("Z", "+00:00"))
|
||
else:
|
||
ts = last_accessed_str
|
||
|
||
if ts.tzinfo is None:
|
||
ts = ts.replace(tzinfo=timezone.utc)
|
||
|
||
age_days = (datetime.now(timezone.utc) - ts).total_seconds() / 86400
|
||
if age_days < 0:
|
||
return 1.5 # Future timestamp = just accessed
|
||
|
||
half_lives_since_access = age_days / self.half_life
|
||
|
||
if half_lives_since_access <= 1.0:
|
||
# Within 1 half-life: linearly from 1.5 (just now) to 1.0 (at 1 HL)
|
||
return 1.0 + 0.5 * (1.0 - half_lives_since_access)
|
||
elif half_lives_since_access <= 2.0:
|
||
# Between 1 and 2 half-lives: linearly from 1.0 to 1.0 (no boost)
|
||
return 1.0
|
||
else:
|
||
return 1.0
|
||
except (ValueError, TypeError):
|
||
return 1.0
|