"""Database connection, schema, migrations, path constants, and data classes. This module contains the lowest-level database primitives for Timmy's memory system — connection management, schema creation / migration, path constants, and the core data classes (MemoryEntry, MemoryChunk). """ import logging import sqlite3 import uuid from collections.abc import Generator from contextlib import closing, contextmanager from dataclasses import dataclass, field from datetime import UTC, datetime from pathlib import Path from config import settings logger = logging.getLogger(__name__) # ── Path constants ─────────────────────────────────────────────────────────── PROJECT_ROOT = Path(__file__).parent.parent.parent.parent HOT_MEMORY_PATH = PROJECT_ROOT / "MEMORY.md" VAULT_PATH = PROJECT_ROOT / "memory" SOUL_PATH = VAULT_PATH / "self" / "soul.md" DB_PATH = PROJECT_ROOT / "data" / "memory.db" # ── Database connection ────────────────────────────────────────────────────── @contextmanager def get_connection() -> Generator[sqlite3.Connection, None, None]: """Get database connection to unified memory database.""" DB_PATH.parent.mkdir(parents=True, exist_ok=True) with closing(sqlite3.connect(str(DB_PATH))) as conn: conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}") _ensure_schema(conn) yield conn def _ensure_schema(conn: sqlite3.Connection) -> None: """Create the unified memories table and indexes if they don't exist.""" conn.execute(""" CREATE TABLE IF NOT EXISTS memories ( id TEXT PRIMARY KEY, content TEXT NOT NULL, memory_type TEXT NOT NULL DEFAULT 'fact', source TEXT NOT NULL DEFAULT 'agent', embedding TEXT, metadata TEXT, source_hash TEXT, agent_id TEXT, task_id TEXT, session_id TEXT, confidence REAL NOT NULL DEFAULT 0.8, tags TEXT NOT NULL DEFAULT '[]', created_at TEXT NOT NULL, last_accessed TEXT, access_count INTEGER NOT NULL DEFAULT 0 ) """) # Create indexes for efficient querying conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_session ON memories(session_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(agent_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)") conn.commit() # Run migration if needed _migrate_schema(conn) def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]: """Get the column names for a table.""" cursor = conn.execute(f"PRAGMA table_info({table_name})") return {row[1] for row in cursor.fetchall()} def _migrate_episodes(conn: sqlite3.Connection) -> None: """Migrate episodes table rows into the unified memories table.""" logger.info("Migration: Converting episodes table to memories") try: cols = _get_table_columns(conn, "episodes") context_type_col = "context_type" if "context_type" in cols else "'conversation'" conn.execute(f""" INSERT INTO memories ( id, content, memory_type, source, embedding, metadata, agent_id, task_id, session_id, created_at, access_count, last_accessed ) SELECT id, content, COALESCE({context_type_col}, 'conversation'), COALESCE(source, 'agent'), embedding, metadata, agent_id, task_id, session_id, COALESCE(timestamp, datetime('now')), 0, NULL FROM episodes """) conn.execute("DROP TABLE episodes") logger.info("Migration: Migrated episodes to memories") except sqlite3.Error as exc: logger.warning("Migration: Failed to migrate episodes: %s", exc) def _migrate_chunks(conn: sqlite3.Connection) -> None: """Migrate chunks table rows into the unified memories table.""" logger.info("Migration: Converting chunks table to memories") try: cols = _get_table_columns(conn, "chunks") id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)" content_col = "content" if "content" in cols else "text" source_col = ( "filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'") ) embedding_col = "embedding" if "embedding" in cols else "NULL" created_col = "created_at" if "created_at" in cols else "datetime('now')" conn.execute(f""" INSERT INTO memories ( id, content, memory_type, source, embedding, created_at, access_count ) SELECT {id_col}, {content_col}, 'vault_chunk', {source_col}, {embedding_col}, {created_col}, 0 FROM chunks """) conn.execute("DROP TABLE chunks") logger.info("Migration: Migrated chunks to memories") except sqlite3.Error as exc: logger.warning("Migration: Failed to migrate chunks: %s", exc) def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None: """Drop a legacy table if it exists.""" try: conn.execute(f"DROP TABLE {table}") # noqa: S608 logger.info("Migration: Dropped old %s table", table) except sqlite3.Error as exc: logger.warning("Migration: Failed to drop %s: %s", table, exc) def _migrate_schema(conn: sqlite3.Connection) -> None: """Migrate from old three-table schema to unified memories table. Migration paths: - episodes table -> memories (context_type -> memory_type) - chunks table -> memories with memory_type='vault_chunk' - facts table -> dropped (unused, 0 rows expected) """ cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = {row[0] for row in cursor.fetchall()} has_memories = "memories" in tables if not has_memories and (tables & {"episodes", "chunks", "facts"}): logger.info("Migration: Creating unified memories table") if "episodes" in tables and has_memories: _migrate_episodes(conn) if "chunks" in tables and has_memories: _migrate_chunks(conn) if "facts" in tables: _drop_legacy_table(conn, "facts") conn.commit() # Alias for backward compatibility get_conn = get_connection # ── Data classes ───────────────────────────────────────────────────────────── @dataclass class MemoryEntry: """A memory entry with vector embedding. Note: The DB column is `memory_type` but this field is named `context_type` for backward API compatibility. """ id: str = field(default_factory=lambda: str(uuid.uuid4())) content: str = "" # The actual text content source: str = "" # Where it came from (agent, user, system) context_type: str = "conversation" # API field name; DB column is memory_type agent_id: str | None = None task_id: str | None = None session_id: str | None = None metadata: dict | None = None embedding: list[float] | None = None timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) relevance_score: float | None = None # Set during search @dataclass class MemoryChunk: """A searchable chunk of memory.""" id: str source: str # filepath content: str embedding: list[float] created_at: str