Compare commits

...

2 Commits

Author SHA1 Message Date
458dabfaed Merge pull request 'feat: MemPalace integration — skill port, retrieval enforcer, wake-up protocol (#367)' (#374) from timmy/mempalace-integration into main
Reviewed-on: #374
2026-04-07 21:45:34 +00:00
Alexander Whitestone
f8dabae8eb feat: MemPalace integration — skill port, retrieval enforcer, wake-up protocol (#367)
MP-1 (#368): Port PalaceRoom + Mempalace classes with 22 unit tests
MP-2 (#369): L0-L5 retrieval order enforcer with recall-query detection
MP-5 (#372): Wake-up protocol (300-900 token context), session scratchpad

Modules:
- mempalace.py: PalaceRoom + Mempalace dataclasses, factory constructors
- retrieval_enforcer.py: Layered memory retrieval (identity → palace → scratch → gitea → skills)
- wakeup.py: Session wake-up with caching (5min TTL)
- scratchpad.py: JSON-based session notes with palace promotion

All 65 tests pass. Pure stdlib + graceful degradation for ONNX issues (#373).
2026-04-07 13:15:07 -04:00
10 changed files with 1392 additions and 0 deletions

View File

@@ -0,0 +1,14 @@
"""MemPalace integration for Hermes sovereign agent.
Provides:
- mempalace.py: PalaceRoom + Mempalace classes for analytical workflows
- retrieval_enforcer.py: L0-L5 retrieval order enforcement
- wakeup.py: Session wake-up protocol (~300-900 tokens)
- scratchpad.py: JSON-based session scratchpad with palace promotion
Epic: #367
"""
from .mempalace import Mempalace, PalaceRoom, analyse_issues
__all__ = ["Mempalace", "PalaceRoom", "analyse_issues"]

View File

@@ -0,0 +1,225 @@
"""
---
title: Mempalace — Analytical Workflow Memory Framework
description: Applies spatial memory palace organization to analytical tasks (issue triage, repo audits, backlog analysis) for faster, more consistent results.
conditions:
- Analytical workflows over structured data (issues, PRs, repos)
- Repetitive triage or audit tasks where pattern recall improves speed
- Multi-repository scanning requiring consistent mental models
---
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from typing import Any
@dataclass
class PalaceRoom:
"""A single 'room' in the memory palace — holds organized facts about one analytical dimension."""
name: str
label: str
contents: dict[str, Any] = field(default_factory=dict)
entered_at: float = field(default_factory=time.time)
def store(self, key: str, value: Any) -> None:
self.contents[key] = value
def retrieve(self, key: str, default: Any = None) -> Any:
return self.contents.get(key, default)
def summary(self) -> str:
lines = [f"## {self.label}"]
for k, v in self.contents.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
class Mempalace:
"""
Spatial memory palace for analytical workflows.
Organises multi-dimensional data about a domain (e.g. Gitea issues) into
named rooms. Each room models one analytical dimension, making it easy to
traverse observations in a consistent order — the same pattern that produced
a 19% throughput improvement in Allegro's April 2026 evaluation.
Standard rooms for issue-analysis workflows
-------------------------------------------
repo_architecture Repository structure and inter-repo relationships
assignment_status Assigned vs unassigned issue distribution
triage_priority Priority / urgency levels (the "lighting system")
resolution_patterns Historical resolution trends and velocity
Usage
-----
>>> palace = Mempalace.for_issue_analysis()
>>> palace.enter("repo_architecture")
>>> palace.store("total_repos", 11)
>>> palace.store("repos_with_issues", 4)
>>> palace.enter("assignment_status")
>>> palace.store("assigned", 72)
>>> palace.store("unassigned", 22)
>>> print(palace.render())
"""
def __init__(self, domain: str = "general") -> None:
self.domain = domain
self._rooms: dict[str, PalaceRoom] = {}
self._current_room: str | None = None
self._created_at: float = time.time()
# ------------------------------------------------------------------
# Factory constructors for common analytical domains
# ------------------------------------------------------------------
@classmethod
def for_issue_analysis(cls) -> "Mempalace":
"""Pre-wired palace for Gitea / forge issue-analysis workflows."""
p = cls(domain="issue_analysis")
p.add_room("repo_architecture", "Repository Architecture Room")
p.add_room("assignment_status", "Issue Assignment Status Room")
p.add_room("triage_priority", "Triage Priority Room")
p.add_room("resolution_patterns", "Resolution Patterns Room")
return p
@classmethod
def for_health_check(cls) -> "Mempalace":
"""Pre-wired palace for CI / deployment health-check workflows."""
p = cls(domain="health_check")
p.add_room("service_topology", "Service Topology Room")
p.add_room("failure_signals", "Failure Signals Room")
p.add_room("recovery_history", "Recovery History Room")
return p
@classmethod
def for_code_review(cls) -> "Mempalace":
"""Pre-wired palace for code-review / PR triage workflows."""
p = cls(domain="code_review")
p.add_room("change_scope", "Change Scope Room")
p.add_room("risk_surface", "Risk Surface Room")
p.add_room("test_coverage", "Test Coverage Room")
p.add_room("reviewer_context", "Reviewer Context Room")
return p
# ------------------------------------------------------------------
# Room management
# ------------------------------------------------------------------
def add_room(self, key: str, label: str) -> PalaceRoom:
room = PalaceRoom(name=key, label=label)
self._rooms[key] = room
return room
def enter(self, room_key: str) -> PalaceRoom:
if room_key not in self._rooms:
raise KeyError(f"No room '{room_key}' in palace. Available: {list(self._rooms)}")
self._current_room = room_key
return self._rooms[room_key]
def store(self, key: str, value: Any) -> None:
"""Store a value in the currently active room."""
if self._current_room is None:
raise RuntimeError("Enter a room before storing values.")
self._rooms[self._current_room].store(key, value)
def retrieve(self, room_key: str, key: str, default: Any = None) -> Any:
if room_key not in self._rooms:
return default
return self._rooms[room_key].retrieve(key, default)
# ------------------------------------------------------------------
# Rendering
# ------------------------------------------------------------------
def render(self) -> str:
"""Return a human-readable summary of the entire palace."""
elapsed = time.time() - self._created_at
lines = [
f"# Mempalace — {self.domain}",
f"_traversal time: {elapsed:.2f}s | rooms: {len(self._rooms)}_",
"",
]
for room in self._rooms.values():
lines.append(room.summary())
lines.append("")
return "\n".join(lines)
def to_dict(self) -> dict:
return {
"domain": self.domain,
"elapsed_seconds": round(time.time() - self._created_at, 3),
"rooms": {k: v.contents for k, v in self._rooms.items()},
}
def to_json(self) -> str:
return json.dumps(self.to_dict(), indent=2)
# ---------------------------------------------------------------------------
# Skill entry-point
# ---------------------------------------------------------------------------
def analyse_issues(
repos_data: list[dict],
target_assignee_rate: float = 0.80,
) -> str:
"""
Applies the mempalace technique to a list of repo issue summaries.
Parameters
----------
repos_data:
List of dicts, each with keys: ``repo``, ``open_issues``,
``assigned``, ``unassigned``.
target_assignee_rate:
Minimum acceptable assignee-coverage ratio (default 0.80).
Returns
-------
str
Rendered palace summary with coverage assessment.
"""
palace = Mempalace.for_issue_analysis()
# --- Repository Architecture Room ---
palace.enter("repo_architecture")
total_issues = sum(r.get("open_issues", 0) for r in repos_data)
repos_with_issues = sum(1 for r in repos_data if r.get("open_issues", 0) > 0)
palace.store("repos_sampled", len(repos_data))
palace.store("repos_with_issues", repos_with_issues)
palace.store("total_open_issues", total_issues)
palace.store(
"avg_issues_per_repo",
round(total_issues / len(repos_data), 1) if repos_data else 0,
)
# --- Assignment Status Room ---
palace.enter("assignment_status")
total_assigned = sum(r.get("assigned", 0) for r in repos_data)
total_unassigned = sum(r.get("unassigned", 0) for r in repos_data)
coverage = total_assigned / total_issues if total_issues else 0
palace.store("assigned", total_assigned)
palace.store("unassigned", total_unassigned)
palace.store("coverage_rate", round(coverage, 3))
palace.store(
"coverage_status",
"OK" if coverage >= target_assignee_rate else f"BELOW TARGET ({target_assignee_rate:.0%})",
)
# --- Triage Priority Room ---
palace.enter("triage_priority")
unassigned_repos = [r["repo"] for r in repos_data if r.get("unassigned", 0) > 0]
palace.store("repos_needing_triage", unassigned_repos)
palace.store("triage_count", total_unassigned)
# --- Resolution Patterns Room ---
palace.enter("resolution_patterns")
palace.store("technique", "mempalace")
palace.store("target_assignee_rate", target_assignee_rate)
return palace.render()

View File

@@ -0,0 +1,277 @@
"""Retrieval Order Enforcer — L0 through L5 memory hierarchy.
Ensures the agent checks durable memory before falling back to free generation.
Gracefully degrades if any layer is unavailable (ONNX issues, missing files, etc).
Layer order:
L0: Identity (~/.mempalace/identity.txt)
L1: Palace rooms (mempalace CLI search)
L2: Session scratch (~/.hermes/scratchpad/{session_id}.json)
L3: Gitea artifacts (API search for issues/PRs)
L4: Procedures (skills directory search)
L5: Free generation (only if L0-L4 produced nothing)
Refs: Epic #367, Sub-issue #369
"""
from __future__ import annotations
import json
import os
import re
import subprocess
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
SKILLS_DIR = Path.home() / ".hermes" / "skills"
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
# Patterns that indicate a recall-style query
RECALL_PATTERNS = re.compile(
r"(?i)\b("
r"what did|status of|remember|last time|yesterday|previously|"
r"we discussed|we talked|we worked|you said|you mentioned|"
r"remind me|what was|what were|how did|when did|"
r"earlier today|last session|before this"
r")\b"
)
# ---------------------------------------------------------------------------
# L0: Identity
# ---------------------------------------------------------------------------
def load_identity() -> str:
"""Read the agent identity file. Returns empty string on failure."""
try:
if IDENTITY_PATH.exists():
text = IDENTITY_PATH.read_text(encoding="utf-8").strip()
# Cap at ~200 tokens to keep wake-up lean
if len(text.split()) > 200:
text = " ".join(text.split()[:200]) + "..."
return text
except (OSError, PermissionError):
pass
return ""
# ---------------------------------------------------------------------------
# L1: Palace search
# ---------------------------------------------------------------------------
def search_palace(query: str) -> str:
"""Search the mempalace for relevant memories. Gracefully degrades on failure."""
try:
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
result = subprocess.run(
[bin_path, "search", query],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
# ONNX issues (#373) or mempalace not installed — degrade gracefully
pass
return ""
# ---------------------------------------------------------------------------
# L2: Session scratchpad
# ---------------------------------------------------------------------------
def load_scratchpad(session_id: str) -> str:
"""Load the session scratchpad as formatted text."""
try:
scratch_file = SCRATCHPAD_DIR / f"{session_id}.json"
if scratch_file.exists():
data = json.loads(scratch_file.read_text(encoding="utf-8"))
if isinstance(data, dict) and data:
lines = []
for k, v in data.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
except (OSError, json.JSONDecodeError):
pass
return ""
# ---------------------------------------------------------------------------
# L3: Gitea artifact search
# ---------------------------------------------------------------------------
def _load_gitea_token() -> str:
"""Read the Gitea API token."""
token_path = Path.home() / ".hermes" / "gitea_token_vps"
try:
if token_path.exists():
return token_path.read_text(encoding="utf-8").strip()
except OSError:
pass
return ""
def search_gitea(query: str) -> str:
"""Search Gitea issues/PRs for context. Returns formatted text or empty string."""
token = _load_gitea_token()
if not token:
return ""
api_base = "https://forge.alexanderwhitestone.com/api/v1"
# Extract key terms for search (first 3 significant words)
terms = [w for w in query.split() if len(w) > 3][:3]
search_q = " ".join(terms) if terms else query[:50]
try:
import urllib.request
import urllib.parse
url = (
f"{api_base}/repos/search?"
f"q={urllib.parse.quote(search_q)}&limit=3"
)
req = urllib.request.Request(url, headers={
"Authorization": f"token {token}",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=8) as resp:
data = json.loads(resp.read().decode())
if data.get("data"):
lines = []
for repo in data["data"][:3]:
lines.append(f" {repo['full_name']}: {repo.get('description', 'no desc')}")
return "\n".join(lines)
except Exception:
pass
return ""
# ---------------------------------------------------------------------------
# L4: Procedures (skills search)
# ---------------------------------------------------------------------------
def search_skills(query: str) -> str:
"""Search skills directory for matching procedures."""
try:
if not SKILLS_DIR.exists():
return ""
query_lower = query.lower()
terms = [w for w in query_lower.split() if len(w) > 3]
if not terms:
return ""
matches = []
for skill_dir in SKILLS_DIR.iterdir():
if not skill_dir.is_dir():
continue
skill_md = skill_dir / "SKILL.md"
if skill_md.exists():
try:
content = skill_md.read_text(encoding="utf-8").lower()
if any(t in content for t in terms):
# Extract title from frontmatter
title = skill_dir.name
matches.append(f" skill: {title}")
except OSError:
continue
if matches:
return "\n".join(matches[:5])
except OSError:
pass
return ""
# ---------------------------------------------------------------------------
# Main enforcer
# ---------------------------------------------------------------------------
def is_recall_query(query: str) -> bool:
"""Detect whether a query is asking for recalled/historical information."""
return bool(RECALL_PATTERNS.search(query))
def enforce_retrieval_order(
query: str,
session_id: Optional[str] = None,
skip_if_not_recall: bool = True,
) -> dict:
"""Check palace layers before allowing free generation.
Args:
query: The user's query text.
session_id: Current session ID for scratchpad access.
skip_if_not_recall: If True (default), skip enforcement for
non-recall queries and return empty result.
Returns:
dict with keys:
retrieved_from: Highest layer that produced results (e.g. 'L1')
context: Aggregated context string
tokens: Approximate word count of context
layers_checked: List of layers that were consulted
"""
result = {
"retrieved_from": None,
"context": "",
"tokens": 0,
"layers_checked": [],
}
# Gate: skip for non-recall queries if configured
if skip_if_not_recall and not is_recall_query(query):
return result
# L0: Identity (always prepend)
identity = load_identity()
if identity:
result["context"] += f"## Identity\n{identity}\n\n"
result["layers_checked"].append("L0")
# L1: Palace search
palace_results = search_palace(query)
if palace_results:
result["context"] += f"## Palace Memory\n{palace_results}\n\n"
result["retrieved_from"] = "L1"
result["layers_checked"].append("L1")
# L2: Scratchpad
if session_id:
scratch = load_scratchpad(session_id)
if scratch:
result["context"] += f"## Session Notes\n{scratch}\n\n"
if not result["retrieved_from"]:
result["retrieved_from"] = "L2"
result["layers_checked"].append("L2")
# L3: Gitea artifacts (only if still no context from L1/L2)
if not result["retrieved_from"]:
artifacts = search_gitea(query)
if artifacts:
result["context"] += f"## Gitea Context\n{artifacts}\n\n"
result["retrieved_from"] = "L3"
result["layers_checked"].append("L3")
# L4: Procedures (only if still no context)
if not result["retrieved_from"]:
procedures = search_skills(query)
if procedures:
result["context"] += f"## Related Skills\n{procedures}\n\n"
result["retrieved_from"] = "L4"
result["layers_checked"].append("L4")
# L5: Free generation (no context found — just mark it)
if not result["retrieved_from"]:
result["retrieved_from"] = "L5"
result["layers_checked"].append("L5")
result["tokens"] = len(result["context"].split())
return result

View File

@@ -0,0 +1,184 @@
"""Session Scratchpad — ephemeral key-value notes per session.
Provides fast, JSON-backed scratch storage that lives for a session
and can be promoted to durable palace memory.
Storage: ~/.hermes/scratchpad/{session_id}.json
Refs: Epic #367, Sub-issue #372
"""
from __future__ import annotations
import json
import os
import subprocess
import time
from pathlib import Path
from typing import Any, Optional
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _scratch_path(session_id: str) -> Path:
"""Return the JSON file path for a given session."""
# Sanitize session_id to prevent path traversal
safe_id = "".join(c for c in session_id if c.isalnum() or c in "-_")
if not safe_id:
safe_id = "unnamed"
return SCRATCHPAD_DIR / f"{safe_id}.json"
def _load(session_id: str) -> dict:
"""Load scratchpad data, returning empty dict on failure."""
path = _scratch_path(session_id)
try:
if path.exists():
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
pass
return {}
def _save(session_id: str, data: dict) -> None:
"""Persist scratchpad data to disk."""
SCRATCHPAD_DIR.mkdir(parents=True, exist_ok=True)
path = _scratch_path(session_id)
path.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8")
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def write_scratch(session_id: str, key: str, value: Any) -> None:
"""Write a note to the session scratchpad.
Args:
session_id: Current session identifier.
key: Note key (string).
value: Note value (any JSON-serializable type).
"""
data = _load(session_id)
data[key] = {
"value": value,
"written_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}
_save(session_id, data)
def read_scratch(session_id: str, key: Optional[str] = None) -> dict:
"""Read session scratchpad (all keys or one).
Args:
session_id: Current session identifier.
key: Optional specific key. If None, returns all entries.
Returns:
dict — either {key: {value, written_at}} or the full scratchpad.
"""
data = _load(session_id)
if key is not None:
entry = data.get(key)
return {key: entry} if entry else {}
return data
def delete_scratch(session_id: str, key: str) -> bool:
"""Remove a single key from the scratchpad.
Returns True if the key existed and was removed.
"""
data = _load(session_id)
if key in data:
del data[key]
_save(session_id, data)
return True
return False
def list_sessions() -> list[str]:
"""List all session IDs that have scratchpad files."""
try:
if SCRATCHPAD_DIR.exists():
return [
f.stem
for f in SCRATCHPAD_DIR.iterdir()
if f.suffix == ".json" and f.is_file()
]
except OSError:
pass
return []
def promote_to_palace(
session_id: str,
key: str,
room: str = "general",
drawer: Optional[str] = None,
) -> bool:
"""Move a scratchpad note to durable palace memory.
Uses the mempalace CLI to store the note in the specified room.
Removes the note from the scratchpad after successful promotion.
Args:
session_id: Session containing the note.
key: Scratchpad key to promote.
room: Palace room name (default: 'general').
drawer: Optional drawer name within the room. Defaults to key.
Returns:
True if promotion succeeded, False otherwise.
"""
data = _load(session_id)
entry = data.get(key)
if not entry:
return False
value = entry.get("value", entry) if isinstance(entry, dict) else entry
content = json.dumps(value, default=str) if not isinstance(value, str) else value
try:
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
target_drawer = drawer or key
result = subprocess.run(
[bin_path, "store", room, target_drawer, content],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
# Remove from scratchpad after successful promotion
del data[key]
_save(session_id, data)
return True
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
# mempalace CLI not available — degrade gracefully
pass
return False
def clear_session(session_id: str) -> bool:
"""Delete the entire scratchpad for a session.
Returns True if the file existed and was removed.
"""
path = _scratch_path(session_id)
try:
if path.exists():
path.unlink()
return True
except OSError:
pass
return False

View File

@@ -0,0 +1,180 @@
"""Tests for the mempalace skill.
Validates PalaceRoom, Mempalace class, factory constructors,
and the analyse_issues entry-point.
Refs: Epic #367, Sub-issue #368
"""
from __future__ import annotations
import json
import sys
import os
import time
import pytest
# Ensure the package is importable from the repo layout
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from mempalace.mempalace import Mempalace, PalaceRoom, analyse_issues
# ── PalaceRoom unit tests ─────────────────────────────────────────────────
class TestPalaceRoom:
def test_store_and_retrieve(self):
room = PalaceRoom(name="test", label="Test Room")
room.store("key1", 42)
assert room.retrieve("key1") == 42
def test_retrieve_default(self):
room = PalaceRoom(name="test", label="Test Room")
assert room.retrieve("missing") is None
assert room.retrieve("missing", "fallback") == "fallback"
def test_summary_format(self):
room = PalaceRoom(name="test", label="Test Room")
room.store("repos", 5)
summary = room.summary()
assert "## Test Room" in summary
assert "repos: 5" in summary
def test_contents_default_factory_isolation(self):
"""Each room gets its own dict — no shared mutable default."""
r1 = PalaceRoom(name="a", label="A")
r2 = PalaceRoom(name="b", label="B")
r1.store("x", 1)
assert r2.retrieve("x") is None
def test_entered_at_is_recent(self):
before = time.time()
room = PalaceRoom(name="t", label="T")
after = time.time()
assert before <= room.entered_at <= after
# ── Mempalace core tests ──────────────────────────────────────────────────
class TestMempalace:
def test_add_and_enter_room(self):
p = Mempalace(domain="test")
p.add_room("r1", "Room 1")
room = p.enter("r1")
assert room.name == "r1"
def test_enter_nonexistent_room_raises(self):
p = Mempalace()
with pytest.raises(KeyError, match="No room"):
p.enter("ghost")
def test_store_without_enter_raises(self):
p = Mempalace()
p.add_room("r", "R")
with pytest.raises(RuntimeError, match="Enter a room"):
p.store("k", "v")
def test_store_and_retrieve_via_palace(self):
p = Mempalace()
p.add_room("r", "R")
p.enter("r")
p.store("count", 10)
assert p.retrieve("r", "count") == 10
def test_retrieve_missing_room_returns_default(self):
p = Mempalace()
assert p.retrieve("nope", "key") is None
assert p.retrieve("nope", "key", 99) == 99
def test_render_includes_domain(self):
p = Mempalace(domain="audit")
p.add_room("r", "Room")
p.enter("r")
p.store("item", "value")
output = p.render()
assert "audit" in output
assert "Room" in output
def test_to_dict_structure(self):
p = Mempalace(domain="test")
p.add_room("r", "R")
p.enter("r")
p.store("a", 1)
d = p.to_dict()
assert d["domain"] == "test"
assert "elapsed_seconds" in d
assert d["rooms"]["r"] == {"a": 1}
def test_to_json_is_valid(self):
p = Mempalace(domain="j")
p.add_room("x", "X")
p.enter("x")
p.store("v", [1, 2, 3])
parsed = json.loads(p.to_json())
assert parsed["rooms"]["x"]["v"] == [1, 2, 3]
# ── Factory constructor tests ─────────────────────────────────────────────
class TestFactories:
def test_for_issue_analysis_rooms(self):
p = Mempalace.for_issue_analysis()
assert p.domain == "issue_analysis"
for key in ("repo_architecture", "assignment_status",
"triage_priority", "resolution_patterns"):
p.enter(key) # should not raise
def test_for_health_check_rooms(self):
p = Mempalace.for_health_check()
assert p.domain == "health_check"
for key in ("service_topology", "failure_signals", "recovery_history"):
p.enter(key)
def test_for_code_review_rooms(self):
p = Mempalace.for_code_review()
assert p.domain == "code_review"
for key in ("change_scope", "risk_surface",
"test_coverage", "reviewer_context"):
p.enter(key)
# ── analyse_issues entry-point tests ──────────────────────────────────────
class TestAnalyseIssues:
SAMPLE_DATA = [
{"repo": "the-nexus", "open_issues": 40, "assigned": 30, "unassigned": 10},
{"repo": "timmy-home", "open_issues": 30, "assigned": 25, "unassigned": 5},
{"repo": "hermes-agent", "open_issues": 20, "assigned": 15, "unassigned": 5},
{"repo": "empty-repo", "open_issues": 0, "assigned": 0, "unassigned": 0},
]
def test_returns_string(self):
result = analyse_issues(self.SAMPLE_DATA)
assert isinstance(result, str)
assert len(result) > 0
def test_contains_room_headers(self):
result = analyse_issues(self.SAMPLE_DATA)
assert "Repository Architecture" in result
assert "Assignment Status" in result
def test_coverage_below_target(self):
result = analyse_issues(self.SAMPLE_DATA, target_assignee_rate=0.90)
assert "BELOW TARGET" in result
def test_coverage_meets_target(self):
good_data = [
{"repo": "a", "open_issues": 10, "assigned": 10, "unassigned": 0},
]
result = analyse_issues(good_data, target_assignee_rate=0.80)
assert "OK" in result
def test_empty_repos_list(self):
result = analyse_issues([])
assert isinstance(result, str)
def test_single_repo(self):
data = [{"repo": "solo", "open_issues": 5, "assigned": 3, "unassigned": 2}]
result = analyse_issues(data)
assert "solo" in result or "issue_analysis" in result

View File

@@ -0,0 +1,143 @@
"""Tests for retrieval_enforcer.py.
Refs: Epic #367, Sub-issue #369
"""
from __future__ import annotations
import json
import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from mempalace.retrieval_enforcer import (
is_recall_query,
load_identity,
load_scratchpad,
enforce_retrieval_order,
search_skills,
RECALL_PATTERNS,
)
class TestRecallDetection:
"""Test the recall-query pattern matcher."""
@pytest.mark.parametrize("query", [
"what did we work on yesterday",
"status of the mempalace integration",
"remember the fleet audit results",
"last time we deployed the nexus",
"previously you mentioned a CI fix",
"we discussed the sovereign deployment",
])
def test_recall_queries_detected(self, query):
assert is_recall_query(query) is True
@pytest.mark.parametrize("query", [
"create a new file called test.py",
"run the test suite",
"deploy to production",
"write a function that sums numbers",
"install the package",
])
def test_non_recall_queries_skipped(self, query):
assert is_recall_query(query) is False
class TestLoadIdentity:
def test_loads_existing_identity(self, tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Timmy. A sovereign AI.")
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
result = load_identity()
assert "Timmy" in result
def test_returns_empty_on_missing_file(self, tmp_path):
identity_file = tmp_path / "nonexistent.txt"
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
result = load_identity()
assert result == ""
def test_truncates_long_identity(self, tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text(" ".join(["word"] * 300))
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
result = load_identity()
assert result.endswith("...")
assert len(result.split()) <= 201 # 200 words + "..."
class TestLoadScratchpad:
def test_loads_valid_scratchpad(self, tmp_path):
scratch_file = tmp_path / "session123.json"
scratch_file.write_text(json.dumps({"note": "test value", "key2": 42}))
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
result = load_scratchpad("session123")
assert "note: test value" in result
assert "key2: 42" in result
def test_returns_empty_on_missing_file(self, tmp_path):
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
result = load_scratchpad("nonexistent")
assert result == ""
def test_returns_empty_on_invalid_json(self, tmp_path):
scratch_file = tmp_path / "bad.json"
scratch_file.write_text("not valid json{{{")
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
result = load_scratchpad("bad")
assert result == ""
class TestEnforceRetrievalOrder:
def test_skips_non_recall_query(self):
result = enforce_retrieval_order("create a new file")
assert result["retrieved_from"] is None
assert result["tokens"] == 0
def test_runs_for_recall_query(self, tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Timmy.")
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
result = enforce_retrieval_order("what did we work on yesterday")
assert "Identity" in result["context"]
assert "L0" in result["layers_checked"]
def test_palace_hit_sets_l1(self, tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Timmy.")
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
patch("mempalace.retrieval_enforcer.search_palace", return_value="Found: fleet audit results"), \
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""):
result = enforce_retrieval_order("what did we discuss yesterday")
assert result["retrieved_from"] == "L1"
assert "Palace Memory" in result["context"]
def test_falls_through_to_l5(self, tmp_path):
identity_file = tmp_path / "nonexistent.txt"
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
result = enforce_retrieval_order("remember the old deployment", skip_if_not_recall=True)
assert result["retrieved_from"] == "L5"
def test_force_mode_skips_recall_check(self, tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Timmy.")
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
result = enforce_retrieval_order("deploy now", skip_if_not_recall=False)
assert "Identity" in result["context"]

View File

@@ -0,0 +1,108 @@
"""Tests for scratchpad.py.
Refs: Epic #367, Sub-issue #372
"""
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
from unittest.mock import patch
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from mempalace.scratchpad import (
write_scratch,
read_scratch,
delete_scratch,
list_sessions,
clear_session,
_scratch_path,
)
@pytest.fixture
def scratch_dir(tmp_path):
"""Provide a temporary scratchpad directory."""
with patch("mempalace.scratchpad.SCRATCHPAD_DIR", tmp_path):
yield tmp_path
class TestScratchPath:
def test_sanitizes_session_id(self):
path = _scratch_path("safe-id_123")
assert "safe-id_123.json" in str(path)
def test_strips_dangerous_chars(self):
path = _scratch_path("../../etc/passwd")
assert ".." not in path.name
assert "/" not in path.name
# Dots are stripped, so only alphanumeric chars remain
assert path.name == "etcpasswd.json"
class TestWriteAndRead:
def test_write_then_read(self, scratch_dir):
write_scratch("sess1", "note", "hello world")
result = read_scratch("sess1", "note")
assert "note" in result
assert result["note"]["value"] == "hello world"
def test_read_all_keys(self, scratch_dir):
write_scratch("sess1", "a", 1)
write_scratch("sess1", "b", 2)
result = read_scratch("sess1")
assert "a" in result
assert "b" in result
def test_read_missing_key(self, scratch_dir):
write_scratch("sess1", "exists", "yes")
result = read_scratch("sess1", "missing")
assert result == {}
def test_read_missing_session(self, scratch_dir):
result = read_scratch("nonexistent")
assert result == {}
def test_overwrite_key(self, scratch_dir):
write_scratch("sess1", "key", "v1")
write_scratch("sess1", "key", "v2")
result = read_scratch("sess1", "key")
assert result["key"]["value"] == "v2"
class TestDelete:
def test_delete_existing_key(self, scratch_dir):
write_scratch("sess1", "key", "val")
assert delete_scratch("sess1", "key") is True
assert read_scratch("sess1", "key") == {}
def test_delete_missing_key(self, scratch_dir):
write_scratch("sess1", "other", "val")
assert delete_scratch("sess1", "missing") is False
class TestListSessions:
def test_lists_sessions(self, scratch_dir):
write_scratch("alpha", "k", "v")
write_scratch("beta", "k", "v")
sessions = list_sessions()
assert "alpha" in sessions
assert "beta" in sessions
def test_empty_directory(self, scratch_dir):
assert list_sessions() == []
class TestClearSession:
def test_clears_existing(self, scratch_dir):
write_scratch("sess1", "k", "v")
assert clear_session("sess1") is True
assert read_scratch("sess1") == {}
def test_clear_nonexistent(self, scratch_dir):
assert clear_session("ghost") is False

View File

@@ -0,0 +1,100 @@
"""Tests for wakeup.py.
Refs: Epic #367, Sub-issue #372
"""
from __future__ import annotations
import json
import os
import sys
import time
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from mempalace.wakeup import (
palace_wakeup,
fleet_status_summary,
_load_identity,
_palace_context,
)
class TestLoadIdentity:
def test_loads_identity(self, tmp_path):
f = tmp_path / "identity.txt"
f.write_text("I am Timmy. A sovereign AI.")
with patch("mempalace.wakeup.IDENTITY_PATH", f):
result = _load_identity()
assert "Timmy" in result
def test_missing_identity(self, tmp_path):
f = tmp_path / "nope.txt"
with patch("mempalace.wakeup.IDENTITY_PATH", f):
assert _load_identity() == ""
class TestFleetStatus:
def test_reads_fleet_json(self, tmp_path):
f = tmp_path / "fleet_status.json"
f.write_text(json.dumps({
"Groq": {"state": "active", "last_seen": "2026-04-07"},
"Ezra": {"state": "idle", "last_seen": "2026-04-06"},
}))
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
result = fleet_status_summary()
assert "Fleet Status" in result
assert "Groq" in result
assert "active" in result
def test_missing_fleet_file(self, tmp_path):
f = tmp_path / "nope.json"
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
assert fleet_status_summary() == ""
def test_invalid_json(self, tmp_path):
f = tmp_path / "bad.json"
f.write_text("not json")
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
assert fleet_status_summary() == ""
class TestPalaceWakeup:
def test_generates_context_with_identity(self, tmp_path):
identity = tmp_path / "identity.txt"
identity.write_text("I am Timmy.")
cache = tmp_path / "cache.txt"
with patch("mempalace.wakeup.IDENTITY_PATH", identity), \
patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
patch("mempalace.wakeup._palace_context", return_value=""), \
patch("mempalace.wakeup.fleet_status_summary", return_value=""):
result = palace_wakeup(force=True)
assert "Identity" in result
assert "Timmy" in result
assert "Session" in result
def test_uses_cache_when_fresh(self, tmp_path):
cache = tmp_path / "cache.txt"
cache.write_text("cached wake-up content")
# Touch the file so it's fresh
with patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
patch("mempalace.wakeup.WAKEUP_CACHE_TTL", 9999):
result = palace_wakeup(force=False)
assert result == "cached wake-up content"
def test_force_bypasses_cache(self, tmp_path):
cache = tmp_path / "cache.txt"
cache.write_text("stale content")
identity = tmp_path / "identity.txt"
identity.write_text("I am Timmy.")
with patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
patch("mempalace.wakeup.IDENTITY_PATH", identity), \
patch("mempalace.wakeup._palace_context", return_value=""), \
patch("mempalace.wakeup.fleet_status_summary", return_value=""):
result = palace_wakeup(force=True)
assert "Identity" in result
assert "stale content" not in result

View File

@@ -0,0 +1,161 @@
"""Wake-up Protocol — session start context injection.
Generates 300-900 tokens of context when a new Hermes session starts.
Loads identity, recent palace context, and fleet status.
Refs: Epic #367, Sub-issue #372
"""
from __future__ import annotations
import json
import os
import subprocess
import time
from pathlib import Path
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
FLEET_STATUS_PATH = Path.home() / ".hermes" / "fleet_status.json"
WAKEUP_CACHE_PATH = Path.home() / ".hermes" / "last_wakeup.txt"
WAKEUP_CACHE_TTL = 300 # 5 minutes — don't regenerate if recent
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _load_identity() -> str:
"""Read the agent identity file."""
try:
if IDENTITY_PATH.exists():
text = IDENTITY_PATH.read_text(encoding="utf-8").strip()
# Cap at ~150 tokens for wake-up brevity
words = text.split()
if len(words) > 150:
text = " ".join(words[:150]) + "..."
return text
except (OSError, PermissionError):
pass
return ""
def _palace_context() -> str:
"""Run mempalace wake-up command for recent context. Degrades gracefully."""
try:
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
result = subprocess.run(
[bin_path, "wake-up"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
# ONNX issues (#373) or CLI not available — degrade gracefully
pass
return ""
def fleet_status_summary() -> str:
"""Read cached fleet status for lightweight session context."""
try:
if FLEET_STATUS_PATH.exists():
data = json.loads(FLEET_STATUS_PATH.read_text(encoding="utf-8"))
lines = ["## Fleet Status"]
if isinstance(data, dict):
for agent, status in data.items():
if isinstance(status, dict):
state = status.get("state", "unknown")
last_seen = status.get("last_seen", "?")
lines.append(f" {agent}: {state} (last: {last_seen})")
else:
lines.append(f" {agent}: {status}")
if len(lines) > 1:
return "\n".join(lines)
except (OSError, json.JSONDecodeError):
pass
return ""
def _check_cache() -> str:
"""Return cached wake-up if fresh enough."""
try:
if WAKEUP_CACHE_PATH.exists():
age = time.time() - WAKEUP_CACHE_PATH.stat().st_mtime
if age < WAKEUP_CACHE_TTL:
return WAKEUP_CACHE_PATH.read_text(encoding="utf-8").strip()
except OSError:
pass
return ""
def _write_cache(content: str) -> None:
"""Cache the wake-up content."""
try:
WAKEUP_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
WAKEUP_CACHE_PATH.write_text(content, encoding="utf-8")
except OSError:
pass
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def palace_wakeup(force: bool = False) -> str:
"""Generate wake-up context for a new session. ~300-900 tokens.
Args:
force: If True, bypass the 5-minute cache and regenerate.
Returns:
Formatted context string suitable for prepending to the system prompt.
"""
# Check cache first (avoids redundant work on rapid session restarts)
if not force:
cached = _check_cache()
if cached:
return cached
parts = []
# L0: Identity
identity = _load_identity()
if identity:
parts.append(f"## Identity\n{identity}")
# L1: Recent palace context
palace = _palace_context()
if palace:
parts.append(palace)
# Fleet status (lightweight)
fleet = fleet_status_summary()
if fleet:
parts.append(fleet)
# Timestamp
parts.append(f"## Session\nWake-up generated: {time.strftime('%Y-%m-%d %H:%M:%S')}")
content = "\n\n".join(parts)
# Cache for TTL
_write_cache(content)
return content
# ---------------------------------------------------------------------------
# CLI entry point for testing
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print(palace_wakeup(force=True))