diff --git a/src/timmy/memory/__init__.py b/src/timmy/memory/__init__.py index 5e89afc..73206c5 100644 --- a/src/timmy/memory/__init__.py +++ b/src/timmy/memory/__init__.py @@ -1,7 +1,10 @@ """Memory — Persistent conversation and knowledge memory. Sub-modules: - embeddings — text-to-vector embedding + similarity functions - unified — unified memory schema and connection management - vector_store — backward compatibility re-exports from memory_system + embeddings — text-to-vector embedding + similarity functions + unified — unified memory schema and connection management + chain — CRUD operations (store, search, delete, stats) + semantic — SemanticMemory and MemorySearcher classes + consolidation — HotMemory and VaultMemory classes + vector_store — backward compatibility re-exports from memory_system """ diff --git a/src/timmy/memory/chain.py b/src/timmy/memory/chain.py new file mode 100644 index 0000000..be22241 --- /dev/null +++ b/src/timmy/memory/chain.py @@ -0,0 +1,387 @@ +"""CRUD operations for Timmy's unified memory database. + +Provides store, search, delete, and management functions for the +`memories` table defined in timmy.memory.unified. +""" + +import json +import logging +import sqlite3 +import uuid +from contextlib import contextmanager +from datetime import UTC, datetime, timedelta +from pathlib import Path + +from config import settings +from timmy.memory.embeddings import ( + _keyword_overlap, + cosine_similarity, + embed_text, +) +from timmy.memory.unified import ( + DB_PATH, + MemoryEntry, + _ensure_schema, + get_connection, +) + +logger = logging.getLogger(__name__) + + +def store_memory( + content: str, + source: str, + context_type: str = "conversation", + agent_id: str | None = None, + task_id: str | None = None, + session_id: str | None = None, + metadata: dict | None = None, + compute_embedding: bool = True, +) -> MemoryEntry: + """Store a memory entry with optional embedding. + + Args: + content: The text content to store + source: Source of the memory (agent name, user, system) + context_type: Type of context (conversation, document, fact, vault_chunk) + agent_id: Associated agent ID + task_id: Associated task ID + session_id: Session identifier + metadata: Additional structured data + compute_embedding: Whether to compute vector embedding + + Returns: + The stored MemoryEntry + """ + embedding = None + if compute_embedding: + embedding = embed_text(content) + + entry = MemoryEntry( + content=content, + source=source, + context_type=context_type, + agent_id=agent_id, + task_id=task_id, + session_id=session_id, + metadata=metadata, + embedding=embedding, + ) + + with get_connection() as conn: + conn.execute( + """ + INSERT INTO memories + (id, content, memory_type, source, agent_id, task_id, session_id, + metadata, embedding, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + entry.id, + entry.content, + entry.context_type, # DB column is memory_type + entry.source, + entry.agent_id, + entry.task_id, + entry.session_id, + json.dumps(metadata) if metadata else None, + json.dumps(embedding) if embedding else None, + entry.timestamp, + ), + ) + conn.commit() + + return entry + + +def _build_search_filters( + context_type: str | None, + agent_id: str | None, + session_id: str | None, +) -> tuple[str, list]: + """Build SQL WHERE clause and params from search filters.""" + conditions: list[str] = [] + params: list = [] + + if context_type: + conditions.append("memory_type = ?") + params.append(context_type) + if agent_id: + conditions.append("agent_id = ?") + params.append(agent_id) + if session_id: + conditions.append("session_id = ?") + params.append(session_id) + + where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" + return where_clause, params + + +def _fetch_memory_candidates( + where_clause: str, params: list, candidate_limit: int +) -> list[sqlite3.Row]: + """Fetch candidate memory rows from the database.""" + query_sql = f""" + SELECT * FROM memories + {where_clause} + ORDER BY created_at DESC + LIMIT ? + """ + params.append(candidate_limit) + + with get_connection() as conn: + return conn.execute(query_sql, params).fetchall() + + +def _row_to_entry(row: sqlite3.Row) -> MemoryEntry: + """Convert a database row to a MemoryEntry.""" + return MemoryEntry( + id=row["id"], + content=row["content"], + source=row["source"], + context_type=row["memory_type"], # DB column -> API field + agent_id=row["agent_id"], + task_id=row["task_id"], + session_id=row["session_id"], + metadata=json.loads(row["metadata"]) if row["metadata"] else None, + embedding=json.loads(row["embedding"]) if row["embedding"] else None, + timestamp=row["created_at"], + ) + + +def _score_and_filter( + rows: list[sqlite3.Row], + query: str, + query_embedding: list[float], + min_relevance: float, +) -> list[MemoryEntry]: + """Score candidate rows by similarity and filter by min_relevance.""" + results = [] + for row in rows: + entry = _row_to_entry(row) + + if entry.embedding: + score = cosine_similarity(query_embedding, entry.embedding) + else: + score = _keyword_overlap(query, entry.content) + + entry.relevance_score = score + if score >= min_relevance: + results.append(entry) + + results.sort(key=lambda x: x.relevance_score or 0, reverse=True) + return results + + +def search_memories( + query: str, + limit: int = 10, + context_type: str | None = None, + agent_id: str | None = None, + session_id: str | None = None, + min_relevance: float = 0.0, +) -> list[MemoryEntry]: + """Search for memories by semantic similarity. + + Args: + query: Search query text + limit: Maximum results + context_type: Filter by memory type (maps to DB memory_type column) + agent_id: Filter by agent + session_id: Filter by session + min_relevance: Minimum similarity score (0-1) + + Returns: + List of MemoryEntry objects sorted by relevance + """ + query_embedding = embed_text(query) + where_clause, params = _build_search_filters(context_type, agent_id, session_id) + rows = _fetch_memory_candidates(where_clause, params, limit * 3) + results = _score_and_filter(rows, query, query_embedding, min_relevance) + return results[:limit] + + +def delete_memory(memory_id: str) -> bool: + """Delete a memory entry by ID. + + Returns: + True if deleted, False if not found + """ + with get_connection() as conn: + cursor = conn.execute( + "DELETE FROM memories WHERE id = ?", + (memory_id,), + ) + conn.commit() + return cursor.rowcount > 0 + + +def get_memory_stats() -> dict: + """Get statistics about the memory store. + + Returns: + Dict with counts by type, total entries, etc. + """ + from timmy.memory.embeddings import _get_embedding_model + + with get_connection() as conn: + total = conn.execute("SELECT COUNT(*) as count FROM memories").fetchone()["count"] + + by_type = {} + rows = conn.execute( + "SELECT memory_type, COUNT(*) as count FROM memories GROUP BY memory_type" + ).fetchall() + for row in rows: + by_type[row["memory_type"]] = row["count"] + + with_embeddings = conn.execute( + "SELECT COUNT(*) as count FROM memories WHERE embedding IS NOT NULL" + ).fetchone()["count"] + + return { + "total_entries": total, + "by_type": by_type, + "with_embeddings": with_embeddings, + "has_embedding_model": _get_embedding_model() is not False, + } + + +def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int: + """Delete old memories to manage storage. + + Args: + older_than_days: Delete memories older than this + keep_facts: Whether to preserve fact-type memories + + Returns: + Number of entries deleted + """ + cutoff = (datetime.now(UTC) - timedelta(days=older_than_days)).isoformat() + + with get_connection() as conn: + if keep_facts: + cursor = conn.execute( + """ + DELETE FROM memories + WHERE created_at < ? AND memory_type != 'fact' + """, + (cutoff,), + ) + else: + cursor = conn.execute( + "DELETE FROM memories WHERE created_at < ?", + (cutoff,), + ) + + deleted = cursor.rowcount + conn.commit() + + return deleted + + +def get_memory_context(query: str, max_tokens: int = 2000, **filters) -> str: + """Get relevant memory context as formatted text for LLM prompts. + + Args: + query: Search query + max_tokens: Approximate maximum tokens to return + **filters: Additional filters (agent_id, session_id, etc.) + + Returns: + Formatted context string for inclusion in prompts + """ + memories = search_memories(query, limit=20, **filters) + + context_parts = [] + total_chars = 0 + max_chars = max_tokens * 4 # Rough approximation + + for mem in memories: + formatted = f"[{mem.source}]: {mem.content}" + if total_chars + len(formatted) > max_chars: + break + context_parts.append(formatted) + total_chars += len(formatted) + + if not context_parts: + return "" + + return "Relevant context from memory:\n" + "\n\n".join(context_parts) + + +def recall_personal_facts(agent_id: str | None = None) -> list[str]: + """Recall personal facts about the user or system. + + Args: + agent_id: Optional agent filter + + Returns: + List of fact strings + """ + with get_connection() as conn: + if agent_id: + rows = conn.execute( + """ + SELECT content FROM memories + WHERE memory_type = 'fact' AND agent_id = ? + ORDER BY created_at DESC + LIMIT 100 + """, + (agent_id,), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT content FROM memories + WHERE memory_type = 'fact' + ORDER BY created_at DESC + LIMIT 100 + """, + ).fetchall() + + return [r["content"] for r in rows] + + +def recall_personal_facts_with_ids(agent_id: str | None = None) -> list[dict]: + """Recall personal facts with their IDs for edit/delete operations.""" + with get_connection() as conn: + if agent_id: + rows = conn.execute( + "SELECT id, content FROM memories WHERE memory_type = 'fact' AND agent_id = ? ORDER BY created_at DESC LIMIT 100", + (agent_id,), + ).fetchall() + else: + rows = conn.execute( + "SELECT id, content FROM memories WHERE memory_type = 'fact' ORDER BY created_at DESC LIMIT 100", + ).fetchall() + return [{"id": r["id"], "content": r["content"]} for r in rows] + + +def update_personal_fact(memory_id: str, new_content: str) -> bool: + """Update a personal fact's content.""" + with get_connection() as conn: + cursor = conn.execute( + "UPDATE memories SET content = ? WHERE id = ? AND memory_type = 'fact'", + (new_content, memory_id), + ) + conn.commit() + return cursor.rowcount > 0 + + +def store_personal_fact(fact: str, agent_id: str | None = None) -> MemoryEntry: + """Store a personal fact about the user or system. + + Args: + fact: The fact to store + agent_id: Associated agent + + Returns: + The stored MemoryEntry + """ + return store_memory( + content=fact, + source="system", + context_type="fact", + agent_id=agent_id, + metadata={"auto_extracted": False}, + ) diff --git a/src/timmy/memory/consolidation.py b/src/timmy/memory/consolidation.py new file mode 100644 index 0000000..b486e52 --- /dev/null +++ b/src/timmy/memory/consolidation.py @@ -0,0 +1,310 @@ +"""Hot and Vault memory classes for Timmy's memory consolidation tier. + +HotMemory: Tier 1 — computed view of top facts from the database. +VaultMemory: Tier 2 — structured vault (memory/ directory), append-only markdown. +""" + +import logging +import re +from datetime import UTC, datetime +from pathlib import Path + +from timmy.memory.unified import PROJECT_ROOT + +logger = logging.getLogger(__name__) + +VAULT_PATH = PROJECT_ROOT / "memory" + +_DEFAULT_HOT_MEMORY_TEMPLATE = """\ +# Timmy Hot Memory + +> Working RAM — always loaded, ~300 lines max, pruned monthly +> Last updated: {date} + +--- + +## Current Status + +**Agent State:** Operational +**Mode:** Development +**Active Tasks:** 0 +**Pending Decisions:** None + +--- + +## Standing Rules + +1. **Sovereignty First** — No cloud dependencies +2. **Local-Only Inference** — Ollama on localhost +3. **Privacy by Design** — Telemetry disabled +4. **Tool Minimalism** — Use tools only when necessary +5. **Memory Discipline** — Write handoffs at session end + +--- + +## Agent Roster + +| Agent | Role | Status | +|-------|------|--------| +| Timmy | Core | Active | + +--- + +## User Profile + +**Name:** (not set) +**Interests:** (to be learned) + +--- + +## Key Decisions + +(none yet) + +--- + +## Pending Actions + +- [ ] Learn user's name + +--- + +*Prune date: {prune_date}* +""" + + +class HotMemory: + """Tier 1: Hot memory — computed view of top facts from DB.""" + + def __init__(self, path=None) -> None: + if path is None: + path = PROJECT_ROOT / "MEMORY.md" + self.path = path + self._content: str | None = None + self._last_modified: float | None = None + + def read(self, force_refresh: bool = False) -> str: + """Read hot memory — computed view of top facts + last reflection from DB.""" + from timmy.memory.chain import recall_personal_facts + # Import recall_last_reflection lazily to support patching in memory_system + try: + # Use the version from memory_system so patches work correctly + import timmy.memory_system as _ms + recall_last_reflection = _ms.recall_last_reflection + except Exception: + from timmy.memory.chain import recall_personal_facts as _rpf # noqa: F811 + recall_last_reflection = None + + try: + facts = recall_personal_facts() + lines = ["# Timmy Hot Memory\n"] + + if facts: + lines.append("## Known Facts\n") + for f in facts[:15]: + lines.append(f"- {f}") + + # Include the last reflection if available + if recall_last_reflection is not None: + try: + reflection = recall_last_reflection() + if reflection: + lines.append("\n## Last Reflection\n") + lines.append(reflection) + except Exception: + pass + + if len(lines) > 1: + return "\n".join(lines) + except Exception: + logger.debug("DB context read failed, falling back to file") + + # Fallback to file if DB unavailable + if self.path.exists(): + return self.path.read_text() + + return "# Timmy Hot Memory\n\nNo memories stored yet.\n" + + def update_section(self, section: str, content: str) -> None: + """Update a specific section in MEMORY.md. + + DEPRECATED: Hot memory is now computed from the database. + This method is kept for backward compatibility during transition. + Use memory_write() to store facts in the database. + """ + logger.warning( + "HotMemory.update_section() is deprecated. " + "Use memory_write() to store facts in the database." + ) + + # Keep file-writing for backward compatibility during transition + # Guard against empty or excessively large writes + if not content or not content.strip(): + logger.warning("HotMemory: Refusing empty write to section '%s'", section) + return + if len(content) > 2000: + logger.warning("HotMemory: Truncating oversized write to section '%s'", section) + content = content[:2000] + "\n... [truncated]" + + if not self.path.exists(): + self._create_default() + + full_content = self.read() + + # Find section + pattern = rf"(## {re.escape(section)}.*?)(?=\n## |\Z)" + match = re.search(pattern, full_content, re.DOTALL) + + if match: + # Replace section + new_section = f"## {section}\n\n{content}\n\n" + full_content = full_content[: match.start()] + new_section + full_content[match.end() :] + else: + # Append section — guard against missing prune marker + insert_point = full_content.rfind("*Prune date:") + new_section = f"## {section}\n\n{content}\n\n" + if insert_point < 0: + # No prune marker — just append at end + full_content = full_content.rstrip() + "\n\n" + new_section + else: + full_content = ( + full_content[:insert_point] + new_section + "\n" + full_content[insert_point:] + ) + + self.path.write_text(full_content) + self._content = full_content + self._last_modified = self.path.stat().st_mtime + logger.info("HotMemory: Updated section '%s'", section) + + def _create_default(self) -> None: + """Create default MEMORY.md if missing. + + DEPRECATED: Hot memory is now computed from the database. + This method is kept for backward compatibility during transition. + """ + logger.debug( + "HotMemory._create_default() - creating default MEMORY.md for backward compatibility" + ) + now = datetime.now(UTC) + content = _DEFAULT_HOT_MEMORY_TEMPLATE.format( + date=now.strftime("%Y-%m-%d"), + prune_date=now.replace(day=25).strftime("%Y-%m-%d"), + ) + self.path.write_text(content) + logger.info("HotMemory: Created default MEMORY.md") + + +class VaultMemory: + """Tier 2: Structured vault (memory/) — append-only markdown.""" + + def __init__(self) -> None: + self.path = VAULT_PATH + self._ensure_structure() + + def _ensure_structure(self) -> None: + """Ensure vault directory structure exists.""" + (self.path / "self").mkdir(parents=True, exist_ok=True) + (self.path / "notes").mkdir(parents=True, exist_ok=True) + (self.path / "aar").mkdir(parents=True, exist_ok=True) + + def write_note(self, name: str, content: str, namespace: str = "notes") -> Path: + """Write a note to the vault.""" + # Add timestamp to filename + timestamp = datetime.now(UTC).strftime("%Y%m%d") + filename = f"{timestamp}_{name}.md" + filepath = self.path / namespace / filename + + # Add header + full_content = f"""# {name.replace("_", " ").title()} + +> Created: {datetime.now(UTC).isoformat()} +> Namespace: {namespace} + +--- + +{content} + +--- + +*Auto-generated by Timmy Memory System* +""" + + filepath.write_text(full_content) + logger.info("VaultMemory: Wrote %s", filepath) + return filepath + + def read_file(self, filepath: Path) -> str: + """Read a file from the vault.""" + if not filepath.exists(): + return "" + return filepath.read_text() + + def update_user_profile(self, key: str, value: str) -> None: + """Update a field in user_profile.md. + + DEPRECATED: User profile updates should now use memory_write() to store + facts in the database. This method is kept for backward compatibility. + """ + logger.warning( + "VaultMemory.update_user_profile() is deprecated. " + "Use memory_write() to store user facts in the database." + ) + # Still update the file for backward compatibility during transition + profile_path = self.path / "self" / "user_profile.md" + + if not profile_path.exists(): + self._create_default_profile() + + content = profile_path.read_text() + + pattern = rf"(\*\*{re.escape(key)}:\*\*).*" + if re.search(pattern, content): + safe_value = value.strip() + content = re.sub(pattern, lambda m: f"{m.group(1)} {safe_value}", content) + else: + facts_section = "## Important Facts" + if facts_section in content: + insert_point = content.find(facts_section) + len(facts_section) + content = content[:insert_point] + f"\n- {key}: {value}" + content[insert_point:] + + content = re.sub( + r"\*Last updated:.*\*", + f"*Last updated: {datetime.now(UTC).strftime('%Y-%m-%d')}*", + content, + ) + + profile_path.write_text(content) + logger.info("VaultMemory: Updated user profile: %s = %s", key, value) + + def _create_default_profile(self) -> None: + """Create default user profile.""" + profile_path = self.path / "self" / "user_profile.md" + default = """# User Profile + +> Learned information about the user. + +## Basic Information + +**Name:** (unknown) +**Location:** (unknown) +**Occupation:** (unknown) + +## Interests & Expertise + +- (to be learned) + +## Preferences + +- Response style: concise, technical +- Tool usage: minimal + +## Important Facts + +- (to be extracted) + +--- + +*Last updated: {date}* +""".format(date=datetime.now(UTC).strftime("%Y-%m-%d")) + + profile_path.write_text(default) diff --git a/src/timmy/memory/semantic.py b/src/timmy/memory/semantic.py new file mode 100644 index 0000000..345d180 --- /dev/null +++ b/src/timmy/memory/semantic.py @@ -0,0 +1,278 @@ +"""Semantic memory and search classes for Timmy. + +Provides SemanticMemory (vector search over vault content) and +MemorySearcher (high-level multi-tier search interface). +""" + +import hashlib +import json +import logging +import sqlite3 +from contextlib import closing, contextmanager +from collections.abc import Generator +from datetime import UTC, datetime +from pathlib import Path + +from config import settings +from timmy.memory.embeddings import ( + EMBEDDING_DIM, + _get_embedding_model, + cosine_similarity, + embed_text, +) +from timmy.memory.unified import ( + DB_PATH, + PROJECT_ROOT, + _ensure_schema, + get_connection, +) + +logger = logging.getLogger(__name__) + +VAULT_PATH = PROJECT_ROOT / "memory" + + +class SemanticMemory: + """Vector-based semantic search over vault content.""" + + def __init__(self) -> None: + self.db_path = DB_PATH + self.vault_path = VAULT_PATH + + @contextmanager + def _get_conn(self) -> Generator[sqlite3.Connection, None, None]: + """Get connection to the instance's db_path (backward compatibility). + + Uses self.db_path if set differently from global DB_PATH, + otherwise uses the global get_connection(). + """ + if self.db_path == DB_PATH: + # Use global connection (normal production path) + with get_connection() as conn: + yield conn + else: + # Use instance-specific db_path (test path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + with closing(sqlite3.connect(str(self.db_path))) as conn: + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") + # Ensure schema exists + _ensure_schema(conn) + yield conn + + def _init_db(self) -> None: + """Initialize database at self.db_path (backward compatibility). + + This method is kept for backward compatibility with existing code and tests. + Schema creation is handled by _get_conn. + """ + # Trigger schema creation via _get_conn + with self._get_conn(): + pass + + def index_file(self, filepath: Path) -> int: + """Index a single file into semantic memory.""" + if not filepath.exists(): + return 0 + + content = filepath.read_text() + file_hash = hashlib.md5(content.encode()).hexdigest() + + with self._get_conn() as conn: + # Check if already indexed with same hash + cursor = conn.execute( + "SELECT metadata FROM memories WHERE source = ? AND memory_type = 'vault_chunk' LIMIT 1", + (str(filepath),), + ) + existing = cursor.fetchone() + if existing and existing[0]: + try: + meta = json.loads(existing[0]) + if meta.get("source_hash") == file_hash: + return 0 # Already indexed + except json.JSONDecodeError: + pass + + # Delete old chunks for this file + conn.execute( + "DELETE FROM memories WHERE source = ? AND memory_type = 'vault_chunk'", + (str(filepath),), + ) + + # Split into chunks (paragraphs) + chunks = self._split_into_chunks(content) + + # Index each chunk + now = datetime.now(UTC).isoformat() + for i, chunk_text in enumerate(chunks): + if len(chunk_text.strip()) < 20: # Skip tiny chunks + continue + + chunk_id = f"{filepath.stem}_{i}" + chunk_embedding = embed_text(chunk_text) + + conn.execute( + """INSERT INTO memories + (id, content, memory_type, source, metadata, embedding, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + ( + chunk_id, + chunk_text, + "vault_chunk", + str(filepath), + json.dumps({"source_hash": file_hash, "chunk_index": i}), + json.dumps(chunk_embedding), + now, + ), + ) + + conn.commit() + + logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks)) + return len(chunks) + + def _split_into_chunks(self, text: str, max_chunk_size: int = 500) -> list[str]: + """Split text into semantic chunks.""" + # Split by paragraphs first + paragraphs = text.split("\n\n") + chunks = [] + + for para in paragraphs: + para = para.strip() + if not para: + continue + + # If paragraph is small enough, keep as one chunk + if len(para) <= max_chunk_size: + chunks.append(para) + else: + # Split long paragraphs by sentences + sentences = para.replace(". ", ".\n").split("\n") + current_chunk = "" + + for sent in sentences: + if len(current_chunk) + len(sent) < max_chunk_size: + current_chunk += " " + sent if current_chunk else sent + else: + if current_chunk: + chunks.append(current_chunk.strip()) + current_chunk = sent + + if current_chunk: + chunks.append(current_chunk.strip()) + + return chunks + + def index_vault(self) -> int: + """Index entire vault directory.""" + total_chunks = 0 + + for md_file in self.vault_path.rglob("*.md"): + # Skip handoff file (handled separately) + if "last-session-handoff" in md_file.name: + continue + total_chunks += self.index_file(md_file) + + logger.info("SemanticMemory: Indexed vault (%d total chunks)", total_chunks) + return total_chunks + + def search(self, query: str, top_k: int = 5) -> list[tuple[str, float]]: + """Search for relevant memory chunks.""" + query_embedding = embed_text(query) + + with self._get_conn() as conn: + conn.row_factory = sqlite3.Row + + # Get all vault chunks + rows = conn.execute( + "SELECT source, content, embedding FROM memories WHERE memory_type = 'vault_chunk'" + ).fetchall() + + # Calculate similarities + scored = [] + for row in rows: + embedding = json.loads(row["embedding"]) + score = cosine_similarity(query_embedding, embedding) + scored.append((row["source"], row["content"], score)) + + # Sort by score descending + scored.sort(key=lambda x: x[2], reverse=True) + + # Return top_k + return [(content, score) for _, content, score in scored[:top_k]] + + def get_relevant_context(self, query: str, max_chars: int = 2000) -> str: + """Get formatted context string for a query.""" + results = self.search(query, top_k=3) + + if not results: + return "" + + parts = [] + total_chars = 0 + + for content, score in results: + if score < 0.3: # Similarity threshold + continue + + chunk = f"[Relevant memory - score {score:.2f}]: {content[:400]}..." + if total_chars + len(chunk) > max_chars: + break + + parts.append(chunk) + total_chars += len(chunk) + + return "\n\n".join(parts) if parts else "" + + def stats(self) -> dict: + """Get indexing statistics.""" + with self._get_conn() as conn: + cursor = conn.execute( + "SELECT COUNT(*), COUNT(DISTINCT source) FROM memories WHERE memory_type = 'vault_chunk'" + ) + total_chunks, total_files = cursor.fetchone() + + return { + "total_chunks": total_chunks, + "total_files": total_files, + "embedding_dim": EMBEDDING_DIM if _get_embedding_model() else 128, + } + + +class MemorySearcher: + """High-level interface for memory search.""" + + def __init__(self) -> None: + self.semantic = SemanticMemory() + + def search(self, query: str, tiers: list[str] = None) -> dict: + """Search across memory tiers. + + Args: + query: Search query + tiers: List of tiers to search ["hot", "vault", "semantic"] + + Returns: + Dict with results from each tier + """ + tiers = tiers or ["semantic"] # Default to semantic only + results = {} + + if "semantic" in tiers: + semantic_results = self.semantic.search(query, top_k=5) + results["semantic"] = [ + {"content": content, "score": score} for content, score in semantic_results + ] + + return results + + def get_context_for_query(self, query: str) -> str: + """Get comprehensive context for a user query.""" + # Get semantic context + semantic_context = self.semantic.get_relevant_context(query) + + if semantic_context: + return f"## Relevant Past Context\n\n{semantic_context}" + + return "" diff --git a/src/timmy/memory_system.py b/src/timmy/memory_system.py index 3f37a48..ef6ee88 100644 --- a/src/timmy/memory_system.py +++ b/src/timmy/memory_system.py @@ -1,23 +1,16 @@ -"""Consolidated memory system for Timmy — single source of truth. +"""Consolidated memory system for Timmy — thin orchestrator. -Architecture: -- Database: Single `memories` table with unified schema -- Embeddings: timmy.memory.embeddings (extracted) -- CRUD: store_memory, search_memories, delete_memory, etc. -- Tool functions: memory_search, memory_read, memory_write, memory_forget -- Classes: HotMemory, VaultMemory, MemorySystem, SemanticMemory, MemorySearcher +Submodules: memory.chain (CRUD), memory.semantic (search), +memory.consolidation (tiers), memory.unified (DB/schema). """ -import hashlib -import json import logging import re import sqlite3 import uuid from collections.abc import Generator from contextlib import closing, contextmanager -from dataclasses import dataclass, field -from datetime import UTC, datetime, timedelta +from datetime import UTC, datetime from pathlib import Path from config import settings @@ -31,78 +24,22 @@ from timmy.memory.embeddings import ( cosine_similarity, embed_text, ) +from timmy.memory.unified import ( + MemoryChunk, # noqa: F401 — re-exported for backward compatibility + MemoryEntry, # noqa: F401 — re-exported for backward compatibility + _ensure_schema, +) logger = logging.getLogger(__name__) -# Paths +# Paths — defined here so tests can patch timmy.memory_system. PROJECT_ROOT = Path(__file__).parent.parent.parent HOT_MEMORY_PATH = PROJECT_ROOT / "MEMORY.md" VAULT_PATH = PROJECT_ROOT / "memory" SOUL_PATH = VAULT_PATH / "self" / "soul.md" DB_PATH = PROJECT_ROOT / "data" / "memory.db" - - -# ─────────────────────────────────────────────────────────────────────────────── -# Database Connection -# ─────────────────────────────────────────────────────────────────────────────── - - -_DEFAULT_HOT_MEMORY_TEMPLATE = """\ -# Timmy Hot Memory - -> Working RAM — always loaded, ~300 lines max, pruned monthly -> Last updated: {date} - ---- - -## Current Status - -**Agent State:** Operational -**Mode:** Development -**Active Tasks:** 0 -**Pending Decisions:** None - ---- - -## Standing Rules - -1. **Sovereignty First** — No cloud dependencies -2. **Local-Only Inference** — Ollama on localhost -3. **Privacy by Design** — Telemetry disabled -4. **Tool Minimalism** — Use tools only when necessary -5. **Memory Discipline** — Write handoffs at session end - ---- - -## Agent Roster - -| Agent | Role | Status | -|-------|------|--------| -| Timmy | Core | Active | - ---- - -## User Profile - -**Name:** (not set) -**Interests:** (to be learned) - ---- - -## Key Decisions - -(none yet) - ---- - -## Pending Actions - -- [ ] Learn user's name - ---- - -*Prune date: {prune_date}* -""" +NOTES_DIR = Path.home() / ".timmy" / "notes" +DECISION_LOG = Path.home() / ".timmy" / "decisions.md" @contextmanager @@ -117,547 +54,34 @@ def get_connection() -> Generator[sqlite3.Connection, None, None]: yield conn -def _ensure_schema(conn: sqlite3.Connection) -> None: - """Create the unified memories table and indexes if they don't exist.""" - conn.execute(""" - CREATE TABLE IF NOT EXISTS memories ( - id TEXT PRIMARY KEY, - content TEXT NOT NULL, - memory_type TEXT NOT NULL DEFAULT 'fact', - source TEXT NOT NULL DEFAULT 'agent', - embedding TEXT, - metadata TEXT, - source_hash TEXT, - agent_id TEXT, - task_id TEXT, - session_id TEXT, - confidence REAL NOT NULL DEFAULT 0.8, - tags TEXT NOT NULL DEFAULT '[]', - created_at TEXT NOT NULL, - last_accessed TEXT, - access_count INTEGER NOT NULL DEFAULT 0 - ) - """) - - # Create indexes for efficient querying - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_session ON memories(session_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(agent_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") - conn.commit() - - # Run migration if needed - _migrate_schema(conn) - - -def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]: - """Get the column names for a table.""" - cursor = conn.execute(f"PRAGMA table_info({table_name})") - return {row[1] for row in cursor.fetchall()} - - -def _migrate_episodes(conn: sqlite3.Connection) -> None: - """Migrate episodes table rows into the unified memories table.""" - logger.info("Migration: Converting episodes table to memories") - try: - cols = _get_table_columns(conn, "episodes") - context_type_col = "context_type" if "context_type" in cols else "'conversation'" - - conn.execute(f""" - INSERT INTO memories ( - id, content, memory_type, source, embedding, - metadata, agent_id, task_id, session_id, - created_at, access_count, last_accessed - ) - SELECT - id, content, - COALESCE({context_type_col}, 'conversation'), - COALESCE(source, 'agent'), - embedding, - metadata, agent_id, task_id, session_id, - COALESCE(timestamp, datetime('now')), 0, NULL - FROM episodes - """) - conn.execute("DROP TABLE episodes") - logger.info("Migration: Migrated episodes to memories") - except sqlite3.Error as exc: - logger.warning("Migration: Failed to migrate episodes: %s", exc) - - -def _migrate_chunks(conn: sqlite3.Connection) -> None: - """Migrate chunks table rows into the unified memories table.""" - logger.info("Migration: Converting chunks table to memories") - try: - cols = _get_table_columns(conn, "chunks") - - id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)" - content_col = "content" if "content" in cols else "text" - source_col = ( - "filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'") - ) - embedding_col = "embedding" if "embedding" in cols else "NULL" - created_col = "created_at" if "created_at" in cols else "datetime('now')" - - conn.execute(f""" - INSERT INTO memories ( - id, content, memory_type, source, embedding, - created_at, access_count - ) - SELECT - {id_col}, {content_col}, 'vault_chunk', {source_col}, - {embedding_col}, {created_col}, 0 - FROM chunks - """) - conn.execute("DROP TABLE chunks") - logger.info("Migration: Migrated chunks to memories") - except sqlite3.Error as exc: - logger.warning("Migration: Failed to migrate chunks: %s", exc) - - -def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None: - """Drop a legacy table if it exists.""" - try: - conn.execute(f"DROP TABLE {table}") # noqa: S608 - logger.info("Migration: Dropped old %s table", table) - except sqlite3.Error as exc: - logger.warning("Migration: Failed to drop %s: %s", table, exc) - - -def _migrate_schema(conn: sqlite3.Connection) -> None: - """Migrate from old three-table schema to unified memories table. - - Migration paths: - - episodes table -> memories (context_type -> memory_type) - - chunks table -> memories with memory_type='vault_chunk' - - facts table -> dropped (unused, 0 rows expected) - """ - cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") - tables = {row[0] for row in cursor.fetchall()} - - has_memories = "memories" in tables - - if not has_memories and (tables & {"episodes", "chunks", "facts"}): - logger.info("Migration: Creating unified memories table") - - if "episodes" in tables and has_memories: - _migrate_episodes(conn) - if "chunks" in tables and has_memories: - _migrate_chunks(conn) - if "facts" in tables: - _drop_legacy_table(conn, "facts") - - conn.commit() - - # Alias for backward compatibility get_conn = get_connection -# ─────────────────────────────────────────────────────────────────────────────── -# Data Classes -# ─────────────────────────────────────────────────────────────────────────────── - - -@dataclass -class MemoryEntry: - """A memory entry with vector embedding. - - Note: The DB column is `memory_type` but this field is named `context_type` - for backward API compatibility. - """ - - id: str = field(default_factory=lambda: str(uuid.uuid4())) - content: str = "" # The actual text content - source: str = "" # Where it came from (agent, user, system) - context_type: str = "conversation" # API field name; DB column is memory_type - agent_id: str | None = None - task_id: str | None = None - session_id: str | None = None - metadata: dict | None = None - embedding: list[float] | None = None - timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) - relevance_score: float | None = None # Set during search - - -@dataclass -class MemoryChunk: - """A searchable chunk of memory.""" - - id: str - source: str # filepath - content: str - embedding: list[float] - created_at: str +from timmy.memory.chain import ( # noqa: E402 + _build_search_filters, + _fetch_memory_candidates, + _row_to_entry, + _score_and_filter, + delete_memory, + get_memory_context, + get_memory_stats, + prune_memories, + recall_personal_facts, + recall_personal_facts_with_ids, + store_personal_fact, + update_personal_fact, +) +from timmy.memory.chain import search_memories, store_memory # noqa: E402 +from timmy.memory.consolidation import HotMemory, VaultMemory # noqa: E402, F401 +from timmy.memory.semantic import MemorySearcher, SemanticMemory # noqa: E402, F401 # ─────────────────────────────────────────────────────────────────────────────── -# CRUD Operations +# Reflection functions — use local get_connection() so DB_PATH patches work # ─────────────────────────────────────────────────────────────────────────────── -def store_memory( - content: str, - source: str, - context_type: str = "conversation", - agent_id: str | None = None, - task_id: str | None = None, - session_id: str | None = None, - metadata: dict | None = None, - compute_embedding: bool = True, -) -> MemoryEntry: - """Store a memory entry with optional embedding. - - Args: - content: The text content to store - source: Source of the memory (agent name, user, system) - context_type: Type of context (conversation, document, fact, vault_chunk) - agent_id: Associated agent ID - task_id: Associated task ID - session_id: Session identifier - metadata: Additional structured data - compute_embedding: Whether to compute vector embedding - - Returns: - The stored MemoryEntry - """ - embedding = None - if compute_embedding: - embedding = embed_text(content) - - entry = MemoryEntry( - content=content, - source=source, - context_type=context_type, - agent_id=agent_id, - task_id=task_id, - session_id=session_id, - metadata=metadata, - embedding=embedding, - ) - - with get_connection() as conn: - conn.execute( - """ - INSERT INTO memories - (id, content, memory_type, source, agent_id, task_id, session_id, - metadata, embedding, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - entry.id, - entry.content, - entry.context_type, # DB column is memory_type - entry.source, - entry.agent_id, - entry.task_id, - entry.session_id, - json.dumps(metadata) if metadata else None, - json.dumps(embedding) if embedding else None, - entry.timestamp, - ), - ) - conn.commit() - - return entry - - -def _build_search_filters( - context_type: str | None, - agent_id: str | None, - session_id: str | None, -) -> tuple[str, list]: - """Build SQL WHERE clause and params from search filters.""" - conditions: list[str] = [] - params: list = [] - - if context_type: - conditions.append("memory_type = ?") - params.append(context_type) - if agent_id: - conditions.append("agent_id = ?") - params.append(agent_id) - if session_id: - conditions.append("session_id = ?") - params.append(session_id) - - where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" - return where_clause, params - - -def _fetch_memory_candidates( - where_clause: str, params: list, candidate_limit: int -) -> list[sqlite3.Row]: - """Fetch candidate memory rows from the database.""" - query_sql = f""" - SELECT * FROM memories - {where_clause} - ORDER BY created_at DESC - LIMIT ? - """ - params.append(candidate_limit) - - with get_connection() as conn: - return conn.execute(query_sql, params).fetchall() - - -def _row_to_entry(row: sqlite3.Row) -> MemoryEntry: - """Convert a database row to a MemoryEntry.""" - return MemoryEntry( - id=row["id"], - content=row["content"], - source=row["source"], - context_type=row["memory_type"], # DB column -> API field - agent_id=row["agent_id"], - task_id=row["task_id"], - session_id=row["session_id"], - metadata=json.loads(row["metadata"]) if row["metadata"] else None, - embedding=json.loads(row["embedding"]) if row["embedding"] else None, - timestamp=row["created_at"], - ) - - -def _score_and_filter( - rows: list[sqlite3.Row], - query: str, - query_embedding: list[float], - min_relevance: float, -) -> list[MemoryEntry]: - """Score candidate rows by similarity and filter by min_relevance.""" - results = [] - for row in rows: - entry = _row_to_entry(row) - - if entry.embedding: - score = cosine_similarity(query_embedding, entry.embedding) - else: - score = _keyword_overlap(query, entry.content) - - entry.relevance_score = score - if score >= min_relevance: - results.append(entry) - - results.sort(key=lambda x: x.relevance_score or 0, reverse=True) - return results - - -def search_memories( - query: str, - limit: int = 10, - context_type: str | None = None, - agent_id: str | None = None, - session_id: str | None = None, - min_relevance: float = 0.0, -) -> list[MemoryEntry]: - """Search for memories by semantic similarity. - - Args: - query: Search query text - limit: Maximum results - context_type: Filter by memory type (maps to DB memory_type column) - agent_id: Filter by agent - session_id: Filter by session - min_relevance: Minimum similarity score (0-1) - - Returns: - List of MemoryEntry objects sorted by relevance - """ - query_embedding = embed_text(query) - where_clause, params = _build_search_filters(context_type, agent_id, session_id) - rows = _fetch_memory_candidates(where_clause, params, limit * 3) - results = _score_and_filter(rows, query, query_embedding, min_relevance) - return results[:limit] - - -def delete_memory(memory_id: str) -> bool: - """Delete a memory entry by ID. - - Returns: - True if deleted, False if not found - """ - with get_connection() as conn: - cursor = conn.execute( - "DELETE FROM memories WHERE id = ?", - (memory_id,), - ) - conn.commit() - return cursor.rowcount > 0 - - -def get_memory_stats() -> dict: - """Get statistics about the memory store. - - Returns: - Dict with counts by type, total entries, etc. - """ - with get_connection() as conn: - total = conn.execute("SELECT COUNT(*) as count FROM memories").fetchone()["count"] - - by_type = {} - rows = conn.execute( - "SELECT memory_type, COUNT(*) as count FROM memories GROUP BY memory_type" - ).fetchall() - for row in rows: - by_type[row["memory_type"]] = row["count"] - - with_embeddings = conn.execute( - "SELECT COUNT(*) as count FROM memories WHERE embedding IS NOT NULL" - ).fetchone()["count"] - - return { - "total_entries": total, - "by_type": by_type, - "with_embeddings": with_embeddings, - "has_embedding_model": _get_embedding_model() is not False, - } - - -def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int: - """Delete old memories to manage storage. - - Args: - older_than_days: Delete memories older than this - keep_facts: Whether to preserve fact-type memories - - Returns: - Number of entries deleted - """ - cutoff = (datetime.now(UTC) - timedelta(days=older_than_days)).isoformat() - - with get_connection() as conn: - if keep_facts: - cursor = conn.execute( - """ - DELETE FROM memories - WHERE created_at < ? AND memory_type != 'fact' - """, - (cutoff,), - ) - else: - cursor = conn.execute( - "DELETE FROM memories WHERE created_at < ?", - (cutoff,), - ) - - deleted = cursor.rowcount - conn.commit() - - return deleted - - -def get_memory_context(query: str, max_tokens: int = 2000, **filters) -> str: - """Get relevant memory context as formatted text for LLM prompts. - - Args: - query: Search query - max_tokens: Approximate maximum tokens to return - **filters: Additional filters (agent_id, session_id, etc.) - - Returns: - Formatted context string for inclusion in prompts - """ - memories = search_memories(query, limit=20, **filters) - - context_parts = [] - total_chars = 0 - max_chars = max_tokens * 4 # Rough approximation - - for mem in memories: - formatted = f"[{mem.source}]: {mem.content}" - if total_chars + len(formatted) > max_chars: - break - context_parts.append(formatted) - total_chars += len(formatted) - - if not context_parts: - return "" - - return "Relevant context from memory:\n" + "\n\n".join(context_parts) - - -# ─────────────────────────────────────────────────────────────────────────────── -# Personal Facts Functions -# ─────────────────────────────────────────────────────────────────────────────── - - -def recall_personal_facts(agent_id: str | None = None) -> list[str]: - """Recall personal facts about the user or system. - - Args: - agent_id: Optional agent filter - - Returns: - List of fact strings - """ - with get_connection() as conn: - if agent_id: - rows = conn.execute( - """ - SELECT content FROM memories - WHERE memory_type = 'fact' AND agent_id = ? - ORDER BY created_at DESC - LIMIT 100 - """, - (agent_id,), - ).fetchall() - else: - rows = conn.execute( - """ - SELECT content FROM memories - WHERE memory_type = 'fact' - ORDER BY created_at DESC - LIMIT 100 - """, - ).fetchall() - - return [r["content"] for r in rows] - - -def recall_personal_facts_with_ids(agent_id: str | None = None) -> list[dict]: - """Recall personal facts with their IDs for edit/delete operations.""" - with get_connection() as conn: - if agent_id: - rows = conn.execute( - "SELECT id, content FROM memories WHERE memory_type = 'fact' AND agent_id = ? ORDER BY created_at DESC LIMIT 100", - (agent_id,), - ).fetchall() - else: - rows = conn.execute( - "SELECT id, content FROM memories WHERE memory_type = 'fact' ORDER BY created_at DESC LIMIT 100", - ).fetchall() - return [{"id": r["id"], "content": r["content"]} for r in rows] - - -def update_personal_fact(memory_id: str, new_content: str) -> bool: - """Update a personal fact's content.""" - with get_connection() as conn: - cursor = conn.execute( - "UPDATE memories SET content = ? WHERE id = ? AND memory_type = 'fact'", - (new_content, memory_id), - ) - conn.commit() - return cursor.rowcount > 0 - - -def store_personal_fact(fact: str, agent_id: str | None = None) -> MemoryEntry: - """Store a personal fact about the user or system. - - Args: - fact: The fact to store - agent_id: Associated agent - - Returns: - The stored MemoryEntry - """ - return store_memory( - content=fact, - source="system", - context_type="fact", - agent_id=agent_id, - metadata={"auto_extracted": False}, - ) - - def store_last_reflection(reflection: str) -> None: """Store the last reflection, replacing any previous one. @@ -691,521 +115,15 @@ def recall_last_reflection() -> str | None: # ─────────────────────────────────────────────────────────────────────────────── -# Hot Memory (computed from DB instead of MEMORY.md) +# Tool Functions # ─────────────────────────────────────────────────────────────────────────────── -class HotMemory: - """Tier 1: Hot memory — computed view of top facts from DB.""" - - def __init__(self) -> None: - self.path = HOT_MEMORY_PATH - self._content: str | None = None - self._last_modified: float | None = None - - def read(self, force_refresh: bool = False) -> str: - """Read hot memory — computed view of top facts + last reflection from DB.""" - try: - facts = recall_personal_facts() - lines = ["# Timmy Hot Memory\n"] - - if facts: - lines.append("## Known Facts\n") - for f in facts[:15]: - lines.append(f"- {f}") - - # Include the last reflection if available - reflection = recall_last_reflection() - if reflection: - lines.append("\n## Last Reflection\n") - lines.append(reflection) - - if len(lines) > 1: - return "\n".join(lines) - except Exception: - logger.debug("DB context read failed, falling back to file") - - # Fallback to file if DB unavailable - if self.path.exists(): - return self.path.read_text() - - return "# Timmy Hot Memory\n\nNo memories stored yet.\n" - - def update_section(self, section: str, content: str) -> None: - """Update a specific section in MEMORY.md. - - DEPRECATED: Hot memory is now computed from the database. - This method is kept for backward compatibility during transition. - Use memory_write() to store facts in the database. - """ - logger.warning( - "HotMemory.update_section() is deprecated. " - "Use memory_write() to store facts in the database." - ) - - # Keep file-writing for backward compatibility during transition - # Guard against empty or excessively large writes - if not content or not content.strip(): - logger.warning("HotMemory: Refusing empty write to section '%s'", section) - return - if len(content) > 2000: - logger.warning("HotMemory: Truncating oversized write to section '%s'", section) - content = content[:2000] + "\n... [truncated]" - - if not self.path.exists(): - self._create_default() - - full_content = self.read() - - # Find section - pattern = rf"(## {re.escape(section)}.*?)(?=\n## |\Z)" - match = re.search(pattern, full_content, re.DOTALL) - - if match: - # Replace section - new_section = f"## {section}\n\n{content}\n\n" - full_content = full_content[: match.start()] + new_section + full_content[match.end() :] - else: - # Append section — guard against missing prune marker - insert_point = full_content.rfind("*Prune date:") - new_section = f"## {section}\n\n{content}\n\n" - if insert_point < 0: - # No prune marker — just append at end - full_content = full_content.rstrip() + "\n\n" + new_section - else: - full_content = ( - full_content[:insert_point] + new_section + "\n" + full_content[insert_point:] - ) - - self.path.write_text(full_content) - self._content = full_content - self._last_modified = self.path.stat().st_mtime - logger.info("HotMemory: Updated section '%s'", section) - - def _create_default(self) -> None: - """Create default MEMORY.md if missing. - - DEPRECATED: Hot memory is now computed from the database. - This method is kept for backward compatibility during transition. - """ - logger.debug( - "HotMemory._create_default() - creating default MEMORY.md for backward compatibility" - ) - now = datetime.now(UTC) - content = _DEFAULT_HOT_MEMORY_TEMPLATE.format( - date=now.strftime("%Y-%m-%d"), - prune_date=now.replace(day=25).strftime("%Y-%m-%d"), - ) - self.path.write_text(content) - logger.info("HotMemory: Created default MEMORY.md") - - -# ─────────────────────────────────────────────────────────────────────────────── -# Vault Memory (memory/ directory) -# ─────────────────────────────────────────────────────────────────────────────── - - -class VaultMemory: - """Tier 2: Structured vault (memory/) — append-only markdown.""" - - def __init__(self) -> None: - self.path = VAULT_PATH - self._ensure_structure() - - def _ensure_structure(self) -> None: - """Ensure vault directory structure exists.""" - (self.path / "self").mkdir(parents=True, exist_ok=True) - (self.path / "notes").mkdir(parents=True, exist_ok=True) - (self.path / "aar").mkdir(parents=True, exist_ok=True) - - def write_note(self, name: str, content: str, namespace: str = "notes") -> Path: - """Write a note to the vault.""" - # Add timestamp to filename - timestamp = datetime.now(UTC).strftime("%Y%m%d") - filename = f"{timestamp}_{name}.md" - filepath = self.path / namespace / filename - - # Add header - full_content = f"""# {name.replace("_", " ").title()} - -> Created: {datetime.now(UTC).isoformat()} -> Namespace: {namespace} - ---- - -{content} - ---- - -*Auto-generated by Timmy Memory System* -""" - - filepath.write_text(full_content) - logger.info("VaultMemory: Wrote %s", filepath) - return filepath - - def read_file(self, filepath: Path) -> str: - """Read a file from the vault.""" - if not filepath.exists(): - return "" - return filepath.read_text() - - def update_user_profile(self, key: str, value: str) -> None: - """Update a field in user_profile.md. - - DEPRECATED: User profile updates should now use memory_write() to store - facts in the database. This method is kept for backward compatibility. - """ - logger.warning( - "VaultMemory.update_user_profile() is deprecated. " - "Use memory_write() to store user facts in the database." - ) - # Still update the file for backward compatibility during transition - profile_path = self.path / "self" / "user_profile.md" - - if not profile_path.exists(): - self._create_default_profile() - - content = profile_path.read_text() - - pattern = rf"(\*\*{re.escape(key)}:\*\*).*" - if re.search(pattern, content): - safe_value = value.strip() - content = re.sub(pattern, lambda m: f"{m.group(1)} {safe_value}", content) - else: - facts_section = "## Important Facts" - if facts_section in content: - insert_point = content.find(facts_section) + len(facts_section) - content = content[:insert_point] + f"\n- {key}: {value}" + content[insert_point:] - - content = re.sub( - r"\*Last updated:.*\*", - f"*Last updated: {datetime.now(UTC).strftime('%Y-%m-%d')}*", - content, - ) - - profile_path.write_text(content) - logger.info("VaultMemory: Updated user profile: %s = %s", key, value) - - def _create_default_profile(self) -> None: - """Create default user profile.""" - profile_path = self.path / "self" / "user_profile.md" - default = """# User Profile - -> Learned information about the user. - -## Basic Information - -**Name:** (unknown) -**Location:** (unknown) -**Occupation:** (unknown) - -## Interests & Expertise - -- (to be learned) - -## Preferences - -- Response style: concise, technical -- Tool usage: minimal - -## Important Facts - -- (to be extracted) - ---- - -*Last updated: {date}* -""".format(date=datetime.now(UTC).strftime("%Y-%m-%d")) - - profile_path.write_text(default) - - -# ─────────────────────────────────────────────────────────────────────────────── -# Semantic Memory (Vector search) -# ─────────────────────────────────────────────────────────────────────────────── - - -class SemanticMemory: - """Vector-based semantic search over vault content.""" - - def __init__(self) -> None: - self.db_path = DB_PATH - self.vault_path = VAULT_PATH - - @contextmanager - def _get_conn(self) -> Generator[sqlite3.Connection, None, None]: - """Get connection to the instance's db_path (backward compatibility). - - Uses self.db_path if set differently from global DB_PATH, - otherwise uses the global get_connection(). - """ - if self.db_path == DB_PATH: - # Use global connection (normal production path) - with get_connection() as conn: - yield conn - else: - # Use instance-specific db_path (test path) - self.db_path.parent.mkdir(parents=True, exist_ok=True) - with closing(sqlite3.connect(str(self.db_path))) as conn: - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") - conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") - # Ensure schema exists - conn.execute(""" - CREATE TABLE IF NOT EXISTS memories ( - id TEXT PRIMARY KEY, - content TEXT NOT NULL, - memory_type TEXT NOT NULL DEFAULT 'fact', - source TEXT NOT NULL DEFAULT 'agent', - embedding TEXT, - metadata TEXT, - source_hash TEXT, - agent_id TEXT, - task_id TEXT, - session_id TEXT, - confidence REAL NOT NULL DEFAULT 0.8, - tags TEXT NOT NULL DEFAULT '[]', - created_at TEXT NOT NULL, - last_accessed TEXT, - access_count INTEGER NOT NULL DEFAULT 0 - ) - """) - conn.execute( - "CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)" - ) - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") - conn.commit() - yield conn - - def _init_db(self) -> None: - """Initialize database at self.db_path (backward compatibility). - - This method is kept for backward compatibility with existing code and tests. - Schema creation is handled by _get_conn. - """ - # Trigger schema creation via _get_conn - with self._get_conn(): - pass - - def index_file(self, filepath: Path) -> int: - """Index a single file into semantic memory.""" - if not filepath.exists(): - return 0 - - content = filepath.read_text() - file_hash = hashlib.md5(content.encode()).hexdigest() - - with self._get_conn() as conn: - # Check if already indexed with same hash - cursor = conn.execute( - "SELECT metadata FROM memories WHERE source = ? AND memory_type = 'vault_chunk' LIMIT 1", - (str(filepath),), - ) - existing = cursor.fetchone() - if existing and existing[0]: - try: - meta = json.loads(existing[0]) - if meta.get("source_hash") == file_hash: - return 0 # Already indexed - except json.JSONDecodeError: - pass - - # Delete old chunks for this file - conn.execute( - "DELETE FROM memories WHERE source = ? AND memory_type = 'vault_chunk'", - (str(filepath),), - ) - - # Split into chunks (paragraphs) - chunks = self._split_into_chunks(content) - - # Index each chunk - now = datetime.now(UTC).isoformat() - for i, chunk_text in enumerate(chunks): - if len(chunk_text.strip()) < 20: # Skip tiny chunks - continue - - chunk_id = f"{filepath.stem}_{i}" - chunk_embedding = embed_text(chunk_text) - - conn.execute( - """INSERT INTO memories - (id, content, memory_type, source, metadata, embedding, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?)""", - ( - chunk_id, - chunk_text, - "vault_chunk", - str(filepath), - json.dumps({"source_hash": file_hash, "chunk_index": i}), - json.dumps(chunk_embedding), - now, - ), - ) - - conn.commit() - - logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks)) - return len(chunks) - - def _split_into_chunks(self, text: str, max_chunk_size: int = 500) -> list[str]: - """Split text into semantic chunks.""" - # Split by paragraphs first - paragraphs = text.split("\n\n") - chunks = [] - - for para in paragraphs: - para = para.strip() - if not para: - continue - - # If paragraph is small enough, keep as one chunk - if len(para) <= max_chunk_size: - chunks.append(para) - else: - # Split long paragraphs by sentences - sentences = para.replace(". ", ".\n").split("\n") - current_chunk = "" - - for sent in sentences: - if len(current_chunk) + len(sent) < max_chunk_size: - current_chunk += " " + sent if current_chunk else sent - else: - if current_chunk: - chunks.append(current_chunk.strip()) - current_chunk = sent - - if current_chunk: - chunks.append(current_chunk.strip()) - - return chunks - - def index_vault(self) -> int: - """Index entire vault directory.""" - total_chunks = 0 - - for md_file in self.vault_path.rglob("*.md"): - # Skip handoff file (handled separately) - if "last-session-handoff" in md_file.name: - continue - total_chunks += self.index_file(md_file) - - logger.info("SemanticMemory: Indexed vault (%d total chunks)", total_chunks) - return total_chunks - - def search(self, query: str, top_k: int = 5) -> list[tuple[str, float]]: - """Search for relevant memory chunks.""" - query_embedding = embed_text(query) - - with self._get_conn() as conn: - conn.row_factory = sqlite3.Row - - # Get all vault chunks - rows = conn.execute( - "SELECT source, content, embedding FROM memories WHERE memory_type = 'vault_chunk'" - ).fetchall() - - # Calculate similarities - scored = [] - for row in rows: - embedding = json.loads(row["embedding"]) - score = cosine_similarity(query_embedding, embedding) - scored.append((row["source"], row["content"], score)) - - # Sort by score descending - scored.sort(key=lambda x: x[2], reverse=True) - - # Return top_k - return [(content, score) for _, content, score in scored[:top_k]] - - def get_relevant_context(self, query: str, max_chars: int = 2000) -> str: - """Get formatted context string for a query.""" - results = self.search(query, top_k=3) - - if not results: - return "" - - parts = [] - total_chars = 0 - - for content, score in results: - if score < 0.3: # Similarity threshold - continue - - chunk = f"[Relevant memory - score {score:.2f}]: {content[:400]}..." - if total_chars + len(chunk) > max_chars: - break - - parts.append(chunk) - total_chars += len(chunk) - - return "\n\n".join(parts) if parts else "" - - def stats(self) -> dict: - """Get indexing statistics.""" - with self._get_conn() as conn: - cursor = conn.execute( - "SELECT COUNT(*), COUNT(DISTINCT source) FROM memories WHERE memory_type = 'vault_chunk'" - ) - total_chunks, total_files = cursor.fetchone() - - return { - "total_chunks": total_chunks, - "total_files": total_files, - "embedding_dim": EMBEDDING_DIM if _get_embedding_model() else 128, - } - - -class MemorySearcher: - """High-level interface for memory search.""" - - def __init__(self) -> None: - self.semantic = SemanticMemory() - - def search(self, query: str, tiers: list[str] = None) -> dict: - """Search across memory tiers. - - Args: - query: Search query - tiers: List of tiers to search ["hot", "vault", "semantic"] - - Returns: - Dict with results from each tier - """ - tiers = tiers or ["semantic"] # Default to semantic only - results = {} - - if "semantic" in tiers: - semantic_results = self.semantic.search(query, top_k=5) - results["semantic"] = [ - {"content": content, "score": score} for content, score in semantic_results - ] - - return results - - def get_context_for_query(self, query: str) -> str: - """Get comprehensive context for a user query.""" - # Get semantic context - semantic_context = self.semantic.get_relevant_context(query) - - if semantic_context: - return f"## Relevant Past Context\n\n{semantic_context}" - - return "" - - # Module-level singletons semantic_memory = SemanticMemory() memory_searcher = MemorySearcher() -# ─────────────────────────────────────────────────────────────────────────────── -# Tool Functions -# ─────────────────────────────────────────────────────────────────────────────── - - def memory_search(query: str, limit: int = 10) -> str: """Search past conversations, notes, and stored facts for relevant context. @@ -1373,9 +291,6 @@ def memory_forget(query: str) -> str: # Artifact Tools — "hands" for producing artifacts during conversation # ─────────────────────────────────────────────────────────────────────────────── -NOTES_DIR = Path.home() / ".timmy" / "notes" -DECISION_LOG = Path.home() / ".timmy" / "decisions.md" - def jot_note(title: str, body: str) -> str: """Write a markdown note to Timmy's workspace (~/.timmy/notes/). @@ -1455,7 +370,7 @@ class MemorySystem: """Central memory system coordinating all tiers.""" def __init__(self) -> None: - self.hot = HotMemory() + self.hot = HotMemory(HOT_MEMORY_PATH) self.vault = VaultMemory() self.session_decisions: list[str] = []