From 1a8b8ecaedbfbef1cd561ba46f560d50bc351870 Mon Sep 17 00:00:00 2001 From: Timmy Time Date: Fri, 20 Mar 2026 12:07:15 -0400 Subject: [PATCH] [loop-cycle-1235] refactor: break up _migrate_schema() into focused helpers (#591) (#595) --- src/timmy/memory/unified.py | 149 +++++++++++++++++++----------------- 1 file changed, 77 insertions(+), 72 deletions(-) diff --git a/src/timmy/memory/unified.py b/src/timmy/memory/unified.py index 18dae52..97fd888 100644 --- a/src/timmy/memory/unified.py +++ b/src/timmy/memory/unified.py @@ -78,83 +78,88 @@ def _migrate_schema(conn: sqlite3.Connection) -> None: cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = {row[0] for row in cursor.fetchall()} - has_memories = "memories" in tables - has_episodes = "episodes" in tables - has_chunks = "chunks" in tables - has_facts = "facts" in tables - - # Check if we need to migrate (old schema exists but new one doesn't fully) - if not has_memories: + if "memories" not in tables: logger.info("Migration: Creating unified memories table") - # Schema will be created above - - # Migrate episodes -> memories - if has_episodes and has_memories: - logger.info("Migration: Converting episodes table to memories") - try: - cols = _get_table_columns(conn, "episodes") - context_type_col = "context_type" if "context_type" in cols else "'conversation'" - - conn.execute(f""" - INSERT INTO memories ( - id, content, memory_type, source, embedding, - metadata, agent_id, task_id, session_id, - created_at, access_count, last_accessed - ) - SELECT - id, content, - COALESCE({context_type_col}, 'conversation'), - COALESCE(source, 'agent'), - embedding, - metadata, agent_id, task_id, session_id, - COALESCE(timestamp, datetime('now')), 0, NULL - FROM episodes - """) - conn.execute("DROP TABLE episodes") - logger.info("Migration: Migrated episodes to memories") - except sqlite3.Error as exc: - logger.warning("Migration: Failed to migrate episodes: %s", exc) - - # Migrate chunks -> memories as vault_chunk - if has_chunks and has_memories: - logger.info("Migration: Converting chunks table to memories") - try: - cols = _get_table_columns(conn, "chunks") - - id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)" - content_col = "content" if "content" in cols else "text" - source_col = ( - "filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'") - ) - embedding_col = "embedding" if "embedding" in cols else "NULL" - created_col = "created_at" if "created_at" in cols else "datetime('now')" - - conn.execute(f""" - INSERT INTO memories ( - id, content, memory_type, source, embedding, - created_at, access_count - ) - SELECT - {id_col}, {content_col}, 'vault_chunk', {source_col}, - {embedding_col}, {created_col}, 0 - FROM chunks - """) - conn.execute("DROP TABLE chunks") - logger.info("Migration: Migrated chunks to memories") - except sqlite3.Error as exc: - logger.warning("Migration: Failed to migrate chunks: %s", exc) - - # Drop old facts table - if has_facts: - try: - conn.execute("DROP TABLE facts") - logger.info("Migration: Dropped old facts table") - except sqlite3.Error as exc: - logger.warning("Migration: Failed to drop facts: %s", exc) + # Schema will be created by _ensure_schema above + conn.commit() + return + _migrate_episodes(conn, tables) + _migrate_chunks(conn, tables) + _drop_legacy_tables(conn, tables) conn.commit() +def _migrate_episodes(conn: sqlite3.Connection, tables: set[str]) -> None: + """Migrate episodes table rows into the unified memories table.""" + if "episodes" not in tables: + return + logger.info("Migration: Converting episodes table to memories") + try: + cols = _get_table_columns(conn, "episodes") + context_type_col = "context_type" if "context_type" in cols else "'conversation'" + conn.execute(f""" + INSERT INTO memories ( + id, content, memory_type, source, embedding, + metadata, agent_id, task_id, session_id, + created_at, access_count, last_accessed + ) + SELECT + id, content, + COALESCE({context_type_col}, 'conversation'), + COALESCE(source, 'agent'), + embedding, + metadata, agent_id, task_id, session_id, + COALESCE(timestamp, datetime('now')), 0, NULL + FROM episodes + """) + conn.execute("DROP TABLE episodes") + logger.info("Migration: Migrated episodes to memories") + except sqlite3.Error as exc: + logger.warning("Migration: Failed to migrate episodes: %s", exc) + + +def _migrate_chunks(conn: sqlite3.Connection, tables: set[str]) -> None: + """Migrate chunks table rows into the unified memories table as vault_chunk.""" + if "chunks" not in tables: + return + logger.info("Migration: Converting chunks table to memories") + try: + cols = _get_table_columns(conn, "chunks") + id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)" + content_col = "content" if "content" in cols else "text" + source_col = ( + "filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'") + ) + embedding_col = "embedding" if "embedding" in cols else "NULL" + created_col = "created_at" if "created_at" in cols else "datetime('now')" + conn.execute(f""" + INSERT INTO memories ( + id, content, memory_type, source, embedding, + created_at, access_count + ) + SELECT + {id_col}, {content_col}, 'vault_chunk', {source_col}, + {embedding_col}, {created_col}, 0 + FROM chunks + """) + conn.execute("DROP TABLE chunks") + logger.info("Migration: Migrated chunks to memories") + except sqlite3.Error as exc: + logger.warning("Migration: Failed to migrate chunks: %s", exc) + + +def _drop_legacy_tables(conn: sqlite3.Connection, tables: set[str]) -> None: + """Drop old facts table if it exists.""" + if "facts" not in tables: + return + try: + conn.execute("DROP TABLE facts") + logger.info("Migration: Dropped old facts table") + except sqlite3.Error as exc: + logger.warning("Migration: Failed to drop facts: %s", exc) + + def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]: """Get the column names for a table.""" cursor = conn.execute(f"PRAGMA table_info({table_name})")