diff --git a/tests/unit/test_memory_unified.py b/tests/unit/test_memory_unified.py new file mode 100644 index 00000000..e700eae3 --- /dev/null +++ b/tests/unit/test_memory_unified.py @@ -0,0 +1,460 @@ +"""Unit tests for timmy.memory.unified — schema, migration, and dataclasses.""" + +import sqlite3 +import uuid +from unittest.mock import patch + +import pytest + +from timmy.memory.unified import ( + MemoryChunk, + MemoryEntry, + _ensure_schema, + _get_table_columns, + _migrate_schema, + get_conn, + get_connection, +) + +# ── Helpers ────────────────────────────────────────────────────────────────── + + +def _make_conn() -> sqlite3.Connection: + """Return an in-memory SQLite connection with row_factory set.""" + conn = sqlite3.connect(":memory:") + conn.row_factory = sqlite3.Row + return conn + + +# ── get_connection / get_conn ──────────────────────────────────────────────── + + +class TestGetConnection: + """Tests for get_connection context manager.""" + + def test_returns_connection(self, tmp_path): + """get_connection yields a usable sqlite3 connection.""" + db = tmp_path / "mem.db" + with patch("timmy.memory.unified.DB_PATH", db): + with get_connection() as conn: + assert isinstance(conn, sqlite3.Connection) + # Schema should already be created + cur = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='memories'" + ) + assert cur.fetchone() is not None + + def test_creates_parent_directory(self, tmp_path): + """get_connection creates parent dirs if missing.""" + db = tmp_path / "sub" / "dir" / "mem.db" + with patch("timmy.memory.unified.DB_PATH", db): + with get_connection() as conn: + conn.execute("SELECT 1") + assert db.parent.exists() + + def test_get_conn_alias(self): + """get_conn is the same object as get_connection.""" + assert get_conn is get_connection + + def test_connection_has_row_factory(self, tmp_path): + """Connection should use sqlite3.Row factory.""" + db = tmp_path / "mem.db" + with patch("timmy.memory.unified.DB_PATH", db): + with get_connection() as conn: + assert conn.row_factory is sqlite3.Row + + def test_wal_mode_enabled(self, tmp_path): + """Connection enables WAL journal mode.""" + db = tmp_path / "mem.db" + with patch("timmy.memory.unified.DB_PATH", db): + with get_connection() as conn: + cur = conn.execute("PRAGMA journal_mode") + mode = cur.fetchone()[0] + assert mode == "wal" + + +# ── _ensure_schema ─────────────────────────────────────────────────────────── + + +class TestEnsureSchema: + """Tests for _ensure_schema.""" + + def test_creates_memories_table(self): + """Memories table should be created with correct columns.""" + conn = _make_conn() + _ensure_schema(conn) + cols = _get_table_columns(conn, "memories") + expected = { + "id", + "content", + "memory_type", + "source", + "embedding", + "metadata", + "source_hash", + "agent_id", + "task_id", + "session_id", + "confidence", + "tags", + "created_at", + "last_accessed", + "access_count", + } + assert cols == expected + conn.close() + + def test_creates_indexes(self): + """Expected indexes should exist after schema creation.""" + conn = _make_conn() + _ensure_schema(conn) + cur = conn.execute("SELECT name FROM sqlite_master WHERE type='index'") + indexes = {row[0] for row in cur.fetchall()} + for idx in [ + "idx_memories_type", + "idx_memories_time", + "idx_memories_session", + "idx_memories_agent", + "idx_memories_source", + ]: + assert idx in indexes, f"Missing index: {idx}" + conn.close() + + def test_idempotent(self): + """Calling _ensure_schema twice should not error.""" + conn = _make_conn() + _ensure_schema(conn) + _ensure_schema(conn) # no error + conn.close() + + def test_default_values(self): + """Inserted row should have correct defaults.""" + conn = _make_conn() + _ensure_schema(conn) + conn.execute( + "INSERT INTO memories (id, content, created_at) VALUES (?, ?, ?)", + ("test-1", "hello", "2025-01-01T00:00:00"), + ) + conn.commit() + row = conn.execute("SELECT * FROM memories WHERE id='test-1'").fetchone() + assert row["memory_type"] == "fact" + assert row["source"] == "agent" + assert row["confidence"] == 0.8 + assert row["tags"] == "[]" + assert row["access_count"] == 0 + conn.close() + + +# ── _get_table_columns ─────────────────────────────────────────────────────── + + +class TestGetTableColumns: + """Tests for _get_table_columns helper.""" + + def test_returns_column_names(self): + conn = _make_conn() + conn.execute("CREATE TABLE t (a TEXT, b INTEGER, c REAL)") + assert _get_table_columns(conn, "t") == {"a", "b", "c"} + conn.close() + + def test_empty_for_missing_table(self): + """PRAGMA table_info on non-existent table returns empty set.""" + conn = _make_conn() + assert _get_table_columns(conn, "nonexistent") == set() + conn.close() + + +# ── _migrate_schema ────────────────────────────────────────────────────────── + + +class TestMigrateSchema: + """Tests for _migrate_schema — old table migration.""" + + def _setup_memories_table(self, conn): + """Create the unified memories table for migration tests.""" + _ensure_schema(conn) + + def test_no_old_tables_is_noop(self): + """Migration with only memories table does nothing.""" + conn = _make_conn() + self._setup_memories_table(conn) + # Should not raise + _migrate_schema(conn) + conn.close() + + def test_migrates_episodes_with_context_type(self): + """Episodes rows migrate to memories with context_type as memory_type.""" + conn = _make_conn() + self._setup_memories_table(conn) + conn.execute(""" + CREATE TABLE episodes ( + id TEXT PRIMARY KEY, + content TEXT, + context_type TEXT, + source TEXT, + embedding TEXT, + metadata TEXT, + agent_id TEXT, + task_id TEXT, + session_id TEXT, + timestamp TEXT + ) + """) + conn.execute( + "INSERT INTO episodes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + "ep-1", + "test content", + "conversation", + "agent", + None, + None, + "a1", + "t1", + "s1", + "2025-01-01T00:00:00", + ), + ) + conn.commit() + + _migrate_schema(conn) + + # Episodes table should be dropped + tables = { + r[0] + for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + } + assert "episodes" not in tables + + # Data should be in memories + row = conn.execute("SELECT * FROM memories WHERE id='ep-1'").fetchone() + assert row is not None + assert row["content"] == "test content" + assert row["memory_type"] == "conversation" + assert row["source"] == "agent" + conn.close() + + def test_migrates_episodes_without_context_type(self): + """Episodes without context_type column default to 'conversation'.""" + conn = _make_conn() + self._setup_memories_table(conn) + conn.execute(""" + CREATE TABLE episodes ( + id TEXT PRIMARY KEY, + content TEXT, + source TEXT, + embedding TEXT, + metadata TEXT, + agent_id TEXT, + task_id TEXT, + session_id TEXT, + timestamp TEXT + ) + """) + conn.execute( + "INSERT INTO episodes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ("ep-2", "no type", "user", None, None, None, None, None, "2025-02-01"), + ) + conn.commit() + + _migrate_schema(conn) + row = conn.execute("SELECT * FROM memories WHERE id='ep-2'").fetchone() + assert row["memory_type"] == "conversation" + conn.close() + + def test_migrates_chunks(self): + """Chunks table migrates to memories as vault_chunk type.""" + conn = _make_conn() + self._setup_memories_table(conn) + conn.execute(""" + CREATE TABLE chunks ( + id TEXT PRIMARY KEY, + content TEXT, + filepath TEXT, + embedding TEXT, + created_at TEXT + ) + """) + conn.execute( + "INSERT INTO chunks VALUES (?, ?, ?, ?, ?)", + ("ch-1", "chunk text", "/vault/note.md", None, "2025-03-01"), + ) + conn.commit() + + _migrate_schema(conn) + + tables = { + r[0] + for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + } + assert "chunks" not in tables + + row = conn.execute("SELECT * FROM memories WHERE id='ch-1'").fetchone() + assert row is not None + assert row["memory_type"] == "vault_chunk" + assert row["source"] == "/vault/note.md" + conn.close() + + def test_migrates_chunks_minimal_columns(self): + """Chunks with minimal columns (text instead of content, no id).""" + conn = _make_conn() + self._setup_memories_table(conn) + conn.execute("CREATE TABLE chunks (text TEXT, source TEXT)") + conn.execute("INSERT INTO chunks VALUES (?, ?)", ("minimal chunk", "vault")) + conn.commit() + + _migrate_schema(conn) + + rows = conn.execute("SELECT * FROM memories WHERE memory_type='vault_chunk'").fetchall() + assert len(rows) == 1 + assert rows[0]["content"] == "minimal chunk" + assert rows[0]["source"] == "vault" + conn.close() + + def test_drops_facts_table(self): + """Old facts table is dropped during migration.""" + conn = _make_conn() + self._setup_memories_table(conn) + conn.execute("CREATE TABLE facts (id TEXT PRIMARY KEY, content TEXT)") + conn.commit() + + _migrate_schema(conn) + + tables = { + r[0] + for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + } + assert "facts" not in tables + conn.close() + + def test_migration_handles_episode_error_gracefully(self): + """If episode migration fails, it logs warning but continues.""" + conn = _make_conn() + self._setup_memories_table(conn) + # Create episodes with a column that will cause a conflict + conn.execute("CREATE TABLE episodes (id TEXT PRIMARY KEY, content TEXT)") + # Insert a row with the same ID already in memories to trigger conflict + conn.execute( + "INSERT INTO memories (id, content, created_at) VALUES (?, ?, ?)", + ("dup-id", "existing", "2025-01-01"), + ) + conn.execute("INSERT INTO episodes VALUES (?, ?)", ("dup-id", "duplicate")) + conn.commit() + + # Should not raise — logs warning instead + _migrate_schema(conn) + conn.close() + + def test_full_migration_all_old_tables(self): + """All three old tables migrate in a single pass.""" + conn = _make_conn() + self._setup_memories_table(conn) + + conn.execute(""" + CREATE TABLE episodes ( + id TEXT, content TEXT, context_type TEXT, source TEXT, + embedding TEXT, metadata TEXT, agent_id TEXT, task_id TEXT, + session_id TEXT, timestamp TEXT + ) + """) + conn.execute( + "CREATE TABLE chunks (id TEXT, content TEXT, filepath TEXT, embedding TEXT, created_at TEXT)" + ) + conn.execute("CREATE TABLE facts (id TEXT, content TEXT)") + conn.execute( + "INSERT INTO episodes VALUES ('e1','ep','conv','agent',NULL,NULL,NULL,NULL,NULL,'2025-01-01')" + ) + conn.execute("INSERT INTO chunks VALUES ('c1','chunk','/f.md',NULL,'2025-01-01')") + conn.commit() + + _migrate_schema(conn) + + tables = { + r[0] + for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + } + assert "episodes" not in tables + assert "chunks" not in tables + assert "facts" not in tables + assert "memories" in tables + + assert conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0] >= 2 + conn.close() + + +# ── MemoryEntry dataclass ──────────────────────────────────────────────────── + + +class TestMemoryEntry: + """Tests for the MemoryEntry dataclass.""" + + def test_defaults(self): + entry = MemoryEntry() + assert entry.content == "" + assert entry.source == "" + assert entry.context_type == "conversation" + assert entry.agent_id is None + assert entry.task_id is None + assert entry.session_id is None + assert entry.metadata is None + assert entry.embedding is None + assert entry.relevance_score is None + # ID should be a valid UUID + uuid.UUID(entry.id) + # Timestamp should be ISO format + assert "T" in entry.timestamp + + def test_custom_values(self): + entry = MemoryEntry( + id="custom-id", + content="hello world", + source="user", + context_type="fact", + agent_id="agent-1", + task_id="task-1", + session_id="sess-1", + metadata={"key": "value"}, + embedding=[0.1, 0.2, 0.3], + timestamp="2025-06-01T00:00:00", + relevance_score=0.95, + ) + assert entry.id == "custom-id" + assert entry.content == "hello world" + assert entry.source == "user" + assert entry.context_type == "fact" + assert entry.agent_id == "agent-1" + assert entry.metadata == {"key": "value"} + assert entry.embedding == [0.1, 0.2, 0.3] + assert entry.relevance_score == 0.95 + + def test_unique_ids(self): + """Each MemoryEntry gets a unique default ID.""" + a = MemoryEntry() + b = MemoryEntry() + assert a.id != b.id + + +# ── MemoryChunk dataclass ──────────────────────────────────────────────────── + + +class TestMemoryChunk: + """Tests for the MemoryChunk dataclass.""" + + def test_fields(self): + chunk = MemoryChunk( + id="ch-1", + source="/vault/file.md", + content="some text", + embedding=[0.5, 0.6], + created_at="2025-01-01T00:00:00", + ) + assert chunk.id == "ch-1" + assert chunk.source == "/vault/file.md" + assert chunk.content == "some text" + assert chunk.embedding == [0.5, 0.6] + assert chunk.created_at == "2025-01-01T00:00:00" + + def test_required_fields(self): + """MemoryChunk requires all fields — no defaults.""" + with pytest.raises(TypeError): + MemoryChunk() # type: ignore[call-arg]