Files
timmy-config/hermes-sovereign/mempalace/sovereign_store.py
perplexity 593621c5e0 feat: sovereign memory store — zero-API durable memory (SQLite + FTS5 + HRR)
Implements the missing pieces of the MemPalace epic (#367):

- sovereign_store.py: Self-contained memory store replacing the third-party
  mempalace CLI and its ONNX dependency. Uses:
  * SQLite + FTS5 for keyword search (porter stemmer, unicode61)
  * HRR phase vectors (SHA-256 deterministic, numpy optional) for semantic similarity
  * Reciprocal Rank Fusion to merge keyword and semantic rankings
  * Trust scoring with boost/decay lifecycle
  * Room-based organization matching the existing PalaceRoom model

- promotion.py (MP-4, #371): Quality-gated scratchpad-to-palace promotion.
  Four heuristic gates, no LLM call:
  1. Length gate (min 5 words, max 500)
  2. Structure gate (rejects fragments and pure code)
  3. Duplicate gate (FTS5 + Jaccard overlap detection)
  4. Staleness gate (7-day threshold for old notes)
  Includes force override, batch promotion, and audit logging.

- 21 unit tests covering HRR vectors, store operations, search,
  trust lifecycle, and all promotion gates.

Zero external dependencies. Zero API calls. Zero cloud.

Refs: #367 #370 #371
2026-04-07 22:41:37 +00:00

475 lines
17 KiB
Python

"""Sovereign Memory Store — zero-API, zero-dependency durable memory.
Replaces the third-party `mempalace` CLI and its ONNX requirement with a
self-contained SQLite + FTS5 + HRR (Holographic Reduced Representation)
store. Every operation is local: no network calls, no API keys, no cloud.
Storage: ~/.hermes/palace/sovereign.db
Capabilities:
- Durable fact storage with rooms, categories, and trust scores
- Hybrid retrieval: FTS5 keyword search + HRR cosine similarity
- Reciprocal Rank Fusion to merge keyword and semantic results
- Trust scoring: facts that get retrieved and confirmed gain trust
- Graceful numpy degradation: falls back to keyword-only if missing
Refs: Epic #367, MP-3 #370, MP-4 #371
"""
from __future__ import annotations
import hashlib
import json
import math
import sqlite3
import struct
import time
from pathlib import Path
from typing import Any, Optional
# ---------------------------------------------------------------------------
# HRR (Holographic Reduced Representations) — zero-dependency vectors
# ---------------------------------------------------------------------------
# Phase-encoded vectors via SHA-256. No ONNX, no embeddings API, no numpy
# required (but uses numpy when available for speed).
_TWO_PI = 2.0 * math.pi
_DIM = 512 # Compact dimension — sufficient for memory retrieval
try:
import numpy as np
_HAS_NUMPY = True
except ImportError:
_HAS_NUMPY = False
def _encode_atom_np(word: str, dim: int = _DIM) -> "np.ndarray":
"""Deterministic phase vector via SHA-256 (numpy path)."""
values_per_block = 16
blocks_needed = math.ceil(dim / values_per_block)
uint16_values: list[int] = []
for i in range(blocks_needed):
digest = hashlib.sha256(f"{word}:{i}".encode()).digest()
uint16_values.extend(struct.unpack("<16H", digest))
return np.array(uint16_values[:dim], dtype=np.float64) * (_TWO_PI / 65536.0)
def _encode_atom_pure(word: str, dim: int = _DIM) -> list[float]:
"""Deterministic phase vector via SHA-256 (pure Python fallback)."""
values_per_block = 16
blocks_needed = math.ceil(dim / values_per_block)
uint16_values: list[int] = []
for i in range(blocks_needed):
digest = hashlib.sha256(f"{word}:{i}".encode()).digest()
for j in range(0, 32, 2):
uint16_values.append(int.from_bytes(digest[j:j+2], "little"))
return [v * (_TWO_PI / 65536.0) for v in uint16_values[:dim]]
def encode_text(text: str, dim: int = _DIM):
"""Encode a text string into an HRR phase vector by bundling word atoms.
Uses circular mean of per-word phase vectors — the standard HRR
superposition operation. Result is a fixed-width vector regardless
of input length.
"""
words = text.lower().split()
if not words:
words = ["<empty>"]
if _HAS_NUMPY:
atoms = [_encode_atom_np(w, dim) for w in words]
# Circular mean: average the unit vectors, extract phase
unit_sum = sum(np.exp(1j * a) for a in atoms)
return np.angle(unit_sum) % _TWO_PI
else:
# Pure Python circular mean
real_sum = [0.0] * dim
imag_sum = [0.0] * dim
for w in words:
atom = _encode_atom_pure(w, dim)
for d in range(dim):
real_sum[d] += math.cos(atom[d])
imag_sum[d] += math.sin(atom[d])
return [math.atan2(imag_sum[d], real_sum[d]) % _TWO_PI for d in range(dim)]
def cosine_similarity_phase(a, b) -> float:
"""Cosine similarity between two phase vectors.
For phase vectors, similarity = mean(cos(a - b)).
"""
if _HAS_NUMPY:
return float(np.mean(np.cos(np.array(a) - np.array(b))))
else:
n = len(a)
return sum(math.cos(a[i] - b[i]) for i in range(n)) / n
def serialize_vector(vec) -> bytes:
"""Serialize a vector to bytes for SQLite storage."""
if _HAS_NUMPY:
return vec.astype(np.float64).tobytes()
else:
return struct.pack(f"{len(vec)}d", *vec)
def deserialize_vector(blob: bytes):
"""Deserialize bytes back to a vector."""
n = len(blob) // 8 # float64 = 8 bytes
if _HAS_NUMPY:
return np.frombuffer(blob, dtype=np.float64)
else:
return list(struct.unpack(f"{n}d", blob))
# ---------------------------------------------------------------------------
# SQLite Schema
# ---------------------------------------------------------------------------
_SCHEMA = """
CREATE TABLE IF NOT EXISTS memories (
memory_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
room TEXT DEFAULT 'general',
category TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
hrr_vector BLOB
);
CREATE INDEX IF NOT EXISTS idx_memories_room ON memories(room);
CREATE INDEX IF NOT EXISTS idx_memories_trust ON memories(trust_score DESC);
-- FTS5 for fast keyword search
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
content, room, category,
content=memories, content_rowid=memory_id,
tokenize='porter unicode61'
);
-- Sync triggers
CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
INSERT INTO memories_fts(rowid, content, room, category)
VALUES (new.memory_id, new.content, new.room, new.category);
END;
CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content, room, category)
VALUES ('delete', old.memory_id, old.content, old.room, old.category);
END;
CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content, room, category)
VALUES ('delete', old.memory_id, old.content, old.room, old.category);
INSERT INTO memories_fts(rowid, content, room, category)
VALUES (new.memory_id, new.content, new.room, new.category);
END;
-- Promotion log: tracks what moved from scratchpad to durable memory
CREATE TABLE IF NOT EXISTS promotion_log (
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
scratch_key TEXT NOT NULL,
memory_id INTEGER REFERENCES memories(memory_id),
promoted_at REAL NOT NULL,
reason TEXT DEFAULT ''
);
"""
# ---------------------------------------------------------------------------
# SovereignStore
# ---------------------------------------------------------------------------
class SovereignStore:
"""Zero-API durable memory store.
All operations are local SQLite. No network calls. No API keys.
HRR vectors provide semantic similarity without embedding models.
FTS5 provides fast keyword search. RRF merges both rankings.
"""
def __init__(self, db_path: Optional[str] = None):
if db_path is None:
db_path = str(Path.home() / ".hermes" / "palace" / "sovereign.db")
self._db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(db_path)
self._conn.row_factory = sqlite3.Row
self._conn.executescript(_SCHEMA)
def close(self):
self._conn.close()
# ------------------------------------------------------------------
# Store
# ------------------------------------------------------------------
def store(
self,
content: str,
room: str = "general",
category: str = "",
trust: float = 0.5,
) -> int:
"""Store a fact in durable memory. Returns the memory_id."""
now = time.time()
vec = encode_text(content)
blob = serialize_vector(vec)
cur = self._conn.execute(
"""INSERT INTO memories (content, room, category, trust_score,
created_at, updated_at, hrr_vector)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(content, room, category, trust, now, now, blob),
)
self._conn.commit()
return cur.lastrowid
def store_batch(self, items: list[dict]) -> list[int]:
"""Store multiple facts. Each item: {content, room?, category?, trust?}."""
ids = []
now = time.time()
for item in items:
content = item["content"]
vec = encode_text(content)
blob = serialize_vector(vec)
cur = self._conn.execute(
"""INSERT INTO memories (content, room, category, trust_score,
created_at, updated_at, hrr_vector)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
content,
item.get("room", "general"),
item.get("category", ""),
item.get("trust", 0.5),
now, now, blob,
),
)
ids.append(cur.lastrowid)
self._conn.commit()
return ids
# ------------------------------------------------------------------
# Search — hybrid FTS5 + HRR with Reciprocal Rank Fusion
# ------------------------------------------------------------------
def search(
self,
query: str,
room: Optional[str] = None,
limit: int = 10,
min_trust: float = 0.0,
fts_weight: float = 0.5,
hrr_weight: float = 0.5,
) -> list[dict]:
"""Hybrid search: FTS5 keywords + HRR semantic similarity.
Uses Reciprocal Rank Fusion (RRF) to merge both rankings.
Returns list of dicts with content, room, score, trust_score.
"""
k_rrf = 60 # Standard RRF constant
# Stage 1: FTS5 candidates
fts_results = self._fts_search(query, room, min_trust, limit * 3)
# Stage 2: HRR candidates (scan top N by trust)
hrr_results = self._hrr_search(query, room, min_trust, limit * 3)
# Stage 3: RRF fusion
scores: dict[int, float] = {}
meta: dict[int, dict] = {}
for rank, row in enumerate(fts_results):
mid = row["memory_id"]
scores[mid] = scores.get(mid, 0) + fts_weight / (k_rrf + rank + 1)
meta[mid] = dict(row)
for rank, row in enumerate(hrr_results):
mid = row["memory_id"]
scores[mid] = scores.get(mid, 0) + hrr_weight / (k_rrf + rank + 1)
if mid not in meta:
meta[mid] = dict(row)
# Sort by fused score
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit]
results = []
for mid, score in ranked:
m = meta[mid]
# Bump retrieval count
self._conn.execute(
"UPDATE memories SET retrieval_count = retrieval_count + 1 WHERE memory_id = ?",
(mid,),
)
results.append({
"memory_id": mid,
"content": m["content"],
"room": m["room"],
"category": m.get("category", ""),
"trust_score": m["trust_score"],
"score": round(score, 6),
})
if results:
self._conn.commit()
return results
def _fts_search(
self, query: str, room: Optional[str], min_trust: float, limit: int
) -> list[dict]:
"""FTS5 full-text search."""
try:
if room:
rows = self._conn.execute(
"""SELECT m.memory_id, m.content, m.room, m.category,
m.trust_score, m.retrieval_count
FROM memories_fts f
JOIN memories m ON f.rowid = m.memory_id
WHERE memories_fts MATCH ? AND m.room = ?
AND m.trust_score >= ?
ORDER BY rank LIMIT ?""",
(query, room, min_trust, limit),
).fetchall()
else:
rows = self._conn.execute(
"""SELECT m.memory_id, m.content, m.room, m.category,
m.trust_score, m.retrieval_count
FROM memories_fts f
JOIN memories m ON f.rowid = m.memory_id
WHERE memories_fts MATCH ?
AND m.trust_score >= ?
ORDER BY rank LIMIT ?""",
(query, min_trust, limit),
).fetchall()
return [dict(r) for r in rows]
except sqlite3.OperationalError:
# Bad FTS query syntax — degrade gracefully
return []
def _hrr_search(
self, query: str, room: Optional[str], min_trust: float, limit: int
) -> list[dict]:
"""HRR cosine similarity search (brute-force scan, fast for <100K facts)."""
query_vec = encode_text(query)
if room:
rows = self._conn.execute(
"""SELECT memory_id, content, room, category, trust_score,
retrieval_count, hrr_vector
FROM memories
WHERE room = ? AND trust_score >= ? AND hrr_vector IS NOT NULL""",
(room, min_trust),
).fetchall()
else:
rows = self._conn.execute(
"""SELECT memory_id, content, room, category, trust_score,
retrieval_count, hrr_vector
FROM memories
WHERE trust_score >= ? AND hrr_vector IS NOT NULL""",
(min_trust,),
).fetchall()
scored = []
for r in rows:
stored_vec = deserialize_vector(r["hrr_vector"])
sim = cosine_similarity_phase(query_vec, stored_vec)
scored.append((sim, dict(r)))
scored.sort(key=lambda x: x[0], reverse=True)
return [item[1] for item in scored[:limit]]
# ------------------------------------------------------------------
# Trust management
# ------------------------------------------------------------------
def boost_trust(self, memory_id: int, delta: float = 0.05) -> None:
"""Increase trust score when a memory proves useful."""
self._conn.execute(
"""UPDATE memories SET trust_score = MIN(1.0, trust_score + ?),
updated_at = ? WHERE memory_id = ?""",
(delta, time.time(), memory_id),
)
self._conn.commit()
def decay_trust(self, memory_id: int, delta: float = 0.02) -> None:
"""Decrease trust score when a memory is contradicted."""
self._conn.execute(
"""UPDATE memories SET trust_score = MAX(0.0, trust_score - ?),
updated_at = ? WHERE memory_id = ?""",
(delta, time.time(), memory_id),
)
self._conn.commit()
# ------------------------------------------------------------------
# Room operations
# ------------------------------------------------------------------
def list_rooms(self) -> list[dict]:
"""List all rooms with fact counts."""
rows = self._conn.execute(
"""SELECT room, COUNT(*) as count,
AVG(trust_score) as avg_trust
FROM memories GROUP BY room ORDER BY count DESC"""
).fetchall()
return [dict(r) for r in rows]
def room_contents(self, room: str, limit: int = 50) -> list[dict]:
"""Get all facts in a room, ordered by trust."""
rows = self._conn.execute(
"""SELECT memory_id, content, category, trust_score,
retrieval_count, created_at
FROM memories WHERE room = ?
ORDER BY trust_score DESC, created_at DESC LIMIT ?""",
(room, limit),
).fetchall()
return [dict(r) for r in rows]
# ------------------------------------------------------------------
# Stats
# ------------------------------------------------------------------
def stats(self) -> dict:
"""Return store statistics."""
row = self._conn.execute(
"""SELECT COUNT(*) as total,
AVG(trust_score) as avg_trust,
SUM(retrieval_count) as total_retrievals,
COUNT(DISTINCT room) as room_count
FROM memories"""
).fetchone()
return dict(row)
# ------------------------------------------------------------------
# Promotion support (scratchpad → durable)
# ------------------------------------------------------------------
def log_promotion(
self,
session_id: str,
scratch_key: str,
memory_id: int,
reason: str = "",
) -> None:
"""Record a scratchpad-to-palace promotion in the audit log."""
self._conn.execute(
"""INSERT INTO promotion_log
(session_id, scratch_key, memory_id, promoted_at, reason)
VALUES (?, ?, ?, ?, ?)""",
(session_id, scratch_key, memory_id, time.time(), reason),
)
self._conn.commit()
def recent_promotions(self, limit: int = 20) -> list[dict]:
"""Get recent promotion log entries."""
rows = self._conn.execute(
"""SELECT p.*, m.content, m.room
FROM promotion_log p
LEFT JOIN memories m ON p.memory_id = m.memory_id
ORDER BY p.promoted_at DESC LIMIT ?""",
(limit,),
).fetchall()
return [dict(r) for r in rows]