[loop-cycle-54] refactor: consolidate three memory stores into single table (#37) (#223)

This commit is contained in:
2026-03-15 13:33:24 -04:00
parent 4a68f6cb8b
commit b4cb3e9975
14 changed files with 1425 additions and 1084 deletions

View File

@@ -1,28 +1,29 @@
"""Unified memory database — single SQLite DB for all memory types.
"""Unified memory schema and connection management.
Consolidates three previously separate stores into one:
- **facts**: Long-term knowledge (user preferences, learned patterns)
- **chunks**: Indexed vault documents (markdown files from memory/)
- **episodes**: Runtime memories (conversations, agent observations)
All three tables live in ``data/memory.db``. Existing APIs in
``vector_store.py`` and ``semantic_memory.py`` are updated to point here.
This module provides the central database schema for Timmy's consolidated
memory system. All memory types (facts, conversations, documents, vault chunks)
are stored in a single `memories` table with a `memory_type` discriminator.
"""
import logging
import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
logger = logging.getLogger(__name__)
DB_PATH = Path(__file__).parent.parent.parent.parent / "data" / "memory.db"
# Paths
PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
DB_PATH = PROJECT_ROOT / "data" / "memory.db"
@contextmanager
def get_connection() -> Generator[sqlite3.Connection, None, None]:
"""Open (and lazily create) the unified memory database."""
"""Get database connection to unified memory database."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
@@ -33,56 +34,166 @@ def get_connection() -> Generator[sqlite3.Connection, None, None]:
def _ensure_schema(conn: sqlite3.Connection) -> None:
"""Create the three core tables and indexes if they don't exist."""
# --- facts ---------------------------------------------------------------
"""Create the unified memories table and indexes if they don't exist."""
conn.execute("""
CREATE TABLE IF NOT EXISTS facts (
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
category TEXT NOT NULL DEFAULT 'general',
content TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 0.8,
memory_type TEXT NOT NULL DEFAULT 'fact',
source TEXT NOT NULL DEFAULT 'agent',
embedding TEXT,
metadata TEXT,
source_hash TEXT,
agent_id TEXT,
task_id TEXT,
session_id TEXT,
confidence REAL NOT NULL DEFAULT 0.8,
tags TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
last_accessed TEXT,
access_count INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_confidence ON facts(confidence)")
# Create indexes for efficient querying
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_time ON memories(created_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_session ON memories(session_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(agent_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_source ON memories(source)")
conn.commit()
# --- chunks (vault document fragments) -----------------------------------
conn.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
content TEXT NOT NULL,
embedding TEXT NOT NULL,
created_at TEXT NOT NULL,
source_hash TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source)")
# Run migration if needed
_migrate_schema(conn)
# --- episodes (runtime memory entries) -----------------------------------
conn.execute("""
CREATE TABLE IF NOT EXISTS episodes (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
source TEXT NOT NULL,
context_type TEXT NOT NULL DEFAULT 'conversation',
embedding TEXT,
metadata TEXT,
agent_id TEXT,
task_id TEXT,
session_id TEXT,
timestamp TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_type ON episodes(context_type)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_time ON episodes(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_session ON episodes(session_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_episodes_agent ON episodes(agent_id)")
def _migrate_schema(conn: sqlite3.Connection) -> None:
"""Migrate from old three-table schema to unified memories table.
Migration paths:
- episodes table -> memories (context_type -> memory_type)
- chunks table -> memories with memory_type='vault_chunk'
- facts table -> dropped (unused, 0 rows expected)
"""
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
has_memories = "memories" in tables
has_episodes = "episodes" in tables
has_chunks = "chunks" in tables
has_facts = "facts" in tables
# Check if we need to migrate (old schema exists but new one doesn't fully)
if not has_memories:
logger.info("Migration: Creating unified memories table")
# Schema will be created above
# Migrate episodes -> memories
if has_episodes and has_memories:
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
# Migrate chunks -> memories as vault_chunk
if has_chunks and has_memories:
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = "filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
# Drop old facts table
if has_facts:
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
conn.commit()
def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
"""Get the column names for a table."""
cursor = conn.execute(f"PRAGMA table_info({table_name})")
return {row[1] for row in cursor.fetchall()}
# Backward compatibility aliases
get_conn = get_connection
@dataclass
class MemoryEntry:
"""A memory entry with vector embedding.
Note: The DB column is `memory_type` but this field is named `context_type`
for backward API compatibility.
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
content: str = "" # The actual text content
source: str = "" # Where it came from (agent, user, system)
context_type: str = "conversation" # API field name; DB column is memory_type
agent_id: str | None = None
task_id: str | None = None
session_id: str | None = None
metadata: dict | None = None
embedding: list[float] | None = None
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
relevance_score: float | None = None # Set during search
@dataclass
class MemoryChunk:
"""A searchable chunk of memory."""
id: str
source: str # filepath
content: str
embedding: list[float]
created_at: str
# Note: Functions are available via memory_system module directly
# from timmy.memory_system import store_memory, search_memories, etc.