feat: Sovereign Memory Store — zero-API durable memory (SQLite + FTS5 + HRR) #380
@@ -5,10 +5,13 @@ Provides:
|
||||
- retrieval_enforcer.py: L0-L5 retrieval order enforcement
|
||||
- wakeup.py: Session wake-up protocol (~300-900 tokens)
|
||||
- scratchpad.py: JSON-based session scratchpad with palace promotion
|
||||
- sovereign_store.py: Zero-API durable memory (SQLite + FTS5 + HRR vectors)
|
||||
- promotion.py: Quality-gated scratchpad-to-palace promotion (MP-4)
|
||||
|
||||
Epic: #367
|
||||
"""
|
||||
|
||||
from .mempalace import Mempalace, PalaceRoom, analyse_issues
|
||||
from .sovereign_store import SovereignStore
|
||||
|
||||
__all__ = ["Mempalace", "PalaceRoom", "analyse_issues"]
|
||||
__all__ = ["Mempalace", "PalaceRoom", "analyse_issues", "SovereignStore"]
|
||||
|
||||
188
hermes-sovereign/mempalace/promotion.py
Normal file
188
hermes-sovereign/mempalace/promotion.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""Memory Promotion — quality-gated scratchpad-to-palace promotion.
|
||||
|
||||
Implements MP-4 (#371): move session notes to durable memory only when
|
||||
they pass quality gates. No LLM calls — all heuristic-based.
|
||||
|
||||
Quality gates:
|
||||
1. Minimum content length (too short = noise)
|
||||
2. Duplicate detection (FTS5 + HRR similarity check)
|
||||
3. Structural quality (has subject-verb structure, not just a fragment)
|
||||
4. Staleness check (don't promote stale notes from old sessions)
|
||||
|
||||
Refs: Epic #367, Sub-issue #371
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
try:
|
||||
from .sovereign_store import SovereignStore
|
||||
except ImportError:
|
||||
from sovereign_store import SovereignStore
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Quality gate thresholds
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
MIN_CONTENT_WORDS = 5
|
||||
MAX_CONTENT_WORDS = 500
|
||||
DUPLICATE_SIMILARITY = 0.85
|
||||
DUPLICATE_FTS_THRESHOLD = 3
|
||||
STALE_SECONDS = 86400 * 7
|
||||
MIN_TRUST_FOR_AUTO = 0.4
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Quality checks
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _check_length(content: str) -> tuple[bool, str]:
|
||||
"""Gate 1: Content length check."""
|
||||
words = content.split()
|
||||
if len(words) < MIN_CONTENT_WORDS:
|
||||
return False, f"Too short ({len(words)} words, minimum {MIN_CONTENT_WORDS})"
|
||||
if len(words) > MAX_CONTENT_WORDS:
|
||||
return False, f"Too long ({len(words)} words, maximum {MAX_CONTENT_WORDS}). Summarize first."
|
||||
return True, "OK"
|
||||
|
||||
|
||||
def _check_structure(content: str) -> tuple[bool, str]:
|
||||
"""Gate 2: Basic structural quality."""
|
||||
if not re.search(r"[a-zA-Z]", content):
|
||||
return False, "No alphabetic content — pure code/numbers are not memory-worthy"
|
||||
if len(content.split()) < 3:
|
||||
return False, "Fragment — needs at least subject + predicate"
|
||||
return True, "OK"
|
||||
|
||||
|
||||
def _check_duplicate(content: str, store: SovereignStore, room: str) -> tuple[bool, str]:
|
||||
"""Gate 3: Duplicate detection via hybrid search."""
|
||||
results = store.search(content, room=room, limit=5, min_trust=0.0)
|
||||
for r in results:
|
||||
if r["score"] > DUPLICATE_SIMILARITY:
|
||||
return False, f"Duplicate detected: memory #{r['memory_id']} (score {r['score']:.3f})"
|
||||
if _text_overlap(content, r["content"]) > 0.8:
|
||||
return False, f"Near-duplicate text: memory #{r['memory_id']}"
|
||||
return True, "OK"
|
||||
|
||||
|
||||
def _check_staleness(written_at: float) -> tuple[bool, str]:
|
||||
"""Gate 4: Staleness check."""
|
||||
age = time.time() - written_at
|
||||
if age > STALE_SECONDS:
|
||||
days = int(age / 86400)
|
||||
return False, f"Stale ({days} days old). Review manually before promoting."
|
||||
return True, "OK"
|
||||
|
||||
|
||||
def _text_overlap(a: str, b: str) -> float:
|
||||
"""Jaccard similarity between two texts (word-level)."""
|
||||
words_a = set(a.lower().split())
|
||||
words_b = set(b.lower().split())
|
||||
if not words_a or not words_b:
|
||||
return 0.0
|
||||
intersection = words_a & words_b
|
||||
union = words_a | words_b
|
||||
return len(intersection) / len(union)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class PromotionResult:
|
||||
"""Result of a promotion attempt."""
|
||||
def __init__(self, success: bool, memory_id: Optional[int], reason: str, gates: dict):
|
||||
self.success = success
|
||||
self.memory_id = memory_id
|
||||
self.reason = reason
|
||||
self.gates = gates
|
||||
|
||||
def __repr__(self):
|
||||
status = "PROMOTED" if self.success else "REJECTED"
|
||||
return f"PromotionResult({status}: {self.reason})"
|
||||
|
||||
|
||||
def evaluate_for_promotion(
|
||||
content: str,
|
||||
store: SovereignStore,
|
||||
room: str = "general",
|
||||
written_at: Optional[float] = None,
|
||||
) -> dict:
|
||||
"""Run all quality gates without actually promoting."""
|
||||
if written_at is None:
|
||||
written_at = time.time()
|
||||
gates = {}
|
||||
gates["length"] = _check_length(content)
|
||||
gates["structure"] = _check_structure(content)
|
||||
gates["duplicate"] = _check_duplicate(content, store, room)
|
||||
gates["staleness"] = _check_staleness(written_at)
|
||||
all_passed = all(passed for passed, _ in gates.values())
|
||||
return {
|
||||
"eligible": all_passed,
|
||||
"gates": gates,
|
||||
"content_preview": content[:100] + ("..." if len(content) > 100 else ""),
|
||||
}
|
||||
|
||||
|
||||
def promote(
|
||||
content: str,
|
||||
store: SovereignStore,
|
||||
session_id: str,
|
||||
scratch_key: str,
|
||||
room: str = "general",
|
||||
category: str = "",
|
||||
trust: float = 0.5,
|
||||
written_at: Optional[float] = None,
|
||||
force: bool = False,
|
||||
) -> PromotionResult:
|
||||
"""Promote a scratchpad note to durable palace memory."""
|
||||
if written_at is None:
|
||||
written_at = time.time()
|
||||
gates = {}
|
||||
if not force:
|
||||
gates["length"] = _check_length(content)
|
||||
gates["structure"] = _check_structure(content)
|
||||
gates["duplicate"] = _check_duplicate(content, store, room)
|
||||
gates["staleness"] = _check_staleness(written_at)
|
||||
for gate_name, (passed, message) in gates.items():
|
||||
if not passed:
|
||||
return PromotionResult(
|
||||
success=False, memory_id=None,
|
||||
reason=f"Failed gate '{gate_name}': {message}", gates=gates,
|
||||
)
|
||||
memory_id = store.store(content, room=room, category=category, trust=trust)
|
||||
store.log_promotion(session_id, scratch_key, memory_id, reason="auto" if not force else "forced")
|
||||
return PromotionResult(success=True, memory_id=memory_id, reason="Promoted to durable memory", gates=gates)
|
||||
|
||||
|
||||
def promote_session_batch(
|
||||
store: SovereignStore,
|
||||
session_id: str,
|
||||
notes: dict[str, dict],
|
||||
room: str = "general",
|
||||
force: bool = False,
|
||||
) -> list[PromotionResult]:
|
||||
"""Promote all notes from a session scratchpad."""
|
||||
results = []
|
||||
for key, entry in notes.items():
|
||||
content = entry.get("value", str(entry)) if isinstance(entry, dict) else str(entry)
|
||||
written_at = None
|
||||
if isinstance(entry, dict) and "written_at" in entry:
|
||||
try:
|
||||
import datetime
|
||||
written_at = datetime.datetime.strptime(
|
||||
entry["written_at"], "%Y-%m-%d %H:%M:%S"
|
||||
).timestamp()
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
result = promote(
|
||||
content=str(content), store=store, session_id=session_id,
|
||||
scratch_key=key, room=room, written_at=written_at, force=force,
|
||||
)
|
||||
results.append(result)
|
||||
return results
|
||||
474
hermes-sovereign/mempalace/sovereign_store.py
Normal file
474
hermes-sovereign/mempalace/sovereign_store.py
Normal file
@@ -0,0 +1,474 @@
|
||||
"""Sovereign Memory Store — zero-API, zero-dependency durable memory.
|
||||
|
||||
Replaces the third-party `mempalace` CLI and its ONNX requirement with a
|
||||
self-contained SQLite + FTS5 + HRR (Holographic Reduced Representation)
|
||||
store. Every operation is local: no network calls, no API keys, no cloud.
|
||||
|
||||
Storage: ~/.hermes/palace/sovereign.db
|
||||
|
||||
Capabilities:
|
||||
- Durable fact storage with rooms, categories, and trust scores
|
||||
- Hybrid retrieval: FTS5 keyword search + HRR cosine similarity
|
||||
- Reciprocal Rank Fusion to merge keyword and semantic results
|
||||
- Trust scoring: facts that get retrieved and confirmed gain trust
|
||||
- Graceful numpy degradation: falls back to keyword-only if missing
|
||||
|
||||
Refs: Epic #367, MP-3 #370, MP-4 #371
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import math
|
||||
import sqlite3
|
||||
import struct
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HRR (Holographic Reduced Representations) — zero-dependency vectors
|
||||
# ---------------------------------------------------------------------------
|
||||
# Phase-encoded vectors via SHA-256. No ONNX, no embeddings API, no numpy
|
||||
# required (but uses numpy when available for speed).
|
||||
|
||||
_TWO_PI = 2.0 * math.pi
|
||||
_DIM = 512 # Compact dimension — sufficient for memory retrieval
|
||||
|
||||
try:
|
||||
import numpy as np
|
||||
_HAS_NUMPY = True
|
||||
except ImportError:
|
||||
_HAS_NUMPY = False
|
||||
|
||||
|
||||
def _encode_atom_np(word: str, dim: int = _DIM) -> "np.ndarray":
|
||||
"""Deterministic phase vector via SHA-256 (numpy path)."""
|
||||
values_per_block = 16
|
||||
blocks_needed = math.ceil(dim / values_per_block)
|
||||
uint16_values: list[int] = []
|
||||
for i in range(blocks_needed):
|
||||
digest = hashlib.sha256(f"{word}:{i}".encode()).digest()
|
||||
uint16_values.extend(struct.unpack("<16H", digest))
|
||||
return np.array(uint16_values[:dim], dtype=np.float64) * (_TWO_PI / 65536.0)
|
||||
|
||||
|
||||
def _encode_atom_pure(word: str, dim: int = _DIM) -> list[float]:
|
||||
"""Deterministic phase vector via SHA-256 (pure Python fallback)."""
|
||||
values_per_block = 16
|
||||
blocks_needed = math.ceil(dim / values_per_block)
|
||||
uint16_values: list[int] = []
|
||||
for i in range(blocks_needed):
|
||||
digest = hashlib.sha256(f"{word}:{i}".encode()).digest()
|
||||
for j in range(0, 32, 2):
|
||||
uint16_values.append(int.from_bytes(digest[j:j+2], "little"))
|
||||
return [v * (_TWO_PI / 65536.0) for v in uint16_values[:dim]]
|
||||
|
||||
|
||||
def encode_text(text: str, dim: int = _DIM):
|
||||
"""Encode a text string into an HRR phase vector by bundling word atoms.
|
||||
|
||||
Uses circular mean of per-word phase vectors — the standard HRR
|
||||
superposition operation. Result is a fixed-width vector regardless
|
||||
of input length.
|
||||
"""
|
||||
words = text.lower().split()
|
||||
if not words:
|
||||
words = ["<empty>"]
|
||||
|
||||
if _HAS_NUMPY:
|
||||
atoms = [_encode_atom_np(w, dim) for w in words]
|
||||
# Circular mean: average the unit vectors, extract phase
|
||||
unit_sum = sum(np.exp(1j * a) for a in atoms)
|
||||
return np.angle(unit_sum) % _TWO_PI
|
||||
else:
|
||||
# Pure Python circular mean
|
||||
real_sum = [0.0] * dim
|
||||
imag_sum = [0.0] * dim
|
||||
for w in words:
|
||||
atom = _encode_atom_pure(w, dim)
|
||||
for d in range(dim):
|
||||
real_sum[d] += math.cos(atom[d])
|
||||
imag_sum[d] += math.sin(atom[d])
|
||||
return [math.atan2(imag_sum[d], real_sum[d]) % _TWO_PI for d in range(dim)]
|
||||
|
||||
|
||||
def cosine_similarity_phase(a, b) -> float:
|
||||
"""Cosine similarity between two phase vectors.
|
||||
|
||||
For phase vectors, similarity = mean(cos(a - b)).
|
||||
"""
|
||||
if _HAS_NUMPY:
|
||||
return float(np.mean(np.cos(np.array(a) - np.array(b))))
|
||||
else:
|
||||
n = len(a)
|
||||
return sum(math.cos(a[i] - b[i]) for i in range(n)) / n
|
||||
|
||||
|
||||
def serialize_vector(vec) -> bytes:
|
||||
"""Serialize a vector to bytes for SQLite storage."""
|
||||
if _HAS_NUMPY:
|
||||
return vec.astype(np.float64).tobytes()
|
||||
else:
|
||||
return struct.pack(f"{len(vec)}d", *vec)
|
||||
|
||||
|
||||
def deserialize_vector(blob: bytes):
|
||||
"""Deserialize bytes back to a vector."""
|
||||
n = len(blob) // 8 # float64 = 8 bytes
|
||||
if _HAS_NUMPY:
|
||||
return np.frombuffer(blob, dtype=np.float64)
|
||||
else:
|
||||
return list(struct.unpack(f"{n}d", blob))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SQLite Schema
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS memories (
|
||||
memory_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
content TEXT NOT NULL,
|
||||
room TEXT DEFAULT 'general',
|
||||
category TEXT DEFAULT '',
|
||||
trust_score REAL DEFAULT 0.5,
|
||||
retrieval_count INTEGER DEFAULT 0,
|
||||
created_at REAL NOT NULL,
|
||||
updated_at REAL NOT NULL,
|
||||
hrr_vector BLOB
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_memories_room ON memories(room);
|
||||
CREATE INDEX IF NOT EXISTS idx_memories_trust ON memories(trust_score DESC);
|
||||
|
||||
-- FTS5 for fast keyword search
|
||||
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
|
||||
content, room, category,
|
||||
content=memories, content_rowid=memory_id,
|
||||
tokenize='porter unicode61'
|
||||
);
|
||||
|
||||
-- Sync triggers
|
||||
CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
|
||||
INSERT INTO memories_fts(rowid, content, room, category)
|
||||
VALUES (new.memory_id, new.content, new.room, new.category);
|
||||
END;
|
||||
|
||||
CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
|
||||
INSERT INTO memories_fts(memories_fts, rowid, content, room, category)
|
||||
VALUES ('delete', old.memory_id, old.content, old.room, old.category);
|
||||
END;
|
||||
|
||||
CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN
|
||||
INSERT INTO memories_fts(memories_fts, rowid, content, room, category)
|
||||
VALUES ('delete', old.memory_id, old.content, old.room, old.category);
|
||||
INSERT INTO memories_fts(rowid, content, room, category)
|
||||
VALUES (new.memory_id, new.content, new.room, new.category);
|
||||
END;
|
||||
|
||||
-- Promotion log: tracks what moved from scratchpad to durable memory
|
||||
CREATE TABLE IF NOT EXISTS promotion_log (
|
||||
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
session_id TEXT NOT NULL,
|
||||
scratch_key TEXT NOT NULL,
|
||||
memory_id INTEGER REFERENCES memories(memory_id),
|
||||
promoted_at REAL NOT NULL,
|
||||
reason TEXT DEFAULT ''
|
||||
);
|
||||
"""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SovereignStore
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class SovereignStore:
|
||||
"""Zero-API durable memory store.
|
||||
|
||||
All operations are local SQLite. No network calls. No API keys.
|
||||
HRR vectors provide semantic similarity without embedding models.
|
||||
FTS5 provides fast keyword search. RRF merges both rankings.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: Optional[str] = None):
|
||||
if db_path is None:
|
||||
db_path = str(Path.home() / ".hermes" / "palace" / "sovereign.db")
|
||||
self._db_path = db_path
|
||||
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
self._conn = sqlite3.connect(db_path)
|
||||
self._conn.row_factory = sqlite3.Row
|
||||
self._conn.executescript(_SCHEMA)
|
||||
|
||||
def close(self):
|
||||
self._conn.close()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Store
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def store(
|
||||
self,
|
||||
content: str,
|
||||
room: str = "general",
|
||||
category: str = "",
|
||||
trust: float = 0.5,
|
||||
) -> int:
|
||||
"""Store a fact in durable memory. Returns the memory_id."""
|
||||
now = time.time()
|
||||
vec = encode_text(content)
|
||||
blob = serialize_vector(vec)
|
||||
cur = self._conn.execute(
|
||||
"""INSERT INTO memories (content, room, category, trust_score,
|
||||
created_at, updated_at, hrr_vector)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(content, room, category, trust, now, now, blob),
|
||||
)
|
||||
self._conn.commit()
|
||||
return cur.lastrowid
|
||||
|
||||
def store_batch(self, items: list[dict]) -> list[int]:
|
||||
"""Store multiple facts. Each item: {content, room?, category?, trust?}."""
|
||||
ids = []
|
||||
now = time.time()
|
||||
for item in items:
|
||||
content = item["content"]
|
||||
vec = encode_text(content)
|
||||
blob = serialize_vector(vec)
|
||||
cur = self._conn.execute(
|
||||
"""INSERT INTO memories (content, room, category, trust_score,
|
||||
created_at, updated_at, hrr_vector)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
content,
|
||||
item.get("room", "general"),
|
||||
item.get("category", ""),
|
||||
item.get("trust", 0.5),
|
||||
now, now, blob,
|
||||
),
|
||||
)
|
||||
ids.append(cur.lastrowid)
|
||||
self._conn.commit()
|
||||
return ids
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Search — hybrid FTS5 + HRR with Reciprocal Rank Fusion
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def search(
|
||||
self,
|
||||
query: str,
|
||||
room: Optional[str] = None,
|
||||
limit: int = 10,
|
||||
min_trust: float = 0.0,
|
||||
fts_weight: float = 0.5,
|
||||
hrr_weight: float = 0.5,
|
||||
) -> list[dict]:
|
||||
"""Hybrid search: FTS5 keywords + HRR semantic similarity.
|
||||
|
||||
Uses Reciprocal Rank Fusion (RRF) to merge both rankings.
|
||||
Returns list of dicts with content, room, score, trust_score.
|
||||
"""
|
||||
k_rrf = 60 # Standard RRF constant
|
||||
|
||||
# Stage 1: FTS5 candidates
|
||||
fts_results = self._fts_search(query, room, min_trust, limit * 3)
|
||||
|
||||
# Stage 2: HRR candidates (scan top N by trust)
|
||||
hrr_results = self._hrr_search(query, room, min_trust, limit * 3)
|
||||
|
||||
# Stage 3: RRF fusion
|
||||
scores: dict[int, float] = {}
|
||||
meta: dict[int, dict] = {}
|
||||
|
||||
for rank, row in enumerate(fts_results):
|
||||
mid = row["memory_id"]
|
||||
scores[mid] = scores.get(mid, 0) + fts_weight / (k_rrf + rank + 1)
|
||||
meta[mid] = dict(row)
|
||||
|
||||
for rank, row in enumerate(hrr_results):
|
||||
mid = row["memory_id"]
|
||||
scores[mid] = scores.get(mid, 0) + hrr_weight / (k_rrf + rank + 1)
|
||||
if mid not in meta:
|
||||
meta[mid] = dict(row)
|
||||
|
||||
# Sort by fused score
|
||||
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit]
|
||||
|
||||
results = []
|
||||
for mid, score in ranked:
|
||||
m = meta[mid]
|
||||
# Bump retrieval count
|
||||
self._conn.execute(
|
||||
"UPDATE memories SET retrieval_count = retrieval_count + 1 WHERE memory_id = ?",
|
||||
(mid,),
|
||||
)
|
||||
results.append({
|
||||
"memory_id": mid,
|
||||
"content": m["content"],
|
||||
"room": m["room"],
|
||||
"category": m.get("category", ""),
|
||||
"trust_score": m["trust_score"],
|
||||
"score": round(score, 6),
|
||||
})
|
||||
|
||||
if results:
|
||||
self._conn.commit()
|
||||
return results
|
||||
|
||||
def _fts_search(
|
||||
self, query: str, room: Optional[str], min_trust: float, limit: int
|
||||
) -> list[dict]:
|
||||
"""FTS5 full-text search."""
|
||||
try:
|
||||
if room:
|
||||
rows = self._conn.execute(
|
||||
"""SELECT m.memory_id, m.content, m.room, m.category,
|
||||
m.trust_score, m.retrieval_count
|
||||
FROM memories_fts f
|
||||
JOIN memories m ON f.rowid = m.memory_id
|
||||
WHERE memories_fts MATCH ? AND m.room = ?
|
||||
AND m.trust_score >= ?
|
||||
ORDER BY rank LIMIT ?""",
|
||||
(query, room, min_trust, limit),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = self._conn.execute(
|
||||
"""SELECT m.memory_id, m.content, m.room, m.category,
|
||||
m.trust_score, m.retrieval_count
|
||||
FROM memories_fts f
|
||||
JOIN memories m ON f.rowid = m.memory_id
|
||||
WHERE memories_fts MATCH ?
|
||||
AND m.trust_score >= ?
|
||||
ORDER BY rank LIMIT ?""",
|
||||
(query, min_trust, limit),
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
except sqlite3.OperationalError:
|
||||
# Bad FTS query syntax — degrade gracefully
|
||||
return []
|
||||
|
||||
def _hrr_search(
|
||||
self, query: str, room: Optional[str], min_trust: float, limit: int
|
||||
) -> list[dict]:
|
||||
"""HRR cosine similarity search (brute-force scan, fast for <100K facts)."""
|
||||
query_vec = encode_text(query)
|
||||
|
||||
if room:
|
||||
rows = self._conn.execute(
|
||||
"""SELECT memory_id, content, room, category, trust_score,
|
||||
retrieval_count, hrr_vector
|
||||
FROM memories
|
||||
WHERE room = ? AND trust_score >= ? AND hrr_vector IS NOT NULL""",
|
||||
(room, min_trust),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = self._conn.execute(
|
||||
"""SELECT memory_id, content, room, category, trust_score,
|
||||
retrieval_count, hrr_vector
|
||||
FROM memories
|
||||
WHERE trust_score >= ? AND hrr_vector IS NOT NULL""",
|
||||
(min_trust,),
|
||||
).fetchall()
|
||||
|
||||
scored = []
|
||||
for r in rows:
|
||||
stored_vec = deserialize_vector(r["hrr_vector"])
|
||||
sim = cosine_similarity_phase(query_vec, stored_vec)
|
||||
scored.append((sim, dict(r)))
|
||||
|
||||
scored.sort(key=lambda x: x[0], reverse=True)
|
||||
return [item[1] for item in scored[:limit]]
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Trust management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def boost_trust(self, memory_id: int, delta: float = 0.05) -> None:
|
||||
"""Increase trust score when a memory proves useful."""
|
||||
self._conn.execute(
|
||||
"""UPDATE memories SET trust_score = MIN(1.0, trust_score + ?),
|
||||
updated_at = ? WHERE memory_id = ?""",
|
||||
(delta, time.time(), memory_id),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def decay_trust(self, memory_id: int, delta: float = 0.02) -> None:
|
||||
"""Decrease trust score when a memory is contradicted."""
|
||||
self._conn.execute(
|
||||
"""UPDATE memories SET trust_score = MAX(0.0, trust_score - ?),
|
||||
updated_at = ? WHERE memory_id = ?""",
|
||||
(delta, time.time(), memory_id),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Room operations
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def list_rooms(self) -> list[dict]:
|
||||
"""List all rooms with fact counts."""
|
||||
rows = self._conn.execute(
|
||||
"""SELECT room, COUNT(*) as count,
|
||||
AVG(trust_score) as avg_trust
|
||||
FROM memories GROUP BY room ORDER BY count DESC"""
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
def room_contents(self, room: str, limit: int = 50) -> list[dict]:
|
||||
"""Get all facts in a room, ordered by trust."""
|
||||
rows = self._conn.execute(
|
||||
"""SELECT memory_id, content, category, trust_score,
|
||||
retrieval_count, created_at
|
||||
FROM memories WHERE room = ?
|
||||
ORDER BY trust_score DESC, created_at DESC LIMIT ?""",
|
||||
(room, limit),
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Stats
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def stats(self) -> dict:
|
||||
"""Return store statistics."""
|
||||
row = self._conn.execute(
|
||||
"""SELECT COUNT(*) as total,
|
||||
AVG(trust_score) as avg_trust,
|
||||
SUM(retrieval_count) as total_retrievals,
|
||||
COUNT(DISTINCT room) as room_count
|
||||
FROM memories"""
|
||||
).fetchone()
|
||||
return dict(row)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Promotion support (scratchpad → durable)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def log_promotion(
|
||||
self,
|
||||
session_id: str,
|
||||
scratch_key: str,
|
||||
memory_id: int,
|
||||
reason: str = "",
|
||||
) -> None:
|
||||
"""Record a scratchpad-to-palace promotion in the audit log."""
|
||||
self._conn.execute(
|
||||
"""INSERT INTO promotion_log
|
||||
(session_id, scratch_key, memory_id, promoted_at, reason)
|
||||
VALUES (?, ?, ?, ?, ?)""",
|
||||
(session_id, scratch_key, memory_id, time.time(), reason),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def recent_promotions(self, limit: int = 20) -> list[dict]:
|
||||
"""Get recent promotion log entries."""
|
||||
rows = self._conn.execute(
|
||||
"""SELECT p.*, m.content, m.room
|
||||
FROM promotion_log p
|
||||
LEFT JOIN memories m ON p.memory_id = m.memory_id
|
||||
ORDER BY p.promoted_at DESC LIMIT ?""",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
255
hermes-sovereign/mempalace/tests/test_sovereign_store.py
Normal file
255
hermes-sovereign/mempalace/tests/test_sovereign_store.py
Normal file
@@ -0,0 +1,255 @@
|
||||
"""Tests for the Sovereign Memory Store and Promotion system.
|
||||
|
||||
Zero-API, zero-network — everything runs against an in-memory SQLite DB.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
|
||||
# Allow imports from parent package
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
from sovereign_store import (
|
||||
SovereignStore,
|
||||
encode_text,
|
||||
cosine_similarity_phase,
|
||||
serialize_vector,
|
||||
deserialize_vector,
|
||||
)
|
||||
from promotion import (
|
||||
evaluate_for_promotion,
|
||||
promote,
|
||||
promote_session_batch,
|
||||
)
|
||||
|
||||
|
||||
class TestHRRVectors(unittest.TestCase):
|
||||
"""Test the HRR encoding and similarity functions."""
|
||||
|
||||
def test_deterministic_encoding(self):
|
||||
"""Same text always produces the same vector."""
|
||||
v1 = encode_text("hello world")
|
||||
v2 = encode_text("hello world")
|
||||
self.assertAlmostEqual(cosine_similarity_phase(v1, v2), 1.0, places=5)
|
||||
|
||||
def test_similar_texts_higher_similarity(self):
|
||||
"""Related texts should be more similar than unrelated ones."""
|
||||
v_agent = encode_text("agent memory palace retrieval")
|
||||
v_similar = encode_text("agent recall memory search")
|
||||
v_unrelated = encode_text("banana strawberry fruit smoothie")
|
||||
sim_related = cosine_similarity_phase(v_agent, v_similar)
|
||||
sim_unrelated = cosine_similarity_phase(v_agent, v_unrelated)
|
||||
self.assertGreater(sim_related, sim_unrelated)
|
||||
|
||||
def test_serialize_roundtrip(self):
|
||||
"""Vectors survive serialization to/from bytes."""
|
||||
vec = encode_text("test serialization")
|
||||
blob = serialize_vector(vec)
|
||||
restored = deserialize_vector(blob)
|
||||
sim = cosine_similarity_phase(vec, restored)
|
||||
self.assertAlmostEqual(sim, 1.0, places=5)
|
||||
|
||||
def test_empty_text(self):
|
||||
"""Empty text gets a fallback encoding."""
|
||||
vec = encode_text("")
|
||||
self.assertEqual(len(vec) if hasattr(vec, '__len__') else len(list(vec)), 512)
|
||||
|
||||
|
||||
class TestSovereignStore(unittest.TestCase):
|
||||
"""Test the SQLite-backed sovereign store."""
|
||||
|
||||
def setUp(self):
|
||||
self.db_path = os.path.join(tempfile.mkdtemp(), "test.db")
|
||||
self.store = SovereignStore(db_path=self.db_path)
|
||||
|
||||
def tearDown(self):
|
||||
self.store.close()
|
||||
if os.path.exists(self.db_path):
|
||||
os.remove(self.db_path)
|
||||
|
||||
def test_store_and_retrieve(self):
|
||||
"""Store a fact and find it via search."""
|
||||
mid = self.store.store("Timmy is a sovereign AI agent on Hermes VPS", room="identity")
|
||||
results = self.store.search("sovereign agent", room="identity")
|
||||
self.assertTrue(any(r["memory_id"] == mid for r in results))
|
||||
|
||||
def test_fts_search(self):
|
||||
"""FTS5 keyword search works."""
|
||||
self.store.store("The beacon game uses paperclips mechanics", room="projects")
|
||||
self.store.store("Fleet agents handle delegation and dispatch", room="fleet")
|
||||
results = self.store.search("paperclips")
|
||||
self.assertTrue(len(results) > 0)
|
||||
self.assertIn("paperclips", results[0]["content"].lower())
|
||||
|
||||
def test_hrr_search_semantic(self):
|
||||
"""HRR similarity finds related content even without exact keywords."""
|
||||
self.store.store("Memory palace rooms organize facts spatially", room="memory")
|
||||
self.store.store("Pizza delivery service runs on weekends", room="unrelated")
|
||||
results = self.store.search("organize knowledge rooms", room="memory")
|
||||
self.assertTrue(len(results) > 0)
|
||||
self.assertIn("palace", results[0]["content"].lower())
|
||||
|
||||
def test_room_filtering(self):
|
||||
"""Room filter restricts search scope."""
|
||||
self.store.store("Hermes harness manages tool calls", room="infrastructure")
|
||||
self.store.store("Hermes mythology Greek god", room="lore")
|
||||
results = self.store.search("Hermes", room="infrastructure")
|
||||
self.assertTrue(all(r["room"] == "infrastructure" for r in results))
|
||||
|
||||
def test_trust_boost(self):
|
||||
"""Trust score increases when boosted."""
|
||||
mid = self.store.store("fact", trust=0.5)
|
||||
self.store.boost_trust(mid, delta=0.1)
|
||||
results = self.store.room_contents("general")
|
||||
fact = next(r for r in results if r["memory_id"] == mid)
|
||||
self.assertAlmostEqual(fact["trust_score"], 0.6, places=2)
|
||||
|
||||
def test_trust_decay(self):
|
||||
"""Trust score decreases when decayed."""
|
||||
mid = self.store.store("questionable fact", trust=0.5)
|
||||
self.store.decay_trust(mid, delta=0.2)
|
||||
results = self.store.room_contents("general")
|
||||
fact = next(r for r in results if r["memory_id"] == mid)
|
||||
self.assertAlmostEqual(fact["trust_score"], 0.3, places=2)
|
||||
|
||||
def test_batch_store(self):
|
||||
"""Batch store works."""
|
||||
ids = self.store.store_batch([
|
||||
{"content": "fact one", "room": "test"},
|
||||
{"content": "fact two", "room": "test"},
|
||||
{"content": "fact three", "room": "test"},
|
||||
])
|
||||
self.assertEqual(len(ids), 3)
|
||||
rooms = self.store.list_rooms()
|
||||
test_room = next(r for r in rooms if r["room"] == "test")
|
||||
self.assertEqual(test_room["count"], 3)
|
||||
|
||||
def test_stats(self):
|
||||
"""Stats returns correct counts."""
|
||||
self.store.store("a fact", room="r1")
|
||||
self.store.store("another fact", room="r2")
|
||||
s = self.store.stats()
|
||||
self.assertEqual(s["total"], 2)
|
||||
self.assertEqual(s["room_count"], 2)
|
||||
|
||||
def test_retrieval_count_increments(self):
|
||||
"""Retrieval count goes up when a fact is found via search."""
|
||||
self.store.store("unique searchable content xyz123", room="test")
|
||||
self.store.search("xyz123")
|
||||
results = self.store.room_contents("test")
|
||||
self.assertTrue(any(r["retrieval_count"] > 0 for r in results))
|
||||
|
||||
|
||||
class TestPromotion(unittest.TestCase):
|
||||
"""Test the quality-gated promotion system."""
|
||||
|
||||
def setUp(self):
|
||||
self.db_path = os.path.join(tempfile.mkdtemp(), "promo_test.db")
|
||||
self.store = SovereignStore(db_path=self.db_path)
|
||||
|
||||
def tearDown(self):
|
||||
self.store.close()
|
||||
|
||||
def test_successful_promotion(self):
|
||||
"""Good content passes all gates."""
|
||||
result = promote(
|
||||
content="Timmy runs on the Hermes VPS at 143.198.27.163 with local Ollama inference",
|
||||
store=self.store,
|
||||
session_id="test-session-001",
|
||||
scratch_key="vps_info",
|
||||
room="infrastructure",
|
||||
)
|
||||
self.assertTrue(result.success)
|
||||
self.assertIsNotNone(result.memory_id)
|
||||
|
||||
def test_reject_too_short(self):
|
||||
"""Short fragments get rejected."""
|
||||
result = promote(
|
||||
content="yes",
|
||||
store=self.store,
|
||||
session_id="test",
|
||||
scratch_key="short",
|
||||
)
|
||||
self.assertFalse(result.success)
|
||||
self.assertIn("Too short", result.reason)
|
||||
|
||||
def test_reject_duplicate(self):
|
||||
"""Duplicate content gets rejected."""
|
||||
self.store.store("SOUL.md is the canonical identity document for Timmy", room="identity")
|
||||
result = promote(
|
||||
content="SOUL.md is the canonical identity document for Timmy",
|
||||
store=self.store,
|
||||
session_id="test",
|
||||
scratch_key="soul",
|
||||
room="identity",
|
||||
)
|
||||
self.assertFalse(result.success)
|
||||
self.assertIn("uplicate", result.reason)
|
||||
|
||||
def test_reject_stale(self):
|
||||
"""Old notes get flagged as stale."""
|
||||
old_time = time.time() - (86400 * 10)
|
||||
result = promote(
|
||||
content="This is a note from long ago about something important",
|
||||
store=self.store,
|
||||
session_id="test",
|
||||
scratch_key="old",
|
||||
written_at=old_time,
|
||||
)
|
||||
self.assertFalse(result.success)
|
||||
self.assertIn("Stale", result.reason)
|
||||
|
||||
def test_force_bypasses_gates(self):
|
||||
"""Force flag overrides quality gates."""
|
||||
result = promote(
|
||||
content="ok",
|
||||
store=self.store,
|
||||
session_id="test",
|
||||
scratch_key="forced",
|
||||
force=True,
|
||||
)
|
||||
self.assertTrue(result.success)
|
||||
|
||||
def test_evaluate_dry_run(self):
|
||||
"""Evaluate returns gate details without promoting."""
|
||||
eval_result = evaluate_for_promotion(
|
||||
content="The fleet uses kimi-k2.5 as the primary model for all agent operations",
|
||||
store=self.store,
|
||||
room="fleet",
|
||||
)
|
||||
self.assertTrue(eval_result["eligible"])
|
||||
self.assertTrue(all(p for p, _ in eval_result["gates"].values()))
|
||||
|
||||
def test_batch_promotion(self):
|
||||
"""Batch promotion processes all notes."""
|
||||
notes = {
|
||||
"infra": {"value": "Hermes VPS runs Ubuntu 22.04 with 2 vCPUs and 4GB RAM", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")},
|
||||
"short": {"value": "no", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")},
|
||||
"model": {"value": "The primary local model is gemma4:latest running on Ollama", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")},
|
||||
}
|
||||
results = promote_session_batch(self.store, "batch-session", notes, room="config")
|
||||
promoted = [r for r in results if r.success]
|
||||
rejected = [r for r in results if not r.success]
|
||||
self.assertEqual(len(promoted), 2)
|
||||
self.assertEqual(len(rejected), 1)
|
||||
|
||||
def test_promotion_logged(self):
|
||||
"""Successful promotions appear in the audit log."""
|
||||
promote(
|
||||
content="Forge is hosted at forge.alexanderwhitestone.com running Gitea",
|
||||
store=self.store,
|
||||
session_id="log-test",
|
||||
scratch_key="forge",
|
||||
room="infrastructure",
|
||||
)
|
||||
log = self.store.recent_promotions()
|
||||
self.assertTrue(len(log) > 0)
|
||||
self.assertEqual(log[0]["session_id"], "log-test")
|
||||
self.assertEqual(log[0]["scratch_key"], "forge")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user