MP-1 (#368): Port PalaceRoom + Mempalace classes with 22 unit tests MP-2 (#369): L0-L5 retrieval order enforcer with recall-query detection MP-5 (#372): Wake-up protocol (300-900 token context), session scratchpad Modules: - mempalace.py: PalaceRoom + Mempalace dataclasses, factory constructors - retrieval_enforcer.py: Layered memory retrieval (identity → palace → scratch → gitea → skills) - wakeup.py: Session wake-up with caching (5min TTL) - scratchpad.py: JSON-based session notes with palace promotion All 65 tests pass. Pure stdlib + graceful degradation for ONNX issues (#373).
278 lines
9.3 KiB
Python
278 lines
9.3 KiB
Python
"""Retrieval Order Enforcer — L0 through L5 memory hierarchy.
|
|
|
|
Ensures the agent checks durable memory before falling back to free generation.
|
|
Gracefully degrades if any layer is unavailable (ONNX issues, missing files, etc).
|
|
|
|
Layer order:
|
|
L0: Identity (~/.mempalace/identity.txt)
|
|
L1: Palace rooms (mempalace CLI search)
|
|
L2: Session scratch (~/.hermes/scratchpad/{session_id}.json)
|
|
L3: Gitea artifacts (API search for issues/PRs)
|
|
L4: Procedures (skills directory search)
|
|
L5: Free generation (only if L0-L4 produced nothing)
|
|
|
|
Refs: Epic #367, Sub-issue #369
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
|
|
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
|
|
SKILLS_DIR = Path.home() / ".hermes" / "skills"
|
|
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
|
|
|
|
# Patterns that indicate a recall-style query
|
|
RECALL_PATTERNS = re.compile(
|
|
r"(?i)\b("
|
|
r"what did|status of|remember|last time|yesterday|previously|"
|
|
r"we discussed|we talked|we worked|you said|you mentioned|"
|
|
r"remind me|what was|what were|how did|when did|"
|
|
r"earlier today|last session|before this"
|
|
r")\b"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# L0: Identity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_identity() -> str:
|
|
"""Read the agent identity file. Returns empty string on failure."""
|
|
try:
|
|
if IDENTITY_PATH.exists():
|
|
text = IDENTITY_PATH.read_text(encoding="utf-8").strip()
|
|
# Cap at ~200 tokens to keep wake-up lean
|
|
if len(text.split()) > 200:
|
|
text = " ".join(text.split()[:200]) + "..."
|
|
return text
|
|
except (OSError, PermissionError):
|
|
pass
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# L1: Palace search
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def search_palace(query: str) -> str:
|
|
"""Search the mempalace for relevant memories. Gracefully degrades on failure."""
|
|
try:
|
|
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
|
|
result = subprocess.run(
|
|
[bin_path, "search", query],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
return result.stdout.strip()
|
|
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
|
|
# ONNX issues (#373) or mempalace not installed — degrade gracefully
|
|
pass
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# L2: Session scratchpad
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_scratchpad(session_id: str) -> str:
|
|
"""Load the session scratchpad as formatted text."""
|
|
try:
|
|
scratch_file = SCRATCHPAD_DIR / f"{session_id}.json"
|
|
if scratch_file.exists():
|
|
data = json.loads(scratch_file.read_text(encoding="utf-8"))
|
|
if isinstance(data, dict) and data:
|
|
lines = []
|
|
for k, v in data.items():
|
|
lines.append(f" {k}: {v}")
|
|
return "\n".join(lines)
|
|
except (OSError, json.JSONDecodeError):
|
|
pass
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# L3: Gitea artifact search
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _load_gitea_token() -> str:
|
|
"""Read the Gitea API token."""
|
|
token_path = Path.home() / ".hermes" / "gitea_token_vps"
|
|
try:
|
|
if token_path.exists():
|
|
return token_path.read_text(encoding="utf-8").strip()
|
|
except OSError:
|
|
pass
|
|
return ""
|
|
|
|
|
|
def search_gitea(query: str) -> str:
|
|
"""Search Gitea issues/PRs for context. Returns formatted text or empty string."""
|
|
token = _load_gitea_token()
|
|
if not token:
|
|
return ""
|
|
|
|
api_base = "https://forge.alexanderwhitestone.com/api/v1"
|
|
# Extract key terms for search (first 3 significant words)
|
|
terms = [w for w in query.split() if len(w) > 3][:3]
|
|
search_q = " ".join(terms) if terms else query[:50]
|
|
|
|
try:
|
|
import urllib.request
|
|
import urllib.parse
|
|
|
|
url = (
|
|
f"{api_base}/repos/search?"
|
|
f"q={urllib.parse.quote(search_q)}&limit=3"
|
|
)
|
|
req = urllib.request.Request(url, headers={
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/json",
|
|
})
|
|
with urllib.request.urlopen(req, timeout=8) as resp:
|
|
data = json.loads(resp.read().decode())
|
|
if data.get("data"):
|
|
lines = []
|
|
for repo in data["data"][:3]:
|
|
lines.append(f" {repo['full_name']}: {repo.get('description', 'no desc')}")
|
|
return "\n".join(lines)
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# L4: Procedures (skills search)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def search_skills(query: str) -> str:
|
|
"""Search skills directory for matching procedures."""
|
|
try:
|
|
if not SKILLS_DIR.exists():
|
|
return ""
|
|
|
|
query_lower = query.lower()
|
|
terms = [w for w in query_lower.split() if len(w) > 3]
|
|
if not terms:
|
|
return ""
|
|
|
|
matches = []
|
|
for skill_dir in SKILLS_DIR.iterdir():
|
|
if not skill_dir.is_dir():
|
|
continue
|
|
skill_md = skill_dir / "SKILL.md"
|
|
if skill_md.exists():
|
|
try:
|
|
content = skill_md.read_text(encoding="utf-8").lower()
|
|
if any(t in content for t in terms):
|
|
# Extract title from frontmatter
|
|
title = skill_dir.name
|
|
matches.append(f" skill: {title}")
|
|
except OSError:
|
|
continue
|
|
|
|
if matches:
|
|
return "\n".join(matches[:5])
|
|
except OSError:
|
|
pass
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main enforcer
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_recall_query(query: str) -> bool:
|
|
"""Detect whether a query is asking for recalled/historical information."""
|
|
return bool(RECALL_PATTERNS.search(query))
|
|
|
|
|
|
def enforce_retrieval_order(
|
|
query: str,
|
|
session_id: Optional[str] = None,
|
|
skip_if_not_recall: bool = True,
|
|
) -> dict:
|
|
"""Check palace layers before allowing free generation.
|
|
|
|
Args:
|
|
query: The user's query text.
|
|
session_id: Current session ID for scratchpad access.
|
|
skip_if_not_recall: If True (default), skip enforcement for
|
|
non-recall queries and return empty result.
|
|
|
|
Returns:
|
|
dict with keys:
|
|
retrieved_from: Highest layer that produced results (e.g. 'L1')
|
|
context: Aggregated context string
|
|
tokens: Approximate word count of context
|
|
layers_checked: List of layers that were consulted
|
|
"""
|
|
result = {
|
|
"retrieved_from": None,
|
|
"context": "",
|
|
"tokens": 0,
|
|
"layers_checked": [],
|
|
}
|
|
|
|
# Gate: skip for non-recall queries if configured
|
|
if skip_if_not_recall and not is_recall_query(query):
|
|
return result
|
|
|
|
# L0: Identity (always prepend)
|
|
identity = load_identity()
|
|
if identity:
|
|
result["context"] += f"## Identity\n{identity}\n\n"
|
|
result["layers_checked"].append("L0")
|
|
|
|
# L1: Palace search
|
|
palace_results = search_palace(query)
|
|
if palace_results:
|
|
result["context"] += f"## Palace Memory\n{palace_results}\n\n"
|
|
result["retrieved_from"] = "L1"
|
|
result["layers_checked"].append("L1")
|
|
|
|
# L2: Scratchpad
|
|
if session_id:
|
|
scratch = load_scratchpad(session_id)
|
|
if scratch:
|
|
result["context"] += f"## Session Notes\n{scratch}\n\n"
|
|
if not result["retrieved_from"]:
|
|
result["retrieved_from"] = "L2"
|
|
result["layers_checked"].append("L2")
|
|
|
|
# L3: Gitea artifacts (only if still no context from L1/L2)
|
|
if not result["retrieved_from"]:
|
|
artifacts = search_gitea(query)
|
|
if artifacts:
|
|
result["context"] += f"## Gitea Context\n{artifacts}\n\n"
|
|
result["retrieved_from"] = "L3"
|
|
result["layers_checked"].append("L3")
|
|
|
|
# L4: Procedures (only if still no context)
|
|
if not result["retrieved_from"]:
|
|
procedures = search_skills(query)
|
|
if procedures:
|
|
result["context"] += f"## Related Skills\n{procedures}\n\n"
|
|
result["retrieved_from"] = "L4"
|
|
result["layers_checked"].append("L4")
|
|
|
|
# L5: Free generation (no context found — just mark it)
|
|
if not result["retrieved_from"]:
|
|
result["retrieved_from"] = "L5"
|
|
result["layers_checked"].append("L5")
|
|
|
|
result["tokens"] = len(result["context"].split())
|
|
return result
|