test: add unit tests for memory/unified.py (#353)
Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
This commit was merged in pull request #353.
This commit is contained in:
460
tests/unit/test_memory_unified.py
Normal file
460
tests/unit/test_memory_unified.py
Normal file
@@ -0,0 +1,460 @@
|
||||
"""Unit tests for timmy.memory.unified — schema, migration, and dataclasses."""
|
||||
|
||||
import sqlite3
|
||||
import uuid
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.memory.unified import (
|
||||
MemoryChunk,
|
||||
MemoryEntry,
|
||||
_ensure_schema,
|
||||
_get_table_columns,
|
||||
_migrate_schema,
|
||||
get_conn,
|
||||
get_connection,
|
||||
)
|
||||
|
||||
# ── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _make_conn() -> sqlite3.Connection:
|
||||
"""Return an in-memory SQLite connection with row_factory set."""
|
||||
conn = sqlite3.connect(":memory:")
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
|
||||
# ── get_connection / get_conn ────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGetConnection:
|
||||
"""Tests for get_connection context manager."""
|
||||
|
||||
def test_returns_connection(self, tmp_path):
|
||||
"""get_connection yields a usable sqlite3 connection."""
|
||||
db = tmp_path / "mem.db"
|
||||
with patch("timmy.memory.unified.DB_PATH", db):
|
||||
with get_connection() as conn:
|
||||
assert isinstance(conn, sqlite3.Connection)
|
||||
# Schema should already be created
|
||||
cur = conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name='memories'"
|
||||
)
|
||||
assert cur.fetchone() is not None
|
||||
|
||||
def test_creates_parent_directory(self, tmp_path):
|
||||
"""get_connection creates parent dirs if missing."""
|
||||
db = tmp_path / "sub" / "dir" / "mem.db"
|
||||
with patch("timmy.memory.unified.DB_PATH", db):
|
||||
with get_connection() as conn:
|
||||
conn.execute("SELECT 1")
|
||||
assert db.parent.exists()
|
||||
|
||||
def test_get_conn_alias(self):
|
||||
"""get_conn is the same object as get_connection."""
|
||||
assert get_conn is get_connection
|
||||
|
||||
def test_connection_has_row_factory(self, tmp_path):
|
||||
"""Connection should use sqlite3.Row factory."""
|
||||
db = tmp_path / "mem.db"
|
||||
with patch("timmy.memory.unified.DB_PATH", db):
|
||||
with get_connection() as conn:
|
||||
assert conn.row_factory is sqlite3.Row
|
||||
|
||||
def test_wal_mode_enabled(self, tmp_path):
|
||||
"""Connection enables WAL journal mode."""
|
||||
db = tmp_path / "mem.db"
|
||||
with patch("timmy.memory.unified.DB_PATH", db):
|
||||
with get_connection() as conn:
|
||||
cur = conn.execute("PRAGMA journal_mode")
|
||||
mode = cur.fetchone()[0]
|
||||
assert mode == "wal"
|
||||
|
||||
|
||||
# ── _ensure_schema ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestEnsureSchema:
|
||||
"""Tests for _ensure_schema."""
|
||||
|
||||
def test_creates_memories_table(self):
|
||||
"""Memories table should be created with correct columns."""
|
||||
conn = _make_conn()
|
||||
_ensure_schema(conn)
|
||||
cols = _get_table_columns(conn, "memories")
|
||||
expected = {
|
||||
"id",
|
||||
"content",
|
||||
"memory_type",
|
||||
"source",
|
||||
"embedding",
|
||||
"metadata",
|
||||
"source_hash",
|
||||
"agent_id",
|
||||
"task_id",
|
||||
"session_id",
|
||||
"confidence",
|
||||
"tags",
|
||||
"created_at",
|
||||
"last_accessed",
|
||||
"access_count",
|
||||
}
|
||||
assert cols == expected
|
||||
conn.close()
|
||||
|
||||
def test_creates_indexes(self):
|
||||
"""Expected indexes should exist after schema creation."""
|
||||
conn = _make_conn()
|
||||
_ensure_schema(conn)
|
||||
cur = conn.execute("SELECT name FROM sqlite_master WHERE type='index'")
|
||||
indexes = {row[0] for row in cur.fetchall()}
|
||||
for idx in [
|
||||
"idx_memories_type",
|
||||
"idx_memories_time",
|
||||
"idx_memories_session",
|
||||
"idx_memories_agent",
|
||||
"idx_memories_source",
|
||||
]:
|
||||
assert idx in indexes, f"Missing index: {idx}"
|
||||
conn.close()
|
||||
|
||||
def test_idempotent(self):
|
||||
"""Calling _ensure_schema twice should not error."""
|
||||
conn = _make_conn()
|
||||
_ensure_schema(conn)
|
||||
_ensure_schema(conn) # no error
|
||||
conn.close()
|
||||
|
||||
def test_default_values(self):
|
||||
"""Inserted row should have correct defaults."""
|
||||
conn = _make_conn()
|
||||
_ensure_schema(conn)
|
||||
conn.execute(
|
||||
"INSERT INTO memories (id, content, created_at) VALUES (?, ?, ?)",
|
||||
("test-1", "hello", "2025-01-01T00:00:00"),
|
||||
)
|
||||
conn.commit()
|
||||
row = conn.execute("SELECT * FROM memories WHERE id='test-1'").fetchone()
|
||||
assert row["memory_type"] == "fact"
|
||||
assert row["source"] == "agent"
|
||||
assert row["confidence"] == 0.8
|
||||
assert row["tags"] == "[]"
|
||||
assert row["access_count"] == 0
|
||||
conn.close()
|
||||
|
||||
|
||||
# ── _get_table_columns ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGetTableColumns:
|
||||
"""Tests for _get_table_columns helper."""
|
||||
|
||||
def test_returns_column_names(self):
|
||||
conn = _make_conn()
|
||||
conn.execute("CREATE TABLE t (a TEXT, b INTEGER, c REAL)")
|
||||
assert _get_table_columns(conn, "t") == {"a", "b", "c"}
|
||||
conn.close()
|
||||
|
||||
def test_empty_for_missing_table(self):
|
||||
"""PRAGMA table_info on non-existent table returns empty set."""
|
||||
conn = _make_conn()
|
||||
assert _get_table_columns(conn, "nonexistent") == set()
|
||||
conn.close()
|
||||
|
||||
|
||||
# ── _migrate_schema ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMigrateSchema:
|
||||
"""Tests for _migrate_schema — old table migration."""
|
||||
|
||||
def _setup_memories_table(self, conn):
|
||||
"""Create the unified memories table for migration tests."""
|
||||
_ensure_schema(conn)
|
||||
|
||||
def test_no_old_tables_is_noop(self):
|
||||
"""Migration with only memories table does nothing."""
|
||||
conn = _make_conn()
|
||||
self._setup_memories_table(conn)
|
||||
# Should not raise
|
||||
_migrate_schema(conn)
|
||||
conn.close()
|
||||
|
||||
def test_migrates_episodes_with_context_type(self):
|
||||
"""Episodes rows migrate to memories with context_type as memory_type."""
|
||||
conn = _make_conn()
|
||||
self._setup_memories_table(conn)
|
||||
conn.execute("""
|
||||
CREATE TABLE episodes (
|
||||
id TEXT PRIMARY KEY,
|
||||
content TEXT,
|
||||
context_type TEXT,
|
||||
source TEXT,
|
||||
embedding TEXT,
|
||||
metadata TEXT,
|
||||
agent_id TEXT,
|
||||
task_id TEXT,
|
||||
session_id TEXT,
|
||||
timestamp TEXT
|
||||
)
|
||||
""")
|
||||
conn.execute(
|
||||
"INSERT INTO episodes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
"ep-1",
|
||||
"test content",
|
||||
"conversation",
|
||||
"agent",
|
||||
None,
|
||||
None,
|
||||
"a1",
|
||||
"t1",
|
||||
"s1",
|
||||
"2025-01-01T00:00:00",
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
_migrate_schema(conn)
|
||||
|
||||
# Episodes table should be dropped
|
||||
tables = {
|
||||
r[0]
|
||||
for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
|
||||
}
|
||||
assert "episodes" not in tables
|
||||
|
||||
# Data should be in memories
|
||||
row = conn.execute("SELECT * FROM memories WHERE id='ep-1'").fetchone()
|
||||
assert row is not None
|
||||
assert row["content"] == "test content"
|
||||
assert row["memory_type"] == "conversation"
|
||||
assert row["source"] == "agent"
|
||||
conn.close()
|
||||
|
||||
def test_migrates_episodes_without_context_type(self):
|
||||
"""Episodes without context_type column default to 'conversation'."""
|
||||
conn = _make_conn()
|
||||
self._setup_memories_table(conn)
|
||||
conn.execute("""
|
||||
CREATE TABLE episodes (
|
||||
id TEXT PRIMARY KEY,
|
||||
content TEXT,
|
||||
source TEXT,
|
||||
embedding TEXT,
|
||||
metadata TEXT,
|
||||
agent_id TEXT,
|
||||
task_id TEXT,
|
||||
session_id TEXT,
|
||||
timestamp TEXT
|
||||
)
|
||||
""")
|
||||
conn.execute(
|
||||
"INSERT INTO episodes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
("ep-2", "no type", "user", None, None, None, None, None, "2025-02-01"),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
_migrate_schema(conn)
|
||||
row = conn.execute("SELECT * FROM memories WHERE id='ep-2'").fetchone()
|
||||
assert row["memory_type"] == "conversation"
|
||||
conn.close()
|
||||
|
||||
def test_migrates_chunks(self):
|
||||
"""Chunks table migrates to memories as vault_chunk type."""
|
||||
conn = _make_conn()
|
||||
self._setup_memories_table(conn)
|
||||
conn.execute("""
|
||||
CREATE TABLE chunks (
|
||||
id TEXT PRIMARY KEY,
|
||||
content TEXT,
|
||||
filepath TEXT,
|
||||
embedding TEXT,
|
||||
created_at TEXT
|
||||
)
|
||||
""")
|
||||
conn.execute(
|
||||
"INSERT INTO chunks VALUES (?, ?, ?, ?, ?)",
|
||||
("ch-1", "chunk text", "/vault/note.md", None, "2025-03-01"),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
_migrate_schema(conn)
|
||||
|
||||
tables = {
|
||||
r[0]
|
||||
for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
|
||||
}
|
||||
assert "chunks" not in tables
|
||||
|
||||
row = conn.execute("SELECT * FROM memories WHERE id='ch-1'").fetchone()
|
||||
assert row is not None
|
||||
assert row["memory_type"] == "vault_chunk"
|
||||
assert row["source"] == "/vault/note.md"
|
||||
conn.close()
|
||||
|
||||
def test_migrates_chunks_minimal_columns(self):
|
||||
"""Chunks with minimal columns (text instead of content, no id)."""
|
||||
conn = _make_conn()
|
||||
self._setup_memories_table(conn)
|
||||
conn.execute("CREATE TABLE chunks (text TEXT, source TEXT)")
|
||||
conn.execute("INSERT INTO chunks VALUES (?, ?)", ("minimal chunk", "vault"))
|
||||
conn.commit()
|
||||
|
||||
_migrate_schema(conn)
|
||||
|
||||
rows = conn.execute("SELECT * FROM memories WHERE memory_type='vault_chunk'").fetchall()
|
||||
assert len(rows) == 1
|
||||
assert rows[0]["content"] == "minimal chunk"
|
||||
assert rows[0]["source"] == "vault"
|
||||
conn.close()
|
||||
|
||||
def test_drops_facts_table(self):
|
||||
"""Old facts table is dropped during migration."""
|
||||
conn = _make_conn()
|
||||
self._setup_memories_table(conn)
|
||||
conn.execute("CREATE TABLE facts (id TEXT PRIMARY KEY, content TEXT)")
|
||||
conn.commit()
|
||||
|
||||
_migrate_schema(conn)
|
||||
|
||||
tables = {
|
||||
r[0]
|
||||
for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
|
||||
}
|
||||
assert "facts" not in tables
|
||||
conn.close()
|
||||
|
||||
def test_migration_handles_episode_error_gracefully(self):
|
||||
"""If episode migration fails, it logs warning but continues."""
|
||||
conn = _make_conn()
|
||||
self._setup_memories_table(conn)
|
||||
# Create episodes with a column that will cause a conflict
|
||||
conn.execute("CREATE TABLE episodes (id TEXT PRIMARY KEY, content TEXT)")
|
||||
# Insert a row with the same ID already in memories to trigger conflict
|
||||
conn.execute(
|
||||
"INSERT INTO memories (id, content, created_at) VALUES (?, ?, ?)",
|
||||
("dup-id", "existing", "2025-01-01"),
|
||||
)
|
||||
conn.execute("INSERT INTO episodes VALUES (?, ?)", ("dup-id", "duplicate"))
|
||||
conn.commit()
|
||||
|
||||
# Should not raise — logs warning instead
|
||||
_migrate_schema(conn)
|
||||
conn.close()
|
||||
|
||||
def test_full_migration_all_old_tables(self):
|
||||
"""All three old tables migrate in a single pass."""
|
||||
conn = _make_conn()
|
||||
self._setup_memories_table(conn)
|
||||
|
||||
conn.execute("""
|
||||
CREATE TABLE episodes (
|
||||
id TEXT, content TEXT, context_type TEXT, source TEXT,
|
||||
embedding TEXT, metadata TEXT, agent_id TEXT, task_id TEXT,
|
||||
session_id TEXT, timestamp TEXT
|
||||
)
|
||||
""")
|
||||
conn.execute(
|
||||
"CREATE TABLE chunks (id TEXT, content TEXT, filepath TEXT, embedding TEXT, created_at TEXT)"
|
||||
)
|
||||
conn.execute("CREATE TABLE facts (id TEXT, content TEXT)")
|
||||
conn.execute(
|
||||
"INSERT INTO episodes VALUES ('e1','ep','conv','agent',NULL,NULL,NULL,NULL,NULL,'2025-01-01')"
|
||||
)
|
||||
conn.execute("INSERT INTO chunks VALUES ('c1','chunk','/f.md',NULL,'2025-01-01')")
|
||||
conn.commit()
|
||||
|
||||
_migrate_schema(conn)
|
||||
|
||||
tables = {
|
||||
r[0]
|
||||
for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
|
||||
}
|
||||
assert "episodes" not in tables
|
||||
assert "chunks" not in tables
|
||||
assert "facts" not in tables
|
||||
assert "memories" in tables
|
||||
|
||||
assert conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0] >= 2
|
||||
conn.close()
|
||||
|
||||
|
||||
# ── MemoryEntry dataclass ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMemoryEntry:
|
||||
"""Tests for the MemoryEntry dataclass."""
|
||||
|
||||
def test_defaults(self):
|
||||
entry = MemoryEntry()
|
||||
assert entry.content == ""
|
||||
assert entry.source == ""
|
||||
assert entry.context_type == "conversation"
|
||||
assert entry.agent_id is None
|
||||
assert entry.task_id is None
|
||||
assert entry.session_id is None
|
||||
assert entry.metadata is None
|
||||
assert entry.embedding is None
|
||||
assert entry.relevance_score is None
|
||||
# ID should be a valid UUID
|
||||
uuid.UUID(entry.id)
|
||||
# Timestamp should be ISO format
|
||||
assert "T" in entry.timestamp
|
||||
|
||||
def test_custom_values(self):
|
||||
entry = MemoryEntry(
|
||||
id="custom-id",
|
||||
content="hello world",
|
||||
source="user",
|
||||
context_type="fact",
|
||||
agent_id="agent-1",
|
||||
task_id="task-1",
|
||||
session_id="sess-1",
|
||||
metadata={"key": "value"},
|
||||
embedding=[0.1, 0.2, 0.3],
|
||||
timestamp="2025-06-01T00:00:00",
|
||||
relevance_score=0.95,
|
||||
)
|
||||
assert entry.id == "custom-id"
|
||||
assert entry.content == "hello world"
|
||||
assert entry.source == "user"
|
||||
assert entry.context_type == "fact"
|
||||
assert entry.agent_id == "agent-1"
|
||||
assert entry.metadata == {"key": "value"}
|
||||
assert entry.embedding == [0.1, 0.2, 0.3]
|
||||
assert entry.relevance_score == 0.95
|
||||
|
||||
def test_unique_ids(self):
|
||||
"""Each MemoryEntry gets a unique default ID."""
|
||||
a = MemoryEntry()
|
||||
b = MemoryEntry()
|
||||
assert a.id != b.id
|
||||
|
||||
|
||||
# ── MemoryChunk dataclass ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMemoryChunk:
|
||||
"""Tests for the MemoryChunk dataclass."""
|
||||
|
||||
def test_fields(self):
|
||||
chunk = MemoryChunk(
|
||||
id="ch-1",
|
||||
source="/vault/file.md",
|
||||
content="some text",
|
||||
embedding=[0.5, 0.6],
|
||||
created_at="2025-01-01T00:00:00",
|
||||
)
|
||||
assert chunk.id == "ch-1"
|
||||
assert chunk.source == "/vault/file.md"
|
||||
assert chunk.content == "some text"
|
||||
assert chunk.embedding == [0.5, 0.6]
|
||||
assert chunk.created_at == "2025-01-01T00:00:00"
|
||||
|
||||
def test_required_fields(self):
|
||||
"""MemoryChunk requires all fields — no defaults."""
|
||||
with pytest.raises(TypeError):
|
||||
MemoryChunk() # type: ignore[call-arg]
|
||||
Reference in New Issue
Block a user