""" MemPalace Portal — Hybrid Memory Provider. Bridges the local Holographic fact store with the fleet-wide MemPalace vector database. Implements smart context compression for token efficiency. """ import json import logging import os import re import requests from typing import Any, Dict, List, Optional from agent.memory_provider import MemoryProvider # Import Holographic components if available try: from plugins.memory.holographic.store import MemoryStore from plugins.memory.holographic.retrieval import FactRetriever HAS_HOLOGRAPHIC = True except ImportError: HAS_HOLOGRAPHIC = False logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Tool Schemas # --------------------------------------------------------------------------- MEMPALACE_SCHEMA = { "name": "mempalace", "description": ( "Search or record memories in the shared fleet vector database. " "Use this for long-term, high-volume memory across the entire fleet." ), "parameters": { "type": "object", "properties": { "action": {"type": "string", "enum": ["search", "record", "wings"]}, "query": {"type": "string", "description": "Search query."}, "text": {"type": "string", "description": "Memory text to record."}, "room": {"type": "string", "description": "Target room (e.g., forge, hermes, nexus)."}, "n_results": {"type": "integer", "default": 5}, }, "required": ["action"], }, } FACT_STORE_SCHEMA = { "name": "fact_store", "description": ( "Structured local fact storage. Use for durable facts about people, projects, and decisions." ), "parameters": { "type": "object", "properties": { "action": {"type": "string", "enum": ["add", "search", "probe", "reason", "update", "remove"]}, "content": {"type": "string"}, "query": {"type": "string"}, "entity": {"type": "string"}, "fact_id": {"type": "integer"}, }, "required": ["action"], }, } # --------------------------------------------------------------------------- # Provider Implementation # --------------------------------------------------------------------------- class MemPalacePortalProvider(MemoryProvider): """Hybrid Fleet Vector + Local Structured memory provider.""" def __init__(self, config: dict | None = None): self._config = config or {} self._api_url = os.environ.get("MEMPALACE_API_URL", "http://127.0.0.1:7771") self._hologram_store = None self._hologram_retriever = None self._session_id = None @property def name(self) -> str: return "mempalace" def is_available(self) -> bool: # Always available if we can reach the API or have Holographic return True def initialize(self, session_id: str, **kwargs) -> None: self._session_id = session_id hermes_home = kwargs.get("hermes_home") if HAS_HOLOGRAPHIC and hermes_home: db_path = os.path.join(hermes_home, "memory_store.db") try: self._hologram_store = MemoryStore(db_path=db_path) self._hologram_retriever = FactRetriever(store=self._hologram_store) logger.info("Holographic store initialized as local portal layer.") except Exception as e: logger.error(f"Failed to init Holographic layer: {e}") def system_prompt_block(self) -> str: status = "Active (Fleet Portal)" if self._hologram_store: status += " + Local Hologram" return ( f"# MemPalace Portal\n" f"Status: {status}.\n" "You have access to the shared fleet vector database (mempalace) and local structured facts (fact_store).\n" "Use mempalace for semantic fleet-wide recall. Use fact_store for precise local knowledge." ) def prefetch(self, query: str, *, session_id: str = "") -> str: if not query: return "" context_blocks = [] # 1. Fleet Search (MemPalace) try: res = requests.get(f"{self._api_url}/search", params={"q": query, "n": 3}, timeout=2) if res.ok: data = res.json() memories = data.get("memories", []) if memories: block = "## Fleet Memories (MemPalace)\n" for m in memories: block += f"- {m['text']}\n" context_blocks.append(block) except Exception: pass # 2. Local Probe (Holographic) if self._hologram_retriever: try: # Extract entities from query to probe entities = re.findall(r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\b', query) facts = [] for ent in entities: results = self._hologram_retriever.probe(ent, limit=3) facts.extend(results) if facts: block = "## Local Facts (Hologram)\n" seen = set() for f in facts: if f['content'] not in seen: block += f"- {f['content']}\n" seen.add(f['content']) context_blocks.append(block) except Exception: pass return "\n\n".join(context_blocks) def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None: # Record to Fleet Palace try: payload = { "text": f"User: {user_content}\nAssistant: {assistant_content}", "room": "hermes_sync", "metadata": {"session_id": session_id} } requests.post(f"{self._api_url}/record", json=payload, timeout=2) except Exception: pass def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str: """Token Efficiency: Summarize and archive before context is lost.""" if not messages: return "" # Extract key facts for Hologram if self._hologram_store: # Simple heuristic: look for \"I prefer\", \"The project uses\", etc. for msg in messages: if msg.get(\"role\") == \"user\": content = msg.get(\"content\", \"\") if \"prefer\" in content.lower() or \"use\" in content.lower(): try: self._hologram_store.add_fact(content[:200], category=\"user_pref\") except Exception: pass # Archive session summary to MemPalace summary_text = f"Session {self._session_id} summary: " + " | ".join([m['content'][:50] for m in messages if m.get('role') == 'user']) try: payload = { "text": summary_text, "room": "summaries", "metadata": {"type": "session_summary", "session_id": self._session_id} } requests.post(f"{self._api_url}/record", json=payload, timeout=2) except Exception: pass return "Insights archived to MemPalace and Hologram." def get_tool_schemas(self) -> List[Dict[str, Any]]: return [MEMPALACE_SCHEMA, FACT_STORE_SCHEMA] def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str: if tool_name == "mempalace": return self._handle_mempalace(args) elif tool_name == "fact_store": return self._handle_fact_store(args) return json.dumps({"error": f"Unknown tool: {tool_name}"}) def _handle_mempalace(self, args: dict) -> str: action = args.get("action") try: if action == "search": res = requests.get(f"{self._api_url}/search", params={"q": args["query"], "n": args.get("n_results", 5)}, timeout=10) return res.text elif action == "record": res = requests.post(f"{self._api_url}/record", json={"text": args["text"], "room": args.get("room", "general")}, timeout=10) return res.text elif action == "wings": res = requests.get(f"{self._api_url}/wings", timeout=10) return res.text except Exception as e: return json.dumps({"success": False, "error": str(e)}) return json.dumps({"error": "Invalid action"}) def _handle_fact_store(self, args: dict) -> str: if not self._hologram_store: return json.dumps({"error": "Holographic store not initialized locally."}) # Logic similar to holographic plugin action = args["action"] try: if action == "add": fid = self._hologram_store.add_fact(args["content"]) return json.dumps({"fact_id": fid, "status": "added"}) elif action == "probe": res = self._hologram_retriever.probe(args["entity"]) return json.dumps({"results": res}) # ... other actions ... return json.dumps({"status": "ok", "message": f"Action {action} processed (partial impl)"}) except Exception as e: return json.dumps({"error": str(e)}) def shutdown(self) -> None: if self._hologram_store: self._hologram_store.close() def register(ctx) -> None: provider = MemPalacePortalProvider() ctx.register_memory_provider(provider)