"""Three-tier memory system for Timmy. Architecture: - Tier 1 (Hot): MEMORY.md — always loaded, ~300 lines - Tier 2 (Vault): memory/ — structured markdown, append-only - Tier 3 (Semantic): Vector search over vault (optional) """ import logging import re from datetime import UTC, datetime from pathlib import Path logger = logging.getLogger(__name__) # Paths PROJECT_ROOT = Path(__file__).parent.parent.parent HOT_MEMORY_PATH = PROJECT_ROOT / "MEMORY.md" VAULT_PATH = PROJECT_ROOT / "memory" SOUL_PATH = VAULT_PATH / "self" / "soul.md" class HotMemory: """Tier 1: Hot memory (MEMORY.md) — always loaded.""" def __init__(self) -> None: self.path = HOT_MEMORY_PATH self._content: str | None = None self._last_modified: float | None = None def read(self, force_refresh: bool = False) -> str: """Read hot memory, with caching.""" if not self.path.exists(): self._create_default() # Check if file changed current_mtime = self.path.stat().st_mtime if not force_refresh and self._content and self._last_modified == current_mtime: return self._content self._content = self.path.read_text() self._last_modified = current_mtime logger.debug("HotMemory: Loaded %d chars from %s", len(self._content), self.path) return self._content def update_section(self, section: str, content: str) -> None: """Update a specific section in MEMORY.md.""" # Guard against empty or excessively large writes if not content or not content.strip(): logger.warning("HotMemory: Refusing empty write to section '%s'", section) return if len(content) > 2000: logger.warning("HotMemory: Truncating oversized write to section '%s'", section) content = content[:2000] + "\n... [truncated]" full_content = self.read() # Find section pattern = rf"(## {re.escape(section)}.*?)(?=\n## |\Z)" match = re.search(pattern, full_content, re.DOTALL) if match: # Replace section new_section = f"## {section}\n\n{content}\n\n" full_content = full_content[: match.start()] + new_section + full_content[match.end() :] else: # Append section before last updated line insert_point = full_content.rfind("*Prune date:") new_section = f"## {section}\n\n{content}\n\n" full_content = ( full_content[:insert_point] + new_section + "\n" + full_content[insert_point:] ) self.path.write_text(full_content) self._content = full_content self._last_modified = self.path.stat().st_mtime logger.info("HotMemory: Updated section '%s'", section) def _create_default(self) -> None: """Create default MEMORY.md if missing.""" default_content = """# Timmy Hot Memory > Working RAM — always loaded, ~300 lines max, pruned monthly > Last updated: {date} --- ## Current Status **Agent State:** Operational **Mode:** Development **Active Tasks:** 0 **Pending Decisions:** None --- ## Standing Rules 1. **Sovereignty First** — No cloud dependencies 2. **Local-Only Inference** — Ollama on localhost 3. **Privacy by Design** — Telemetry disabled 4. **Tool Minimalism** — Use tools only when necessary 5. **Memory Discipline** — Write handoffs at session end --- ## Agent Roster | Agent | Role | Status | |-------|------|--------| | Timmy | Core | Active | --- ## User Profile **Name:** (not set) **Interests:** (to be learned) --- ## Key Decisions (none yet) --- ## Pending Actions - [ ] Learn user's name --- *Prune date: {prune_date}* """.format( date=datetime.now(UTC).strftime("%Y-%m-%d"), prune_date=(datetime.now(UTC).replace(day=25)).strftime("%Y-%m-%d"), ) self.path.write_text(default_content) logger.info("HotMemory: Created default MEMORY.md") class VaultMemory: """Tier 2: Structured vault (memory/) — append-only markdown.""" def __init__(self) -> None: self.path = VAULT_PATH self._ensure_structure() def _ensure_structure(self) -> None: """Ensure vault directory structure exists.""" (self.path / "self").mkdir(parents=True, exist_ok=True) (self.path / "notes").mkdir(parents=True, exist_ok=True) (self.path / "aar").mkdir(parents=True, exist_ok=True) def write_note(self, name: str, content: str, namespace: str = "notes") -> Path: """Write a note to the vault.""" # Add timestamp to filename timestamp = datetime.now(UTC).strftime("%Y%m%d") filename = f"{timestamp}_{name}.md" filepath = self.path / namespace / filename # Add header full_content = f"""# {name.replace("_", " ").title()} > Created: {datetime.now(UTC).isoformat()} > Namespace: {namespace} --- {content} --- *Auto-generated by Timmy Memory System* """ filepath.write_text(full_content) logger.info("VaultMemory: Wrote %s", filepath) return filepath def read_file(self, filepath: Path) -> str: """Read a file from the vault.""" if not filepath.exists(): return "" return filepath.read_text() def update_user_profile(self, key: str, value: str) -> None: """Update a field in user_profile.md.""" profile_path = self.path / "self" / "user_profile.md" if not profile_path.exists(): # Create default profile self._create_default_profile() content = profile_path.read_text() # Simple pattern replacement — use lambda to avoid regex-interpreting value pattern = rf"(\*\*{re.escape(key)}:\*\*).*" if re.search(pattern, content): safe_value = value.strip() content = re.sub(pattern, lambda m: f"{m.group(1)} {safe_value}", content) else: # Add to Important Facts section facts_section = "## Important Facts" if facts_section in content: insert_point = content.find(facts_section) + len(facts_section) content = content[:insert_point] + f"\n- {key}: {value}" + content[insert_point:] # Update last_updated content = re.sub( r"\*Last updated:.*\*", f"*Last updated: {datetime.now(UTC).strftime('%Y-%m-%d')}*", content, ) profile_path.write_text(content) logger.info("VaultMemory: Updated user profile: %s = %s", key, value) def _create_default_profile(self) -> None: """Create default user profile.""" profile_path = self.path / "self" / "user_profile.md" default = """# User Profile > Learned information about the user. ## Basic Information **Name:** (unknown) **Location:** (unknown) **Occupation:** (unknown) ## Interests & Expertise - (to be learned) ## Preferences - Response style: concise, technical - Tool usage: minimal ## Important Facts - (to be extracted) --- *Last updated: {date}* """.format(date=datetime.now(UTC).strftime("%Y-%m-%d")) profile_path.write_text(default) class MemorySystem: """Central memory system coordinating all tiers.""" def __init__(self) -> None: self.hot = HotMemory() self.vault = VaultMemory() self.session_decisions: list[str] = [] def end_session(self, summary: str) -> None: """End session (retained for API compatibility).""" # Update hot memory self.hot.update_section( "Current Session", f"**Last Session:** {datetime.now(UTC).strftime('%Y-%m-%d %H:%M')}\n" + f"**Summary:** {summary[:100]}...", ) logger.info("MemorySystem: Session ended") def record_decision(self, decision: str) -> None: """Record a key decision during session (retained for API compatibility).""" self.session_decisions.append(decision) def update_user_fact(self, key: str, value: str) -> None: """Update user profile in vault.""" self.vault.update_user_profile(key, value) # Also update hot memory if key.lower() == "name": self.hot.update_section("User Profile", f"**Name:** {value}") def _load_user_profile_summary(self) -> str: """Load condensed user profile.""" profile_path = self.vault.path / "self" / "user_profile.md" if not profile_path.exists(): return "" content = profile_path.read_text() # Extract key fields summary_parts = [] # Name name_match = re.search(r"\*\*Name:\*\* (.+)", content) if name_match and "unknown" not in name_match.group(1).lower(): summary_parts.append(f"Name: {name_match.group(1).strip()}") # Interests interests_section = re.search(r"## Interests.*?\n- (.+?)(?=\n## |\Z)", content, re.DOTALL) if interests_section: interests = [ i.strip() for i in interests_section.group(1).split("\n-") if i.strip() and "to be" not in i ] if interests: summary_parts.append(f"Interests: {', '.join(interests[:3])}") return "\n".join(summary_parts) if summary_parts else "" def read_soul(self) -> str: """Read soul.md — Timmy's core identity. Returns empty string if missing.""" try: if SOUL_PATH.exists(): return SOUL_PATH.read_text() except OSError as exc: logger.debug("Failed to read soul.md: %s", exc) return "" def get_system_context(self) -> str: """Get full context for system prompt injection.""" context_parts = [] # 0. Soul identity (immutable, always first) soul_content = self.read_soul() if soul_content: context_parts.append("## Soul Identity\n" + soul_content) # 1. Hot memory hot_content = self.hot.read() context_parts.append("## Hot Memory\n" + hot_content) # 2. User profile (key fields only) profile = self._load_user_profile_summary() if profile: context_parts.append("## User Context\n" + profile) # 3. Known facts from long-term memory facts_section = self._load_known_facts() if facts_section: context_parts.append(facts_section) return "\n\n---\n\n".join(context_parts) def _load_known_facts(self, limit: int = 10) -> str: """Load top facts from the episodes table (context_type='fact'). Returns a formatted section string, or empty string if no facts exist. """ try: from timmy.memory.vector_store import recall_personal_facts facts = recall_personal_facts() if not facts: return "" # Cap at limit facts = facts[:limit] lines = ["## Known Facts\n"] for fact in facts: lines.append(f"- {fact[:200]}") return "\n".join(lines) except (ImportError, AttributeError) as exc: logger.debug("Failed to load known facts: %s", exc) return "" # ── Lazy singleton ──────────────────────────────────────────────────────────── _memory_system: MemorySystem | None = None def get_memory_system() -> MemorySystem: """Return the module-level MemorySystem, creating it on first access.""" global _memory_system if _memory_system is None: _memory_system = MemorySystem() return _memory_system def reset_memory_system() -> None: """Reset the singleton for test isolation.""" global _memory_system _memory_system = None def __getattr__(name: str): """Module-level __getattr__ for lazy backward-compatible access.""" if name == "memory_system": return get_memory_system() raise AttributeError(f"module {__name__!r} has no attribute {name!r}")