"""Consolidated memory system for Timmy — single source of truth. Architecture: - Database: Single `memories` table with unified schema - Embeddings: Local sentence-transformers with hash fallback - CRUD: store_memory, search_memories, delete_memory, etc. - Tool functions: memory_search, memory_read, memory_write, memory_forget - Classes: HotMemory, VaultMemory, MemorySystem, SemanticMemory, MemorySearcher """ import hashlib import json import logging import math import re import sqlite3 import uuid from collections.abc import Generator from contextlib import closing, contextmanager from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from pathlib import Path logger = logging.getLogger(__name__) # Paths PROJECT_ROOT = Path(__file__).parent.parent.parent HOT_MEMORY_PATH = PROJECT_ROOT / "MEMORY.md" VAULT_PATH = PROJECT_ROOT / "memory" SOUL_PATH = VAULT_PATH / "self" / "soul.md" DB_PATH = PROJECT_ROOT / "data" / "memory.db" # Embedding model - small, fast, local EMBEDDING_MODEL = None EMBEDDING_DIM = 384 # MiniLM dimension # ─────────────────────────────────────────────────────────────────────────────── # Embedding Functions # ─────────────────────────────────────────────────────────────────────────────── def _get_embedding_model(): """Lazy-load embedding model.""" global EMBEDDING_MODEL if EMBEDDING_MODEL is None: try: from config import settings if settings.timmy_skip_embeddings: EMBEDDING_MODEL = False return EMBEDDING_MODEL except ImportError: pass try: from sentence_transformers import SentenceTransformer EMBEDDING_MODEL = SentenceTransformer("all-MiniLM-L6-v2") logger.info("MemorySystem: Loaded embedding model") except ImportError: logger.warning("MemorySystem: sentence-transformers not installed, using fallback") EMBEDDING_MODEL = False # Use fallback return EMBEDDING_MODEL def _simple_hash_embedding(text: str) -> list[float]: """Fallback: Simple hash-based embedding when transformers unavailable.""" words = text.lower().split() vec = [0.0] * 128 for i, word in enumerate(words[:50]): # First 50 words h = hashlib.md5(word.encode()).hexdigest() for j in range(8): idx = (i * 8 + j) % 128 vec[idx] += int(h[j * 2 : j * 2 + 2], 16) / 255.0 # Normalize mag = math.sqrt(sum(x * x for x in vec)) or 1.0 return [x / mag for x in vec] def embed_text(text: str) -> list[float]: """Generate embedding for text.""" model = _get_embedding_model() if model and model is not False: embedding = model.encode(text) return embedding.tolist() return _simple_hash_embedding(text) def cosine_similarity(a: list[float], b: list[float]) -> float: """Calculate cosine similarity between two vectors.""" dot = sum(x * y for x, y in zip(a, b, strict=False)) mag_a = math.sqrt(sum(x * x for x in a)) mag_b = math.sqrt(sum(x * x for x in b)) if mag_a == 0 or mag_b == 0: return 0.0 return dot / (mag_a * mag_b) # Alias for backward compatibility _cosine_similarity = cosine_similarity def _keyword_overlap(query: str, content: str) -> float: """Simple keyword overlap score as fallback.""" query_words = set(query.lower().split()) content_words = set(content.lower().split()) if not query_words: return 0.0 overlap = len(query_words & content_words) return overlap / len(query_words) # ─────────────────────────────────────────────────────────────────────────────── # Database Connection # ─────────────────────────────────────────────────────────────────────────────── @contextmanager def get_connection() -> Generator[sqlite3.Connection, None, None]: """Get database connection to unified memory database.""" DB_PATH.parent.mkdir(parents=True, exist_ok=True) with closing(sqlite3.connect(str(DB_PATH))) as conn: conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=5000") _ensure_schema(conn) yield conn def _ensure_schema(conn: sqlite3.Connection) -> None: """Create the unified memories table and indexes if they don't exist.""" conn.execute(""" CREATE TABLE IF NOT EXISTS memories ( id TEXT PRIMARY KEY, content TEXT NOT NULL, memory_type TEXT NOT NULL DEFAULT 'fact', source TEXT NOT NULL DEFAULT 'agent', embedding TEXT, metadata TEXT, source_hash TEXT, agent_id TEXT, task_id TEXT, session_id TEXT, confidence REAL NOT NULL DEFAULT 0.8, tags TEXT NOT NULL DEFAULT '[]', created_at TEXT NOT NULL, last_accessed TEXT, access_count INTEGER NOT NULL DEFAULT 0 ) """) # Create indexes for efficient querying conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_session ON memories(session_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(agent_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") conn.commit() # Run migration if needed _migrate_schema(conn) def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]: """Get the column names for a table.""" cursor = conn.execute(f"PRAGMA table_info({table_name})") return {row[1] for row in cursor.fetchall()} def _migrate_schema(conn: sqlite3.Connection) -> None: """Migrate from old three-table schema to unified memories table. Migration paths: - episodes table -> memories (context_type -> memory_type) - chunks table -> memories with memory_type='vault_chunk' - facts table -> dropped (unused, 0 rows expected) """ cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = {row[0] for row in cursor.fetchall()} has_memories = "memories" in tables has_episodes = "episodes" in tables has_chunks = "chunks" in tables has_facts = "facts" in tables # Check if we need to migrate (old schema exists) if not has_memories and (has_episodes or has_chunks or has_facts): logger.info("Migration: Creating unified memories table") # Schema will be created by _ensure_schema above # Migrate episodes -> memories if has_episodes and has_memories: logger.info("Migration: Converting episodes table to memories") try: cols = _get_table_columns(conn, "episodes") context_type_col = "context_type" if "context_type" in cols else "'conversation'" conn.execute(f""" INSERT INTO memories ( id, content, memory_type, source, embedding, metadata, agent_id, task_id, session_id, created_at, access_count, last_accessed ) SELECT id, content, COALESCE({context_type_col}, 'conversation'), COALESCE(source, 'agent'), embedding, metadata, agent_id, task_id, session_id, COALESCE(timestamp, datetime('now')), 0, NULL FROM episodes """) conn.execute("DROP TABLE episodes") logger.info("Migration: Migrated episodes to memories") except sqlite3.Error as exc: logger.warning("Migration: Failed to migrate episodes: %s", exc) # Migrate chunks -> memories as vault_chunk if has_chunks and has_memories: logger.info("Migration: Converting chunks table to memories") try: cols = _get_table_columns(conn, "chunks") id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)" content_col = "content" if "content" in cols else "text" source_col = ( "filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'") ) embedding_col = "embedding" if "embedding" in cols else "NULL" created_col = "created_at" if "created_at" in cols else "datetime('now')" conn.execute(f""" INSERT INTO memories ( id, content, memory_type, source, embedding, created_at, access_count ) SELECT {id_col}, {content_col}, 'vault_chunk', {source_col}, {embedding_col}, {created_col}, 0 FROM chunks """) conn.execute("DROP TABLE chunks") logger.info("Migration: Migrated chunks to memories") except sqlite3.Error as exc: logger.warning("Migration: Failed to migrate chunks: %s", exc) # Drop old tables if has_facts: try: conn.execute("DROP TABLE facts") logger.info("Migration: Dropped old facts table") except sqlite3.Error as exc: logger.warning("Migration: Failed to drop facts: %s", exc) conn.commit() # Alias for backward compatibility get_conn = get_connection # ─────────────────────────────────────────────────────────────────────────────── # Data Classes # ─────────────────────────────────────────────────────────────────────────────── @dataclass class MemoryEntry: """A memory entry with vector embedding. Note: The DB column is `memory_type` but this field is named `context_type` for backward API compatibility. """ id: str = field(default_factory=lambda: str(uuid.uuid4())) content: str = "" # The actual text content source: str = "" # Where it came from (agent, user, system) context_type: str = "conversation" # API field name; DB column is memory_type agent_id: str | None = None task_id: str | None = None session_id: str | None = None metadata: dict | None = None embedding: list[float] | None = None timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) relevance_score: float | None = None # Set during search @dataclass class MemoryChunk: """A searchable chunk of memory.""" id: str source: str # filepath content: str embedding: list[float] created_at: str # ─────────────────────────────────────────────────────────────────────────────── # CRUD Operations # ─────────────────────────────────────────────────────────────────────────────── def store_memory( content: str, source: str, context_type: str = "conversation", agent_id: str | None = None, task_id: str | None = None, session_id: str | None = None, metadata: dict | None = None, compute_embedding: bool = True, ) -> MemoryEntry: """Store a memory entry with optional embedding. Args: content: The text content to store source: Source of the memory (agent name, user, system) context_type: Type of context (conversation, document, fact, vault_chunk) agent_id: Associated agent ID task_id: Associated task ID session_id: Session identifier metadata: Additional structured data compute_embedding: Whether to compute vector embedding Returns: The stored MemoryEntry """ embedding = None if compute_embedding: embedding = embed_text(content) entry = MemoryEntry( content=content, source=source, context_type=context_type, agent_id=agent_id, task_id=task_id, session_id=session_id, metadata=metadata, embedding=embedding, ) with get_connection() as conn: conn.execute( """ INSERT INTO memories (id, content, memory_type, source, agent_id, task_id, session_id, metadata, embedding, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry.id, entry.content, entry.context_type, # DB column is memory_type entry.source, entry.agent_id, entry.task_id, entry.session_id, json.dumps(metadata) if metadata else None, json.dumps(embedding) if embedding else None, entry.timestamp, ), ) conn.commit() return entry def search_memories( query: str, limit: int = 10, context_type: str | None = None, agent_id: str | None = None, session_id: str | None = None, min_relevance: float = 0.0, ) -> list[MemoryEntry]: """Search for memories by semantic similarity. Args: query: Search query text limit: Maximum results context_type: Filter by memory type (maps to DB memory_type column) agent_id: Filter by agent session_id: Filter by session min_relevance: Minimum similarity score (0-1) Returns: List of MemoryEntry objects sorted by relevance """ query_embedding = embed_text(query) # Build query with filters conditions = [] params = [] if context_type: conditions.append("memory_type = ?") params.append(context_type) if agent_id: conditions.append("agent_id = ?") params.append(agent_id) if session_id: conditions.append("session_id = ?") params.append(session_id) where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" # Fetch candidates (we'll do in-memory similarity for now) query_sql = f""" SELECT * FROM memories {where_clause} ORDER BY created_at DESC LIMIT ? """ params.append(limit * 3) # Get more candidates for ranking with get_connection() as conn: rows = conn.execute(query_sql, params).fetchall() # Compute similarity scores results = [] for row in rows: entry = MemoryEntry( id=row["id"], content=row["content"], source=row["source"], context_type=row["memory_type"], # DB column -> API field agent_id=row["agent_id"], task_id=row["task_id"], session_id=row["session_id"], metadata=json.loads(row["metadata"]) if row["metadata"] else None, embedding=json.loads(row["embedding"]) if row["embedding"] else None, timestamp=row["created_at"], ) if entry.embedding: score = cosine_similarity(query_embedding, entry.embedding) entry.relevance_score = score if score >= min_relevance: results.append(entry) else: # Fallback: check for keyword overlap score = _keyword_overlap(query, entry.content) entry.relevance_score = score if score >= min_relevance: results.append(entry) # Sort by relevance and return top results results.sort(key=lambda x: x.relevance_score or 0, reverse=True) return results[:limit] def delete_memory(memory_id: str) -> bool: """Delete a memory entry by ID. Returns: True if deleted, False if not found """ with get_connection() as conn: cursor = conn.execute( "DELETE FROM memories WHERE id = ?", (memory_id,), ) conn.commit() return cursor.rowcount > 0 def get_memory_stats() -> dict: """Get statistics about the memory store. Returns: Dict with counts by type, total entries, etc. """ with get_connection() as conn: total = conn.execute("SELECT COUNT(*) as count FROM memories").fetchone()["count"] by_type = {} rows = conn.execute( "SELECT memory_type, COUNT(*) as count FROM memories GROUP BY memory_type" ).fetchall() for row in rows: by_type[row["memory_type"]] = row["count"] with_embeddings = conn.execute( "SELECT COUNT(*) as count FROM memories WHERE embedding IS NOT NULL" ).fetchone()["count"] return { "total_entries": total, "by_type": by_type, "with_embeddings": with_embeddings, "has_embedding_model": _get_embedding_model() is not False, } def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int: """Delete old memories to manage storage. Args: older_than_days: Delete memories older than this keep_facts: Whether to preserve fact-type memories Returns: Number of entries deleted """ cutoff = (datetime.now(UTC) - timedelta(days=older_than_days)).isoformat() with get_connection() as conn: if keep_facts: cursor = conn.execute( """ DELETE FROM memories WHERE created_at < ? AND memory_type != 'fact' """, (cutoff,), ) else: cursor = conn.execute( "DELETE FROM memories WHERE created_at < ?", (cutoff,), ) deleted = cursor.rowcount conn.commit() return deleted def get_memory_context(query: str, max_tokens: int = 2000, **filters) -> str: """Get relevant memory context as formatted text for LLM prompts. Args: query: Search query max_tokens: Approximate maximum tokens to return **filters: Additional filters (agent_id, session_id, etc.) Returns: Formatted context string for inclusion in prompts """ memories = search_memories(query, limit=20, **filters) context_parts = [] total_chars = 0 max_chars = max_tokens * 4 # Rough approximation for mem in memories: formatted = f"[{mem.source}]: {mem.content}" if total_chars + len(formatted) > max_chars: break context_parts.append(formatted) total_chars += len(formatted) if not context_parts: return "" return "Relevant context from memory:\n" + "\n\n".join(context_parts) # ─────────────────────────────────────────────────────────────────────────────── # Personal Facts Functions # ─────────────────────────────────────────────────────────────────────────────── def recall_personal_facts(agent_id: str | None = None) -> list[str]: """Recall personal facts about the user or system. Args: agent_id: Optional agent filter Returns: List of fact strings """ with get_connection() as conn: if agent_id: rows = conn.execute( """ SELECT content FROM memories WHERE memory_type = 'fact' AND agent_id = ? ORDER BY created_at DESC LIMIT 100 """, (agent_id,), ).fetchall() else: rows = conn.execute( """ SELECT content FROM memories WHERE memory_type = 'fact' ORDER BY created_at DESC LIMIT 100 """, ).fetchall() return [r["content"] for r in rows] def recall_personal_facts_with_ids(agent_id: str | None = None) -> list[dict]: """Recall personal facts with their IDs for edit/delete operations.""" with get_connection() as conn: if agent_id: rows = conn.execute( "SELECT id, content FROM memories WHERE memory_type = 'fact' AND agent_id = ? ORDER BY created_at DESC LIMIT 100", (agent_id,), ).fetchall() else: rows = conn.execute( "SELECT id, content FROM memories WHERE memory_type = 'fact' ORDER BY created_at DESC LIMIT 100", ).fetchall() return [{"id": r["id"], "content": r["content"]} for r in rows] def update_personal_fact(memory_id: str, new_content: str) -> bool: """Update a personal fact's content.""" with get_connection() as conn: cursor = conn.execute( "UPDATE memories SET content = ? WHERE id = ? AND memory_type = 'fact'", (new_content, memory_id), ) conn.commit() return cursor.rowcount > 0 def store_personal_fact(fact: str, agent_id: str | None = None) -> MemoryEntry: """Store a personal fact about the user or system. Args: fact: The fact to store agent_id: Associated agent Returns: The stored MemoryEntry """ return store_memory( content=fact, source="system", context_type="fact", agent_id=agent_id, metadata={"auto_extracted": False}, ) # ─────────────────────────────────────────────────────────────────────────────── # Hot Memory (computed from DB instead of MEMORY.md) # ─────────────────────────────────────────────────────────────────────────────── class HotMemory: """Tier 1: Hot memory — computed view of top facts from DB.""" def __init__(self) -> None: self.path = HOT_MEMORY_PATH self._content: str | None = None self._last_modified: float | None = None def read(self, force_refresh: bool = False) -> str: """Read hot memory — computed view of top facts from DB.""" try: facts = recall_personal_facts() if facts: lines = ["# Timmy Hot Memory\n", "## Known Facts\n"] for f in facts[:15]: lines.append(f"- {f}") return "\n".join(lines) except Exception: pass # Fallback to file if DB unavailable if self.path.exists(): return self.path.read_text() return "# Timmy Hot Memory\n\nNo memories stored yet.\n" def update_section(self, section: str, content: str) -> None: """Update a specific section in MEMORY.md. DEPRECATED: Hot memory is now computed from the database. This method is kept for backward compatibility during transition. Use memory_write() to store facts in the database. """ logger.warning( "HotMemory.update_section() is deprecated. " "Use memory_write() to store facts in the database." ) # Keep file-writing for backward compatibility during transition # Guard against empty or excessively large writes if not content or not content.strip(): logger.warning("HotMemory: Refusing empty write to section '%s'", section) return if len(content) > 2000: logger.warning("HotMemory: Truncating oversized write to section '%s'", section) content = content[:2000] + "\n... [truncated]" if not self.path.exists(): self._create_default() full_content = self.read() # Find section pattern = rf"(## {re.escape(section)}.*?)(?=\n## |\Z)" match = re.search(pattern, full_content, re.DOTALL) if match: # Replace section new_section = f"## {section}\n\n{content}\n\n" full_content = full_content[: match.start()] + new_section + full_content[match.end() :] else: # Append section before last updated line insert_point = full_content.rfind("*Prune date:") new_section = f"## {section}\n\n{content}\n\n" full_content = ( full_content[:insert_point] + new_section + "\n" + full_content[insert_point:] ) self.path.write_text(full_content) self._content = full_content self._last_modified = self.path.stat().st_mtime logger.info("HotMemory: Updated section '%s'", section) def _create_default(self) -> None: """Create default MEMORY.md if missing. DEPRECATED: Hot memory is now computed from the database. This method is kept for backward compatibility during transition. """ logger.debug( "HotMemory._create_default() - creating default MEMORY.md for backward compatibility" ) default_content = """# Timmy Hot Memory > Working RAM — always loaded, ~300 lines max, pruned monthly > Last updated: {date} --- ## Current Status **Agent State:** Operational **Mode:** Development **Active Tasks:** 0 **Pending Decisions:** None --- ## Standing Rules 1. **Sovereignty First** — No cloud dependencies 2. **Local-Only Inference** — Ollama on localhost 3. **Privacy by Design** — Telemetry disabled 4. **Tool Minimalism** — Use tools only when necessary 5. **Memory Discipline** — Write handoffs at session end --- ## Agent Roster | Agent | Role | Status | |-------|------|--------| | Timmy | Core | Active | --- ## User Profile **Name:** (not set) **Interests:** (to be learned) --- ## Key Decisions (none yet) --- ## Pending Actions - [ ] Learn user's name --- *Prune date: {prune_date}* """.format( date=datetime.now(UTC).strftime("%Y-%m-%d"), prune_date=(datetime.now(UTC).replace(day=25)).strftime("%Y-%m-%d"), ) self.path.write_text(default_content) logger.info("HotMemory: Created default MEMORY.md") # ─────────────────────────────────────────────────────────────────────────────── # Vault Memory (memory/ directory) # ─────────────────────────────────────────────────────────────────────────────── class VaultMemory: """Tier 2: Structured vault (memory/) — append-only markdown.""" def __init__(self) -> None: self.path = VAULT_PATH self._ensure_structure() def _ensure_structure(self) -> None: """Ensure vault directory structure exists.""" (self.path / "self").mkdir(parents=True, exist_ok=True) (self.path / "notes").mkdir(parents=True, exist_ok=True) (self.path / "aar").mkdir(parents=True, exist_ok=True) def write_note(self, name: str, content: str, namespace: str = "notes") -> Path: """Write a note to the vault.""" # Add timestamp to filename timestamp = datetime.now(UTC).strftime("%Y%m%d") filename = f"{timestamp}_{name}.md" filepath = self.path / namespace / filename # Add header full_content = f"""# {name.replace("_", " ").title()} > Created: {datetime.now(UTC).isoformat()} > Namespace: {namespace} --- {content} --- *Auto-generated by Timmy Memory System* """ filepath.write_text(full_content) logger.info("VaultMemory: Wrote %s", filepath) return filepath def read_file(self, filepath: Path) -> str: """Read a file from the vault.""" if not filepath.exists(): return "" return filepath.read_text() def update_user_profile(self, key: str, value: str) -> None: """Update a field in user_profile.md. DEPRECATED: User profile updates should now use memory_write() to store facts in the database. This method is kept for backward compatibility. """ logger.warning( "VaultMemory.update_user_profile() is deprecated. " "Use memory_write() to store user facts in the database." ) # Still update the file for backward compatibility during transition profile_path = self.path / "self" / "user_profile.md" if not profile_path.exists(): self._create_default_profile() content = profile_path.read_text() pattern = rf"(\*\*{re.escape(key)}:\*\*).*" if re.search(pattern, content): safe_value = value.strip() content = re.sub(pattern, lambda m: f"{m.group(1)} {safe_value}", content) else: facts_section = "## Important Facts" if facts_section in content: insert_point = content.find(facts_section) + len(facts_section) content = content[:insert_point] + f"\n- {key}: {value}" + content[insert_point:] content = re.sub( r"\*Last updated:.*\*", f"*Last updated: {datetime.now(UTC).strftime('%Y-%m-%d')}*", content, ) profile_path.write_text(content) logger.info("VaultMemory: Updated user profile: %s = %s", key, value) def _create_default_profile(self) -> None: """Create default user profile.""" profile_path = self.path / "self" / "user_profile.md" default = """# User Profile > Learned information about the user. ## Basic Information **Name:** (unknown) **Location:** (unknown) **Occupation:** (unknown) ## Interests & Expertise - (to be learned) ## Preferences - Response style: concise, technical - Tool usage: minimal ## Important Facts - (to be extracted) --- *Last updated: {date}* """.format(date=datetime.now(UTC).strftime("%Y-%m-%d")) profile_path.write_text(default) # ─────────────────────────────────────────────────────────────────────────────── # Semantic Memory (Vector search) # ─────────────────────────────────────────────────────────────────────────────── class SemanticMemory: """Vector-based semantic search over vault content.""" def __init__(self) -> None: self.db_path = DB_PATH self.vault_path = VAULT_PATH @contextmanager def _get_conn(self) -> Generator[sqlite3.Connection, None, None]: """Get connection to the instance's db_path (backward compatibility). Uses self.db_path if set differently from global DB_PATH, otherwise uses the global get_connection(). """ if self.db_path == DB_PATH: # Use global connection (normal production path) with get_connection() as conn: yield conn else: # Use instance-specific db_path (test path) self.db_path.parent.mkdir(parents=True, exist_ok=True) with closing(sqlite3.connect(str(self.db_path))) as conn: conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=5000") # Ensure schema exists conn.execute(""" CREATE TABLE IF NOT EXISTS memories ( id TEXT PRIMARY KEY, content TEXT NOT NULL, memory_type TEXT NOT NULL DEFAULT 'fact', source TEXT NOT NULL DEFAULT 'agent', embedding TEXT, metadata TEXT, source_hash TEXT, agent_id TEXT, task_id TEXT, session_id TEXT, confidence REAL NOT NULL DEFAULT 0.8, tags TEXT NOT NULL DEFAULT '[]', created_at TEXT NOT NULL, last_accessed TEXT, access_count INTEGER NOT NULL DEFAULT 0 ) """) conn.execute( "CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)" ) conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") conn.commit() yield conn def _init_db(self) -> None: """Initialize database at self.db_path (backward compatibility). This method is kept for backward compatibility with existing code and tests. Schema creation is handled by _get_conn. """ # Trigger schema creation via _get_conn with self._get_conn(): pass def index_file(self, filepath: Path) -> int: """Index a single file into semantic memory.""" if not filepath.exists(): return 0 content = filepath.read_text() file_hash = hashlib.md5(content.encode()).hexdigest() with self._get_conn() as conn: # Check if already indexed with same hash cursor = conn.execute( "SELECT metadata FROM memories WHERE source = ? AND memory_type = 'vault_chunk' LIMIT 1", (str(filepath),), ) existing = cursor.fetchone() if existing and existing[0]: try: meta = json.loads(existing[0]) if meta.get("source_hash") == file_hash: return 0 # Already indexed except json.JSONDecodeError: pass # Delete old chunks for this file conn.execute( "DELETE FROM memories WHERE source = ? AND memory_type = 'vault_chunk'", (str(filepath),), ) # Split into chunks (paragraphs) chunks = self._split_into_chunks(content) # Index each chunk now = datetime.now(UTC).isoformat() for i, chunk_text in enumerate(chunks): if len(chunk_text.strip()) < 20: # Skip tiny chunks continue chunk_id = f"{filepath.stem}_{i}" chunk_embedding = embed_text(chunk_text) conn.execute( """INSERT INTO memories (id, content, memory_type, source, metadata, embedding, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", ( chunk_id, chunk_text, "vault_chunk", str(filepath), json.dumps({"source_hash": file_hash, "chunk_index": i}), json.dumps(chunk_embedding), now, ), ) conn.commit() logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks)) return len(chunks) def _split_into_chunks(self, text: str, max_chunk_size: int = 500) -> list[str]: """Split text into semantic chunks.""" # Split by paragraphs first paragraphs = text.split("\n\n") chunks = [] for para in paragraphs: para = para.strip() if not para: continue # If paragraph is small enough, keep as one chunk if len(para) <= max_chunk_size: chunks.append(para) else: # Split long paragraphs by sentences sentences = para.replace(". ", ".\n").split("\n") current_chunk = "" for sent in sentences: if len(current_chunk) + len(sent) < max_chunk_size: current_chunk += " " + sent if current_chunk else sent else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sent if current_chunk: chunks.append(current_chunk.strip()) return chunks def index_vault(self) -> int: """Index entire vault directory.""" total_chunks = 0 for md_file in self.vault_path.rglob("*.md"): # Skip handoff file (handled separately) if "last-session-handoff" in md_file.name: continue total_chunks += self.index_file(md_file) logger.info("SemanticMemory: Indexed vault (%d total chunks)", total_chunks) return total_chunks def search(self, query: str, top_k: int = 5) -> list[tuple[str, float]]: """Search for relevant memory chunks.""" query_embedding = embed_text(query) with self._get_conn() as conn: conn.row_factory = sqlite3.Row # Get all vault chunks rows = conn.execute( "SELECT source, content, embedding FROM memories WHERE memory_type = 'vault_chunk'" ).fetchall() # Calculate similarities scored = [] for row in rows: embedding = json.loads(row["embedding"]) score = cosine_similarity(query_embedding, embedding) scored.append((row["source"], row["content"], score)) # Sort by score descending scored.sort(key=lambda x: x[2], reverse=True) # Return top_k return [(content, score) for _, content, score in scored[:top_k]] def get_relevant_context(self, query: str, max_chars: int = 2000) -> str: """Get formatted context string for a query.""" results = self.search(query, top_k=3) if not results: return "" parts = [] total_chars = 0 for content, score in results: if score < 0.3: # Similarity threshold continue chunk = f"[Relevant memory - score {score:.2f}]: {content[:400]}..." if total_chars + len(chunk) > max_chars: break parts.append(chunk) total_chars += len(chunk) return "\n\n".join(parts) if parts else "" def stats(self) -> dict: """Get indexing statistics.""" with self._get_conn() as conn: cursor = conn.execute( "SELECT COUNT(*), COUNT(DISTINCT source) FROM memories WHERE memory_type = 'vault_chunk'" ) total_chunks, total_files = cursor.fetchone() return { "total_chunks": total_chunks, "total_files": total_files, "embedding_dim": EMBEDDING_DIM if _get_embedding_model() else 128, } class MemorySearcher: """High-level interface for memory search.""" def __init__(self) -> None: self.semantic = SemanticMemory() def search(self, query: str, tiers: list[str] = None) -> dict: """Search across memory tiers. Args: query: Search query tiers: List of tiers to search ["hot", "vault", "semantic"] Returns: Dict with results from each tier """ tiers = tiers or ["semantic"] # Default to semantic only results = {} if "semantic" in tiers: semantic_results = self.semantic.search(query, top_k=5) results["semantic"] = [ {"content": content, "score": score} for content, score in semantic_results ] return results def get_context_for_query(self, query: str) -> str: """Get comprehensive context for a user query.""" # Get semantic context semantic_context = self.semantic.get_relevant_context(query) if semantic_context: return f"## Relevant Past Context\n\n{semantic_context}" return "" # Module-level singletons semantic_memory = SemanticMemory() memory_searcher = MemorySearcher() # ─────────────────────────────────────────────────────────────────────────────── # Tool Functions # ─────────────────────────────────────────────────────────────────────────────── def memory_search(query: str, top_k: int = 5) -> str: """Search past conversations, notes, and stored facts for relevant context. Searches across both the vault (indexed markdown files) and the runtime memory store (facts and conversation fragments stored via memory_write). Args: query: What to search for (e.g. "Bitcoin strategy", "server setup"). top_k: Number of results to return (default 5). Returns: Formatted string of relevant memory results. """ # Guard: model sometimes passes None for top_k if top_k is None: top_k = 5 parts: list[str] = [] # 1. Search semantic vault (indexed markdown files) vault_results = semantic_memory.search(query, top_k) for content, score in vault_results: if score < 0.2: continue parts.append(f"[vault score {score:.2f}] {content[:300]}") # 2. Search runtime vector store (stored facts/conversations) try: runtime_results = search_memories(query, limit=top_k, min_relevance=0.2) for entry in runtime_results: label = entry.context_type or "memory" parts.append(f"[{label}] {entry.content[:300]}") except Exception as exc: logger.debug("Vector store search unavailable: %s", exc) if not parts: return "No relevant memories found." return "\n\n".join(parts) def memory_read(query: str = "", top_k: int = 5) -> str: """Read from persistent memory — search facts, notes, and past conversations. This is the primary tool for recalling stored information. If no query is given, returns the most recent personal facts. With a query, it searches semantically across all stored memories. Args: query: Optional search term. Leave empty to list recent facts. top_k: Maximum results to return (default 5). Returns: Formatted string of memory contents. """ if top_k is None: top_k = 5 parts: list[str] = [] # Always include personal facts first try: facts = search_memories(query or "", limit=top_k, min_relevance=0.0) fact_entries = [e for e in facts if (e.context_type or "") == "fact"] if fact_entries: parts.append("## Personal Facts") for entry in fact_entries[:top_k]: parts.append(f"- {entry.content[:300]}") except Exception as exc: logger.debug("Vector store unavailable for memory_read: %s", exc) # If a query was provided, also do semantic search if query: search_result = memory_search(query, top_k) if search_result and search_result != "No relevant memories found.": parts.append("\n## Search Results") parts.append(search_result) if not parts: return "No memories stored yet. Use memory_write to store information." return "\n".join(parts) def memory_write(content: str, context_type: str = "fact") -> str: """Store a piece of information in persistent memory. Use this tool when the user explicitly asks you to remember something. Stored memories are searchable via memory_search across all channels (web GUI, Discord, Telegram, etc.). Args: content: The information to remember (e.g. a phrase, fact, or note). context_type: Type of memory — "fact" for permanent facts, "conversation" for conversation context, "document" for document fragments. Returns: Confirmation that the memory was stored. """ if not content or not content.strip(): return "Nothing to store — content is empty." valid_types = ("fact", "conversation", "document") if context_type not in valid_types: context_type = "fact" try: # Dedup check for facts — skip if a similar fact already exists # Threshold 0.75 catches paraphrases (was 0.9 which only caught near-exact) if context_type == "fact": existing = search_memories( content.strip(), limit=3, context_type="fact", min_relevance=0.75 ) if existing: return f"Similar fact already stored (id={existing[0].id[:8]}). Skipping duplicate." entry = store_memory( content=content.strip(), source="agent", context_type=context_type, ) return f"Stored in memory (type={context_type}, id={entry.id[:8]}). This is now searchable across all channels." except Exception as exc: logger.error("Failed to write memory: %s", exc) return f"Failed to store memory: {exc}" def memory_forget(query: str) -> str: """Remove a stored memory that is outdated, incorrect, or no longer relevant. Searches for memories matching the query and deletes the closest match. Use this when the user says to forget something or when stored information has changed. Args: query: Description of the memory to forget (e.g. "my phone number", "the old server address"). Returns: Confirmation of what was forgotten, or a message if nothing matched. """ if not query or not query.strip(): return "Nothing to forget — query is empty." try: results = search_memories(query.strip(), limit=3, min_relevance=0.3) if not results: return "No matching memories found to forget." # Delete the closest match best = results[0] deleted = delete_memory(best.id) if deleted: return f'Forgotten: "{best.content[:80]}" (type={best.context_type})' return "Memory not found (may have already been deleted)." except Exception as exc: logger.error("Failed to forget memory: %s", exc) return f"Failed to forget: {exc}" # ─────────────────────────────────────────────────────────────────────────────── # Memory System (Central Coordinator) # ─────────────────────────────────────────────────────────────────────────────── class MemorySystem: """Central memory system coordinating all tiers.""" def __init__(self) -> None: self.hot = HotMemory() self.vault = VaultMemory() self.session_decisions: list[str] = [] def end_session(self, summary: str) -> None: """End session - stores session summary in database.""" # Store session end fact in database instead of updating MEMORY.md try: store_memory( content=f"Session ended: {summary}", source="system", context_type="conversation", metadata={"event": "session_end"}, ) except Exception as exc: logger.warning("Failed to store session end: %s", exc) logger.info("MemorySystem: Session ended") def record_decision(self, decision: str) -> None: """Record a key decision during session (retained for API compatibility).""" self.session_decisions.append(decision) def update_user_fact(self, key: str, value: str) -> None: """Update user fact in the database.""" # Store as a fact in the database try: store_memory( content=f"{key}: {value}", source="system", context_type="fact", metadata={"key": key, "value": value}, ) except Exception as exc: logger.warning("Failed to store user fact: %s", exc) def read_soul(self) -> str: """Read soul.md — Timmy's core identity. Returns empty string if missing.""" try: if SOUL_PATH.exists(): return SOUL_PATH.read_text() except OSError as exc: logger.debug("Failed to read soul.md: %s", exc) return "" def get_system_context(self) -> str: """Get full context for system prompt injection.""" context_parts = [] # 0. Soul identity (immutable, always first) soul_content = self.read_soul() if soul_content: context_parts.append("## Soul Identity\n" + soul_content) # 1. Hot memory (computed from DB facts) hot_content = self.hot.read() context_parts.append("## Hot Memory\n" + hot_content) # 2. User facts are now included in hot memory via recall_personal_facts return "\n\n---\n\n".join(context_parts) # ── Lazy singleton ──────────────────────────────────────────────────────────── _memory_system: MemorySystem | None = None def get_memory_system() -> MemorySystem: """Return the module-level MemorySystem, creating it on first access.""" global _memory_system if _memory_system is None: _memory_system = MemorySystem() return _memory_system def reset_memory_system() -> None: """Reset the singleton for test isolation.""" global _memory_system _memory_system = None def __getattr__(name: str): """Module-level __getattr__ for lazy backward-compatible access.""" if name == "memory_system": return get_memory_system() raise AttributeError(f"module {__name__!r} has no attribute {name!r}")