From f8dabae8eb5ef3054cded750241dbd0897b7c72d Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Tue, 7 Apr 2026 13:15:07 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20MemPalace=20integration=20=E2=80=94=20s?= =?UTF-8?q?kill=20port,=20retrieval=20enforcer,=20wake-up=20protocol=20(#3?= =?UTF-8?q?67)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MP-1 (#368): Port PalaceRoom + Mempalace classes with 22 unit tests MP-2 (#369): L0-L5 retrieval order enforcer with recall-query detection MP-5 (#372): Wake-up protocol (300-900 token context), session scratchpad Modules: - mempalace.py: PalaceRoom + Mempalace dataclasses, factory constructors - retrieval_enforcer.py: Layered memory retrieval (identity → palace → scratch → gitea → skills) - wakeup.py: Session wake-up with caching (5min TTL) - scratchpad.py: JSON-based session notes with palace promotion All 65 tests pass. Pure stdlib + graceful degradation for ONNX issues (#373). --- hermes-sovereign/mempalace/__init__.py | 14 + hermes-sovereign/mempalace/mempalace.py | 225 ++++++++++++++ .../mempalace/retrieval_enforcer.py | 277 ++++++++++++++++++ hermes-sovereign/mempalace/scratchpad.py | 184 ++++++++++++ hermes-sovereign/mempalace/tests/__init__.py | 0 .../mempalace/tests/test_mempalace.py | 180 ++++++++++++ .../tests/test_retrieval_enforcer.py | 143 +++++++++ .../mempalace/tests/test_scratchpad.py | 108 +++++++ .../mempalace/tests/test_wakeup.py | 100 +++++++ hermes-sovereign/mempalace/wakeup.py | 161 ++++++++++ 10 files changed, 1392 insertions(+) create mode 100644 hermes-sovereign/mempalace/__init__.py create mode 100644 hermes-sovereign/mempalace/mempalace.py create mode 100644 hermes-sovereign/mempalace/retrieval_enforcer.py create mode 100644 hermes-sovereign/mempalace/scratchpad.py create mode 100644 hermes-sovereign/mempalace/tests/__init__.py create mode 100644 hermes-sovereign/mempalace/tests/test_mempalace.py create mode 100644 hermes-sovereign/mempalace/tests/test_retrieval_enforcer.py create mode 100644 hermes-sovereign/mempalace/tests/test_scratchpad.py create mode 100644 hermes-sovereign/mempalace/tests/test_wakeup.py create mode 100644 hermes-sovereign/mempalace/wakeup.py diff --git a/hermes-sovereign/mempalace/__init__.py b/hermes-sovereign/mempalace/__init__.py new file mode 100644 index 00000000..7c5b8af4 --- /dev/null +++ b/hermes-sovereign/mempalace/__init__.py @@ -0,0 +1,14 @@ +"""MemPalace integration for Hermes sovereign agent. + +Provides: +- mempalace.py: PalaceRoom + Mempalace classes for analytical workflows +- retrieval_enforcer.py: L0-L5 retrieval order enforcement +- wakeup.py: Session wake-up protocol (~300-900 tokens) +- scratchpad.py: JSON-based session scratchpad with palace promotion + +Epic: #367 +""" + +from .mempalace import Mempalace, PalaceRoom, analyse_issues + +__all__ = ["Mempalace", "PalaceRoom", "analyse_issues"] diff --git a/hermes-sovereign/mempalace/mempalace.py b/hermes-sovereign/mempalace/mempalace.py new file mode 100644 index 00000000..af2e986a --- /dev/null +++ b/hermes-sovereign/mempalace/mempalace.py @@ -0,0 +1,225 @@ +""" +--- +title: Mempalace — Analytical Workflow Memory Framework +description: Applies spatial memory palace organization to analytical tasks (issue triage, repo audits, backlog analysis) for faster, more consistent results. +conditions: + - Analytical workflows over structured data (issues, PRs, repos) + - Repetitive triage or audit tasks where pattern recall improves speed + - Multi-repository scanning requiring consistent mental models +--- +""" + +from __future__ import annotations + +import json +import time +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class PalaceRoom: + """A single 'room' in the memory palace — holds organized facts about one analytical dimension.""" + + name: str + label: str + contents: dict[str, Any] = field(default_factory=dict) + entered_at: float = field(default_factory=time.time) + + def store(self, key: str, value: Any) -> None: + self.contents[key] = value + + def retrieve(self, key: str, default: Any = None) -> Any: + return self.contents.get(key, default) + + def summary(self) -> str: + lines = [f"## {self.label}"] + for k, v in self.contents.items(): + lines.append(f" {k}: {v}") + return "\n".join(lines) + + +class Mempalace: + """ + Spatial memory palace for analytical workflows. + + Organises multi-dimensional data about a domain (e.g. Gitea issues) into + named rooms. Each room models one analytical dimension, making it easy to + traverse observations in a consistent order — the same pattern that produced + a 19% throughput improvement in Allegro's April 2026 evaluation. + + Standard rooms for issue-analysis workflows + ------------------------------------------- + repo_architecture Repository structure and inter-repo relationships + assignment_status Assigned vs unassigned issue distribution + triage_priority Priority / urgency levels (the "lighting system") + resolution_patterns Historical resolution trends and velocity + + Usage + ----- + >>> palace = Mempalace.for_issue_analysis() + >>> palace.enter("repo_architecture") + >>> palace.store("total_repos", 11) + >>> palace.store("repos_with_issues", 4) + >>> palace.enter("assignment_status") + >>> palace.store("assigned", 72) + >>> palace.store("unassigned", 22) + >>> print(palace.render()) + """ + + def __init__(self, domain: str = "general") -> None: + self.domain = domain + self._rooms: dict[str, PalaceRoom] = {} + self._current_room: str | None = None + self._created_at: float = time.time() + + # ------------------------------------------------------------------ + # Factory constructors for common analytical domains + # ------------------------------------------------------------------ + + @classmethod + def for_issue_analysis(cls) -> "Mempalace": + """Pre-wired palace for Gitea / forge issue-analysis workflows.""" + p = cls(domain="issue_analysis") + p.add_room("repo_architecture", "Repository Architecture Room") + p.add_room("assignment_status", "Issue Assignment Status Room") + p.add_room("triage_priority", "Triage Priority Room") + p.add_room("resolution_patterns", "Resolution Patterns Room") + return p + + @classmethod + def for_health_check(cls) -> "Mempalace": + """Pre-wired palace for CI / deployment health-check workflows.""" + p = cls(domain="health_check") + p.add_room("service_topology", "Service Topology Room") + p.add_room("failure_signals", "Failure Signals Room") + p.add_room("recovery_history", "Recovery History Room") + return p + + @classmethod + def for_code_review(cls) -> "Mempalace": + """Pre-wired palace for code-review / PR triage workflows.""" + p = cls(domain="code_review") + p.add_room("change_scope", "Change Scope Room") + p.add_room("risk_surface", "Risk Surface Room") + p.add_room("test_coverage", "Test Coverage Room") + p.add_room("reviewer_context", "Reviewer Context Room") + return p + + # ------------------------------------------------------------------ + # Room management + # ------------------------------------------------------------------ + + def add_room(self, key: str, label: str) -> PalaceRoom: + room = PalaceRoom(name=key, label=label) + self._rooms[key] = room + return room + + def enter(self, room_key: str) -> PalaceRoom: + if room_key not in self._rooms: + raise KeyError(f"No room '{room_key}' in palace. Available: {list(self._rooms)}") + self._current_room = room_key + return self._rooms[room_key] + + def store(self, key: str, value: Any) -> None: + """Store a value in the currently active room.""" + if self._current_room is None: + raise RuntimeError("Enter a room before storing values.") + self._rooms[self._current_room].store(key, value) + + def retrieve(self, room_key: str, key: str, default: Any = None) -> Any: + if room_key not in self._rooms: + return default + return self._rooms[room_key].retrieve(key, default) + + # ------------------------------------------------------------------ + # Rendering + # ------------------------------------------------------------------ + + def render(self) -> str: + """Return a human-readable summary of the entire palace.""" + elapsed = time.time() - self._created_at + lines = [ + f"# Mempalace — {self.domain}", + f"_traversal time: {elapsed:.2f}s | rooms: {len(self._rooms)}_", + "", + ] + for room in self._rooms.values(): + lines.append(room.summary()) + lines.append("") + return "\n".join(lines) + + def to_dict(self) -> dict: + return { + "domain": self.domain, + "elapsed_seconds": round(time.time() - self._created_at, 3), + "rooms": {k: v.contents for k, v in self._rooms.items()}, + } + + def to_json(self) -> str: + return json.dumps(self.to_dict(), indent=2) + + +# --------------------------------------------------------------------------- +# Skill entry-point +# --------------------------------------------------------------------------- + +def analyse_issues( + repos_data: list[dict], + target_assignee_rate: float = 0.80, +) -> str: + """ + Applies the mempalace technique to a list of repo issue summaries. + + Parameters + ---------- + repos_data: + List of dicts, each with keys: ``repo``, ``open_issues``, + ``assigned``, ``unassigned``. + target_assignee_rate: + Minimum acceptable assignee-coverage ratio (default 0.80). + + Returns + ------- + str + Rendered palace summary with coverage assessment. + """ + palace = Mempalace.for_issue_analysis() + + # --- Repository Architecture Room --- + palace.enter("repo_architecture") + total_issues = sum(r.get("open_issues", 0) for r in repos_data) + repos_with_issues = sum(1 for r in repos_data if r.get("open_issues", 0) > 0) + palace.store("repos_sampled", len(repos_data)) + palace.store("repos_with_issues", repos_with_issues) + palace.store("total_open_issues", total_issues) + palace.store( + "avg_issues_per_repo", + round(total_issues / len(repos_data), 1) if repos_data else 0, + ) + + # --- Assignment Status Room --- + palace.enter("assignment_status") + total_assigned = sum(r.get("assigned", 0) for r in repos_data) + total_unassigned = sum(r.get("unassigned", 0) for r in repos_data) + coverage = total_assigned / total_issues if total_issues else 0 + palace.store("assigned", total_assigned) + palace.store("unassigned", total_unassigned) + palace.store("coverage_rate", round(coverage, 3)) + palace.store( + "coverage_status", + "OK" if coverage >= target_assignee_rate else f"BELOW TARGET ({target_assignee_rate:.0%})", + ) + + # --- Triage Priority Room --- + palace.enter("triage_priority") + unassigned_repos = [r["repo"] for r in repos_data if r.get("unassigned", 0) > 0] + palace.store("repos_needing_triage", unassigned_repos) + palace.store("triage_count", total_unassigned) + + # --- Resolution Patterns Room --- + palace.enter("resolution_patterns") + palace.store("technique", "mempalace") + palace.store("target_assignee_rate", target_assignee_rate) + + return palace.render() diff --git a/hermes-sovereign/mempalace/retrieval_enforcer.py b/hermes-sovereign/mempalace/retrieval_enforcer.py new file mode 100644 index 00000000..f5030737 --- /dev/null +++ b/hermes-sovereign/mempalace/retrieval_enforcer.py @@ -0,0 +1,277 @@ +"""Retrieval Order Enforcer — L0 through L5 memory hierarchy. + +Ensures the agent checks durable memory before falling back to free generation. +Gracefully degrades if any layer is unavailable (ONNX issues, missing files, etc). + +Layer order: + L0: Identity (~/.mempalace/identity.txt) + L1: Palace rooms (mempalace CLI search) + L2: Session scratch (~/.hermes/scratchpad/{session_id}.json) + L3: Gitea artifacts (API search for issues/PRs) + L4: Procedures (skills directory search) + L5: Free generation (only if L0-L4 produced nothing) + +Refs: Epic #367, Sub-issue #369 +""" + +from __future__ import annotations + +import json +import os +import re +import subprocess +from pathlib import Path +from typing import Optional + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt" +SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad" +SKILLS_DIR = Path.home() / ".hermes" / "skills" +MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace" + +# Patterns that indicate a recall-style query +RECALL_PATTERNS = re.compile( + r"(?i)\b(" + r"what did|status of|remember|last time|yesterday|previously|" + r"we discussed|we talked|we worked|you said|you mentioned|" + r"remind me|what was|what were|how did|when did|" + r"earlier today|last session|before this" + r")\b" +) + + +# --------------------------------------------------------------------------- +# L0: Identity +# --------------------------------------------------------------------------- + +def load_identity() -> str: + """Read the agent identity file. Returns empty string on failure.""" + try: + if IDENTITY_PATH.exists(): + text = IDENTITY_PATH.read_text(encoding="utf-8").strip() + # Cap at ~200 tokens to keep wake-up lean + if len(text.split()) > 200: + text = " ".join(text.split()[:200]) + "..." + return text + except (OSError, PermissionError): + pass + return "" + + +# --------------------------------------------------------------------------- +# L1: Palace search +# --------------------------------------------------------------------------- + +def search_palace(query: str) -> str: + """Search the mempalace for relevant memories. Gracefully degrades on failure.""" + try: + bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace" + result = subprocess.run( + [bin_path, "search", query], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip() + except (FileNotFoundError, subprocess.TimeoutExpired, OSError): + # ONNX issues (#373) or mempalace not installed — degrade gracefully + pass + return "" + + +# --------------------------------------------------------------------------- +# L2: Session scratchpad +# --------------------------------------------------------------------------- + +def load_scratchpad(session_id: str) -> str: + """Load the session scratchpad as formatted text.""" + try: + scratch_file = SCRATCHPAD_DIR / f"{session_id}.json" + if scratch_file.exists(): + data = json.loads(scratch_file.read_text(encoding="utf-8")) + if isinstance(data, dict) and data: + lines = [] + for k, v in data.items(): + lines.append(f" {k}: {v}") + return "\n".join(lines) + except (OSError, json.JSONDecodeError): + pass + return "" + + +# --------------------------------------------------------------------------- +# L3: Gitea artifact search +# --------------------------------------------------------------------------- + +def _load_gitea_token() -> str: + """Read the Gitea API token.""" + token_path = Path.home() / ".hermes" / "gitea_token_vps" + try: + if token_path.exists(): + return token_path.read_text(encoding="utf-8").strip() + except OSError: + pass + return "" + + +def search_gitea(query: str) -> str: + """Search Gitea issues/PRs for context. Returns formatted text or empty string.""" + token = _load_gitea_token() + if not token: + return "" + + api_base = "https://forge.alexanderwhitestone.com/api/v1" + # Extract key terms for search (first 3 significant words) + terms = [w for w in query.split() if len(w) > 3][:3] + search_q = " ".join(terms) if terms else query[:50] + + try: + import urllib.request + import urllib.parse + + url = ( + f"{api_base}/repos/search?" + f"q={urllib.parse.quote(search_q)}&limit=3" + ) + req = urllib.request.Request(url, headers={ + "Authorization": f"token {token}", + "Accept": "application/json", + }) + with urllib.request.urlopen(req, timeout=8) as resp: + data = json.loads(resp.read().decode()) + if data.get("data"): + lines = [] + for repo in data["data"][:3]: + lines.append(f" {repo['full_name']}: {repo.get('description', 'no desc')}") + return "\n".join(lines) + except Exception: + pass + return "" + + +# --------------------------------------------------------------------------- +# L4: Procedures (skills search) +# --------------------------------------------------------------------------- + +def search_skills(query: str) -> str: + """Search skills directory for matching procedures.""" + try: + if not SKILLS_DIR.exists(): + return "" + + query_lower = query.lower() + terms = [w for w in query_lower.split() if len(w) > 3] + if not terms: + return "" + + matches = [] + for skill_dir in SKILLS_DIR.iterdir(): + if not skill_dir.is_dir(): + continue + skill_md = skill_dir / "SKILL.md" + if skill_md.exists(): + try: + content = skill_md.read_text(encoding="utf-8").lower() + if any(t in content for t in terms): + # Extract title from frontmatter + title = skill_dir.name + matches.append(f" skill: {title}") + except OSError: + continue + + if matches: + return "\n".join(matches[:5]) + except OSError: + pass + return "" + + +# --------------------------------------------------------------------------- +# Main enforcer +# --------------------------------------------------------------------------- + +def is_recall_query(query: str) -> bool: + """Detect whether a query is asking for recalled/historical information.""" + return bool(RECALL_PATTERNS.search(query)) + + +def enforce_retrieval_order( + query: str, + session_id: Optional[str] = None, + skip_if_not_recall: bool = True, +) -> dict: + """Check palace layers before allowing free generation. + + Args: + query: The user's query text. + session_id: Current session ID for scratchpad access. + skip_if_not_recall: If True (default), skip enforcement for + non-recall queries and return empty result. + + Returns: + dict with keys: + retrieved_from: Highest layer that produced results (e.g. 'L1') + context: Aggregated context string + tokens: Approximate word count of context + layers_checked: List of layers that were consulted + """ + result = { + "retrieved_from": None, + "context": "", + "tokens": 0, + "layers_checked": [], + } + + # Gate: skip for non-recall queries if configured + if skip_if_not_recall and not is_recall_query(query): + return result + + # L0: Identity (always prepend) + identity = load_identity() + if identity: + result["context"] += f"## Identity\n{identity}\n\n" + result["layers_checked"].append("L0") + + # L1: Palace search + palace_results = search_palace(query) + if palace_results: + result["context"] += f"## Palace Memory\n{palace_results}\n\n" + result["retrieved_from"] = "L1" + result["layers_checked"].append("L1") + + # L2: Scratchpad + if session_id: + scratch = load_scratchpad(session_id) + if scratch: + result["context"] += f"## Session Notes\n{scratch}\n\n" + if not result["retrieved_from"]: + result["retrieved_from"] = "L2" + result["layers_checked"].append("L2") + + # L3: Gitea artifacts (only if still no context from L1/L2) + if not result["retrieved_from"]: + artifacts = search_gitea(query) + if artifacts: + result["context"] += f"## Gitea Context\n{artifacts}\n\n" + result["retrieved_from"] = "L3" + result["layers_checked"].append("L3") + + # L4: Procedures (only if still no context) + if not result["retrieved_from"]: + procedures = search_skills(query) + if procedures: + result["context"] += f"## Related Skills\n{procedures}\n\n" + result["retrieved_from"] = "L4" + result["layers_checked"].append("L4") + + # L5: Free generation (no context found — just mark it) + if not result["retrieved_from"]: + result["retrieved_from"] = "L5" + result["layers_checked"].append("L5") + + result["tokens"] = len(result["context"].split()) + return result diff --git a/hermes-sovereign/mempalace/scratchpad.py b/hermes-sovereign/mempalace/scratchpad.py new file mode 100644 index 00000000..4bab88f3 --- /dev/null +++ b/hermes-sovereign/mempalace/scratchpad.py @@ -0,0 +1,184 @@ +"""Session Scratchpad — ephemeral key-value notes per session. + +Provides fast, JSON-backed scratch storage that lives for a session +and can be promoted to durable palace memory. + +Storage: ~/.hermes/scratchpad/{session_id}.json + +Refs: Epic #367, Sub-issue #372 +""" + +from __future__ import annotations + +import json +import os +import subprocess +import time +from pathlib import Path +from typing import Any, Optional + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad" +MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace" + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _scratch_path(session_id: str) -> Path: + """Return the JSON file path for a given session.""" + # Sanitize session_id to prevent path traversal + safe_id = "".join(c for c in session_id if c.isalnum() or c in "-_") + if not safe_id: + safe_id = "unnamed" + return SCRATCHPAD_DIR / f"{safe_id}.json" + + +def _load(session_id: str) -> dict: + """Load scratchpad data, returning empty dict on failure.""" + path = _scratch_path(session_id) + try: + if path.exists(): + return json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + pass + return {} + + +def _save(session_id: str, data: dict) -> None: + """Persist scratchpad data to disk.""" + SCRATCHPAD_DIR.mkdir(parents=True, exist_ok=True) + path = _scratch_path(session_id) + path.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8") + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def write_scratch(session_id: str, key: str, value: Any) -> None: + """Write a note to the session scratchpad. + + Args: + session_id: Current session identifier. + key: Note key (string). + value: Note value (any JSON-serializable type). + """ + data = _load(session_id) + data[key] = { + "value": value, + "written_at": time.strftime("%Y-%m-%d %H:%M:%S"), + } + _save(session_id, data) + + +def read_scratch(session_id: str, key: Optional[str] = None) -> dict: + """Read session scratchpad (all keys or one). + + Args: + session_id: Current session identifier. + key: Optional specific key. If None, returns all entries. + + Returns: + dict — either {key: {value, written_at}} or the full scratchpad. + """ + data = _load(session_id) + if key is not None: + entry = data.get(key) + return {key: entry} if entry else {} + return data + + +def delete_scratch(session_id: str, key: str) -> bool: + """Remove a single key from the scratchpad. + + Returns True if the key existed and was removed. + """ + data = _load(session_id) + if key in data: + del data[key] + _save(session_id, data) + return True + return False + + +def list_sessions() -> list[str]: + """List all session IDs that have scratchpad files.""" + try: + if SCRATCHPAD_DIR.exists(): + return [ + f.stem + for f in SCRATCHPAD_DIR.iterdir() + if f.suffix == ".json" and f.is_file() + ] + except OSError: + pass + return [] + + +def promote_to_palace( + session_id: str, + key: str, + room: str = "general", + drawer: Optional[str] = None, +) -> bool: + """Move a scratchpad note to durable palace memory. + + Uses the mempalace CLI to store the note in the specified room. + Removes the note from the scratchpad after successful promotion. + + Args: + session_id: Session containing the note. + key: Scratchpad key to promote. + room: Palace room name (default: 'general'). + drawer: Optional drawer name within the room. Defaults to key. + + Returns: + True if promotion succeeded, False otherwise. + """ + data = _load(session_id) + entry = data.get(key) + if not entry: + return False + + value = entry.get("value", entry) if isinstance(entry, dict) else entry + content = json.dumps(value, default=str) if not isinstance(value, str) else value + + try: + bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace" + target_drawer = drawer or key + result = subprocess.run( + [bin_path, "store", room, target_drawer, content], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0: + # Remove from scratchpad after successful promotion + del data[key] + _save(session_id, data) + return True + except (FileNotFoundError, subprocess.TimeoutExpired, OSError): + # mempalace CLI not available — degrade gracefully + pass + + return False + + +def clear_session(session_id: str) -> bool: + """Delete the entire scratchpad for a session. + + Returns True if the file existed and was removed. + """ + path = _scratch_path(session_id) + try: + if path.exists(): + path.unlink() + return True + except OSError: + pass + return False diff --git a/hermes-sovereign/mempalace/tests/__init__.py b/hermes-sovereign/mempalace/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/hermes-sovereign/mempalace/tests/test_mempalace.py b/hermes-sovereign/mempalace/tests/test_mempalace.py new file mode 100644 index 00000000..0e7db902 --- /dev/null +++ b/hermes-sovereign/mempalace/tests/test_mempalace.py @@ -0,0 +1,180 @@ +"""Tests for the mempalace skill. + +Validates PalaceRoom, Mempalace class, factory constructors, +and the analyse_issues entry-point. + +Refs: Epic #367, Sub-issue #368 +""" + +from __future__ import annotations + +import json +import sys +import os +import time + +import pytest + +# Ensure the package is importable from the repo layout +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) + +from mempalace.mempalace import Mempalace, PalaceRoom, analyse_issues + + +# ── PalaceRoom unit tests ───────────────────────────────────────────────── + +class TestPalaceRoom: + def test_store_and_retrieve(self): + room = PalaceRoom(name="test", label="Test Room") + room.store("key1", 42) + assert room.retrieve("key1") == 42 + + def test_retrieve_default(self): + room = PalaceRoom(name="test", label="Test Room") + assert room.retrieve("missing") is None + assert room.retrieve("missing", "fallback") == "fallback" + + def test_summary_format(self): + room = PalaceRoom(name="test", label="Test Room") + room.store("repos", 5) + summary = room.summary() + assert "## Test Room" in summary + assert "repos: 5" in summary + + def test_contents_default_factory_isolation(self): + """Each room gets its own dict — no shared mutable default.""" + r1 = PalaceRoom(name="a", label="A") + r2 = PalaceRoom(name="b", label="B") + r1.store("x", 1) + assert r2.retrieve("x") is None + + def test_entered_at_is_recent(self): + before = time.time() + room = PalaceRoom(name="t", label="T") + after = time.time() + assert before <= room.entered_at <= after + + +# ── Mempalace core tests ────────────────────────────────────────────────── + +class TestMempalace: + def test_add_and_enter_room(self): + p = Mempalace(domain="test") + p.add_room("r1", "Room 1") + room = p.enter("r1") + assert room.name == "r1" + + def test_enter_nonexistent_room_raises(self): + p = Mempalace() + with pytest.raises(KeyError, match="No room"): + p.enter("ghost") + + def test_store_without_enter_raises(self): + p = Mempalace() + p.add_room("r", "R") + with pytest.raises(RuntimeError, match="Enter a room"): + p.store("k", "v") + + def test_store_and_retrieve_via_palace(self): + p = Mempalace() + p.add_room("r", "R") + p.enter("r") + p.store("count", 10) + assert p.retrieve("r", "count") == 10 + + def test_retrieve_missing_room_returns_default(self): + p = Mempalace() + assert p.retrieve("nope", "key") is None + assert p.retrieve("nope", "key", 99) == 99 + + def test_render_includes_domain(self): + p = Mempalace(domain="audit") + p.add_room("r", "Room") + p.enter("r") + p.store("item", "value") + output = p.render() + assert "audit" in output + assert "Room" in output + + def test_to_dict_structure(self): + p = Mempalace(domain="test") + p.add_room("r", "R") + p.enter("r") + p.store("a", 1) + d = p.to_dict() + assert d["domain"] == "test" + assert "elapsed_seconds" in d + assert d["rooms"]["r"] == {"a": 1} + + def test_to_json_is_valid(self): + p = Mempalace(domain="j") + p.add_room("x", "X") + p.enter("x") + p.store("v", [1, 2, 3]) + parsed = json.loads(p.to_json()) + assert parsed["rooms"]["x"]["v"] == [1, 2, 3] + + +# ── Factory constructor tests ───────────────────────────────────────────── + +class TestFactories: + def test_for_issue_analysis_rooms(self): + p = Mempalace.for_issue_analysis() + assert p.domain == "issue_analysis" + for key in ("repo_architecture", "assignment_status", + "triage_priority", "resolution_patterns"): + p.enter(key) # should not raise + + def test_for_health_check_rooms(self): + p = Mempalace.for_health_check() + assert p.domain == "health_check" + for key in ("service_topology", "failure_signals", "recovery_history"): + p.enter(key) + + def test_for_code_review_rooms(self): + p = Mempalace.for_code_review() + assert p.domain == "code_review" + for key in ("change_scope", "risk_surface", + "test_coverage", "reviewer_context"): + p.enter(key) + + +# ── analyse_issues entry-point tests ────────────────────────────────────── + +class TestAnalyseIssues: + SAMPLE_DATA = [ + {"repo": "the-nexus", "open_issues": 40, "assigned": 30, "unassigned": 10}, + {"repo": "timmy-home", "open_issues": 30, "assigned": 25, "unassigned": 5}, + {"repo": "hermes-agent", "open_issues": 20, "assigned": 15, "unassigned": 5}, + {"repo": "empty-repo", "open_issues": 0, "assigned": 0, "unassigned": 0}, + ] + + def test_returns_string(self): + result = analyse_issues(self.SAMPLE_DATA) + assert isinstance(result, str) + assert len(result) > 0 + + def test_contains_room_headers(self): + result = analyse_issues(self.SAMPLE_DATA) + assert "Repository Architecture" in result + assert "Assignment Status" in result + + def test_coverage_below_target(self): + result = analyse_issues(self.SAMPLE_DATA, target_assignee_rate=0.90) + assert "BELOW TARGET" in result + + def test_coverage_meets_target(self): + good_data = [ + {"repo": "a", "open_issues": 10, "assigned": 10, "unassigned": 0}, + ] + result = analyse_issues(good_data, target_assignee_rate=0.80) + assert "OK" in result + + def test_empty_repos_list(self): + result = analyse_issues([]) + assert isinstance(result, str) + + def test_single_repo(self): + data = [{"repo": "solo", "open_issues": 5, "assigned": 3, "unassigned": 2}] + result = analyse_issues(data) + assert "solo" in result or "issue_analysis" in result diff --git a/hermes-sovereign/mempalace/tests/test_retrieval_enforcer.py b/hermes-sovereign/mempalace/tests/test_retrieval_enforcer.py new file mode 100644 index 00000000..bf84028f --- /dev/null +++ b/hermes-sovereign/mempalace/tests/test_retrieval_enforcer.py @@ -0,0 +1,143 @@ +"""Tests for retrieval_enforcer.py. + +Refs: Epic #367, Sub-issue #369 +""" + +from __future__ import annotations + +import json +import os +import sys +import tempfile +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) + +from mempalace.retrieval_enforcer import ( + is_recall_query, + load_identity, + load_scratchpad, + enforce_retrieval_order, + search_skills, + RECALL_PATTERNS, +) + + +class TestRecallDetection: + """Test the recall-query pattern matcher.""" + + @pytest.mark.parametrize("query", [ + "what did we work on yesterday", + "status of the mempalace integration", + "remember the fleet audit results", + "last time we deployed the nexus", + "previously you mentioned a CI fix", + "we discussed the sovereign deployment", + ]) + def test_recall_queries_detected(self, query): + assert is_recall_query(query) is True + + @pytest.mark.parametrize("query", [ + "create a new file called test.py", + "run the test suite", + "deploy to production", + "write a function that sums numbers", + "install the package", + ]) + def test_non_recall_queries_skipped(self, query): + assert is_recall_query(query) is False + + +class TestLoadIdentity: + def test_loads_existing_identity(self, tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text("I am Timmy. A sovereign AI.") + with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file): + result = load_identity() + assert "Timmy" in result + + def test_returns_empty_on_missing_file(self, tmp_path): + identity_file = tmp_path / "nonexistent.txt" + with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file): + result = load_identity() + assert result == "" + + def test_truncates_long_identity(self, tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text(" ".join(["word"] * 300)) + with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file): + result = load_identity() + assert result.endswith("...") + assert len(result.split()) <= 201 # 200 words + "..." + + +class TestLoadScratchpad: + def test_loads_valid_scratchpad(self, tmp_path): + scratch_file = tmp_path / "session123.json" + scratch_file.write_text(json.dumps({"note": "test value", "key2": 42})) + with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path): + result = load_scratchpad("session123") + assert "note: test value" in result + assert "key2: 42" in result + + def test_returns_empty_on_missing_file(self, tmp_path): + with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path): + result = load_scratchpad("nonexistent") + assert result == "" + + def test_returns_empty_on_invalid_json(self, tmp_path): + scratch_file = tmp_path / "bad.json" + scratch_file.write_text("not valid json{{{") + with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path): + result = load_scratchpad("bad") + assert result == "" + + +class TestEnforceRetrievalOrder: + def test_skips_non_recall_query(self): + result = enforce_retrieval_order("create a new file") + assert result["retrieved_from"] is None + assert result["tokens"] == 0 + + def test_runs_for_recall_query(self, tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text("I am Timmy.") + with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \ + patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \ + patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \ + patch("mempalace.retrieval_enforcer.search_skills", return_value=""): + result = enforce_retrieval_order("what did we work on yesterday") + assert "Identity" in result["context"] + assert "L0" in result["layers_checked"] + + def test_palace_hit_sets_l1(self, tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text("I am Timmy.") + with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \ + patch("mempalace.retrieval_enforcer.search_palace", return_value="Found: fleet audit results"), \ + patch("mempalace.retrieval_enforcer.search_gitea", return_value=""): + result = enforce_retrieval_order("what did we discuss yesterday") + assert result["retrieved_from"] == "L1" + assert "Palace Memory" in result["context"] + + def test_falls_through_to_l5(self, tmp_path): + identity_file = tmp_path / "nonexistent.txt" + with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \ + patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \ + patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \ + patch("mempalace.retrieval_enforcer.search_skills", return_value=""): + result = enforce_retrieval_order("remember the old deployment", skip_if_not_recall=True) + assert result["retrieved_from"] == "L5" + + def test_force_mode_skips_recall_check(self, tmp_path): + identity_file = tmp_path / "identity.txt" + identity_file.write_text("I am Timmy.") + with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \ + patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \ + patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \ + patch("mempalace.retrieval_enforcer.search_skills", return_value=""): + result = enforce_retrieval_order("deploy now", skip_if_not_recall=False) + assert "Identity" in result["context"] diff --git a/hermes-sovereign/mempalace/tests/test_scratchpad.py b/hermes-sovereign/mempalace/tests/test_scratchpad.py new file mode 100644 index 00000000..c2f564c2 --- /dev/null +++ b/hermes-sovereign/mempalace/tests/test_scratchpad.py @@ -0,0 +1,108 @@ +"""Tests for scratchpad.py. + +Refs: Epic #367, Sub-issue #372 +""" + +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) + +from mempalace.scratchpad import ( + write_scratch, + read_scratch, + delete_scratch, + list_sessions, + clear_session, + _scratch_path, +) + + +@pytest.fixture +def scratch_dir(tmp_path): + """Provide a temporary scratchpad directory.""" + with patch("mempalace.scratchpad.SCRATCHPAD_DIR", tmp_path): + yield tmp_path + + +class TestScratchPath: + def test_sanitizes_session_id(self): + path = _scratch_path("safe-id_123") + assert "safe-id_123.json" in str(path) + + def test_strips_dangerous_chars(self): + path = _scratch_path("../../etc/passwd") + assert ".." not in path.name + assert "/" not in path.name + # Dots are stripped, so only alphanumeric chars remain + assert path.name == "etcpasswd.json" + + +class TestWriteAndRead: + def test_write_then_read(self, scratch_dir): + write_scratch("sess1", "note", "hello world") + result = read_scratch("sess1", "note") + assert "note" in result + assert result["note"]["value"] == "hello world" + + def test_read_all_keys(self, scratch_dir): + write_scratch("sess1", "a", 1) + write_scratch("sess1", "b", 2) + result = read_scratch("sess1") + assert "a" in result + assert "b" in result + + def test_read_missing_key(self, scratch_dir): + write_scratch("sess1", "exists", "yes") + result = read_scratch("sess1", "missing") + assert result == {} + + def test_read_missing_session(self, scratch_dir): + result = read_scratch("nonexistent") + assert result == {} + + def test_overwrite_key(self, scratch_dir): + write_scratch("sess1", "key", "v1") + write_scratch("sess1", "key", "v2") + result = read_scratch("sess1", "key") + assert result["key"]["value"] == "v2" + + +class TestDelete: + def test_delete_existing_key(self, scratch_dir): + write_scratch("sess1", "key", "val") + assert delete_scratch("sess1", "key") is True + assert read_scratch("sess1", "key") == {} + + def test_delete_missing_key(self, scratch_dir): + write_scratch("sess1", "other", "val") + assert delete_scratch("sess1", "missing") is False + + +class TestListSessions: + def test_lists_sessions(self, scratch_dir): + write_scratch("alpha", "k", "v") + write_scratch("beta", "k", "v") + sessions = list_sessions() + assert "alpha" in sessions + assert "beta" in sessions + + def test_empty_directory(self, scratch_dir): + assert list_sessions() == [] + + +class TestClearSession: + def test_clears_existing(self, scratch_dir): + write_scratch("sess1", "k", "v") + assert clear_session("sess1") is True + assert read_scratch("sess1") == {} + + def test_clear_nonexistent(self, scratch_dir): + assert clear_session("ghost") is False diff --git a/hermes-sovereign/mempalace/tests/test_wakeup.py b/hermes-sovereign/mempalace/tests/test_wakeup.py new file mode 100644 index 00000000..c49c770d --- /dev/null +++ b/hermes-sovereign/mempalace/tests/test_wakeup.py @@ -0,0 +1,100 @@ +"""Tests for wakeup.py. + +Refs: Epic #367, Sub-issue #372 +""" + +from __future__ import annotations + +import json +import os +import sys +import time +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) + +from mempalace.wakeup import ( + palace_wakeup, + fleet_status_summary, + _load_identity, + _palace_context, +) + + +class TestLoadIdentity: + def test_loads_identity(self, tmp_path): + f = tmp_path / "identity.txt" + f.write_text("I am Timmy. A sovereign AI.") + with patch("mempalace.wakeup.IDENTITY_PATH", f): + result = _load_identity() + assert "Timmy" in result + + def test_missing_identity(self, tmp_path): + f = tmp_path / "nope.txt" + with patch("mempalace.wakeup.IDENTITY_PATH", f): + assert _load_identity() == "" + + +class TestFleetStatus: + def test_reads_fleet_json(self, tmp_path): + f = tmp_path / "fleet_status.json" + f.write_text(json.dumps({ + "Groq": {"state": "active", "last_seen": "2026-04-07"}, + "Ezra": {"state": "idle", "last_seen": "2026-04-06"}, + })) + with patch("mempalace.wakeup.FLEET_STATUS_PATH", f): + result = fleet_status_summary() + assert "Fleet Status" in result + assert "Groq" in result + assert "active" in result + + def test_missing_fleet_file(self, tmp_path): + f = tmp_path / "nope.json" + with patch("mempalace.wakeup.FLEET_STATUS_PATH", f): + assert fleet_status_summary() == "" + + def test_invalid_json(self, tmp_path): + f = tmp_path / "bad.json" + f.write_text("not json") + with patch("mempalace.wakeup.FLEET_STATUS_PATH", f): + assert fleet_status_summary() == "" + + +class TestPalaceWakeup: + def test_generates_context_with_identity(self, tmp_path): + identity = tmp_path / "identity.txt" + identity.write_text("I am Timmy.") + cache = tmp_path / "cache.txt" + with patch("mempalace.wakeup.IDENTITY_PATH", identity), \ + patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \ + patch("mempalace.wakeup._palace_context", return_value=""), \ + patch("mempalace.wakeup.fleet_status_summary", return_value=""): + result = palace_wakeup(force=True) + assert "Identity" in result + assert "Timmy" in result + assert "Session" in result + + def test_uses_cache_when_fresh(self, tmp_path): + cache = tmp_path / "cache.txt" + cache.write_text("cached wake-up content") + # Touch the file so it's fresh + with patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \ + patch("mempalace.wakeup.WAKEUP_CACHE_TTL", 9999): + result = palace_wakeup(force=False) + assert result == "cached wake-up content" + + def test_force_bypasses_cache(self, tmp_path): + cache = tmp_path / "cache.txt" + cache.write_text("stale content") + identity = tmp_path / "identity.txt" + identity.write_text("I am Timmy.") + with patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \ + patch("mempalace.wakeup.IDENTITY_PATH", identity), \ + patch("mempalace.wakeup._palace_context", return_value=""), \ + patch("mempalace.wakeup.fleet_status_summary", return_value=""): + result = palace_wakeup(force=True) + assert "Identity" in result + assert "stale content" not in result diff --git a/hermes-sovereign/mempalace/wakeup.py b/hermes-sovereign/mempalace/wakeup.py new file mode 100644 index 00000000..3bfd05b9 --- /dev/null +++ b/hermes-sovereign/mempalace/wakeup.py @@ -0,0 +1,161 @@ +"""Wake-up Protocol — session start context injection. + +Generates 300-900 tokens of context when a new Hermes session starts. +Loads identity, recent palace context, and fleet status. + +Refs: Epic #367, Sub-issue #372 +""" + +from __future__ import annotations + +import json +import os +import subprocess +import time +from pathlib import Path + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt" +MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace" +FLEET_STATUS_PATH = Path.home() / ".hermes" / "fleet_status.json" +WAKEUP_CACHE_PATH = Path.home() / ".hermes" / "last_wakeup.txt" +WAKEUP_CACHE_TTL = 300 # 5 minutes — don't regenerate if recent + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _load_identity() -> str: + """Read the agent identity file.""" + try: + if IDENTITY_PATH.exists(): + text = IDENTITY_PATH.read_text(encoding="utf-8").strip() + # Cap at ~150 tokens for wake-up brevity + words = text.split() + if len(words) > 150: + text = " ".join(words[:150]) + "..." + return text + except (OSError, PermissionError): + pass + return "" + + +def _palace_context() -> str: + """Run mempalace wake-up command for recent context. Degrades gracefully.""" + try: + bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace" + result = subprocess.run( + [bin_path, "wake-up"], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip() + except (FileNotFoundError, subprocess.TimeoutExpired, OSError): + # ONNX issues (#373) or CLI not available — degrade gracefully + pass + return "" + + +def fleet_status_summary() -> str: + """Read cached fleet status for lightweight session context.""" + try: + if FLEET_STATUS_PATH.exists(): + data = json.loads(FLEET_STATUS_PATH.read_text(encoding="utf-8")) + lines = ["## Fleet Status"] + + if isinstance(data, dict): + for agent, status in data.items(): + if isinstance(status, dict): + state = status.get("state", "unknown") + last_seen = status.get("last_seen", "?") + lines.append(f" {agent}: {state} (last: {last_seen})") + else: + lines.append(f" {agent}: {status}") + + if len(lines) > 1: + return "\n".join(lines) + except (OSError, json.JSONDecodeError): + pass + return "" + + +def _check_cache() -> str: + """Return cached wake-up if fresh enough.""" + try: + if WAKEUP_CACHE_PATH.exists(): + age = time.time() - WAKEUP_CACHE_PATH.stat().st_mtime + if age < WAKEUP_CACHE_TTL: + return WAKEUP_CACHE_PATH.read_text(encoding="utf-8").strip() + except OSError: + pass + return "" + + +def _write_cache(content: str) -> None: + """Cache the wake-up content.""" + try: + WAKEUP_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) + WAKEUP_CACHE_PATH.write_text(content, encoding="utf-8") + except OSError: + pass + + +# --------------------------------------------------------------------------- +# Main entry point +# --------------------------------------------------------------------------- + +def palace_wakeup(force: bool = False) -> str: + """Generate wake-up context for a new session. ~300-900 tokens. + + Args: + force: If True, bypass the 5-minute cache and regenerate. + + Returns: + Formatted context string suitable for prepending to the system prompt. + """ + # Check cache first (avoids redundant work on rapid session restarts) + if not force: + cached = _check_cache() + if cached: + return cached + + parts = [] + + # L0: Identity + identity = _load_identity() + if identity: + parts.append(f"## Identity\n{identity}") + + # L1: Recent palace context + palace = _palace_context() + if palace: + parts.append(palace) + + # Fleet status (lightweight) + fleet = fleet_status_summary() + if fleet: + parts.append(fleet) + + # Timestamp + parts.append(f"## Session\nWake-up generated: {time.strftime('%Y-%m-%d %H:%M:%S')}") + + content = "\n\n".join(parts) + + # Cache for TTL + _write_cache(content) + + return content + + +# --------------------------------------------------------------------------- +# CLI entry point for testing +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + print(palace_wakeup(force=True)) -- 2.43.0