Compare commits
1 Commits
main
...
timmy/memp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f8dabae8eb |
14
hermes-sovereign/mempalace/__init__.py
Normal file
14
hermes-sovereign/mempalace/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
"""MemPalace integration for Hermes sovereign agent.
|
||||
|
||||
Provides:
|
||||
- mempalace.py: PalaceRoom + Mempalace classes for analytical workflows
|
||||
- retrieval_enforcer.py: L0-L5 retrieval order enforcement
|
||||
- wakeup.py: Session wake-up protocol (~300-900 tokens)
|
||||
- scratchpad.py: JSON-based session scratchpad with palace promotion
|
||||
|
||||
Epic: #367
|
||||
"""
|
||||
|
||||
from .mempalace import Mempalace, PalaceRoom, analyse_issues
|
||||
|
||||
__all__ = ["Mempalace", "PalaceRoom", "analyse_issues"]
|
||||
225
hermes-sovereign/mempalace/mempalace.py
Normal file
225
hermes-sovereign/mempalace/mempalace.py
Normal file
@@ -0,0 +1,225 @@
|
||||
"""
|
||||
---
|
||||
title: Mempalace — Analytical Workflow Memory Framework
|
||||
description: Applies spatial memory palace organization to analytical tasks (issue triage, repo audits, backlog analysis) for faster, more consistent results.
|
||||
conditions:
|
||||
- Analytical workflows over structured data (issues, PRs, repos)
|
||||
- Repetitive triage or audit tasks where pattern recall improves speed
|
||||
- Multi-repository scanning requiring consistent mental models
|
||||
---
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class PalaceRoom:
|
||||
"""A single 'room' in the memory palace — holds organized facts about one analytical dimension."""
|
||||
|
||||
name: str
|
||||
label: str
|
||||
contents: dict[str, Any] = field(default_factory=dict)
|
||||
entered_at: float = field(default_factory=time.time)
|
||||
|
||||
def store(self, key: str, value: Any) -> None:
|
||||
self.contents[key] = value
|
||||
|
||||
def retrieve(self, key: str, default: Any = None) -> Any:
|
||||
return self.contents.get(key, default)
|
||||
|
||||
def summary(self) -> str:
|
||||
lines = [f"## {self.label}"]
|
||||
for k, v in self.contents.items():
|
||||
lines.append(f" {k}: {v}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class Mempalace:
|
||||
"""
|
||||
Spatial memory palace for analytical workflows.
|
||||
|
||||
Organises multi-dimensional data about a domain (e.g. Gitea issues) into
|
||||
named rooms. Each room models one analytical dimension, making it easy to
|
||||
traverse observations in a consistent order — the same pattern that produced
|
||||
a 19% throughput improvement in Allegro's April 2026 evaluation.
|
||||
|
||||
Standard rooms for issue-analysis workflows
|
||||
-------------------------------------------
|
||||
repo_architecture Repository structure and inter-repo relationships
|
||||
assignment_status Assigned vs unassigned issue distribution
|
||||
triage_priority Priority / urgency levels (the "lighting system")
|
||||
resolution_patterns Historical resolution trends and velocity
|
||||
|
||||
Usage
|
||||
-----
|
||||
>>> palace = Mempalace.for_issue_analysis()
|
||||
>>> palace.enter("repo_architecture")
|
||||
>>> palace.store("total_repos", 11)
|
||||
>>> palace.store("repos_with_issues", 4)
|
||||
>>> palace.enter("assignment_status")
|
||||
>>> palace.store("assigned", 72)
|
||||
>>> palace.store("unassigned", 22)
|
||||
>>> print(palace.render())
|
||||
"""
|
||||
|
||||
def __init__(self, domain: str = "general") -> None:
|
||||
self.domain = domain
|
||||
self._rooms: dict[str, PalaceRoom] = {}
|
||||
self._current_room: str | None = None
|
||||
self._created_at: float = time.time()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Factory constructors for common analytical domains
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@classmethod
|
||||
def for_issue_analysis(cls) -> "Mempalace":
|
||||
"""Pre-wired palace for Gitea / forge issue-analysis workflows."""
|
||||
p = cls(domain="issue_analysis")
|
||||
p.add_room("repo_architecture", "Repository Architecture Room")
|
||||
p.add_room("assignment_status", "Issue Assignment Status Room")
|
||||
p.add_room("triage_priority", "Triage Priority Room")
|
||||
p.add_room("resolution_patterns", "Resolution Patterns Room")
|
||||
return p
|
||||
|
||||
@classmethod
|
||||
def for_health_check(cls) -> "Mempalace":
|
||||
"""Pre-wired palace for CI / deployment health-check workflows."""
|
||||
p = cls(domain="health_check")
|
||||
p.add_room("service_topology", "Service Topology Room")
|
||||
p.add_room("failure_signals", "Failure Signals Room")
|
||||
p.add_room("recovery_history", "Recovery History Room")
|
||||
return p
|
||||
|
||||
@classmethod
|
||||
def for_code_review(cls) -> "Mempalace":
|
||||
"""Pre-wired palace for code-review / PR triage workflows."""
|
||||
p = cls(domain="code_review")
|
||||
p.add_room("change_scope", "Change Scope Room")
|
||||
p.add_room("risk_surface", "Risk Surface Room")
|
||||
p.add_room("test_coverage", "Test Coverage Room")
|
||||
p.add_room("reviewer_context", "Reviewer Context Room")
|
||||
return p
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Room management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def add_room(self, key: str, label: str) -> PalaceRoom:
|
||||
room = PalaceRoom(name=key, label=label)
|
||||
self._rooms[key] = room
|
||||
return room
|
||||
|
||||
def enter(self, room_key: str) -> PalaceRoom:
|
||||
if room_key not in self._rooms:
|
||||
raise KeyError(f"No room '{room_key}' in palace. Available: {list(self._rooms)}")
|
||||
self._current_room = room_key
|
||||
return self._rooms[room_key]
|
||||
|
||||
def store(self, key: str, value: Any) -> None:
|
||||
"""Store a value in the currently active room."""
|
||||
if self._current_room is None:
|
||||
raise RuntimeError("Enter a room before storing values.")
|
||||
self._rooms[self._current_room].store(key, value)
|
||||
|
||||
def retrieve(self, room_key: str, key: str, default: Any = None) -> Any:
|
||||
if room_key not in self._rooms:
|
||||
return default
|
||||
return self._rooms[room_key].retrieve(key, default)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Rendering
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def render(self) -> str:
|
||||
"""Return a human-readable summary of the entire palace."""
|
||||
elapsed = time.time() - self._created_at
|
||||
lines = [
|
||||
f"# Mempalace — {self.domain}",
|
||||
f"_traversal time: {elapsed:.2f}s | rooms: {len(self._rooms)}_",
|
||||
"",
|
||||
]
|
||||
for room in self._rooms.values():
|
||||
lines.append(room.summary())
|
||||
lines.append("")
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"domain": self.domain,
|
||||
"elapsed_seconds": round(time.time() - self._created_at, 3),
|
||||
"rooms": {k: v.contents for k, v in self._rooms.items()},
|
||||
}
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self.to_dict(), indent=2)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Skill entry-point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def analyse_issues(
|
||||
repos_data: list[dict],
|
||||
target_assignee_rate: float = 0.80,
|
||||
) -> str:
|
||||
"""
|
||||
Applies the mempalace technique to a list of repo issue summaries.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
repos_data:
|
||||
List of dicts, each with keys: ``repo``, ``open_issues``,
|
||||
``assigned``, ``unassigned``.
|
||||
target_assignee_rate:
|
||||
Minimum acceptable assignee-coverage ratio (default 0.80).
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Rendered palace summary with coverage assessment.
|
||||
"""
|
||||
palace = Mempalace.for_issue_analysis()
|
||||
|
||||
# --- Repository Architecture Room ---
|
||||
palace.enter("repo_architecture")
|
||||
total_issues = sum(r.get("open_issues", 0) for r in repos_data)
|
||||
repos_with_issues = sum(1 for r in repos_data if r.get("open_issues", 0) > 0)
|
||||
palace.store("repos_sampled", len(repos_data))
|
||||
palace.store("repos_with_issues", repos_with_issues)
|
||||
palace.store("total_open_issues", total_issues)
|
||||
palace.store(
|
||||
"avg_issues_per_repo",
|
||||
round(total_issues / len(repos_data), 1) if repos_data else 0,
|
||||
)
|
||||
|
||||
# --- Assignment Status Room ---
|
||||
palace.enter("assignment_status")
|
||||
total_assigned = sum(r.get("assigned", 0) for r in repos_data)
|
||||
total_unassigned = sum(r.get("unassigned", 0) for r in repos_data)
|
||||
coverage = total_assigned / total_issues if total_issues else 0
|
||||
palace.store("assigned", total_assigned)
|
||||
palace.store("unassigned", total_unassigned)
|
||||
palace.store("coverage_rate", round(coverage, 3))
|
||||
palace.store(
|
||||
"coverage_status",
|
||||
"OK" if coverage >= target_assignee_rate else f"BELOW TARGET ({target_assignee_rate:.0%})",
|
||||
)
|
||||
|
||||
# --- Triage Priority Room ---
|
||||
palace.enter("triage_priority")
|
||||
unassigned_repos = [r["repo"] for r in repos_data if r.get("unassigned", 0) > 0]
|
||||
palace.store("repos_needing_triage", unassigned_repos)
|
||||
palace.store("triage_count", total_unassigned)
|
||||
|
||||
# --- Resolution Patterns Room ---
|
||||
palace.enter("resolution_patterns")
|
||||
palace.store("technique", "mempalace")
|
||||
palace.store("target_assignee_rate", target_assignee_rate)
|
||||
|
||||
return palace.render()
|
||||
277
hermes-sovereign/mempalace/retrieval_enforcer.py
Normal file
277
hermes-sovereign/mempalace/retrieval_enforcer.py
Normal file
@@ -0,0 +1,277 @@
|
||||
"""Retrieval Order Enforcer — L0 through L5 memory hierarchy.
|
||||
|
||||
Ensures the agent checks durable memory before falling back to free generation.
|
||||
Gracefully degrades if any layer is unavailable (ONNX issues, missing files, etc).
|
||||
|
||||
Layer order:
|
||||
L0: Identity (~/.mempalace/identity.txt)
|
||||
L1: Palace rooms (mempalace CLI search)
|
||||
L2: Session scratch (~/.hermes/scratchpad/{session_id}.json)
|
||||
L3: Gitea artifacts (API search for issues/PRs)
|
||||
L4: Procedures (skills directory search)
|
||||
L5: Free generation (only if L0-L4 produced nothing)
|
||||
|
||||
Refs: Epic #367, Sub-issue #369
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
|
||||
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
|
||||
SKILLS_DIR = Path.home() / ".hermes" / "skills"
|
||||
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
|
||||
|
||||
# Patterns that indicate a recall-style query
|
||||
RECALL_PATTERNS = re.compile(
|
||||
r"(?i)\b("
|
||||
r"what did|status of|remember|last time|yesterday|previously|"
|
||||
r"we discussed|we talked|we worked|you said|you mentioned|"
|
||||
r"remind me|what was|what were|how did|when did|"
|
||||
r"earlier today|last session|before this"
|
||||
r")\b"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# L0: Identity
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_identity() -> str:
|
||||
"""Read the agent identity file. Returns empty string on failure."""
|
||||
try:
|
||||
if IDENTITY_PATH.exists():
|
||||
text = IDENTITY_PATH.read_text(encoding="utf-8").strip()
|
||||
# Cap at ~200 tokens to keep wake-up lean
|
||||
if len(text.split()) > 200:
|
||||
text = " ".join(text.split()[:200]) + "..."
|
||||
return text
|
||||
except (OSError, PermissionError):
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# L1: Palace search
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def search_palace(query: str) -> str:
|
||||
"""Search the mempalace for relevant memories. Gracefully degrades on failure."""
|
||||
try:
|
||||
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
|
||||
result = subprocess.run(
|
||||
[bin_path, "search", query],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return result.stdout.strip()
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
|
||||
# ONNX issues (#373) or mempalace not installed — degrade gracefully
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# L2: Session scratchpad
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_scratchpad(session_id: str) -> str:
|
||||
"""Load the session scratchpad as formatted text."""
|
||||
try:
|
||||
scratch_file = SCRATCHPAD_DIR / f"{session_id}.json"
|
||||
if scratch_file.exists():
|
||||
data = json.loads(scratch_file.read_text(encoding="utf-8"))
|
||||
if isinstance(data, dict) and data:
|
||||
lines = []
|
||||
for k, v in data.items():
|
||||
lines.append(f" {k}: {v}")
|
||||
return "\n".join(lines)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# L3: Gitea artifact search
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _load_gitea_token() -> str:
|
||||
"""Read the Gitea API token."""
|
||||
token_path = Path.home() / ".hermes" / "gitea_token_vps"
|
||||
try:
|
||||
if token_path.exists():
|
||||
return token_path.read_text(encoding="utf-8").strip()
|
||||
except OSError:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def search_gitea(query: str) -> str:
|
||||
"""Search Gitea issues/PRs for context. Returns formatted text or empty string."""
|
||||
token = _load_gitea_token()
|
||||
if not token:
|
||||
return ""
|
||||
|
||||
api_base = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
# Extract key terms for search (first 3 significant words)
|
||||
terms = [w for w in query.split() if len(w) > 3][:3]
|
||||
search_q = " ".join(terms) if terms else query[:50]
|
||||
|
||||
try:
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
|
||||
url = (
|
||||
f"{api_base}/repos/search?"
|
||||
f"q={urllib.parse.quote(search_q)}&limit=3"
|
||||
)
|
||||
req = urllib.request.Request(url, headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Accept": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=8) as resp:
|
||||
data = json.loads(resp.read().decode())
|
||||
if data.get("data"):
|
||||
lines = []
|
||||
for repo in data["data"][:3]:
|
||||
lines.append(f" {repo['full_name']}: {repo.get('description', 'no desc')}")
|
||||
return "\n".join(lines)
|
||||
except Exception:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# L4: Procedures (skills search)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def search_skills(query: str) -> str:
|
||||
"""Search skills directory for matching procedures."""
|
||||
try:
|
||||
if not SKILLS_DIR.exists():
|
||||
return ""
|
||||
|
||||
query_lower = query.lower()
|
||||
terms = [w for w in query_lower.split() if len(w) > 3]
|
||||
if not terms:
|
||||
return ""
|
||||
|
||||
matches = []
|
||||
for skill_dir in SKILLS_DIR.iterdir():
|
||||
if not skill_dir.is_dir():
|
||||
continue
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
if skill_md.exists():
|
||||
try:
|
||||
content = skill_md.read_text(encoding="utf-8").lower()
|
||||
if any(t in content for t in terms):
|
||||
# Extract title from frontmatter
|
||||
title = skill_dir.name
|
||||
matches.append(f" skill: {title}")
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
if matches:
|
||||
return "\n".join(matches[:5])
|
||||
except OSError:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main enforcer
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def is_recall_query(query: str) -> bool:
|
||||
"""Detect whether a query is asking for recalled/historical information."""
|
||||
return bool(RECALL_PATTERNS.search(query))
|
||||
|
||||
|
||||
def enforce_retrieval_order(
|
||||
query: str,
|
||||
session_id: Optional[str] = None,
|
||||
skip_if_not_recall: bool = True,
|
||||
) -> dict:
|
||||
"""Check palace layers before allowing free generation.
|
||||
|
||||
Args:
|
||||
query: The user's query text.
|
||||
session_id: Current session ID for scratchpad access.
|
||||
skip_if_not_recall: If True (default), skip enforcement for
|
||||
non-recall queries and return empty result.
|
||||
|
||||
Returns:
|
||||
dict with keys:
|
||||
retrieved_from: Highest layer that produced results (e.g. 'L1')
|
||||
context: Aggregated context string
|
||||
tokens: Approximate word count of context
|
||||
layers_checked: List of layers that were consulted
|
||||
"""
|
||||
result = {
|
||||
"retrieved_from": None,
|
||||
"context": "",
|
||||
"tokens": 0,
|
||||
"layers_checked": [],
|
||||
}
|
||||
|
||||
# Gate: skip for non-recall queries if configured
|
||||
if skip_if_not_recall and not is_recall_query(query):
|
||||
return result
|
||||
|
||||
# L0: Identity (always prepend)
|
||||
identity = load_identity()
|
||||
if identity:
|
||||
result["context"] += f"## Identity\n{identity}\n\n"
|
||||
result["layers_checked"].append("L0")
|
||||
|
||||
# L1: Palace search
|
||||
palace_results = search_palace(query)
|
||||
if palace_results:
|
||||
result["context"] += f"## Palace Memory\n{palace_results}\n\n"
|
||||
result["retrieved_from"] = "L1"
|
||||
result["layers_checked"].append("L1")
|
||||
|
||||
# L2: Scratchpad
|
||||
if session_id:
|
||||
scratch = load_scratchpad(session_id)
|
||||
if scratch:
|
||||
result["context"] += f"## Session Notes\n{scratch}\n\n"
|
||||
if not result["retrieved_from"]:
|
||||
result["retrieved_from"] = "L2"
|
||||
result["layers_checked"].append("L2")
|
||||
|
||||
# L3: Gitea artifacts (only if still no context from L1/L2)
|
||||
if not result["retrieved_from"]:
|
||||
artifacts = search_gitea(query)
|
||||
if artifacts:
|
||||
result["context"] += f"## Gitea Context\n{artifacts}\n\n"
|
||||
result["retrieved_from"] = "L3"
|
||||
result["layers_checked"].append("L3")
|
||||
|
||||
# L4: Procedures (only if still no context)
|
||||
if not result["retrieved_from"]:
|
||||
procedures = search_skills(query)
|
||||
if procedures:
|
||||
result["context"] += f"## Related Skills\n{procedures}\n\n"
|
||||
result["retrieved_from"] = "L4"
|
||||
result["layers_checked"].append("L4")
|
||||
|
||||
# L5: Free generation (no context found — just mark it)
|
||||
if not result["retrieved_from"]:
|
||||
result["retrieved_from"] = "L5"
|
||||
result["layers_checked"].append("L5")
|
||||
|
||||
result["tokens"] = len(result["context"].split())
|
||||
return result
|
||||
184
hermes-sovereign/mempalace/scratchpad.py
Normal file
184
hermes-sovereign/mempalace/scratchpad.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Session Scratchpad — ephemeral key-value notes per session.
|
||||
|
||||
Provides fast, JSON-backed scratch storage that lives for a session
|
||||
and can be promoted to durable palace memory.
|
||||
|
||||
Storage: ~/.hermes/scratchpad/{session_id}.json
|
||||
|
||||
Refs: Epic #367, Sub-issue #372
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
|
||||
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _scratch_path(session_id: str) -> Path:
|
||||
"""Return the JSON file path for a given session."""
|
||||
# Sanitize session_id to prevent path traversal
|
||||
safe_id = "".join(c for c in session_id if c.isalnum() or c in "-_")
|
||||
if not safe_id:
|
||||
safe_id = "unnamed"
|
||||
return SCRATCHPAD_DIR / f"{safe_id}.json"
|
||||
|
||||
|
||||
def _load(session_id: str) -> dict:
|
||||
"""Load scratchpad data, returning empty dict on failure."""
|
||||
path = _scratch_path(session_id)
|
||||
try:
|
||||
if path.exists():
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
def _save(session_id: str, data: dict) -> None:
|
||||
"""Persist scratchpad data to disk."""
|
||||
SCRATCHPAD_DIR.mkdir(parents=True, exist_ok=True)
|
||||
path = _scratch_path(session_id)
|
||||
path.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def write_scratch(session_id: str, key: str, value: Any) -> None:
|
||||
"""Write a note to the session scratchpad.
|
||||
|
||||
Args:
|
||||
session_id: Current session identifier.
|
||||
key: Note key (string).
|
||||
value: Note value (any JSON-serializable type).
|
||||
"""
|
||||
data = _load(session_id)
|
||||
data[key] = {
|
||||
"value": value,
|
||||
"written_at": time.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
}
|
||||
_save(session_id, data)
|
||||
|
||||
|
||||
def read_scratch(session_id: str, key: Optional[str] = None) -> dict:
|
||||
"""Read session scratchpad (all keys or one).
|
||||
|
||||
Args:
|
||||
session_id: Current session identifier.
|
||||
key: Optional specific key. If None, returns all entries.
|
||||
|
||||
Returns:
|
||||
dict — either {key: {value, written_at}} or the full scratchpad.
|
||||
"""
|
||||
data = _load(session_id)
|
||||
if key is not None:
|
||||
entry = data.get(key)
|
||||
return {key: entry} if entry else {}
|
||||
return data
|
||||
|
||||
|
||||
def delete_scratch(session_id: str, key: str) -> bool:
|
||||
"""Remove a single key from the scratchpad.
|
||||
|
||||
Returns True if the key existed and was removed.
|
||||
"""
|
||||
data = _load(session_id)
|
||||
if key in data:
|
||||
del data[key]
|
||||
_save(session_id, data)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def list_sessions() -> list[str]:
|
||||
"""List all session IDs that have scratchpad files."""
|
||||
try:
|
||||
if SCRATCHPAD_DIR.exists():
|
||||
return [
|
||||
f.stem
|
||||
for f in SCRATCHPAD_DIR.iterdir()
|
||||
if f.suffix == ".json" and f.is_file()
|
||||
]
|
||||
except OSError:
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def promote_to_palace(
|
||||
session_id: str,
|
||||
key: str,
|
||||
room: str = "general",
|
||||
drawer: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Move a scratchpad note to durable palace memory.
|
||||
|
||||
Uses the mempalace CLI to store the note in the specified room.
|
||||
Removes the note from the scratchpad after successful promotion.
|
||||
|
||||
Args:
|
||||
session_id: Session containing the note.
|
||||
key: Scratchpad key to promote.
|
||||
room: Palace room name (default: 'general').
|
||||
drawer: Optional drawer name within the room. Defaults to key.
|
||||
|
||||
Returns:
|
||||
True if promotion succeeded, False otherwise.
|
||||
"""
|
||||
data = _load(session_id)
|
||||
entry = data.get(key)
|
||||
if not entry:
|
||||
return False
|
||||
|
||||
value = entry.get("value", entry) if isinstance(entry, dict) else entry
|
||||
content = json.dumps(value, default=str) if not isinstance(value, str) else value
|
||||
|
||||
try:
|
||||
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
|
||||
target_drawer = drawer or key
|
||||
result = subprocess.run(
|
||||
[bin_path, "store", room, target_drawer, content],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
# Remove from scratchpad after successful promotion
|
||||
del data[key]
|
||||
_save(session_id, data)
|
||||
return True
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
|
||||
# mempalace CLI not available — degrade gracefully
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def clear_session(session_id: str) -> bool:
|
||||
"""Delete the entire scratchpad for a session.
|
||||
|
||||
Returns True if the file existed and was removed.
|
||||
"""
|
||||
path = _scratch_path(session_id)
|
||||
try:
|
||||
if path.exists():
|
||||
path.unlink()
|
||||
return True
|
||||
except OSError:
|
||||
pass
|
||||
return False
|
||||
0
hermes-sovereign/mempalace/tests/__init__.py
Normal file
0
hermes-sovereign/mempalace/tests/__init__.py
Normal file
180
hermes-sovereign/mempalace/tests/test_mempalace.py
Normal file
180
hermes-sovereign/mempalace/tests/test_mempalace.py
Normal file
@@ -0,0 +1,180 @@
|
||||
"""Tests for the mempalace skill.
|
||||
|
||||
Validates PalaceRoom, Mempalace class, factory constructors,
|
||||
and the analyse_issues entry-point.
|
||||
|
||||
Refs: Epic #367, Sub-issue #368
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure the package is importable from the repo layout
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
|
||||
from mempalace.mempalace import Mempalace, PalaceRoom, analyse_issues
|
||||
|
||||
|
||||
# ── PalaceRoom unit tests ─────────────────────────────────────────────────
|
||||
|
||||
class TestPalaceRoom:
|
||||
def test_store_and_retrieve(self):
|
||||
room = PalaceRoom(name="test", label="Test Room")
|
||||
room.store("key1", 42)
|
||||
assert room.retrieve("key1") == 42
|
||||
|
||||
def test_retrieve_default(self):
|
||||
room = PalaceRoom(name="test", label="Test Room")
|
||||
assert room.retrieve("missing") is None
|
||||
assert room.retrieve("missing", "fallback") == "fallback"
|
||||
|
||||
def test_summary_format(self):
|
||||
room = PalaceRoom(name="test", label="Test Room")
|
||||
room.store("repos", 5)
|
||||
summary = room.summary()
|
||||
assert "## Test Room" in summary
|
||||
assert "repos: 5" in summary
|
||||
|
||||
def test_contents_default_factory_isolation(self):
|
||||
"""Each room gets its own dict — no shared mutable default."""
|
||||
r1 = PalaceRoom(name="a", label="A")
|
||||
r2 = PalaceRoom(name="b", label="B")
|
||||
r1.store("x", 1)
|
||||
assert r2.retrieve("x") is None
|
||||
|
||||
def test_entered_at_is_recent(self):
|
||||
before = time.time()
|
||||
room = PalaceRoom(name="t", label="T")
|
||||
after = time.time()
|
||||
assert before <= room.entered_at <= after
|
||||
|
||||
|
||||
# ── Mempalace core tests ──────────────────────────────────────────────────
|
||||
|
||||
class TestMempalace:
|
||||
def test_add_and_enter_room(self):
|
||||
p = Mempalace(domain="test")
|
||||
p.add_room("r1", "Room 1")
|
||||
room = p.enter("r1")
|
||||
assert room.name == "r1"
|
||||
|
||||
def test_enter_nonexistent_room_raises(self):
|
||||
p = Mempalace()
|
||||
with pytest.raises(KeyError, match="No room"):
|
||||
p.enter("ghost")
|
||||
|
||||
def test_store_without_enter_raises(self):
|
||||
p = Mempalace()
|
||||
p.add_room("r", "R")
|
||||
with pytest.raises(RuntimeError, match="Enter a room"):
|
||||
p.store("k", "v")
|
||||
|
||||
def test_store_and_retrieve_via_palace(self):
|
||||
p = Mempalace()
|
||||
p.add_room("r", "R")
|
||||
p.enter("r")
|
||||
p.store("count", 10)
|
||||
assert p.retrieve("r", "count") == 10
|
||||
|
||||
def test_retrieve_missing_room_returns_default(self):
|
||||
p = Mempalace()
|
||||
assert p.retrieve("nope", "key") is None
|
||||
assert p.retrieve("nope", "key", 99) == 99
|
||||
|
||||
def test_render_includes_domain(self):
|
||||
p = Mempalace(domain="audit")
|
||||
p.add_room("r", "Room")
|
||||
p.enter("r")
|
||||
p.store("item", "value")
|
||||
output = p.render()
|
||||
assert "audit" in output
|
||||
assert "Room" in output
|
||||
|
||||
def test_to_dict_structure(self):
|
||||
p = Mempalace(domain="test")
|
||||
p.add_room("r", "R")
|
||||
p.enter("r")
|
||||
p.store("a", 1)
|
||||
d = p.to_dict()
|
||||
assert d["domain"] == "test"
|
||||
assert "elapsed_seconds" in d
|
||||
assert d["rooms"]["r"] == {"a": 1}
|
||||
|
||||
def test_to_json_is_valid(self):
|
||||
p = Mempalace(domain="j")
|
||||
p.add_room("x", "X")
|
||||
p.enter("x")
|
||||
p.store("v", [1, 2, 3])
|
||||
parsed = json.loads(p.to_json())
|
||||
assert parsed["rooms"]["x"]["v"] == [1, 2, 3]
|
||||
|
||||
|
||||
# ── Factory constructor tests ─────────────────────────────────────────────
|
||||
|
||||
class TestFactories:
|
||||
def test_for_issue_analysis_rooms(self):
|
||||
p = Mempalace.for_issue_analysis()
|
||||
assert p.domain == "issue_analysis"
|
||||
for key in ("repo_architecture", "assignment_status",
|
||||
"triage_priority", "resolution_patterns"):
|
||||
p.enter(key) # should not raise
|
||||
|
||||
def test_for_health_check_rooms(self):
|
||||
p = Mempalace.for_health_check()
|
||||
assert p.domain == "health_check"
|
||||
for key in ("service_topology", "failure_signals", "recovery_history"):
|
||||
p.enter(key)
|
||||
|
||||
def test_for_code_review_rooms(self):
|
||||
p = Mempalace.for_code_review()
|
||||
assert p.domain == "code_review"
|
||||
for key in ("change_scope", "risk_surface",
|
||||
"test_coverage", "reviewer_context"):
|
||||
p.enter(key)
|
||||
|
||||
|
||||
# ── analyse_issues entry-point tests ──────────────────────────────────────
|
||||
|
||||
class TestAnalyseIssues:
|
||||
SAMPLE_DATA = [
|
||||
{"repo": "the-nexus", "open_issues": 40, "assigned": 30, "unassigned": 10},
|
||||
{"repo": "timmy-home", "open_issues": 30, "assigned": 25, "unassigned": 5},
|
||||
{"repo": "hermes-agent", "open_issues": 20, "assigned": 15, "unassigned": 5},
|
||||
{"repo": "empty-repo", "open_issues": 0, "assigned": 0, "unassigned": 0},
|
||||
]
|
||||
|
||||
def test_returns_string(self):
|
||||
result = analyse_issues(self.SAMPLE_DATA)
|
||||
assert isinstance(result, str)
|
||||
assert len(result) > 0
|
||||
|
||||
def test_contains_room_headers(self):
|
||||
result = analyse_issues(self.SAMPLE_DATA)
|
||||
assert "Repository Architecture" in result
|
||||
assert "Assignment Status" in result
|
||||
|
||||
def test_coverage_below_target(self):
|
||||
result = analyse_issues(self.SAMPLE_DATA, target_assignee_rate=0.90)
|
||||
assert "BELOW TARGET" in result
|
||||
|
||||
def test_coverage_meets_target(self):
|
||||
good_data = [
|
||||
{"repo": "a", "open_issues": 10, "assigned": 10, "unassigned": 0},
|
||||
]
|
||||
result = analyse_issues(good_data, target_assignee_rate=0.80)
|
||||
assert "OK" in result
|
||||
|
||||
def test_empty_repos_list(self):
|
||||
result = analyse_issues([])
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_single_repo(self):
|
||||
data = [{"repo": "solo", "open_issues": 5, "assigned": 3, "unassigned": 2}]
|
||||
result = analyse_issues(data)
|
||||
assert "solo" in result or "issue_analysis" in result
|
||||
143
hermes-sovereign/mempalace/tests/test_retrieval_enforcer.py
Normal file
143
hermes-sovereign/mempalace/tests/test_retrieval_enforcer.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""Tests for retrieval_enforcer.py.
|
||||
|
||||
Refs: Epic #367, Sub-issue #369
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
|
||||
from mempalace.retrieval_enforcer import (
|
||||
is_recall_query,
|
||||
load_identity,
|
||||
load_scratchpad,
|
||||
enforce_retrieval_order,
|
||||
search_skills,
|
||||
RECALL_PATTERNS,
|
||||
)
|
||||
|
||||
|
||||
class TestRecallDetection:
|
||||
"""Test the recall-query pattern matcher."""
|
||||
|
||||
@pytest.mark.parametrize("query", [
|
||||
"what did we work on yesterday",
|
||||
"status of the mempalace integration",
|
||||
"remember the fleet audit results",
|
||||
"last time we deployed the nexus",
|
||||
"previously you mentioned a CI fix",
|
||||
"we discussed the sovereign deployment",
|
||||
])
|
||||
def test_recall_queries_detected(self, query):
|
||||
assert is_recall_query(query) is True
|
||||
|
||||
@pytest.mark.parametrize("query", [
|
||||
"create a new file called test.py",
|
||||
"run the test suite",
|
||||
"deploy to production",
|
||||
"write a function that sums numbers",
|
||||
"install the package",
|
||||
])
|
||||
def test_non_recall_queries_skipped(self, query):
|
||||
assert is_recall_query(query) is False
|
||||
|
||||
|
||||
class TestLoadIdentity:
|
||||
def test_loads_existing_identity(self, tmp_path):
|
||||
identity_file = tmp_path / "identity.txt"
|
||||
identity_file.write_text("I am Timmy. A sovereign AI.")
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
|
||||
result = load_identity()
|
||||
assert "Timmy" in result
|
||||
|
||||
def test_returns_empty_on_missing_file(self, tmp_path):
|
||||
identity_file = tmp_path / "nonexistent.txt"
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
|
||||
result = load_identity()
|
||||
assert result == ""
|
||||
|
||||
def test_truncates_long_identity(self, tmp_path):
|
||||
identity_file = tmp_path / "identity.txt"
|
||||
identity_file.write_text(" ".join(["word"] * 300))
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
|
||||
result = load_identity()
|
||||
assert result.endswith("...")
|
||||
assert len(result.split()) <= 201 # 200 words + "..."
|
||||
|
||||
|
||||
class TestLoadScratchpad:
|
||||
def test_loads_valid_scratchpad(self, tmp_path):
|
||||
scratch_file = tmp_path / "session123.json"
|
||||
scratch_file.write_text(json.dumps({"note": "test value", "key2": 42}))
|
||||
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
|
||||
result = load_scratchpad("session123")
|
||||
assert "note: test value" in result
|
||||
assert "key2: 42" in result
|
||||
|
||||
def test_returns_empty_on_missing_file(self, tmp_path):
|
||||
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
|
||||
result = load_scratchpad("nonexistent")
|
||||
assert result == ""
|
||||
|
||||
def test_returns_empty_on_invalid_json(self, tmp_path):
|
||||
scratch_file = tmp_path / "bad.json"
|
||||
scratch_file.write_text("not valid json{{{")
|
||||
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
|
||||
result = load_scratchpad("bad")
|
||||
assert result == ""
|
||||
|
||||
|
||||
class TestEnforceRetrievalOrder:
|
||||
def test_skips_non_recall_query(self):
|
||||
result = enforce_retrieval_order("create a new file")
|
||||
assert result["retrieved_from"] is None
|
||||
assert result["tokens"] == 0
|
||||
|
||||
def test_runs_for_recall_query(self, tmp_path):
|
||||
identity_file = tmp_path / "identity.txt"
|
||||
identity_file.write_text("I am Timmy.")
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
|
||||
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
|
||||
result = enforce_retrieval_order("what did we work on yesterday")
|
||||
assert "Identity" in result["context"]
|
||||
assert "L0" in result["layers_checked"]
|
||||
|
||||
def test_palace_hit_sets_l1(self, tmp_path):
|
||||
identity_file = tmp_path / "identity.txt"
|
||||
identity_file.write_text("I am Timmy.")
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
|
||||
patch("mempalace.retrieval_enforcer.search_palace", return_value="Found: fleet audit results"), \
|
||||
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""):
|
||||
result = enforce_retrieval_order("what did we discuss yesterday")
|
||||
assert result["retrieved_from"] == "L1"
|
||||
assert "Palace Memory" in result["context"]
|
||||
|
||||
def test_falls_through_to_l5(self, tmp_path):
|
||||
identity_file = tmp_path / "nonexistent.txt"
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
|
||||
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
|
||||
result = enforce_retrieval_order("remember the old deployment", skip_if_not_recall=True)
|
||||
assert result["retrieved_from"] == "L5"
|
||||
|
||||
def test_force_mode_skips_recall_check(self, tmp_path):
|
||||
identity_file = tmp_path / "identity.txt"
|
||||
identity_file.write_text("I am Timmy.")
|
||||
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
|
||||
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
|
||||
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
|
||||
result = enforce_retrieval_order("deploy now", skip_if_not_recall=False)
|
||||
assert "Identity" in result["context"]
|
||||
108
hermes-sovereign/mempalace/tests/test_scratchpad.py
Normal file
108
hermes-sovereign/mempalace/tests/test_scratchpad.py
Normal file
@@ -0,0 +1,108 @@
|
||||
"""Tests for scratchpad.py.
|
||||
|
||||
Refs: Epic #367, Sub-issue #372
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
|
||||
from mempalace.scratchpad import (
|
||||
write_scratch,
|
||||
read_scratch,
|
||||
delete_scratch,
|
||||
list_sessions,
|
||||
clear_session,
|
||||
_scratch_path,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scratch_dir(tmp_path):
|
||||
"""Provide a temporary scratchpad directory."""
|
||||
with patch("mempalace.scratchpad.SCRATCHPAD_DIR", tmp_path):
|
||||
yield tmp_path
|
||||
|
||||
|
||||
class TestScratchPath:
|
||||
def test_sanitizes_session_id(self):
|
||||
path = _scratch_path("safe-id_123")
|
||||
assert "safe-id_123.json" in str(path)
|
||||
|
||||
def test_strips_dangerous_chars(self):
|
||||
path = _scratch_path("../../etc/passwd")
|
||||
assert ".." not in path.name
|
||||
assert "/" not in path.name
|
||||
# Dots are stripped, so only alphanumeric chars remain
|
||||
assert path.name == "etcpasswd.json"
|
||||
|
||||
|
||||
class TestWriteAndRead:
|
||||
def test_write_then_read(self, scratch_dir):
|
||||
write_scratch("sess1", "note", "hello world")
|
||||
result = read_scratch("sess1", "note")
|
||||
assert "note" in result
|
||||
assert result["note"]["value"] == "hello world"
|
||||
|
||||
def test_read_all_keys(self, scratch_dir):
|
||||
write_scratch("sess1", "a", 1)
|
||||
write_scratch("sess1", "b", 2)
|
||||
result = read_scratch("sess1")
|
||||
assert "a" in result
|
||||
assert "b" in result
|
||||
|
||||
def test_read_missing_key(self, scratch_dir):
|
||||
write_scratch("sess1", "exists", "yes")
|
||||
result = read_scratch("sess1", "missing")
|
||||
assert result == {}
|
||||
|
||||
def test_read_missing_session(self, scratch_dir):
|
||||
result = read_scratch("nonexistent")
|
||||
assert result == {}
|
||||
|
||||
def test_overwrite_key(self, scratch_dir):
|
||||
write_scratch("sess1", "key", "v1")
|
||||
write_scratch("sess1", "key", "v2")
|
||||
result = read_scratch("sess1", "key")
|
||||
assert result["key"]["value"] == "v2"
|
||||
|
||||
|
||||
class TestDelete:
|
||||
def test_delete_existing_key(self, scratch_dir):
|
||||
write_scratch("sess1", "key", "val")
|
||||
assert delete_scratch("sess1", "key") is True
|
||||
assert read_scratch("sess1", "key") == {}
|
||||
|
||||
def test_delete_missing_key(self, scratch_dir):
|
||||
write_scratch("sess1", "other", "val")
|
||||
assert delete_scratch("sess1", "missing") is False
|
||||
|
||||
|
||||
class TestListSessions:
|
||||
def test_lists_sessions(self, scratch_dir):
|
||||
write_scratch("alpha", "k", "v")
|
||||
write_scratch("beta", "k", "v")
|
||||
sessions = list_sessions()
|
||||
assert "alpha" in sessions
|
||||
assert "beta" in sessions
|
||||
|
||||
def test_empty_directory(self, scratch_dir):
|
||||
assert list_sessions() == []
|
||||
|
||||
|
||||
class TestClearSession:
|
||||
def test_clears_existing(self, scratch_dir):
|
||||
write_scratch("sess1", "k", "v")
|
||||
assert clear_session("sess1") is True
|
||||
assert read_scratch("sess1") == {}
|
||||
|
||||
def test_clear_nonexistent(self, scratch_dir):
|
||||
assert clear_session("ghost") is False
|
||||
100
hermes-sovereign/mempalace/tests/test_wakeup.py
Normal file
100
hermes-sovereign/mempalace/tests/test_wakeup.py
Normal file
@@ -0,0 +1,100 @@
|
||||
"""Tests for wakeup.py.
|
||||
|
||||
Refs: Epic #367, Sub-issue #372
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
|
||||
from mempalace.wakeup import (
|
||||
palace_wakeup,
|
||||
fleet_status_summary,
|
||||
_load_identity,
|
||||
_palace_context,
|
||||
)
|
||||
|
||||
|
||||
class TestLoadIdentity:
|
||||
def test_loads_identity(self, tmp_path):
|
||||
f = tmp_path / "identity.txt"
|
||||
f.write_text("I am Timmy. A sovereign AI.")
|
||||
with patch("mempalace.wakeup.IDENTITY_PATH", f):
|
||||
result = _load_identity()
|
||||
assert "Timmy" in result
|
||||
|
||||
def test_missing_identity(self, tmp_path):
|
||||
f = tmp_path / "nope.txt"
|
||||
with patch("mempalace.wakeup.IDENTITY_PATH", f):
|
||||
assert _load_identity() == ""
|
||||
|
||||
|
||||
class TestFleetStatus:
|
||||
def test_reads_fleet_json(self, tmp_path):
|
||||
f = tmp_path / "fleet_status.json"
|
||||
f.write_text(json.dumps({
|
||||
"Groq": {"state": "active", "last_seen": "2026-04-07"},
|
||||
"Ezra": {"state": "idle", "last_seen": "2026-04-06"},
|
||||
}))
|
||||
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
|
||||
result = fleet_status_summary()
|
||||
assert "Fleet Status" in result
|
||||
assert "Groq" in result
|
||||
assert "active" in result
|
||||
|
||||
def test_missing_fleet_file(self, tmp_path):
|
||||
f = tmp_path / "nope.json"
|
||||
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
|
||||
assert fleet_status_summary() == ""
|
||||
|
||||
def test_invalid_json(self, tmp_path):
|
||||
f = tmp_path / "bad.json"
|
||||
f.write_text("not json")
|
||||
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
|
||||
assert fleet_status_summary() == ""
|
||||
|
||||
|
||||
class TestPalaceWakeup:
|
||||
def test_generates_context_with_identity(self, tmp_path):
|
||||
identity = tmp_path / "identity.txt"
|
||||
identity.write_text("I am Timmy.")
|
||||
cache = tmp_path / "cache.txt"
|
||||
with patch("mempalace.wakeup.IDENTITY_PATH", identity), \
|
||||
patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
|
||||
patch("mempalace.wakeup._palace_context", return_value=""), \
|
||||
patch("mempalace.wakeup.fleet_status_summary", return_value=""):
|
||||
result = palace_wakeup(force=True)
|
||||
assert "Identity" in result
|
||||
assert "Timmy" in result
|
||||
assert "Session" in result
|
||||
|
||||
def test_uses_cache_when_fresh(self, tmp_path):
|
||||
cache = tmp_path / "cache.txt"
|
||||
cache.write_text("cached wake-up content")
|
||||
# Touch the file so it's fresh
|
||||
with patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
|
||||
patch("mempalace.wakeup.WAKEUP_CACHE_TTL", 9999):
|
||||
result = palace_wakeup(force=False)
|
||||
assert result == "cached wake-up content"
|
||||
|
||||
def test_force_bypasses_cache(self, tmp_path):
|
||||
cache = tmp_path / "cache.txt"
|
||||
cache.write_text("stale content")
|
||||
identity = tmp_path / "identity.txt"
|
||||
identity.write_text("I am Timmy.")
|
||||
with patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
|
||||
patch("mempalace.wakeup.IDENTITY_PATH", identity), \
|
||||
patch("mempalace.wakeup._palace_context", return_value=""), \
|
||||
patch("mempalace.wakeup.fleet_status_summary", return_value=""):
|
||||
result = palace_wakeup(force=True)
|
||||
assert "Identity" in result
|
||||
assert "stale content" not in result
|
||||
161
hermes-sovereign/mempalace/wakeup.py
Normal file
161
hermes-sovereign/mempalace/wakeup.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""Wake-up Protocol — session start context injection.
|
||||
|
||||
Generates 300-900 tokens of context when a new Hermes session starts.
|
||||
Loads identity, recent palace context, and fleet status.
|
||||
|
||||
Refs: Epic #367, Sub-issue #372
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
|
||||
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
|
||||
FLEET_STATUS_PATH = Path.home() / ".hermes" / "fleet_status.json"
|
||||
WAKEUP_CACHE_PATH = Path.home() / ".hermes" / "last_wakeup.txt"
|
||||
WAKEUP_CACHE_TTL = 300 # 5 minutes — don't regenerate if recent
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _load_identity() -> str:
|
||||
"""Read the agent identity file."""
|
||||
try:
|
||||
if IDENTITY_PATH.exists():
|
||||
text = IDENTITY_PATH.read_text(encoding="utf-8").strip()
|
||||
# Cap at ~150 tokens for wake-up brevity
|
||||
words = text.split()
|
||||
if len(words) > 150:
|
||||
text = " ".join(words[:150]) + "..."
|
||||
return text
|
||||
except (OSError, PermissionError):
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def _palace_context() -> str:
|
||||
"""Run mempalace wake-up command for recent context. Degrades gracefully."""
|
||||
try:
|
||||
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
|
||||
result = subprocess.run(
|
||||
[bin_path, "wake-up"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return result.stdout.strip()
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
|
||||
# ONNX issues (#373) or CLI not available — degrade gracefully
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def fleet_status_summary() -> str:
|
||||
"""Read cached fleet status for lightweight session context."""
|
||||
try:
|
||||
if FLEET_STATUS_PATH.exists():
|
||||
data = json.loads(FLEET_STATUS_PATH.read_text(encoding="utf-8"))
|
||||
lines = ["## Fleet Status"]
|
||||
|
||||
if isinstance(data, dict):
|
||||
for agent, status in data.items():
|
||||
if isinstance(status, dict):
|
||||
state = status.get("state", "unknown")
|
||||
last_seen = status.get("last_seen", "?")
|
||||
lines.append(f" {agent}: {state} (last: {last_seen})")
|
||||
else:
|
||||
lines.append(f" {agent}: {status}")
|
||||
|
||||
if len(lines) > 1:
|
||||
return "\n".join(lines)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def _check_cache() -> str:
|
||||
"""Return cached wake-up if fresh enough."""
|
||||
try:
|
||||
if WAKEUP_CACHE_PATH.exists():
|
||||
age = time.time() - WAKEUP_CACHE_PATH.stat().st_mtime
|
||||
if age < WAKEUP_CACHE_TTL:
|
||||
return WAKEUP_CACHE_PATH.read_text(encoding="utf-8").strip()
|
||||
except OSError:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def _write_cache(content: str) -> None:
|
||||
"""Cache the wake-up content."""
|
||||
try:
|
||||
WAKEUP_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
WAKEUP_CACHE_PATH.write_text(content, encoding="utf-8")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def palace_wakeup(force: bool = False) -> str:
|
||||
"""Generate wake-up context for a new session. ~300-900 tokens.
|
||||
|
||||
Args:
|
||||
force: If True, bypass the 5-minute cache and regenerate.
|
||||
|
||||
Returns:
|
||||
Formatted context string suitable for prepending to the system prompt.
|
||||
"""
|
||||
# Check cache first (avoids redundant work on rapid session restarts)
|
||||
if not force:
|
||||
cached = _check_cache()
|
||||
if cached:
|
||||
return cached
|
||||
|
||||
parts = []
|
||||
|
||||
# L0: Identity
|
||||
identity = _load_identity()
|
||||
if identity:
|
||||
parts.append(f"## Identity\n{identity}")
|
||||
|
||||
# L1: Recent palace context
|
||||
palace = _palace_context()
|
||||
if palace:
|
||||
parts.append(palace)
|
||||
|
||||
# Fleet status (lightweight)
|
||||
fleet = fleet_status_summary()
|
||||
if fleet:
|
||||
parts.append(fleet)
|
||||
|
||||
# Timestamp
|
||||
parts.append(f"## Session\nWake-up generated: {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
|
||||
content = "\n\n".join(parts)
|
||||
|
||||
# Cache for TTL
|
||||
_write_cache(content)
|
||||
|
||||
return content
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI entry point for testing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(palace_wakeup(force=True))
|
||||
Reference in New Issue
Block a user