397 lines
12 KiB
Python
397 lines
12 KiB
Python
"""
|
|
agent.memory — Cross-session agent memory via MemPalace.
|
|
|
|
Gives agents persistent memory across sessions. On wake-up, agents
|
|
recall relevant context from past sessions. On session end, they
|
|
write a diary entry summarizing what happened.
|
|
|
|
Architecture:
|
|
Session Start → memory.recall_context() → inject L0/L1 into prompt
|
|
During Session → memory.remember() → store important facts
|
|
Session End → memory.write_diary() → summarize session
|
|
|
|
All operations degrade gracefully — if MemPalace is unavailable,
|
|
the agent continues without memory and logs a warning.
|
|
|
|
Usage:
|
|
from agent.memory import AgentMemory
|
|
|
|
mem = AgentMemory(agent_name="bezalel", wing="wing_bezalel")
|
|
|
|
# Session start — load context
|
|
context = mem.recall_context("What was I working on last time?")
|
|
|
|
# During session — store important decisions
|
|
mem.remember("Switched CI runner from GitHub Actions to self-hosted", room="forge")
|
|
|
|
# Session end — write diary
|
|
mem.write_diary("Fixed PR #1386, reconciled fleet registry locations")
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger("agent.memory")
|
|
|
|
|
|
@dataclass
|
|
class MemoryContext:
|
|
"""Context loaded at session start from MemPalace."""
|
|
relevant_memories: list[dict] = field(default_factory=list)
|
|
recent_diaries: list[dict] = field(default_factory=list)
|
|
facts: list[dict] = field(default_factory=list)
|
|
loaded: bool = False
|
|
error: Optional[str] = None
|
|
|
|
def to_prompt_block(self) -> str:
|
|
"""Format context as a text block to inject into the agent prompt."""
|
|
if not self.loaded:
|
|
return ""
|
|
|
|
parts = []
|
|
|
|
if self.recent_diaries:
|
|
parts.append("=== Recent Session Summaries ===")
|
|
for d in self.recent_diaries[:3]:
|
|
ts = d.get("timestamp", "")
|
|
text = d.get("text", "")
|
|
parts.append(f"[{ts}] {text[:500]}")
|
|
|
|
if self.facts:
|
|
parts.append("\n=== Known Facts ===")
|
|
for f in self.facts[:10]:
|
|
text = f.get("text", "")
|
|
parts.append(f"- {text[:200]}")
|
|
|
|
if self.relevant_memories:
|
|
parts.append("\n=== Relevant Past Memories ===")
|
|
for m in self.relevant_memories[:5]:
|
|
text = m.get("text", "")
|
|
score = m.get("score", 0)
|
|
parts.append(f"[{score:.2f}] {text[:300]}")
|
|
|
|
if not parts:
|
|
return ""
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
@dataclass
|
|
class SessionTranscript:
|
|
"""A running log of the current session for diary writing."""
|
|
agent_name: str
|
|
wing: str
|
|
started_at: str = field(
|
|
default_factory=lambda: datetime.now(timezone.utc).isoformat()
|
|
)
|
|
entries: list[dict] = field(default_factory=list)
|
|
|
|
def add_user_turn(self, text: str):
|
|
self.entries.append({
|
|
"role": "user",
|
|
"text": text[:2000],
|
|
"ts": time.time(),
|
|
})
|
|
|
|
def add_agent_turn(self, text: str):
|
|
self.entries.append({
|
|
"role": "agent",
|
|
"text": text[:2000],
|
|
"ts": time.time(),
|
|
})
|
|
|
|
def add_tool_call(self, tool: str, args: str, result_summary: str):
|
|
self.entries.append({
|
|
"role": "tool",
|
|
"tool": tool,
|
|
"args": args[:500],
|
|
"result": result_summary[:500],
|
|
"ts": time.time(),
|
|
})
|
|
|
|
def summary(self) -> str:
|
|
"""Generate a compact transcript summary."""
|
|
if not self.entries:
|
|
return "Empty session."
|
|
|
|
turns = []
|
|
for e in self.entries[-20:]: # last 20 entries
|
|
role = e["role"]
|
|
if role == "user":
|
|
turns.append(f"USER: {e['text'][:200]}")
|
|
elif role == "agent":
|
|
turns.append(f"AGENT: {e['text'][:200]}")
|
|
elif role == "tool":
|
|
turns.append(f"TOOL({e.get('tool','')}): {e.get('result','')[:150]}")
|
|
|
|
return "\n".join(turns)
|
|
|
|
|
|
class AgentMemory:
|
|
"""
|
|
Cross-session memory for an agent.
|
|
|
|
Wraps MemPalace with agent-specific conventions:
|
|
- Each agent has a wing (e.g., "wing_bezalel")
|
|
- Session summaries go in the "hermes" room
|
|
- Important decisions go in room-specific closets
|
|
- Facts go in the "nexus" room
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
agent_name: str,
|
|
wing: Optional[str] = None,
|
|
palace_path: Optional[Path] = None,
|
|
):
|
|
self.agent_name = agent_name
|
|
self.wing = wing or f"wing_{agent_name}"
|
|
self.palace_path = palace_path
|
|
self._transcript: Optional[SessionTranscript] = None
|
|
self._available: Optional[bool] = None
|
|
|
|
def _check_available(self) -> bool:
|
|
"""Check if MemPalace is accessible."""
|
|
if self._available is not None:
|
|
return self._available
|
|
|
|
try:
|
|
from nexus.mempalace.searcher import search_memories, add_memory, _get_client
|
|
from nexus.mempalace.config import MEMPALACE_PATH
|
|
|
|
path = self.palace_path or MEMPALACE_PATH
|
|
_get_client(path)
|
|
self._available = True
|
|
logger.info(f"MemPalace available at {path}")
|
|
except Exception as e:
|
|
self._available = False
|
|
logger.warning(f"MemPalace unavailable: {e}")
|
|
|
|
return self._available
|
|
|
|
def recall_context(
|
|
self,
|
|
query: Optional[str] = None,
|
|
n_results: int = 5,
|
|
) -> MemoryContext:
|
|
"""
|
|
Load relevant context from past sessions.
|
|
|
|
Called at session start to inject L0/L1 memory into the prompt.
|
|
|
|
Args:
|
|
query: What to search for. If None, loads recent diary entries.
|
|
n_results: Max memories to recall.
|
|
"""
|
|
ctx = MemoryContext()
|
|
|
|
if not self._check_available():
|
|
ctx.error = "MemPalace unavailable"
|
|
return ctx
|
|
|
|
try:
|
|
from nexus.mempalace.searcher import search_memories
|
|
|
|
# Load recent diary entries (session summaries)
|
|
ctx.recent_diaries = [
|
|
{"text": r.text, "score": r.score, "timestamp": r.metadata.get("timestamp", "")}
|
|
for r in search_memories(
|
|
"session summary",
|
|
palace_path=self.palace_path,
|
|
wing=self.wing,
|
|
room="hermes",
|
|
n_results=3,
|
|
)
|
|
]
|
|
|
|
# Load known facts
|
|
ctx.facts = [
|
|
{"text": r.text, "score": r.score}
|
|
for r in search_memories(
|
|
"important facts decisions",
|
|
palace_path=self.palace_path,
|
|
wing=self.wing,
|
|
room="nexus",
|
|
n_results=5,
|
|
)
|
|
]
|
|
|
|
# Search for relevant memories if query provided
|
|
if query:
|
|
ctx.relevant_memories = [
|
|
{"text": r.text, "score": r.score, "room": r.room}
|
|
for r in search_memories(
|
|
query,
|
|
palace_path=self.palace_path,
|
|
wing=self.wing,
|
|
n_results=n_results,
|
|
)
|
|
]
|
|
|
|
ctx.loaded = True
|
|
|
|
except Exception as e:
|
|
ctx.error = str(e)
|
|
logger.warning(f"Failed to recall context: {e}")
|
|
|
|
return ctx
|
|
|
|
def remember(
|
|
self,
|
|
text: str,
|
|
room: str = "nexus",
|
|
source_file: str = "",
|
|
metadata: Optional[dict] = None,
|
|
) -> Optional[str]:
|
|
"""
|
|
Store a memory.
|
|
|
|
Args:
|
|
text: The memory content.
|
|
room: Target room (forge, hermes, nexus, issues, experiments).
|
|
source_file: Optional source attribution.
|
|
metadata: Extra metadata.
|
|
|
|
Returns:
|
|
Document ID if stored, None if MemPalace unavailable.
|
|
"""
|
|
if not self._check_available():
|
|
logger.warning("Cannot store memory — MemPalace unavailable")
|
|
return None
|
|
|
|
try:
|
|
from nexus.mempalace.searcher import add_memory
|
|
|
|
doc_id = add_memory(
|
|
text=text,
|
|
room=room,
|
|
wing=self.wing,
|
|
palace_path=self.palace_path,
|
|
source_file=source_file,
|
|
extra_metadata=metadata or {},
|
|
)
|
|
logger.debug(f"Stored memory in {room}: {text[:80]}...")
|
|
return doc_id
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to store memory: {e}")
|
|
return None
|
|
|
|
def write_diary(
|
|
self,
|
|
summary: Optional[str] = None,
|
|
) -> Optional[str]:
|
|
"""
|
|
Write a session diary entry to MemPalace.
|
|
|
|
Called at session end. If summary is None, auto-generates one
|
|
from the session transcript.
|
|
|
|
Args:
|
|
summary: Override summary text. If None, generates from transcript.
|
|
|
|
Returns:
|
|
Document ID if stored, None if unavailable.
|
|
"""
|
|
if summary is None and self._transcript:
|
|
summary = self._transcript.summary()
|
|
|
|
if not summary:
|
|
return None
|
|
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
diary_text = f"[{timestamp}] Session by {self.agent_name}:\n{summary}"
|
|
|
|
return self.remember(
|
|
diary_text,
|
|
room="hermes",
|
|
metadata={
|
|
"type": "session_diary",
|
|
"agent": self.agent_name,
|
|
"timestamp": timestamp,
|
|
"entry_count": len(self._transcript.entries) if self._transcript else 0,
|
|
},
|
|
)
|
|
|
|
def start_session(self) -> SessionTranscript:
|
|
"""
|
|
Begin a new session transcript.
|
|
|
|
Returns the transcript object for recording turns.
|
|
"""
|
|
self._transcript = SessionTranscript(
|
|
agent_name=self.agent_name,
|
|
wing=self.wing,
|
|
)
|
|
logger.info(f"Session started for {self.agent_name}")
|
|
return self._transcript
|
|
|
|
def end_session(self, diary_summary: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
End the current session, write diary, return diary doc ID.
|
|
"""
|
|
doc_id = self.write_diary(diary_summary)
|
|
self._transcript = None
|
|
logger.info(f"Session ended for {self.agent_name}")
|
|
return doc_id
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
room: Optional[str] = None,
|
|
n_results: int = 5,
|
|
) -> list[dict]:
|
|
"""
|
|
Search memories. Useful during a session for recall.
|
|
|
|
Returns list of {text, room, wing, score} dicts.
|
|
"""
|
|
if not self._check_available():
|
|
return []
|
|
|
|
try:
|
|
from nexus.mempalace.searcher import search_memories
|
|
|
|
results = search_memories(
|
|
query,
|
|
palace_path=self.palace_path,
|
|
wing=self.wing,
|
|
room=room,
|
|
n_results=n_results,
|
|
)
|
|
return [
|
|
{"text": r.text, "room": r.room, "wing": r.wing, "score": r.score}
|
|
for r in results
|
|
]
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Search failed: {e}")
|
|
return []
|
|
|
|
|
|
# --- Fleet-wide memory helpers ---
|
|
|
|
def create_agent_memory(
|
|
agent_name: str,
|
|
palace_path: Optional[Path] = None,
|
|
) -> AgentMemory:
|
|
"""
|
|
Factory for creating AgentMemory with standard config.
|
|
|
|
Reads wing from MEMPALACE_WING env or defaults to wing_{agent_name}.
|
|
"""
|
|
wing = os.environ.get("MEMPALACE_WING", f"wing_{agent_name}")
|
|
return AgentMemory(
|
|
agent_name=agent_name,
|
|
wing=wing,
|
|
palace_path=palace_path,
|
|
)
|