Replaces the subprocess call to mempalace CLI binary with direct SovereignStore import. L1 palace search now uses SQLite + FTS5 + HRR vectors in-process. No ONNX, no subprocess, no API calls. Removes: import subprocess, MEMPALACE_BIN constant Adds: SovereignStore lazy singleton, _get_store(), SOVEREIGN_DB path Closes #383 Depends on #380 (sovereign_store.py)
311 lines
10 KiB
Python
311 lines
10 KiB
Python
"""Retrieval Order Enforcer — L0 through L5 memory hierarchy.
|
|
|
|
Ensures the agent checks durable memory before falling back to free generation.
|
|
Gracefully degrades if any layer is unavailable (missing files, etc).
|
|
|
|
Layer order:
|
|
L0: Identity (~/.mempalace/identity.txt)
|
|
L1: Palace rooms (SovereignStore — SQLite + FTS5 + HRR, zero API calls)
|
|
L2: Session scratch (~/.hermes/scratchpad/{session_id}.json)
|
|
L3: Gitea artifacts (API search for issues/PRs)
|
|
L4: Procedures (skills directory search)
|
|
L5: Free generation (only if L0-L4 produced nothing)
|
|
|
|
Refs: Epic #367, Sub-issue #369, Wiring: #383
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sovereign Store (replaces mempalace CLI subprocess)
|
|
# ---------------------------------------------------------------------------
|
|
try:
|
|
from .sovereign_store import SovereignStore
|
|
except ImportError:
|
|
try:
|
|
from sovereign_store import SovereignStore
|
|
except ImportError:
|
|
SovereignStore = None # type: ignore[misc,assignment]
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
|
|
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
|
|
SKILLS_DIR = Path.home() / ".hermes" / "skills"
|
|
SOVEREIGN_DB = Path.home() / ".hermes" / "palace" / "sovereign.db"
|
|
|
|
# Patterns that indicate a recall-style query
|
|
RECALL_PATTERNS = re.compile(
|
|
r"(?i)\b("
|
|
r"what did|status of|remember|last time|yesterday|previously|"
|
|
r"we discussed|we talked|we worked|you said|you mentioned|"
|
|
r"remind me|what was|what were|how did|when did|"
|
|
r"earlier today|last session|before this"
|
|
r")\b"
|
|
)
|
|
|
|
# Singleton store instance (lazy-init)
|
|
_store: Optional["SovereignStore"] = None
|
|
|
|
|
|
def _get_store() -> Optional["SovereignStore"]:
|
|
"""Lazy-init the SovereignStore singleton."""
|
|
global _store
|
|
if _store is not None:
|
|
return _store
|
|
if SovereignStore is None:
|
|
return None
|
|
try:
|
|
_store = SovereignStore(db_path=str(SOVEREIGN_DB))
|
|
return _store
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# L0: Identity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_identity() -> str:
|
|
"""Read the agent identity file. Returns empty string on failure."""
|
|
try:
|
|
if IDENTITY_PATH.exists():
|
|
text = IDENTITY_PATH.read_text(encoding="utf-8").strip()
|
|
# Cap at ~200 tokens to keep wake-up lean
|
|
if len(text.split()) > 200:
|
|
text = " ".join(text.split()[:200]) + "..."
|
|
return text
|
|
except (OSError, PermissionError):
|
|
pass
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# L1: Palace search (now via SovereignStore — zero subprocess, zero API)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def search_palace(query: str, room: Optional[str] = None) -> str:
|
|
"""Search the sovereign memory store for relevant memories.
|
|
|
|
Uses SovereignStore (SQLite + FTS5 + HRR) for hybrid keyword + semantic
|
|
search. No subprocess calls, no ONNX, no API keys.
|
|
|
|
Gracefully degrades to empty string if store is unavailable.
|
|
"""
|
|
store = _get_store()
|
|
if store is None:
|
|
return ""
|
|
try:
|
|
results = store.search(query, room=room, limit=5, min_trust=0.2)
|
|
if not results:
|
|
return ""
|
|
lines = []
|
|
for r in results:
|
|
trust = r.get("trust_score", 0.5)
|
|
room_name = r.get("room", "general")
|
|
content = r.get("content", "")
|
|
lines.append(f" [{room_name}] (trust:{trust:.2f}) {content}")
|
|
return "\n".join(lines)
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# L2: Session scratchpad
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_scratchpad(session_id: str) -> str:
|
|
"""Load the session scratchpad as formatted text."""
|
|
try:
|
|
scratch_file = SCRATCHPAD_DIR / f"{session_id}.json"
|
|
if scratch_file.exists():
|
|
data = json.loads(scratch_file.read_text(encoding="utf-8"))
|
|
if isinstance(data, dict) and data:
|
|
lines = []
|
|
for k, v in data.items():
|
|
lines.append(f" {k}: {v}")
|
|
return "\n".join(lines)
|
|
except (OSError, json.JSONDecodeError):
|
|
pass
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# L3: Gitea artifact search
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _load_gitea_token() -> str:
|
|
"""Read the Gitea API token."""
|
|
token_path = Path.home() / ".hermes" / "gitea_token_vps"
|
|
try:
|
|
if token_path.exists():
|
|
return token_path.read_text(encoding="utf-8").strip()
|
|
except OSError:
|
|
pass
|
|
return ""
|
|
|
|
|
|
def search_gitea(query: str) -> str:
|
|
"""Search Gitea issues/PRs for context. Returns formatted text or empty string."""
|
|
token = _load_gitea_token()
|
|
if not token:
|
|
return ""
|
|
|
|
api_base = "https://forge.alexanderwhitestone.com/api/v1"
|
|
# Extract key terms for search (first 3 significant words)
|
|
terms = [w for w in query.split() if len(w) > 3][:3]
|
|
search_q = " ".join(terms) if terms else query[:50]
|
|
|
|
try:
|
|
import urllib.request
|
|
import urllib.parse
|
|
|
|
url = (
|
|
f"{api_base}/repos/search?"
|
|
f"q={urllib.parse.quote(search_q)}&limit=3"
|
|
)
|
|
req = urllib.request.Request(url, headers={
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/json",
|
|
})
|
|
with urllib.request.urlopen(req, timeout=8) as resp:
|
|
data = json.loads(resp.read().decode())
|
|
if data.get("data"):
|
|
lines = []
|
|
for repo in data["data"][:3]:
|
|
lines.append(f" {repo['full_name']}: {repo.get('description', 'no desc')}")
|
|
return "\n".join(lines)
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# L4: Procedures (skills search)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def search_skills(query: str) -> str:
|
|
"""Search skills directory for matching procedures."""
|
|
try:
|
|
if not SKILLS_DIR.exists():
|
|
return ""
|
|
|
|
query_lower = query.lower()
|
|
terms = [w for w in query_lower.split() if len(w) > 3]
|
|
if not terms:
|
|
return ""
|
|
|
|
matches = []
|
|
for skill_dir in SKILLS_DIR.iterdir():
|
|
if not skill_dir.is_dir():
|
|
continue
|
|
skill_md = skill_dir / "SKILL.md"
|
|
if skill_md.exists():
|
|
try:
|
|
content = skill_md.read_text(encoding="utf-8").lower()
|
|
if any(t in content for t in terms):
|
|
title = skill_dir.name
|
|
matches.append(f" skill: {title}")
|
|
except OSError:
|
|
continue
|
|
|
|
if matches:
|
|
return "\n".join(matches[:5])
|
|
except OSError:
|
|
pass
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main enforcer
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_recall_query(query: str) -> bool:
|
|
"""Detect whether a query is asking for recalled/historical information."""
|
|
return bool(RECALL_PATTERNS.search(query))
|
|
|
|
|
|
def enforce_retrieval_order(
|
|
query: str,
|
|
session_id: Optional[str] = None,
|
|
skip_if_not_recall: bool = True,
|
|
) -> dict:
|
|
"""Check palace layers before allowing free generation.
|
|
|
|
Args:
|
|
query: The user's query text.
|
|
session_id: Current session ID for scratchpad access.
|
|
skip_if_not_recall: If True (default), skip enforcement for
|
|
non-recall queries and return empty result.
|
|
|
|
Returns:
|
|
dict with keys:
|
|
retrieved_from: Highest layer that produced results (e.g. 'L1')
|
|
context: Aggregated context string
|
|
tokens: Approximate word count of context
|
|
layers_checked: List of layers that were consulted
|
|
"""
|
|
result = {
|
|
"retrieved_from": None,
|
|
"context": "",
|
|
"tokens": 0,
|
|
"layers_checked": [],
|
|
}
|
|
|
|
# Gate: skip for non-recall queries if configured
|
|
if skip_if_not_recall and not is_recall_query(query):
|
|
return result
|
|
|
|
# L0: Identity (always prepend)
|
|
identity = load_identity()
|
|
if identity:
|
|
result["context"] += f"## Identity\n{identity}\n\n"
|
|
result["layers_checked"].append("L0")
|
|
|
|
# L1: Palace search (SovereignStore — zero API, zero subprocess)
|
|
palace_results = search_palace(query)
|
|
if palace_results:
|
|
result["context"] += f"## Palace Memory\n{palace_results}\n\n"
|
|
result["retrieved_from"] = "L1"
|
|
result["layers_checked"].append("L1")
|
|
|
|
# L2: Scratchpad
|
|
if session_id:
|
|
scratch = load_scratchpad(session_id)
|
|
if scratch:
|
|
result["context"] += f"## Session Notes\n{scratch}\n\n"
|
|
if not result["retrieved_from"]:
|
|
result["retrieved_from"] = "L2"
|
|
result["layers_checked"].append("L2")
|
|
|
|
# L3: Gitea artifacts (only if still no context from L1/L2)
|
|
if not result["retrieved_from"]:
|
|
artifacts = search_gitea(query)
|
|
if artifacts:
|
|
result["context"] += f"## Gitea Context\n{artifacts}\n\n"
|
|
result["retrieved_from"] = "L3"
|
|
result["layers_checked"].append("L3")
|
|
|
|
# L4: Procedures (only if still no context)
|
|
if not result["retrieved_from"]:
|
|
procedures = search_skills(query)
|
|
if procedures:
|
|
result["context"] += f"## Related Skills\n{procedures}\n\n"
|
|
result["retrieved_from"] = "L4"
|
|
result["layers_checked"].append("L4")
|
|
|
|
# L5: Free generation (no context found — just mark it)
|
|
if not result["retrieved_from"]:
|
|
result["retrieved_from"] = "L5"
|
|
result["layers_checked"].append("L5")
|
|
|
|
result["tokens"] = len(result["context"].split())
|
|
return result
|