From f361893fdd2250e8a47a6270228ba10a0528e614 Mon Sep 17 00:00:00 2001 From: Timmy Time Date: Thu, 19 Mar 2026 21:11:02 -0400 Subject: [PATCH] [loop-cycle-951] refactor: break up _migrate_schema() (#552) (#558) --- src/timmy/memory_system.py | 143 +++++++++++++++++++------------------ 1 file changed, 74 insertions(+), 69 deletions(-) diff --git a/src/timmy/memory_system.py b/src/timmy/memory_system.py index 59caef1c..d9be52a1 100644 --- a/src/timmy/memory_system.py +++ b/src/timmy/memory_system.py @@ -98,6 +98,73 @@ def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]: return {row[1] for row in cursor.fetchall()} +def _migrate_episodes(conn: sqlite3.Connection) -> None: + """Migrate episodes table rows into the unified memories table.""" + logger.info("Migration: Converting episodes table to memories") + try: + cols = _get_table_columns(conn, "episodes") + context_type_col = "context_type" if "context_type" in cols else "'conversation'" + + conn.execute(f""" + INSERT INTO memories ( + id, content, memory_type, source, embedding, + metadata, agent_id, task_id, session_id, + created_at, access_count, last_accessed + ) + SELECT + id, content, + COALESCE({context_type_col}, 'conversation'), + COALESCE(source, 'agent'), + embedding, + metadata, agent_id, task_id, session_id, + COALESCE(timestamp, datetime('now')), 0, NULL + FROM episodes + """) + conn.execute("DROP TABLE episodes") + logger.info("Migration: Migrated episodes to memories") + except sqlite3.Error as exc: + logger.warning("Migration: Failed to migrate episodes: %s", exc) + + +def _migrate_chunks(conn: sqlite3.Connection) -> None: + """Migrate chunks table rows into the unified memories table.""" + logger.info("Migration: Converting chunks table to memories") + try: + cols = _get_table_columns(conn, "chunks") + + id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)" + content_col = "content" if "content" in cols else "text" + source_col = ( + "filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'") + ) + embedding_col = "embedding" if "embedding" in cols else "NULL" + created_col = "created_at" if "created_at" in cols else "datetime('now')" + + conn.execute(f""" + INSERT INTO memories ( + id, content, memory_type, source, embedding, + created_at, access_count + ) + SELECT + {id_col}, {content_col}, 'vault_chunk', {source_col}, + {embedding_col}, {created_col}, 0 + FROM chunks + """) + conn.execute("DROP TABLE chunks") + logger.info("Migration: Migrated chunks to memories") + except sqlite3.Error as exc: + logger.warning("Migration: Failed to migrate chunks: %s", exc) + + +def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None: + """Drop a legacy table if it exists.""" + try: + conn.execute(f"DROP TABLE {table}") # noqa: S608 + logger.info("Migration: Dropped old %s table", table) + except sqlite3.Error as exc: + logger.warning("Migration: Failed to drop %s: %s", table, exc) + + def _migrate_schema(conn: sqlite3.Connection) -> None: """Migrate from old three-table schema to unified memories table. @@ -110,78 +177,16 @@ def _migrate_schema(conn: sqlite3.Connection) -> None: tables = {row[0] for row in cursor.fetchall()} has_memories = "memories" in tables - has_episodes = "episodes" in tables - has_chunks = "chunks" in tables - has_facts = "facts" in tables - # Check if we need to migrate (old schema exists) - if not has_memories and (has_episodes or has_chunks or has_facts): + if not has_memories and (tables & {"episodes", "chunks", "facts"}): logger.info("Migration: Creating unified memories table") - # Schema will be created by _ensure_schema above - # Migrate episodes -> memories - if has_episodes and has_memories: - logger.info("Migration: Converting episodes table to memories") - try: - cols = _get_table_columns(conn, "episodes") - context_type_col = "context_type" if "context_type" in cols else "'conversation'" - - conn.execute(f""" - INSERT INTO memories ( - id, content, memory_type, source, embedding, - metadata, agent_id, task_id, session_id, - created_at, access_count, last_accessed - ) - SELECT - id, content, - COALESCE({context_type_col}, 'conversation'), - COALESCE(source, 'agent'), - embedding, - metadata, agent_id, task_id, session_id, - COALESCE(timestamp, datetime('now')), 0, NULL - FROM episodes - """) - conn.execute("DROP TABLE episodes") - logger.info("Migration: Migrated episodes to memories") - except sqlite3.Error as exc: - logger.warning("Migration: Failed to migrate episodes: %s", exc) - - # Migrate chunks -> memories as vault_chunk - if has_chunks and has_memories: - logger.info("Migration: Converting chunks table to memories") - try: - cols = _get_table_columns(conn, "chunks") - - id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)" - content_col = "content" if "content" in cols else "text" - source_col = ( - "filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'") - ) - embedding_col = "embedding" if "embedding" in cols else "NULL" - created_col = "created_at" if "created_at" in cols else "datetime('now')" - - conn.execute(f""" - INSERT INTO memories ( - id, content, memory_type, source, embedding, - created_at, access_count - ) - SELECT - {id_col}, {content_col}, 'vault_chunk', {source_col}, - {embedding_col}, {created_col}, 0 - FROM chunks - """) - conn.execute("DROP TABLE chunks") - logger.info("Migration: Migrated chunks to memories") - except sqlite3.Error as exc: - logger.warning("Migration: Failed to migrate chunks: %s", exc) - - # Drop old tables - if has_facts: - try: - conn.execute("DROP TABLE facts") - logger.info("Migration: Dropped old facts table") - except sqlite3.Error as exc: - logger.warning("Migration: Failed to drop facts: %s", exc) + if "episodes" in tables and has_memories: + _migrate_episodes(conn) + if "chunks" in tables and has_memories: + _migrate_chunks(conn) + if "facts" in tables: + _drop_legacy_table(conn, "facts") conn.commit()