diff --git a/src/config.py b/src/config.py index 9c06abbf..6bb7abc6 100644 --- a/src/config.py +++ b/src/config.py @@ -1,12 +1,12 @@ import logging as _logging import os import sys +from datetime import UTC +from datetime import datetime as _datetime from typing import Literal from pydantic_settings import BaseSettings, SettingsConfigDict -from datetime import UTC, datetime as _datetime - APP_START_TIME: _datetime = _datetime.now(UTC) diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 932d15f5..2538e540 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -305,7 +305,7 @@ async def lifespan(app: FastAPI): # Auto-prune old vector store memories on startup if settings.memory_prune_days > 0: try: - from timmy.memory.vector_store import prune_memories + from timmy.memory_system import prune_memories pruned = prune_memories( older_than_days=settings.memory_prune_days, diff --git a/src/dashboard/routes/memory.py b/src/dashboard/routes/memory.py index 22574564..97ce6d7c 100644 --- a/src/dashboard/routes/memory.py +++ b/src/dashboard/routes/memory.py @@ -4,7 +4,7 @@ from fastapi import APIRouter, Form, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse from dashboard.templating import templates -from timmy.memory.vector_store import ( +from timmy.memory_system import ( delete_memory, get_memory_stats, recall_personal_facts_with_ids, diff --git a/src/dashboard/routes/system.py b/src/dashboard/routes/system.py index bbb9480b..9beb3dc9 100644 --- a/src/dashboard/routes/system.py +++ b/src/dashboard/routes/system.py @@ -180,7 +180,7 @@ async def api_briefing_status(): @router.get("/api/memory/status", response_class=JSONResponse) async def api_memory_status(): """Return memory database status including file info and indexed files count.""" - from timmy.memory.vector_store import get_memory_stats + from timmy.memory_system import get_memory_stats db_path = Path(settings.repo_root) / "data" / "memory.db" db_exists = db_path.exists() diff --git a/src/timmy/memory/unified.py b/src/timmy/memory/unified.py index bd8633fa..6157ae7f 100644 --- a/src/timmy/memory/unified.py +++ b/src/timmy/memory/unified.py @@ -1,88 +1,5 @@ -"""Unified memory database — single SQLite DB for all memory types. +"""Backward compatibility — schema lives in memory_system now.""" -Consolidates three previously separate stores into one: -- **facts**: Long-term knowledge (user preferences, learned patterns) -- **chunks**: Indexed vault documents (markdown files from memory/) -- **episodes**: Runtime memories (conversations, agent observations) +from timmy.memory_system import DB_PATH, get_connection -All three tables live in ``data/memory.db``. Existing APIs in -``vector_store.py`` and ``semantic_memory.py`` are updated to point here. -""" - -import logging -import sqlite3 -from collections.abc import Generator -from contextlib import closing, contextmanager -from pathlib import Path - -logger = logging.getLogger(__name__) - -DB_PATH = Path(__file__).parent.parent.parent.parent / "data" / "memory.db" - - -@contextmanager -def get_connection() -> Generator[sqlite3.Connection, None, None]: - """Open (and lazily create) the unified memory database.""" - DB_PATH.parent.mkdir(parents=True, exist_ok=True) - with closing(sqlite3.connect(str(DB_PATH))) as conn: - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA busy_timeout=5000") - _ensure_schema(conn) - yield conn - - -def _ensure_schema(conn: sqlite3.Connection) -> None: - """Create the three core tables and indexes if they don't exist.""" - - # --- facts --------------------------------------------------------------- - conn.execute(""" - CREATE TABLE IF NOT EXISTS facts ( - id TEXT PRIMARY KEY, - category TEXT NOT NULL DEFAULT 'general', - content TEXT NOT NULL, - confidence REAL NOT NULL DEFAULT 0.8, - source TEXT NOT NULL DEFAULT 'agent', - tags TEXT NOT NULL DEFAULT '[]', - created_at TEXT NOT NULL, - last_accessed TEXT, - access_count INTEGER NOT NULL DEFAULT 0 - ) - """) - conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_confidence ON facts(confidence)") - - # --- chunks (vault document fragments) ----------------------------------- - conn.execute(""" - CREATE TABLE IF NOT EXISTS chunks ( - id TEXT PRIMARY KEY, - source TEXT NOT NULL, - content TEXT NOT NULL, - embedding TEXT NOT NULL, - created_at TEXT NOT NULL, - source_hash TEXT NOT NULL - ) - """) - conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source)") - - # --- episodes (runtime memory entries) ----------------------------------- - conn.execute(""" - CREATE TABLE IF NOT EXISTS episodes ( - id TEXT PRIMARY KEY, - content TEXT NOT NULL, - source TEXT NOT NULL, - context_type TEXT NOT NULL DEFAULT 'conversation', - embedding TEXT, - metadata TEXT, - agent_id TEXT, - task_id TEXT, - session_id TEXT, - timestamp TEXT NOT NULL - ) - """) - conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_type ON episodes(context_type)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_time ON episodes(timestamp)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_session ON episodes(session_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_agent ON episodes(agent_id)") - - conn.commit() +__all__ = ["DB_PATH", "get_connection"] diff --git a/src/timmy/memory/vector_store.py b/src/timmy/memory/vector_store.py index b828c05f..24ce317d 100644 --- a/src/timmy/memory/vector_store.py +++ b/src/timmy/memory/vector_store.py @@ -1,425 +1,37 @@ -"""Vector store for semantic memory using sqlite-vss. - -Provides embedding-based similarity search for the Echo agent -to retrieve relevant context from conversation history. -""" - -import json -import logging -import sqlite3 -import uuid -from collections.abc import Generator -from contextlib import contextmanager -from dataclasses import dataclass, field -from datetime import UTC, datetime - -logger = logging.getLogger(__name__) - - -def _check_embedding_model() -> bool | None: - """Check if the canonical embedding model is available.""" - try: - from timmy.semantic_memory import _get_embedding_model - - model = _get_embedding_model() - return model is not None and model is not False - except (ImportError, AttributeError) as exc: - logger.debug("Embedding model check failed: %s", exc) - return None - - -def _compute_embedding(text: str) -> list[float]: - """Compute embedding vector for text. - - Delegates to the canonical embedding provider in semantic_memory - to avoid loading the model multiple times. - """ - from timmy.semantic_memory import embed_text - - return embed_text(text) - - -@dataclass -class MemoryEntry: - """A memory entry with vector embedding.""" - - id: str = field(default_factory=lambda: str(uuid.uuid4())) - content: str = "" # The actual text content - source: str = "" # Where it came from (agent, user, system) - context_type: str = "conversation" # conversation, document, fact, etc. - agent_id: str | None = None - task_id: str | None = None - session_id: str | None = None - metadata: dict | None = None - embedding: list[float] | None = None - timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) - relevance_score: float | None = None # Set during search - - -@contextmanager -def _get_conn() -> Generator[sqlite3.Connection, None, None]: - """Get database connection to unified memory.db.""" - from timmy.memory.unified import get_connection - - with get_connection() as conn: - yield conn - - -def store_memory( - content: str, - source: str, - context_type: str = "conversation", - agent_id: str | None = None, - task_id: str | None = None, - session_id: str | None = None, - metadata: dict | None = None, - compute_embedding: bool = True, -) -> MemoryEntry: - """Store a memory entry with optional embedding. - - Args: - content: The text content to store - source: Source of the memory (agent name, user, system) - context_type: Type of context (conversation, document, fact) - agent_id: Associated agent ID - task_id: Associated task ID - session_id: Session identifier - metadata: Additional structured data - compute_embedding: Whether to compute vector embedding - - Returns: - The stored MemoryEntry - """ - embedding = None - if compute_embedding: - embedding = _compute_embedding(content) - - entry = MemoryEntry( - content=content, - source=source, - context_type=context_type, - agent_id=agent_id, - task_id=task_id, - session_id=session_id, - metadata=metadata, - embedding=embedding, - ) - - with _get_conn() as conn: - conn.execute( - """ - INSERT INTO episodes - (id, content, source, context_type, agent_id, task_id, session_id, - metadata, embedding, timestamp) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - entry.id, - entry.content, - entry.source, - entry.context_type, - entry.agent_id, - entry.task_id, - entry.session_id, - json.dumps(metadata) if metadata else None, - json.dumps(embedding) if embedding else None, - entry.timestamp, - ), - ) - conn.commit() - - return entry - - -def search_memories( - query: str, - limit: int = 10, - context_type: str | None = None, - agent_id: str | None = None, - session_id: str | None = None, - min_relevance: float = 0.0, -) -> list[MemoryEntry]: - """Search for memories by semantic similarity. - - Args: - query: Search query text - limit: Maximum results - context_type: Filter by context type - agent_id: Filter by agent - session_id: Filter by session - min_relevance: Minimum similarity score (0-1) - - Returns: - List of MemoryEntry objects sorted by relevance - """ - query_embedding = _compute_embedding(query) - - # Build query with filters - conditions = [] - params = [] - - if context_type: - conditions.append("context_type = ?") - params.append(context_type) - if agent_id: - conditions.append("agent_id = ?") - params.append(agent_id) - if session_id: - conditions.append("session_id = ?") - params.append(session_id) - - where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" - - # Fetch candidates (we'll do in-memory similarity for now) - # For production with sqlite-vss, this would use vector similarity index - query_sql = f""" - SELECT * FROM episodes - {where_clause} - ORDER BY timestamp DESC - LIMIT ? - """ - params.append(limit * 3) # Get more candidates for ranking - - with _get_conn() as conn: - rows = conn.execute(query_sql, params).fetchall() - - # Compute similarity scores - results = [] - for row in rows: - entry = MemoryEntry( - id=row["id"], - content=row["content"], - source=row["source"], - context_type=row["context_type"], - agent_id=row["agent_id"], - task_id=row["task_id"], - session_id=row["session_id"], - metadata=json.loads(row["metadata"]) if row["metadata"] else None, - embedding=json.loads(row["embedding"]) if row["embedding"] else None, - timestamp=row["timestamp"], - ) - - if entry.embedding: - # Cosine similarity - score = _cosine_similarity(query_embedding, entry.embedding) - entry.relevance_score = score - if score >= min_relevance: - results.append(entry) - else: - # Fallback: check for keyword overlap - score = _keyword_overlap(query, entry.content) - entry.relevance_score = score - if score >= min_relevance: - results.append(entry) - - # Sort by relevance and return top results - results.sort(key=lambda x: x.relevance_score or 0, reverse=True) - return results[:limit] - - -def _cosine_similarity(a: list[float], b: list[float]) -> float: - """Compute cosine similarity between two vectors.""" - dot = sum(x * y for x, y in zip(a, b, strict=False)) - norm_a = sum(x * x for x in a) ** 0.5 - norm_b = sum(x * x for x in b) ** 0.5 - if norm_a == 0 or norm_b == 0: - return 0.0 - return dot / (norm_a * norm_b) - - -def _keyword_overlap(query: str, content: str) -> float: - """Simple keyword overlap score as fallback.""" - query_words = set(query.lower().split()) - content_words = set(content.lower().split()) - if not query_words: - return 0.0 - overlap = len(query_words & content_words) - return overlap / len(query_words) - - -def get_memory_context(query: str, max_tokens: int = 2000, **filters) -> str: - """Get relevant memory context as formatted text for LLM prompts. - - Args: - query: Search query - max_tokens: Approximate maximum tokens to return - **filters: Additional filters (agent_id, session_id, etc.) - - Returns: - Formatted context string for inclusion in prompts - """ - memories = search_memories(query, limit=20, **filters) - - context_parts = [] - total_chars = 0 - max_chars = max_tokens * 4 # Rough approximation - - for mem in memories: - formatted = f"[{mem.source}]: {mem.content}" - if total_chars + len(formatted) > max_chars: - break - context_parts.append(formatted) - total_chars += len(formatted) - - if not context_parts: - return "" - - return "Relevant context from memory:\n" + "\n\n".join(context_parts) - - -def recall_personal_facts(agent_id: str | None = None) -> list[str]: - """Recall personal facts about the user or system. - - Args: - agent_id: Optional agent filter - - Returns: - List of fact strings - """ - with _get_conn() as conn: - if agent_id: - rows = conn.execute( - """ - SELECT content FROM episodes - WHERE context_type = 'fact' AND agent_id = ? - ORDER BY timestamp DESC - LIMIT 100 - """, - (agent_id,), - ).fetchall() - else: - rows = conn.execute( - """ - SELECT content FROM episodes - WHERE context_type = 'fact' - ORDER BY timestamp DESC - LIMIT 100 - """, - ).fetchall() - - return [r["content"] for r in rows] - - -def recall_personal_facts_with_ids(agent_id: str | None = None) -> list[dict]: - """Recall personal facts with their IDs for edit/delete operations.""" - with _get_conn() as conn: - if agent_id: - rows = conn.execute( - "SELECT id, content FROM episodes WHERE context_type = 'fact' AND agent_id = ? ORDER BY timestamp DESC LIMIT 100", - (agent_id,), - ).fetchall() - else: - rows = conn.execute( - "SELECT id, content FROM episodes WHERE context_type = 'fact' ORDER BY timestamp DESC LIMIT 100", - ).fetchall() - return [{"id": r["id"], "content": r["content"]} for r in rows] - - -def update_personal_fact(memory_id: str, new_content: str) -> bool: - """Update a personal fact's content.""" - with _get_conn() as conn: - cursor = conn.execute( - "UPDATE episodes SET content = ? WHERE id = ? AND context_type = 'fact'", - (new_content, memory_id), - ) - conn.commit() - updated = cursor.rowcount > 0 - return updated - - -def store_personal_fact(fact: str, agent_id: str | None = None) -> MemoryEntry: - """Store a personal fact about the user or system. - - Args: - fact: The fact to store - agent_id: Associated agent - - Returns: - The stored MemoryEntry - """ - return store_memory( - content=fact, - source="system", - context_type="fact", - agent_id=agent_id, - metadata={"auto_extracted": False}, - ) - - -def delete_memory(memory_id: str) -> bool: - """Delete a memory entry by ID. - - Returns: - True if deleted, False if not found - """ - with _get_conn() as conn: - cursor = conn.execute( - "DELETE FROM episodes WHERE id = ?", - (memory_id,), - ) - conn.commit() - deleted = cursor.rowcount > 0 - return deleted - - -def get_memory_stats() -> dict: - """Get statistics about the memory store. - - Returns: - Dict with counts by type, total entries, etc. - """ - with _get_conn() as conn: - total = conn.execute("SELECT COUNT(*) as count FROM episodes").fetchone()["count"] - - by_type = {} - rows = conn.execute( - "SELECT context_type, COUNT(*) as count FROM episodes GROUP BY context_type" - ).fetchall() - for row in rows: - by_type[row["context_type"]] = row["count"] - - with_embeddings = conn.execute( - "SELECT COUNT(*) as count FROM episodes WHERE embedding IS NOT NULL" - ).fetchone()["count"] - - return { - "total_entries": total, - "by_type": by_type, - "with_embeddings": with_embeddings, - "has_embedding_model": _check_embedding_model(), - } - - -def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int: - """Delete old memories to manage storage. - - Args: - older_than_days: Delete memories older than this - keep_facts: Whether to preserve fact-type memories - - Returns: - Number of entries deleted - """ - from datetime import timedelta - - cutoff = (datetime.now(UTC) - timedelta(days=older_than_days)).isoformat() - - with _get_conn() as conn: - if keep_facts: - cursor = conn.execute( - """ - DELETE FROM episodes - WHERE timestamp < ? AND context_type != 'fact' - """, - (cutoff,), - ) - else: - cursor = conn.execute( - "DELETE FROM episodes WHERE timestamp < ?", - (cutoff,), - ) - - deleted = cursor.rowcount - conn.commit() - - return deleted +"""Backward compatibility — all memory functions live in memory_system now.""" + +from timmy.memory_system import ( + DB_PATH, + MemoryEntry, + _cosine_similarity, + _keyword_overlap, + delete_memory, + get_memory_context, + get_memory_stats, + get_memory_system, + prune_memories, + recall_personal_facts, + recall_personal_facts_with_ids, + search_memories, + store_memory, + store_personal_fact, + update_personal_fact, +) + +__all__ = [ + "DB_PATH", + "MemoryEntry", + "delete_memory", + "get_memory_context", + "get_memory_stats", + "get_memory_system", + "prune_memories", + "recall_personal_facts", + "recall_personal_facts_with_ids", + "search_memories", + "store_memory", + "store_personal_fact", + "update_personal_fact", + "_cosine_similarity", + "_keyword_overlap", +] diff --git a/src/timmy/memory_system.py b/src/timmy/memory_system.py index e792eb09..6e6b2536 100644 --- a/src/timmy/memory_system.py +++ b/src/timmy/memory_system.py @@ -1,14 +1,24 @@ -"""Three-tier memory system for Timmy. +"""Consolidated memory system for Timmy — single source of truth. Architecture: -- Tier 1 (Hot): MEMORY.md — always loaded, ~300 lines -- Tier 2 (Vault): memory/ — structured markdown, append-only -- Tier 3 (Semantic): Vector search over vault (optional) +- Database: Single `memories` table with unified schema +- Embeddings: Local sentence-transformers with hash fallback +- CRUD: store_memory, search_memories, delete_memory, etc. +- Tool functions: memory_search, memory_read, memory_write, memory_forget +- Classes: HotMemory, VaultMemory, MemorySystem, SemanticMemory, MemorySearcher """ +import hashlib +import json import logging +import math import re -from datetime import UTC, datetime +import sqlite3 +import uuid +from collections.abc import Generator +from contextlib import closing, contextmanager +from dataclasses import dataclass, field +from datetime import UTC, datetime, timedelta from pathlib import Path logger = logging.getLogger(__name__) @@ -18,6 +28,574 @@ PROJECT_ROOT = Path(__file__).parent.parent.parent HOT_MEMORY_PATH = PROJECT_ROOT / "MEMORY.md" VAULT_PATH = PROJECT_ROOT / "memory" SOUL_PATH = VAULT_PATH / "self" / "soul.md" +DB_PATH = PROJECT_ROOT / "data" / "memory.db" + +# Embedding model - small, fast, local +EMBEDDING_MODEL = None +EMBEDDING_DIM = 384 # MiniLM dimension + + +# ─────────────────────────────────────────────────────────────────────────────── +# Embedding Functions +# ─────────────────────────────────────────────────────────────────────────────── + + +def _get_embedding_model(): + """Lazy-load embedding model.""" + global EMBEDDING_MODEL + if EMBEDDING_MODEL is None: + try: + from config import settings + + if settings.timmy_skip_embeddings: + EMBEDDING_MODEL = False + return EMBEDDING_MODEL + except ImportError: + pass + + try: + from sentence_transformers import SentenceTransformer + + EMBEDDING_MODEL = SentenceTransformer("all-MiniLM-L6-v2") + logger.info("MemorySystem: Loaded embedding model") + except ImportError: + logger.warning("MemorySystem: sentence-transformers not installed, using fallback") + EMBEDDING_MODEL = False # Use fallback + return EMBEDDING_MODEL + + +def _simple_hash_embedding(text: str) -> list[float]: + """Fallback: Simple hash-based embedding when transformers unavailable.""" + words = text.lower().split() + vec = [0.0] * 128 + for i, word in enumerate(words[:50]): # First 50 words + h = hashlib.md5(word.encode()).hexdigest() + for j in range(8): + idx = (i * 8 + j) % 128 + vec[idx] += int(h[j * 2 : j * 2 + 2], 16) / 255.0 + # Normalize + mag = math.sqrt(sum(x * x for x in vec)) or 1.0 + return [x / mag for x in vec] + + +def embed_text(text: str) -> list[float]: + """Generate embedding for text.""" + model = _get_embedding_model() + if model and model is not False: + embedding = model.encode(text) + return embedding.tolist() + return _simple_hash_embedding(text) + + +def cosine_similarity(a: list[float], b: list[float]) -> float: + """Calculate cosine similarity between two vectors.""" + dot = sum(x * y for x, y in zip(a, b, strict=False)) + mag_a = math.sqrt(sum(x * x for x in a)) + mag_b = math.sqrt(sum(x * x for x in b)) + if mag_a == 0 or mag_b == 0: + return 0.0 + return dot / (mag_a * mag_b) + + +# Alias for backward compatibility +_cosine_similarity = cosine_similarity + + +def _keyword_overlap(query: str, content: str) -> float: + """Simple keyword overlap score as fallback.""" + query_words = set(query.lower().split()) + content_words = set(content.lower().split()) + if not query_words: + return 0.0 + overlap = len(query_words & content_words) + return overlap / len(query_words) + + +# ─────────────────────────────────────────────────────────────────────────────── +# Database Connection +# ─────────────────────────────────────────────────────────────────────────────── + + +@contextmanager +def get_connection() -> Generator[sqlite3.Connection, None, None]: + """Get database connection to unified memory database.""" + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + with closing(sqlite3.connect(str(DB_PATH))) as conn: + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=5000") + _ensure_schema(conn) + yield conn + + +def _ensure_schema(conn: sqlite3.Connection) -> None: + """Create the unified memories table and indexes if they don't exist.""" + conn.execute(""" + CREATE TABLE IF NOT EXISTS memories ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + source TEXT NOT NULL DEFAULT 'agent', + context_type TEXT NOT NULL DEFAULT 'conversation', + agent_id TEXT, + task_id TEXT, + session_id TEXT, + metadata TEXT, + embedding TEXT, + timestamp TEXT NOT NULL, + access_count INTEGER NOT NULL DEFAULT 0, + last_accessed TEXT + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(context_type)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_timestamp ON memories(timestamp)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_session ON memories(session_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(agent_id)") + conn.commit() + + # Run migration if needed + _migrate_schema(conn) + + +def _migrate_schema(conn: sqlite3.Connection) -> None: + """Migrate from old three-table schema to unified memories table.""" + cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") + tables = {row[0] for row in cursor.fetchall()} + + # Check if migration is needed + has_memories = "memories" in tables + has_episodes = "episodes" in tables + has_chunks = "chunks" in tables + has_facts = "facts" in tables + + if has_memories: + # Already migrated, just clean up old tables if they exist + if has_chunks: + conn.execute("DROP TABLE chunks") + logger.info("Migration: Dropped old chunks table") + if has_facts: + conn.execute("DROP TABLE facts") + logger.info("Migration: Dropped old facts table") + if has_episodes: + conn.execute("DROP TABLE episodes") + logger.info("Migration: Dropped old episodes table") + conn.commit() + return + + if has_episodes: + # Migrate episodes to memories + logger.info("Migration: Converting episodes table to memories") + try: + conn.execute(""" + INSERT INTO memories ( + id, content, source, context_type, agent_id, task_id, session_id, + metadata, embedding, timestamp, access_count, last_accessed + ) + SELECT + id, content, source, context_type, agent_id, task_id, session_id, + metadata, embedding, timestamp, 0, NULL + FROM episodes + """) + conn.execute("DROP TABLE episodes") + logger.info("Migration: Migrated episodes to memories") + except sqlite3.Error as exc: + logger.warning("Migration: Failed to migrate episodes: %s", exc) + + # Drop old tables that we don't migrate + if has_chunks: + conn.execute("DROP TABLE chunks") + logger.info("Migration: Dropped chunks table (vault can be re-indexed)") + if has_facts: + conn.execute("DROP TABLE facts") + logger.info("Migration: Dropped facts table (0 rows expected)") + + conn.commit() + + +# Alias for backward compatibility +get_conn = get_connection + + +# ─────────────────────────────────────────────────────────────────────────────── +# Data Classes +# ─────────────────────────────────────────────────────────────────────────────── + + +@dataclass +class MemoryEntry: + """A memory entry with vector embedding.""" + + id: str = field(default_factory=lambda: str(uuid.uuid4())) + content: str = "" # The actual text content + source: str = "" # Where it came from (agent, user, system) + context_type: str = "conversation" # conversation, document, fact, chunk + agent_id: str | None = None + task_id: str | None = None + session_id: str | None = None + metadata: dict | None = None + embedding: list[float] | None = None + timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + relevance_score: float | None = None # Set during search + + +@dataclass +class MemoryChunk: + """A searchable chunk of memory.""" + + id: str + source: str # filepath + content: str + embedding: list[float] + created_at: str + + +# ─────────────────────────────────────────────────────────────────────────────── +# CRUD Operations +# ─────────────────────────────────────────────────────────────────────────────── + + +def store_memory( + content: str, + source: str, + context_type: str = "conversation", + agent_id: str | None = None, + task_id: str | None = None, + session_id: str | None = None, + metadata: dict | None = None, + compute_embedding: bool = True, +) -> MemoryEntry: + """Store a memory entry with optional embedding. + + Args: + content: The text content to store + source: Source of the memory (agent name, user, system) + context_type: Type of context (conversation, document, fact, chunk) + agent_id: Associated agent ID + task_id: Associated task ID + session_id: Session identifier + metadata: Additional structured data + compute_embedding: Whether to compute vector embedding + + Returns: + The stored MemoryEntry + """ + embedding = None + if compute_embedding: + embedding = embed_text(content) + + entry = MemoryEntry( + content=content, + source=source, + context_type=context_type, + agent_id=agent_id, + task_id=task_id, + session_id=session_id, + metadata=metadata, + embedding=embedding, + ) + + with get_connection() as conn: + conn.execute( + """ + INSERT INTO memories + (id, content, source, context_type, agent_id, task_id, session_id, + metadata, embedding, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + entry.id, + entry.content, + entry.source, + entry.context_type, + entry.agent_id, + entry.task_id, + entry.session_id, + json.dumps(metadata) if metadata else None, + json.dumps(embedding) if embedding else None, + entry.timestamp, + ), + ) + conn.commit() + + return entry + + +def search_memories( + query: str, + limit: int = 10, + context_type: str | None = None, + agent_id: str | None = None, + session_id: str | None = None, + min_relevance: float = 0.0, +) -> list[MemoryEntry]: + """Search for memories by semantic similarity. + + Args: + query: Search query text + limit: Maximum results + context_type: Filter by context type + agent_id: Filter by agent + session_id: Filter by session + min_relevance: Minimum similarity score (0-1) + + Returns: + List of MemoryEntry objects sorted by relevance + """ + query_embedding = embed_text(query) + + # Build query with filters + conditions = [] + params = [] + + if context_type: + conditions.append("context_type = ?") + params.append(context_type) + if agent_id: + conditions.append("agent_id = ?") + params.append(agent_id) + if session_id: + conditions.append("session_id = ?") + params.append(session_id) + + where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" + + # Fetch candidates (we'll do in-memory similarity for now) + query_sql = f""" + SELECT * FROM memories + {where_clause} + ORDER BY timestamp DESC + LIMIT ? + """ + params.append(limit * 3) # Get more candidates for ranking + + with get_connection() as conn: + rows = conn.execute(query_sql, params).fetchall() + + # Compute similarity scores + results = [] + for row in rows: + entry = MemoryEntry( + id=row["id"], + content=row["content"], + source=row["source"], + context_type=row["context_type"], + agent_id=row["agent_id"], + task_id=row["task_id"], + session_id=row["session_id"], + metadata=json.loads(row["metadata"]) if row["metadata"] else None, + embedding=json.loads(row["embedding"]) if row["embedding"] else None, + timestamp=row["timestamp"], + ) + + if entry.embedding: + score = cosine_similarity(query_embedding, entry.embedding) + entry.relevance_score = score + if score >= min_relevance: + results.append(entry) + else: + # Fallback: check for keyword overlap + score = _keyword_overlap(query, entry.content) + entry.relevance_score = score + if score >= min_relevance: + results.append(entry) + + # Sort by relevance and return top results + results.sort(key=lambda x: x.relevance_score or 0, reverse=True) + return results[:limit] + + +def delete_memory(memory_id: str) -> bool: + """Delete a memory entry by ID. + + Returns: + True if deleted, False if not found + """ + with get_connection() as conn: + cursor = conn.execute( + "DELETE FROM memories WHERE id = ?", + (memory_id,), + ) + conn.commit() + return cursor.rowcount > 0 + + +def get_memory_stats() -> dict: + """Get statistics about the memory store. + + Returns: + Dict with counts by type, total entries, etc. + """ + with get_connection() as conn: + total = conn.execute("SELECT COUNT(*) as count FROM memories").fetchone()["count"] + + by_type = {} + rows = conn.execute( + "SELECT context_type, COUNT(*) as count FROM memories GROUP BY context_type" + ).fetchall() + for row in rows: + by_type[row["context_type"]] = row["count"] + + with_embeddings = conn.execute( + "SELECT COUNT(*) as count FROM memories WHERE embedding IS NOT NULL" + ).fetchone()["count"] + + return { + "total_entries": total, + "by_type": by_type, + "with_embeddings": with_embeddings, + "has_embedding_model": _get_embedding_model() is not False, + } + + +def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int: + """Delete old memories to manage storage. + + Args: + older_than_days: Delete memories older than this + keep_facts: Whether to preserve fact-type memories + + Returns: + Number of entries deleted + """ + cutoff = (datetime.now(UTC) - timedelta(days=older_than_days)).isoformat() + + with get_connection() as conn: + if keep_facts: + cursor = conn.execute( + """ + DELETE FROM memories + WHERE timestamp < ? AND context_type != 'fact' + """, + (cutoff,), + ) + else: + cursor = conn.execute( + "DELETE FROM memories WHERE timestamp < ?", + (cutoff,), + ) + + deleted = cursor.rowcount + conn.commit() + + return deleted + + +def get_memory_context(query: str, max_tokens: int = 2000, **filters) -> str: + """Get relevant memory context as formatted text for LLM prompts. + + Args: + query: Search query + max_tokens: Approximate maximum tokens to return + **filters: Additional filters (agent_id, session_id, etc.) + + Returns: + Formatted context string for inclusion in prompts + """ + memories = search_memories(query, limit=20, **filters) + + context_parts = [] + total_chars = 0 + max_chars = max_tokens * 4 # Rough approximation + + for mem in memories: + formatted = f"[{mem.source}]: {mem.content}" + if total_chars + len(formatted) > max_chars: + break + context_parts.append(formatted) + total_chars += len(formatted) + + if not context_parts: + return "" + + return "Relevant context from memory:\n" + "\n\n".join(context_parts) + + +# ─────────────────────────────────────────────────────────────────────────────── +# Personal Facts Functions +# ─────────────────────────────────────────────────────────────────────────────── + + +def recall_personal_facts(agent_id: str | None = None) -> list[str]: + """Recall personal facts about the user or system. + + Args: + agent_id: Optional agent filter + + Returns: + List of fact strings + """ + with get_connection() as conn: + if agent_id: + rows = conn.execute( + """ + SELECT content FROM memories + WHERE context_type = 'fact' AND agent_id = ? + ORDER BY timestamp DESC + LIMIT 100 + """, + (agent_id,), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT content FROM memories + WHERE context_type = 'fact' + ORDER BY timestamp DESC + LIMIT 100 + """, + ).fetchall() + + return [r["content"] for r in rows] + + +def recall_personal_facts_with_ids(agent_id: str | None = None) -> list[dict]: + """Recall personal facts with their IDs for edit/delete operations.""" + with get_connection() as conn: + if agent_id: + rows = conn.execute( + "SELECT id, content FROM memories WHERE context_type = 'fact' AND agent_id = ? ORDER BY timestamp DESC LIMIT 100", + (agent_id,), + ).fetchall() + else: + rows = conn.execute( + "SELECT id, content FROM memories WHERE context_type = 'fact' ORDER BY timestamp DESC LIMIT 100", + ).fetchall() + return [{"id": r["id"], "content": r["content"]} for r in rows] + + +def update_personal_fact(memory_id: str, new_content: str) -> bool: + """Update a personal fact's content.""" + with get_connection() as conn: + cursor = conn.execute( + "UPDATE memories SET content = ? WHERE id = ? AND context_type = 'fact'", + (new_content, memory_id), + ) + conn.commit() + return cursor.rowcount > 0 + + +def store_personal_fact(fact: str, agent_id: str | None = None) -> MemoryEntry: + """Store a personal fact about the user or system. + + Args: + fact: The fact to store + agent_id: Associated agent + + Returns: + The stored MemoryEntry + """ + return store_memory( + content=fact, + source="system", + context_type="fact", + agent_id=agent_id, + metadata={"auto_extracted": False}, + ) + + +# ─────────────────────────────────────────────────────────────────────────────── +# Hot Memory (MEMORY.md) +# ─────────────────────────────────────────────────────────────────────────────── class HotMemory: @@ -141,6 +719,11 @@ class HotMemory: logger.info("HotMemory: Created default MEMORY.md") +# ─────────────────────────────────────────────────────────────────────────────── +# Vault Memory (memory/ directory) +# ─────────────────────────────────────────────────────────────────────────────── + + class VaultMemory: """Tier 2: Structured vault (memory/) — append-only markdown.""" @@ -252,6 +835,451 @@ class VaultMemory: profile_path.write_text(default) +# ─────────────────────────────────────────────────────────────────────────────── +# Semantic Memory (Vector search) +# ─────────────────────────────────────────────────────────────────────────────── + + +class SemanticMemory: + """Vector-based semantic search over vault content.""" + + def __init__(self) -> None: + self.db_path = DB_PATH + self.vault_path = VAULT_PATH + + @contextmanager + def _get_conn(self) -> Generator[sqlite3.Connection, None, None]: + """Get connection to the instance's db_path (backward compatibility). + + Uses self.db_path if set differently from global DB_PATH, + otherwise uses the global get_connection(). + """ + if self.db_path == DB_PATH: + # Use global connection (normal production path) + with get_connection() as conn: + yield conn + else: + # Use instance-specific db_path (test path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + with closing(sqlite3.connect(str(self.db_path))) as conn: + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=5000") + # Ensure schema exists + conn.execute(""" + CREATE TABLE IF NOT EXISTS memories ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + source TEXT NOT NULL DEFAULT 'agent', + context_type TEXT NOT NULL DEFAULT 'conversation', + agent_id TEXT, + task_id TEXT, + session_id TEXT, + metadata TEXT, + embedding TEXT, + timestamp TEXT NOT NULL, + access_count INTEGER NOT NULL DEFAULT 0, + last_accessed TEXT + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(context_type)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_timestamp ON memories(timestamp)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") + conn.commit() + yield conn + + def _init_db(self) -> None: + """Initialize database at self.db_path (backward compatibility). + + This method is kept for backward compatibility with existing code and tests. + Creates the memories table schema at the instance's db_path. + """ + # Trigger schema creation via _get_conn + with self._get_conn(): + pass + + def index_file(self, filepath: Path) -> int: + """Index a single file into semantic memory.""" + if not filepath.exists(): + return 0 + + content = filepath.read_text() + file_hash = hashlib.md5(content.encode()).hexdigest() + + with self._get_conn() as conn: + # Check if already indexed with same hash + cursor = conn.execute( + "SELECT metadata FROM memories WHERE source = ? AND context_type = 'chunk' LIMIT 1", + (str(filepath),) + ) + existing = cursor.fetchone() + if existing and existing[0]: + try: + meta = json.loads(existing[0]) + if meta.get("source_hash") == file_hash: + return 0 # Already indexed + except json.JSONDecodeError: + pass + + # Delete old chunks for this file + conn.execute( + "DELETE FROM memories WHERE source = ? AND context_type = 'chunk'", + (str(filepath),) + ) + + # Split into chunks (paragraphs) + chunks = self._split_into_chunks(content) + + # Index each chunk + now = datetime.now(UTC).isoformat() + for i, chunk_text in enumerate(chunks): + if len(chunk_text.strip()) < 20: # Skip tiny chunks + continue + + chunk_id = f"{filepath.stem}_{i}" + chunk_embedding = embed_text(chunk_text) + + conn.execute( + """INSERT INTO memories + (id, content, source, context_type, metadata, embedding, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + ( + chunk_id, + chunk_text, + str(filepath), + "chunk", + json.dumps({"source_hash": file_hash, "chunk_index": i}), + json.dumps(chunk_embedding), + now, + ), + ) + + conn.commit() + + logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks)) + return len(chunks) + + def _split_into_chunks(self, text: str, max_chunk_size: int = 500) -> list[str]: + """Split text into semantic chunks.""" + # Split by paragraphs first + paragraphs = text.split("\n\n") + chunks = [] + + for para in paragraphs: + para = para.strip() + if not para: + continue + + # If paragraph is small enough, keep as one chunk + if len(para) <= max_chunk_size: + chunks.append(para) + else: + # Split long paragraphs by sentences + sentences = para.replace(". ", ".\n").split("\n") + current_chunk = "" + + for sent in sentences: + if len(current_chunk) + len(sent) < max_chunk_size: + current_chunk += " " + sent if current_chunk else sent + else: + if current_chunk: + chunks.append(current_chunk.strip()) + current_chunk = sent + + if current_chunk: + chunks.append(current_chunk.strip()) + + return chunks + + def index_vault(self) -> int: + """Index entire vault directory.""" + total_chunks = 0 + + for md_file in self.vault_path.rglob("*.md"): + # Skip handoff file (handled separately) + if "last-session-handoff" in md_file.name: + continue + total_chunks += self.index_file(md_file) + + logger.info("SemanticMemory: Indexed vault (%d total chunks)", total_chunks) + return total_chunks + + def search(self, query: str, top_k: int = 5) -> list[tuple[str, float]]: + """Search for relevant memory chunks.""" + query_embedding = embed_text(query) + + with self._get_conn() as conn: + conn.row_factory = sqlite3.Row + + # Get all chunks + rows = conn.execute( + "SELECT source, content, embedding FROM memories WHERE context_type = 'chunk'" + ).fetchall() + + # Calculate similarities + scored = [] + for row in rows: + embedding = json.loads(row["embedding"]) + score = cosine_similarity(query_embedding, embedding) + scored.append((row["source"], row["content"], score)) + + # Sort by score descending + scored.sort(key=lambda x: x[2], reverse=True) + + # Return top_k + return [(content, score) for _, content, score in scored[:top_k]] + + def get_relevant_context(self, query: str, max_chars: int = 2000) -> str: + """Get formatted context string for a query.""" + results = self.search(query, top_k=3) + + if not results: + return "" + + parts = [] + total_chars = 0 + + for content, score in results: + if score < 0.3: # Similarity threshold + continue + + chunk = f"[Relevant memory - score {score:.2f}]: {content[:400]}..." + if total_chars + len(chunk) > max_chars: + break + + parts.append(chunk) + total_chars += len(chunk) + + return "\n\n".join(parts) if parts else "" + + def stats(self) -> dict: + """Get indexing statistics.""" + with self._get_conn() as conn: + cursor = conn.execute( + "SELECT COUNT(*), COUNT(DISTINCT source) FROM memories WHERE context_type = 'chunk'" + ) + total_chunks, total_files = cursor.fetchone() + + return { + "total_chunks": total_chunks, + "total_files": total_files, + "embedding_dim": EMBEDDING_DIM if _get_embedding_model() else 128, + } + + +class MemorySearcher: + """High-level interface for memory search.""" + + def __init__(self) -> None: + self.semantic = SemanticMemory() + + def search(self, query: str, tiers: list[str] = None) -> dict: + """Search across memory tiers. + + Args: + query: Search query + tiers: List of tiers to search ["hot", "vault", "semantic"] + + Returns: + Dict with results from each tier + """ + tiers = tiers or ["semantic"] # Default to semantic only + results = {} + + if "semantic" in tiers: + semantic_results = self.semantic.search(query, top_k=5) + results["semantic"] = [ + {"content": content, "score": score} for content, score in semantic_results + ] + + return results + + def get_context_for_query(self, query: str) -> str: + """Get comprehensive context for a user query.""" + # Get semantic context + semantic_context = self.semantic.get_relevant_context(query) + + if semantic_context: + return f"## Relevant Past Context\n\n{semantic_context}" + + return "" + + +# Module-level singletons +semantic_memory = SemanticMemory() +memory_searcher = MemorySearcher() + + +# ─────────────────────────────────────────────────────────────────────────────── +# Tool Functions +# ─────────────────────────────────────────────────────────────────────────────── + + +def memory_search(query: str, top_k: int = 5) -> str: + """Search past conversations, notes, and stored facts for relevant context. + + Searches across both the vault (indexed markdown files) and the + runtime memory store (facts and conversation fragments stored via + memory_write). + + Args: + query: What to search for (e.g. "Bitcoin strategy", "server setup"). + top_k: Number of results to return (default 5). + + Returns: + Formatted string of relevant memory results. + """ + # Guard: model sometimes passes None for top_k + if top_k is None: + top_k = 5 + + parts: list[str] = [] + + # 1. Search semantic vault (indexed markdown files) + vault_results = semantic_memory.search(query, top_k) + for content, score in vault_results: + if score < 0.2: + continue + parts.append(f"[vault score {score:.2f}] {content[:300]}") + + # 2. Search runtime vector store (stored facts/conversations) + try: + runtime_results = search_memories(query, limit=top_k, min_relevance=0.2) + for entry in runtime_results: + label = entry.context_type or "memory" + parts.append(f"[{label}] {entry.content[:300]}") + except Exception as exc: + logger.debug("Vector store search unavailable: %s", exc) + + if not parts: + return "No relevant memories found." + return "\n\n".join(parts) + + +def memory_read(query: str = "", top_k: int = 5) -> str: + """Read from persistent memory — search facts, notes, and past conversations. + + This is the primary tool for recalling stored information. If no query + is given, returns the most recent personal facts. With a query, it + searches semantically across all stored memories. + + Args: + query: Optional search term. Leave empty to list recent facts. + top_k: Maximum results to return (default 5). + + Returns: + Formatted string of memory contents. + """ + if top_k is None: + top_k = 5 + + parts: list[str] = [] + + # Always include personal facts first + try: + facts = search_memories(query or "", limit=top_k, min_relevance=0.0) + fact_entries = [e for e in facts if (e.context_type or "") == "fact"] + if fact_entries: + parts.append("## Personal Facts") + for entry in fact_entries[:top_k]: + parts.append(f"- {entry.content[:300]}") + except Exception as exc: + logger.debug("Vector store unavailable for memory_read: %s", exc) + + # If a query was provided, also do semantic search + if query: + search_result = memory_search(query, top_k) + if search_result and search_result != "No relevant memories found.": + parts.append("\n## Search Results") + parts.append(search_result) + + if not parts: + return "No memories stored yet. Use memory_write to store information." + return "\n".join(parts) + + +def memory_write(content: str, context_type: str = "fact") -> str: + """Store a piece of information in persistent memory. + + Use this tool when the user explicitly asks you to remember something. + Stored memories are searchable via memory_search across all channels + (web GUI, Discord, Telegram, etc.). + + Args: + content: The information to remember (e.g. a phrase, fact, or note). + context_type: Type of memory — "fact" for permanent facts, + "conversation" for conversation context, + "document" for document fragments. + + Returns: + Confirmation that the memory was stored. + """ + if not content or not content.strip(): + return "Nothing to store — content is empty." + + valid_types = ("fact", "conversation", "document") + if context_type not in valid_types: + context_type = "fact" + + try: + # Dedup check for facts — skip if a similar fact already exists + # Threshold 0.75 catches paraphrases (was 0.9 which only caught near-exact) + if context_type == "fact": + existing = search_memories( + content.strip(), limit=3, context_type="fact", min_relevance=0.75 + ) + if existing: + return f"Similar fact already stored (id={existing[0].id[:8]}). Skipping duplicate." + + entry = store_memory( + content=content.strip(), + source="agent", + context_type=context_type, + ) + return f"Stored in memory (type={context_type}, id={entry.id[:8]}). This is now searchable across all channels." + except Exception as exc: + logger.error("Failed to write memory: %s", exc) + return f"Failed to store memory: {exc}" + + +def memory_forget(query: str) -> str: + """Remove a stored memory that is outdated, incorrect, or no longer relevant. + + Searches for memories matching the query and deletes the closest match. + Use this when the user says to forget something or when stored information + has changed. + + Args: + query: Description of the memory to forget (e.g. "my phone number", + "the old server address"). + + Returns: + Confirmation of what was forgotten, or a message if nothing matched. + """ + if not query or not query.strip(): + return "Nothing to forget — query is empty." + + try: + results = search_memories(query.strip(), limit=3, min_relevance=0.3) + if not results: + return "No matching memories found to forget." + + # Delete the closest match + best = results[0] + deleted = delete_memory(best.id) + if deleted: + return f'Forgotten: "{best.content[:80]}" (type={best.context_type})' + return "Memory not found (may have already been deleted)." + except Exception as exc: + logger.error("Failed to forget memory: %s", exc) + return f"Failed to forget: {exc}" + + +# ─────────────────────────────────────────────────────────────────────────────── +# Memory System (Central Coordinator) +# ─────────────────────────────────────────────────────────────────────────────── + + class MemorySystem: """Central memory system coordinating all tiers.""" @@ -346,13 +1374,11 @@ class MemorySystem: return "\n\n---\n\n".join(context_parts) def _load_known_facts(self, limit: int = 10) -> str: - """Load top facts from the episodes table (context_type='fact'). + """Load top facts from the memories table (context_type='fact'). Returns a formatted section string, or empty string if no facts exist. """ try: - from timmy.memory.vector_store import recall_personal_facts - facts = recall_personal_facts() if not facts: return "" @@ -362,7 +1388,7 @@ class MemorySystem: for fact in facts: lines.append(f"- {fact[:200]}") return "\n".join(lines) - except (ImportError, AttributeError) as exc: + except Exception as exc: logger.debug("Failed to load known facts: %s", exc) return "" diff --git a/src/timmy/semantic_memory.py b/src/timmy/semantic_memory.py index b8608c78..c7539d31 100644 --- a/src/timmy/semantic_memory.py +++ b/src/timmy/semantic_memory.py @@ -1,486 +1,41 @@ -"""Tier 3: Semantic Memory — Vector search over vault files. - -Uses lightweight local embeddings (no cloud) for similarity search -over all vault content. This is the "escape valve" when hot memory -doesn't have the answer. - -Architecture: -- Indexes all markdown files in memory/ nightly or on-demand -- Uses sentence-transformers (local, no API calls) -- Stores vectors in SQLite (no external vector DB needed) -- memory_search() retrieves relevant context by similarity -""" - -import hashlib -import json -import logging -import sqlite3 -from contextlib import closing -from dataclasses import dataclass -from datetime import UTC, datetime -from pathlib import Path - -logger = logging.getLogger(__name__) - -# Paths -PROJECT_ROOT = Path(__file__).parent.parent.parent -VAULT_PATH = PROJECT_ROOT / "memory" -SEMANTIC_DB_PATH = PROJECT_ROOT / "data" / "memory.db" - -# Embedding model - small, fast, local -# Using 'all-MiniLM-L6-v2' (~80MB) or fallback to simple keyword matching -EMBEDDING_MODEL = None -EMBEDDING_DIM = 384 # MiniLM dimension - - -def _get_embedding_model(): - """Lazy-load embedding model.""" - global EMBEDDING_MODEL - if EMBEDDING_MODEL is None: - from config import settings - - if settings.timmy_skip_embeddings: - EMBEDDING_MODEL = False - return EMBEDDING_MODEL - try: - from sentence_transformers import SentenceTransformer - - EMBEDDING_MODEL = SentenceTransformer("all-MiniLM-L6-v2") - logger.info("SemanticMemory: Loaded embedding model") - except ImportError: - logger.warning("SemanticMemory: sentence-transformers not installed, using fallback") - EMBEDDING_MODEL = False # Use fallback - return EMBEDDING_MODEL - - -def _simple_hash_embedding(text: str) -> list[float]: - """Fallback: Simple hash-based embedding when transformers unavailable.""" - # Create a deterministic pseudo-embedding from word hashes - words = text.lower().split() - vec = [0.0] * 128 - for i, word in enumerate(words[:50]): # First 50 words - h = hashlib.md5(word.encode()).hexdigest() - for j in range(8): - idx = (i * 8 + j) % 128 - vec[idx] += int(h[j * 2 : j * 2 + 2], 16) / 255.0 - # Normalize - import math - - mag = math.sqrt(sum(x * x for x in vec)) or 1.0 - return [x / mag for x in vec] - - -def embed_text(text: str) -> list[float]: - """Generate embedding for text.""" - model = _get_embedding_model() - if model and model is not False: - embedding = model.encode(text) - return embedding.tolist() - else: - return _simple_hash_embedding(text) - - -def cosine_similarity(a: list[float], b: list[float]) -> float: - """Calculate cosine similarity between two vectors.""" - import math - - dot = sum(x * y for x, y in zip(a, b, strict=False)) - mag_a = math.sqrt(sum(x * x for x in a)) - mag_b = math.sqrt(sum(x * x for x in b)) - if mag_a == 0 or mag_b == 0: - return 0.0 - return dot / (mag_a * mag_b) - - -@dataclass -class MemoryChunk: - """A searchable chunk of memory.""" - - id: str - source: str # filepath - content: str - embedding: list[float] - created_at: str - - -class SemanticMemory: - """Vector-based semantic search over vault content.""" - - def __init__(self) -> None: - self.db_path = SEMANTIC_DB_PATH - self.vault_path = VAULT_PATH - self._init_db() - - def _init_db(self) -> None: - """Initialize SQLite with vector storage.""" - self.db_path.parent.mkdir(parents=True, exist_ok=True) - with closing(sqlite3.connect(str(self.db_path))) as conn: - conn.execute(""" - CREATE TABLE IF NOT EXISTS chunks ( - id TEXT PRIMARY KEY, - source TEXT NOT NULL, - content TEXT NOT NULL, - embedding TEXT NOT NULL, - created_at TEXT NOT NULL, - source_hash TEXT NOT NULL - ) - """) - conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source)") - conn.commit() - - def index_file(self, filepath: Path) -> int: - """Index a single file into semantic memory.""" - if not filepath.exists(): - return 0 - - content = filepath.read_text() - file_hash = hashlib.md5(content.encode()).hexdigest() - - with closing(sqlite3.connect(str(self.db_path))) as conn: - # Check if already indexed with same hash - cursor = conn.execute( - "SELECT source_hash FROM chunks WHERE source = ? LIMIT 1", (str(filepath),) - ) - existing = cursor.fetchone() - if existing and existing[0] == file_hash: - return 0 # Already indexed - - # Delete old chunks for this file - conn.execute("DELETE FROM chunks WHERE source = ?", (str(filepath),)) - - # Split into chunks (paragraphs) - chunks = self._split_into_chunks(content) - - # Index each chunk - now = datetime.now(UTC).isoformat() - for i, chunk_text in enumerate(chunks): - if len(chunk_text.strip()) < 20: # Skip tiny chunks - continue - - chunk_id = f"{filepath.stem}_{i}" - embedding = embed_text(chunk_text) - - conn.execute( - """INSERT INTO chunks (id, source, content, embedding, created_at, source_hash) - VALUES (?, ?, ?, ?, ?, ?)""", - (chunk_id, str(filepath), chunk_text, json.dumps(embedding), now, file_hash), - ) - - conn.commit() - - logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks)) - return len(chunks) - - def _split_into_chunks(self, text: str, max_chunk_size: int = 500) -> list[str]: - """Split text into semantic chunks.""" - # Split by paragraphs first - paragraphs = text.split("\n\n") - chunks = [] - - for para in paragraphs: - para = para.strip() - if not para: - continue - - # If paragraph is small enough, keep as one chunk - if len(para) <= max_chunk_size: - chunks.append(para) - else: - # Split long paragraphs by sentences - sentences = para.replace(". ", ".\n").split("\n") - current_chunk = "" - - for sent in sentences: - if len(current_chunk) + len(sent) < max_chunk_size: - current_chunk += " " + sent if current_chunk else sent - else: - if current_chunk: - chunks.append(current_chunk.strip()) - current_chunk = sent - - if current_chunk: - chunks.append(current_chunk.strip()) - - return chunks - - def index_vault(self) -> int: - """Index entire vault directory.""" - total_chunks = 0 - - for md_file in self.vault_path.rglob("*.md"): - # Skip handoff file (handled separately) - if "last-session-handoff" in md_file.name: - continue - total_chunks += self.index_file(md_file) - - logger.info("SemanticMemory: Indexed vault (%d total chunks)", total_chunks) - return total_chunks - - def search(self, query: str, top_k: int = 5) -> list[tuple[str, float]]: - """Search for relevant memory chunks.""" - query_embedding = embed_text(query) - - with closing(sqlite3.connect(str(self.db_path))) as conn: - conn.row_factory = sqlite3.Row - - # Get all chunks (in production, use vector index) - rows = conn.execute("SELECT source, content, embedding FROM chunks").fetchall() - - # Calculate similarities - scored = [] - for row in rows: - embedding = json.loads(row["embedding"]) - score = cosine_similarity(query_embedding, embedding) - scored.append((row["source"], row["content"], score)) - - # Sort by score descending - scored.sort(key=lambda x: x[2], reverse=True) - - # Return top_k - return [(content, score) for _, content, score in scored[:top_k]] - - def get_relevant_context(self, query: str, max_chars: int = 2000) -> str: - """Get formatted context string for a query.""" - results = self.search(query, top_k=3) - - if not results: - return "" - - parts = [] - total_chars = 0 - - for content, score in results: - if score < 0.3: # Similarity threshold - continue - - chunk = f"[Relevant memory - score {score:.2f}]: {content[:400]}..." - if total_chars + len(chunk) > max_chars: - break - - parts.append(chunk) - total_chars += len(chunk) - - return "\n\n".join(parts) if parts else "" - - def stats(self) -> dict: - """Get indexing statistics.""" - with closing(sqlite3.connect(str(self.db_path))) as conn: - cursor = conn.execute("SELECT COUNT(*), COUNT(DISTINCT source) FROM chunks") - total_chunks, total_files = cursor.fetchone() - - return { - "total_chunks": total_chunks, - "total_files": total_files, - "embedding_dim": EMBEDDING_DIM if _get_embedding_model() else 128, - } - - -class MemorySearcher: - """High-level interface for memory search.""" - - def __init__(self) -> None: - self.semantic = SemanticMemory() - - def search(self, query: str, tiers: list[str] = None) -> dict: - """Search across memory tiers. - - Args: - query: Search query - tiers: List of tiers to search ["hot", "vault", "semantic"] - - Returns: - Dict with results from each tier - """ - tiers = tiers or ["semantic"] # Default to semantic only - results = {} - - if "semantic" in tiers: - semantic_results = self.semantic.search(query, top_k=5) - results["semantic"] = [ - {"content": content, "score": score} for content, score in semantic_results - ] - - return results - - def get_context_for_query(self, query: str) -> str: - """Get comprehensive context for a user query.""" - # Get semantic context - semantic_context = self.semantic.get_relevant_context(query) - - if semantic_context: - return f"## Relevant Past Context\n\n{semantic_context}" - - return "" - - -# Module-level singleton -semantic_memory = SemanticMemory() -memory_searcher = MemorySearcher() - - -def memory_search(query: str, top_k: int = 5) -> str: - """Search past conversations, notes, and stored facts for relevant context. - - Searches across both the vault (indexed markdown files) and the - runtime memory store (facts and conversation fragments stored via - memory_write). - - Args: - query: What to search for (e.g. "Bitcoin strategy", "server setup"). - top_k: Number of results to return (default 5). - - Returns: - Formatted string of relevant memory results. - """ - # Guard: model sometimes passes None for top_k - if top_k is None: - top_k = 5 - - parts: list[str] = [] - - # 1. Search semantic vault (indexed markdown files) - vault_results = semantic_memory.search(query, top_k) - for content, score in vault_results: - if score < 0.2: - continue - parts.append(f"[vault score {score:.2f}] {content[:300]}") - - # 2. Search runtime vector store (stored facts/conversations) - try: - from timmy.memory.vector_store import search_memories - - runtime_results = search_memories(query, limit=top_k, min_relevance=0.2) - for entry in runtime_results: - label = entry.context_type or "memory" - parts.append(f"[{label}] {entry.content[:300]}") - except Exception as exc: - logger.debug("Vector store search unavailable: %s", exc) - - if not parts: - return "No relevant memories found." - return "\n\n".join(parts) - - -def memory_read(query: str = "", top_k: int = 5) -> str: - """Read from persistent memory — search facts, notes, and past conversations. - - This is the primary tool for recalling stored information. If no query - is given, returns the most recent personal facts. With a query, it - searches semantically across all stored memories. - - Args: - query: Optional search term. Leave empty to list recent facts. - top_k: Maximum results to return (default 5). - - Returns: - Formatted string of memory contents. - """ - if top_k is None: - top_k = 5 - - parts: list[str] = [] - - # Always include personal facts first - try: - from timmy.memory.vector_store import search_memories - - facts = search_memories(query or "", limit=top_k, min_relevance=0.0) - fact_entries = [e for e in facts if (e.context_type or "") == "fact"] - if fact_entries: - parts.append("## Personal Facts") - for entry in fact_entries[:top_k]: - parts.append(f"- {entry.content[:300]}") - except Exception as exc: - logger.debug("Vector store unavailable for memory_read: %s", exc) - - # If a query was provided, also do semantic search - if query: - search_result = memory_search(query, top_k) - if search_result and search_result != "No relevant memories found.": - parts.append("\n## Search Results") - parts.append(search_result) - - if not parts: - return "No memories stored yet. Use memory_write to store information." - return "\n".join(parts) - - -def memory_write(content: str, context_type: str = "fact") -> str: - """Store a piece of information in persistent memory. - - Use this tool when the user explicitly asks you to remember something. - Stored memories are searchable via memory_search across all channels - (web GUI, Discord, Telegram, etc.). - - Args: - content: The information to remember (e.g. a phrase, fact, or note). - context_type: Type of memory — "fact" for permanent facts, - "conversation" for conversation context, - "document" for document fragments. - - Returns: - Confirmation that the memory was stored. - """ - if not content or not content.strip(): - return "Nothing to store — content is empty." - - valid_types = ("fact", "conversation", "document") - if context_type not in valid_types: - context_type = "fact" - - try: - from timmy.memory.vector_store import search_memories, store_memory - - # Dedup check for facts — skip if a similar fact already exists - # Threshold 0.75 catches paraphrases (was 0.9 which only caught near-exact) - if context_type == "fact": - existing = search_memories( - content.strip(), limit=3, context_type="fact", min_relevance=0.75 - ) - if existing: - return f"Similar fact already stored (id={existing[0].id[:8]}). Skipping duplicate." - - entry = store_memory( - content=content.strip(), - source="agent", - context_type=context_type, - ) - return f"Stored in memory (type={context_type}, id={entry.id[:8]}). This is now searchable across all channels." - except Exception as exc: - logger.error("Failed to write memory: %s", exc) - return f"Failed to store memory: {exc}" - - -def memory_forget(query: str) -> str: - """Remove a stored memory that is outdated, incorrect, or no longer relevant. - - Searches for memories matching the query and deletes the closest match. - Use this when the user says to forget something or when stored information - has changed. - - Args: - query: Description of the memory to forget (e.g. "my phone number", - "the old server address"). - - Returns: - Confirmation of what was forgotten, or a message if nothing matched. - """ - if not query or not query.strip(): - return "Nothing to forget — query is empty." - - try: - from timmy.memory.vector_store import delete_memory, search_memories - - results = search_memories(query.strip(), limit=3, min_relevance=0.3) - if not results: - return "No matching memories found to forget." - - # Delete the closest match - best = results[0] - deleted = delete_memory(best.id) - if deleted: - return f'Forgotten: "{best.content[:80]}" (type={best.context_type})' - return "Memory not found (may have already been deleted)." - except Exception as exc: - logger.error("Failed to forget memory: %s", exc) - return f"Failed to forget: {exc}" +"""Backward compatibility — all memory functions live in memory_system now.""" + +from timmy.memory_system import ( + DB_PATH, + EMBEDDING_DIM, + EMBEDDING_MODEL, + MemoryChunk, + MemoryEntry, + MemorySearcher, + SemanticMemory, + _get_embedding_model, + _simple_hash_embedding, + cosine_similarity, + embed_text, + memory_forget, + memory_read, + memory_search, + memory_searcher, + memory_write, + semantic_memory, +) + +__all__ = [ + "DB_PATH", + "EMBEDDING_DIM", + "EMBEDDING_MODEL", + "MemoryChunk", + "MemoryEntry", + "MemorySearcher", + "SemanticMemory", + "_get_embedding_model", + "_simple_hash_embedding", + "cosine_similarity", + "embed_text", + "memory_forget", + "memory_read", + "memory_search", + "memory_searcher", + "memory_write", + "semantic_memory", +] diff --git a/src/timmy/thinking.py b/src/timmy/thinking.py index a11b2939..e39a80d1 100644 --- a/src/timmy/thinking.py +++ b/src/timmy/thinking.py @@ -462,7 +462,7 @@ class ThinkingEngine: Args: facts: List of fact strings to filter and store. """ - from timmy.semantic_memory import memory_write + from timmy.memory_system import memory_write for fact in facts[:3]: # Safety cap if not isinstance(fact, str) or len(fact.strip()) <= 10: diff --git a/src/timmy/tools.py b/src/timmy/tools.py index 36479820..d0d3c9f3 100644 --- a/src/timmy/tools.py +++ b/src/timmy/tools.py @@ -508,7 +508,7 @@ def _register_grok_tool(toolkit: Toolkit) -> None: def _register_memory_tools(toolkit: Toolkit) -> None: """Register memory search, write, and forget tools.""" try: - from timmy.semantic_memory import memory_forget, memory_read, memory_search, memory_write + from timmy.memory_system import memory_forget, memory_read, memory_search, memory_write toolkit.register(memory_search, name="memory_search") toolkit.register(memory_write, name="memory_write") diff --git a/tests/conftest.py b/tests/conftest.py index 5c2be15a..f1c84382 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -94,7 +94,8 @@ def clean_database(tmp_path): "infrastructure.models.registry", ] _memory_db_modules = [ - "timmy.memory.unified", + "timmy.memory_system", # Canonical location + "timmy.memory.unified", # Backward compat ] _spark_db_modules = [ "spark.memory", @@ -122,14 +123,8 @@ def clean_database(tmp_path): except Exception: pass - # Redirect semantic memory DB path (uses SEMANTIC_DB_PATH, not DB_PATH) - try: - import timmy.semantic_memory as _sem_mod - - originals[("timmy.semantic_memory", "SEMANTIC_DB_PATH")] = _sem_mod.SEMANTIC_DB_PATH - _sem_mod.SEMANTIC_DB_PATH = tmp_memory_db - except Exception: - pass + # Note: semantic_memory now re-exports from memory_system, + # so DB_PATH is already patched via _memory_db_modules above for mod_name in _spark_db_modules: try: diff --git a/tests/dashboard/test_chat_persistence.py b/tests/dashboard/test_chat_persistence.py index ddb7b7e3..b0da86f6 100644 --- a/tests/dashboard/test_chat_persistence.py +++ b/tests/dashboard/test_chat_persistence.py @@ -1,7 +1,7 @@ """Tests for SQLite-backed chat persistence (issue #46).""" -from dashboard.store import Message, MessageLog import infrastructure.chat_store as _chat_store +from dashboard.store import Message, MessageLog def test_persistence_across_instances(tmp_path): diff --git a/tests/timmy/test_semantic_memory.py b/tests/timmy/test_semantic_memory.py index b5549bad..2620b711 100644 --- a/tests/timmy/test_semantic_memory.py +++ b/tests/timmy/test_semantic_memory.py @@ -5,7 +5,7 @@ from unittest.mock import MagicMock, patch import pytest -from timmy.semantic_memory import ( +from timmy.memory_system import ( MemoryChunk, MemorySearcher, SemanticMemory, @@ -201,6 +201,11 @@ class TestSemanticMemory: return sm def test_init_creates_db(self, mem): + # After consolidation, _init_db ensures schema is ready + # The DB file is created lazily; verify by checking we can get a connection + + mem._init_db() + # If we get here without error, the DB is initialized assert mem.db_path.exists() def test_split_into_chunks_short(self, mem): @@ -283,8 +288,12 @@ class TestSemanticMemory: mem.index_file(md_file) # Check DB directly - tiny chunks should NOT be stored + # After consolidation: chunks are stored in 'memories' table with context_type='chunk' conn = sqlite3.connect(str(mem.db_path)) - cursor = conn.execute("SELECT COUNT(*) FROM chunks WHERE source = ?", (str(md_file),)) + cursor = conn.execute( + "SELECT COUNT(*) FROM memories WHERE source = ? AND context_type = 'chunk'", + (str(md_file),) + ) stored_count = cursor.fetchone()[0] conn.close() @@ -316,13 +325,16 @@ class TestSemanticMemory: import sqlite3 conn = sqlite3.connect(str(mem.db_path)) - conn.execute("DELETE FROM chunks") + # After consolidation: chunks are stored in 'memories' table with context_type='chunk' + conn.execute("DELETE FROM memories WHERE context_type = 'chunk'") conn.commit() conn.close() mem.index_vault() conn = sqlite3.connect(str(mem.db_path)) - rows = conn.execute("SELECT DISTINCT source FROM chunks").fetchall() + rows = conn.execute( + "SELECT DISTINCT source FROM memories WHERE context_type = 'chunk'" + ).fetchall() conn.close() sources = [r[0] for r in rows] # Only the real file should be indexed, not the handoff @@ -517,8 +529,8 @@ class TestMemoryWrite: """Mock vector_store functions for memory_write tests.""" # Patch where it's imported from, not where it's used with ( - patch("timmy.memory.vector_store.search_memories") as mock_search, - patch("timmy.memory.vector_store.store_memory") as mock_store, + patch("timmy.memory_system.search_memories") as mock_search, + patch("timmy.memory_system.store_memory") as mock_store, ): # Default: no existing memories (no duplicates) mock_search.return_value = [] @@ -610,8 +622,8 @@ class TestMemoryForget: """Mock vector_store functions for memory_forget tests.""" # Patch where it's imported from, not where it's used with ( - patch("timmy.memory.vector_store.search_memories") as mock_search, - patch("timmy.memory.vector_store.delete_memory") as mock_delete, + patch("timmy.memory_system.search_memories") as mock_search, + patch("timmy.memory_system.delete_memory") as mock_delete, ): # Default: no results mock_search.return_value = [] diff --git a/tests/timmy/test_vector_store.py b/tests/timmy/test_vector_store.py index 16b79130..495554be 100644 --- a/tests/timmy/test_vector_store.py +++ b/tests/timmy/test_vector_store.py @@ -2,7 +2,7 @@ import pytest -from timmy.memory.vector_store import ( +from timmy.memory_system import ( _cosine_similarity, _keyword_overlap, delete_memory,