"""Three-tier memory system for Timmy. Architecture: - Tier 1 (Hot): MEMORY.md — always loaded, ~300 lines - Tier 2 (Vault): memory/ — structured markdown, append-only - Tier 3 (Semantic): Vector search over vault (optional) Handoff Protocol: - Write last-session-handoff.md at session end - Inject into next session automatically """ import logging import re from datetime import datetime, timezone from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) # Paths PROJECT_ROOT = Path(__file__).parent.parent.parent HOT_MEMORY_PATH = PROJECT_ROOT / "MEMORY.md" VAULT_PATH = PROJECT_ROOT / "memory" HANDOFF_PATH = VAULT_PATH / "notes" / "last-session-handoff.md" class HotMemory: """Tier 1: Hot memory (MEMORY.md) — always loaded.""" def __init__(self) -> None: self.path = HOT_MEMORY_PATH self._content: Optional[str] = None self._last_modified: Optional[float] = None def read(self, force_refresh: bool = False) -> str: """Read hot memory, with caching.""" if not self.path.exists(): self._create_default() # Check if file changed current_mtime = self.path.stat().st_mtime if not force_refresh and self._content and self._last_modified == current_mtime: return self._content self._content = self.path.read_text() self._last_modified = current_mtime logger.debug("HotMemory: Loaded %d chars from %s", len(self._content), self.path) return self._content def update_section(self, section: str, content: str) -> None: """Update a specific section in MEMORY.md.""" full_content = self.read() # Find section pattern = rf"(## {re.escape(section)}.*?)(?=\n## |\Z)" match = re.search(pattern, full_content, re.DOTALL) if match: # Replace section new_section = f"## {section}\n\n{content}\n\n" full_content = full_content[: match.start()] + new_section + full_content[match.end() :] else: # Append section before last updated line insert_point = full_content.rfind("*Prune date:") new_section = f"## {section}\n\n{content}\n\n" full_content = ( full_content[:insert_point] + new_section + "\n" + full_content[insert_point:] ) self.path.write_text(full_content) self._content = full_content self._last_modified = self.path.stat().st_mtime logger.info("HotMemory: Updated section '%s'", section) def _create_default(self) -> None: """Create default MEMORY.md if missing.""" default_content = """# Timmy Hot Memory > Working RAM — always loaded, ~300 lines max, pruned monthly > Last updated: {date} --- ## Current Status **Agent State:** Operational **Mode:** Development **Active Tasks:** 0 **Pending Decisions:** None --- ## Standing Rules 1. **Sovereignty First** — No cloud dependencies 2. **Local-Only Inference** — Ollama on localhost 3. **Privacy by Design** — Telemetry disabled 4. **Tool Minimalism** — Use tools only when necessary 5. **Memory Discipline** — Write handoffs at session end --- ## Agent Roster | Agent | Role | Status | |-------|------|--------| | Timmy | Core | Active | --- ## User Profile **Name:** (not set) **Interests:** (to be learned) --- ## Key Decisions (none yet) --- ## Pending Actions - [ ] Learn user's name --- *Prune date: {prune_date}* """.format( date=datetime.now(timezone.utc).strftime("%Y-%m-%d"), prune_date=(datetime.now(timezone.utc).replace(day=25)).strftime("%Y-%m-%d"), ) self.path.write_text(default_content) logger.info("HotMemory: Created default MEMORY.md") class VaultMemory: """Tier 2: Structured vault (memory/) — append-only markdown.""" def __init__(self) -> None: self.path = VAULT_PATH self._ensure_structure() def _ensure_structure(self) -> None: """Ensure vault directory structure exists.""" (self.path / "self").mkdir(parents=True, exist_ok=True) (self.path / "notes").mkdir(parents=True, exist_ok=True) (self.path / "aar").mkdir(parents=True, exist_ok=True) def write_note(self, name: str, content: str, namespace: str = "notes") -> Path: """Write a note to the vault.""" # Add timestamp to filename timestamp = datetime.now(timezone.utc).strftime("%Y%m%d") filename = f"{timestamp}_{name}.md" filepath = self.path / namespace / filename # Add header full_content = f"""# {name.replace("_", " ").title()} > Created: {datetime.now(timezone.utc).isoformat()} > Namespace: {namespace} --- {content} --- *Auto-generated by Timmy Memory System* """ filepath.write_text(full_content) logger.info("VaultMemory: Wrote %s", filepath) return filepath def read_file(self, filepath: Path) -> str: """Read a file from the vault.""" if not filepath.exists(): return "" return filepath.read_text() def list_files(self, namespace: str = "notes", pattern: str = "*.md") -> list[Path]: """List files in a namespace.""" dir_path = self.path / namespace if not dir_path.exists(): return [] return sorted(dir_path.glob(pattern)) def get_latest(self, namespace: str = "notes", pattern: str = "*.md") -> Optional[Path]: """Get most recent file in namespace.""" files = self.list_files(namespace, pattern) return files[-1] if files else None def update_user_profile(self, key: str, value: str) -> None: """Update a field in user_profile.md.""" profile_path = self.path / "self" / "user_profile.md" if not profile_path.exists(): # Create default profile self._create_default_profile() content = profile_path.read_text() # Simple pattern replacement pattern = rf"(\*\*{re.escape(key)}:\*\*).*" if re.search(pattern, content): content = re.sub(pattern, rf"\1 {value}", content) else: # Add to Important Facts section facts_section = "## Important Facts" if facts_section in content: insert_point = content.find(facts_section) + len(facts_section) content = content[:insert_point] + f"\n- {key}: {value}" + content[insert_point:] # Update last_updated content = re.sub( r"\*Last updated:.*\*", f"*Last updated: {datetime.now(timezone.utc).strftime('%Y-%m-%d')}*", content, ) profile_path.write_text(content) logger.info("VaultMemory: Updated user profile: %s = %s", key, value) def _create_default_profile(self) -> None: """Create default user profile.""" profile_path = self.path / "self" / "user_profile.md" default = """# User Profile > Learned information about the user. ## Basic Information **Name:** (unknown) **Location:** (unknown) **Occupation:** (unknown) ## Interests & Expertise - (to be learned) ## Preferences - Response style: concise, technical - Tool usage: minimal ## Important Facts - (to be extracted) --- *Last updated: {date}* """.format(date=datetime.now(timezone.utc).strftime("%Y-%m-%d")) profile_path.write_text(default) class HandoffProtocol: """Session handoff protocol for continuity.""" def __init__(self) -> None: self.path = HANDOFF_PATH self.vault = VaultMemory() def write_handoff( self, session_summary: str, key_decisions: list[str], open_items: list[str], next_steps: list[str], ) -> None: """Write handoff at session end.""" content = f"""# Last Session Handoff **Session End:** {datetime.now(timezone.utc).isoformat()} **Duration:** (calculated on read) ## Summary {session_summary} ## Key Decisions {chr(10).join(f"- {d}" for d in key_decisions) if key_decisions else "- (none)"} ## Open Items {chr(10).join(f"- [ ] {i}" for i in open_items) if open_items else "- (none)"} ## Next Steps {chr(10).join(f"- {s}" for s in next_steps) if next_steps else "- (none)"} ## Context for Next Session The user was last working on: {session_summary[:200]}... --- *This handoff will be auto-loaded at next session start* """ self.path.write_text(content) # Also archive to notes self.vault.write_note("session_handoff", content, namespace="notes") logger.info( "HandoffProtocol: Wrote handoff with %d decisions, %d open items", len(key_decisions), len(open_items), ) def read_handoff(self) -> Optional[str]: """Read handoff if exists.""" if not self.path.exists(): return None return self.path.read_text() def clear_handoff(self) -> None: """Clear handoff after loading.""" if self.path.exists(): self.path.unlink() logger.debug("HandoffProtocol: Cleared handoff") class MemorySystem: """Central memory system coordinating all tiers.""" def __init__(self) -> None: self.hot = HotMemory() self.vault = VaultMemory() self.handoff = HandoffProtocol() self.session_start_time: Optional[datetime] = None self.session_decisions: list[str] = [] self.session_open_items: list[str] = [] def start_session(self) -> str: """Start a new session, loading context from memory.""" self.session_start_time = datetime.now(timezone.utc) # Build context context_parts = [] # 1. Hot memory hot_content = self.hot.read() context_parts.append("## Hot Memory\n" + hot_content) # 2. Last session handoff handoff_content = self.handoff.read_handoff() if handoff_content: context_parts.append("## Previous Session\n" + handoff_content) self.handoff.clear_handoff() # 3. User profile (key fields only) profile = self._load_user_profile_summary() if profile: context_parts.append("## User Context\n" + profile) full_context = "\n\n---\n\n".join(context_parts) logger.info("MemorySystem: Session started with %d chars context", len(full_context)) return full_context def end_session(self, summary: str) -> None: """End session, write handoff.""" self.handoff.write_handoff( session_summary=summary, key_decisions=self.session_decisions, open_items=self.session_open_items, next_steps=[], ) # Update hot memory self.hot.update_section( "Current Session", f"**Last Session:** {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')}\n" + f"**Summary:** {summary[:100]}...", ) logger.info("MemorySystem: Session ended, handoff written") def record_decision(self, decision: str) -> None: """Record a key decision during session.""" self.session_decisions.append(decision) # Also add to hot memory current = self.hot.read() if "## Key Decisions" in current: # Append to section pass # Handled at session end def record_open_item(self, item: str) -> None: """Record an open item for follow-up.""" self.session_open_items.append(item) def update_user_fact(self, key: str, value: str) -> None: """Update user profile in vault.""" self.vault.update_user_profile(key, value) # Also update hot memory if key.lower() == "name": self.hot.update_section("User Profile", f"**Name:** {value}") def _load_user_profile_summary(self) -> str: """Load condensed user profile.""" profile_path = self.vault.path / "self" / "user_profile.md" if not profile_path.exists(): return "" content = profile_path.read_text() # Extract key fields summary_parts = [] # Name name_match = re.search(r"\*\*Name:\*\* (.+)", content) if name_match and "unknown" not in name_match.group(1).lower(): summary_parts.append(f"Name: {name_match.group(1).strip()}") # Interests interests_section = re.search(r"## Interests.*?\n- (.+?)(?=\n## |\Z)", content, re.DOTALL) if interests_section: interests = [ i.strip() for i in interests_section.group(1).split("\n-") if i.strip() and "to be" not in i ] if interests: summary_parts.append(f"Interests: {', '.join(interests[:3])}") return "\n".join(summary_parts) if summary_parts else "" def get_system_context(self) -> str: """Get full context for system prompt injection. Unlike start_session(), this does NOT clear the handoff. Safe to call multiple times without data loss. """ context_parts = [] # 1. Hot memory hot_content = self.hot.read() context_parts.append("## Hot Memory\n" + hot_content) # 2. Last session handoff (read-only, do NOT clear) handoff_content = self.handoff.read_handoff() if handoff_content: context_parts.append("## Previous Session\n" + handoff_content) # 3. User profile (key fields only) profile = self._load_user_profile_summary() if profile: context_parts.append("## User Context\n" + profile) return "\n\n---\n\n".join(context_parts) # ── Lazy singleton ──────────────────────────────────────────────────────────── _memory_system: MemorySystem | None = None def get_memory_system() -> MemorySystem: """Return the module-level MemorySystem, creating it on first access.""" global _memory_system if _memory_system is None: _memory_system = MemorySystem() return _memory_system def reset_memory_system() -> None: """Reset the singleton for test isolation.""" global _memory_system _memory_system = None def __getattr__(name: str): """Module-level __getattr__ for lazy backward-compatible access.""" if name == "memory_system": return get_memory_system() raise AttributeError(f"module {__name__!r} has no attribute {name!r}")