Files
Timmy-time-dashboard/tests/unit/test_memory_unified.py
Kimi Agent db7220db5a
Some checks failed
Tests / lint (push) Successful in 4s
Tests / test (push) Has been cancelled
test: add unit tests for memory/unified.py (#353)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-18 21:23:03 -04:00

461 lines
16 KiB
Python

"""Unit tests for timmy.memory.unified — schema, migration, and dataclasses."""
import sqlite3
import uuid
from unittest.mock import patch
import pytest
from timmy.memory.unified import (
MemoryChunk,
MemoryEntry,
_ensure_schema,
_get_table_columns,
_migrate_schema,
get_conn,
get_connection,
)
# ── Helpers ──────────────────────────────────────────────────────────────────
def _make_conn() -> sqlite3.Connection:
"""Return an in-memory SQLite connection with row_factory set."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
return conn
# ── get_connection / get_conn ────────────────────────────────────────────────
class TestGetConnection:
"""Tests for get_connection context manager."""
def test_returns_connection(self, tmp_path):
"""get_connection yields a usable sqlite3 connection."""
db = tmp_path / "mem.db"
with patch("timmy.memory.unified.DB_PATH", db):
with get_connection() as conn:
assert isinstance(conn, sqlite3.Connection)
# Schema should already be created
cur = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='memories'"
)
assert cur.fetchone() is not None
def test_creates_parent_directory(self, tmp_path):
"""get_connection creates parent dirs if missing."""
db = tmp_path / "sub" / "dir" / "mem.db"
with patch("timmy.memory.unified.DB_PATH", db):
with get_connection() as conn:
conn.execute("SELECT 1")
assert db.parent.exists()
def test_get_conn_alias(self):
"""get_conn is the same object as get_connection."""
assert get_conn is get_connection
def test_connection_has_row_factory(self, tmp_path):
"""Connection should use sqlite3.Row factory."""
db = tmp_path / "mem.db"
with patch("timmy.memory.unified.DB_PATH", db):
with get_connection() as conn:
assert conn.row_factory is sqlite3.Row
def test_wal_mode_enabled(self, tmp_path):
"""Connection enables WAL journal mode."""
db = tmp_path / "mem.db"
with patch("timmy.memory.unified.DB_PATH", db):
with get_connection() as conn:
cur = conn.execute("PRAGMA journal_mode")
mode = cur.fetchone()[0]
assert mode == "wal"
# ── _ensure_schema ───────────────────────────────────────────────────────────
class TestEnsureSchema:
"""Tests for _ensure_schema."""
def test_creates_memories_table(self):
"""Memories table should be created with correct columns."""
conn = _make_conn()
_ensure_schema(conn)
cols = _get_table_columns(conn, "memories")
expected = {
"id",
"content",
"memory_type",
"source",
"embedding",
"metadata",
"source_hash",
"agent_id",
"task_id",
"session_id",
"confidence",
"tags",
"created_at",
"last_accessed",
"access_count",
}
assert cols == expected
conn.close()
def test_creates_indexes(self):
"""Expected indexes should exist after schema creation."""
conn = _make_conn()
_ensure_schema(conn)
cur = conn.execute("SELECT name FROM sqlite_master WHERE type='index'")
indexes = {row[0] for row in cur.fetchall()}
for idx in [
"idx_memories_type",
"idx_memories_time",
"idx_memories_session",
"idx_memories_agent",
"idx_memories_source",
]:
assert idx in indexes, f"Missing index: {idx}"
conn.close()
def test_idempotent(self):
"""Calling _ensure_schema twice should not error."""
conn = _make_conn()
_ensure_schema(conn)
_ensure_schema(conn) # no error
conn.close()
def test_default_values(self):
"""Inserted row should have correct defaults."""
conn = _make_conn()
_ensure_schema(conn)
conn.execute(
"INSERT INTO memories (id, content, created_at) VALUES (?, ?, ?)",
("test-1", "hello", "2025-01-01T00:00:00"),
)
conn.commit()
row = conn.execute("SELECT * FROM memories WHERE id='test-1'").fetchone()
assert row["memory_type"] == "fact"
assert row["source"] == "agent"
assert row["confidence"] == 0.8
assert row["tags"] == "[]"
assert row["access_count"] == 0
conn.close()
# ── _get_table_columns ───────────────────────────────────────────────────────
class TestGetTableColumns:
"""Tests for _get_table_columns helper."""
def test_returns_column_names(self):
conn = _make_conn()
conn.execute("CREATE TABLE t (a TEXT, b INTEGER, c REAL)")
assert _get_table_columns(conn, "t") == {"a", "b", "c"}
conn.close()
def test_empty_for_missing_table(self):
"""PRAGMA table_info on non-existent table returns empty set."""
conn = _make_conn()
assert _get_table_columns(conn, "nonexistent") == set()
conn.close()
# ── _migrate_schema ──────────────────────────────────────────────────────────
class TestMigrateSchema:
"""Tests for _migrate_schema — old table migration."""
def _setup_memories_table(self, conn):
"""Create the unified memories table for migration tests."""
_ensure_schema(conn)
def test_no_old_tables_is_noop(self):
"""Migration with only memories table does nothing."""
conn = _make_conn()
self._setup_memories_table(conn)
# Should not raise
_migrate_schema(conn)
conn.close()
def test_migrates_episodes_with_context_type(self):
"""Episodes rows migrate to memories with context_type as memory_type."""
conn = _make_conn()
self._setup_memories_table(conn)
conn.execute("""
CREATE TABLE episodes (
id TEXT PRIMARY KEY,
content TEXT,
context_type TEXT,
source TEXT,
embedding TEXT,
metadata TEXT,
agent_id TEXT,
task_id TEXT,
session_id TEXT,
timestamp TEXT
)
""")
conn.execute(
"INSERT INTO episodes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
"ep-1",
"test content",
"conversation",
"agent",
None,
None,
"a1",
"t1",
"s1",
"2025-01-01T00:00:00",
),
)
conn.commit()
_migrate_schema(conn)
# Episodes table should be dropped
tables = {
r[0]
for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
}
assert "episodes" not in tables
# Data should be in memories
row = conn.execute("SELECT * FROM memories WHERE id='ep-1'").fetchone()
assert row is not None
assert row["content"] == "test content"
assert row["memory_type"] == "conversation"
assert row["source"] == "agent"
conn.close()
def test_migrates_episodes_without_context_type(self):
"""Episodes without context_type column default to 'conversation'."""
conn = _make_conn()
self._setup_memories_table(conn)
conn.execute("""
CREATE TABLE episodes (
id TEXT PRIMARY KEY,
content TEXT,
source TEXT,
embedding TEXT,
metadata TEXT,
agent_id TEXT,
task_id TEXT,
session_id TEXT,
timestamp TEXT
)
""")
conn.execute(
"INSERT INTO episodes VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
("ep-2", "no type", "user", None, None, None, None, None, "2025-02-01"),
)
conn.commit()
_migrate_schema(conn)
row = conn.execute("SELECT * FROM memories WHERE id='ep-2'").fetchone()
assert row["memory_type"] == "conversation"
conn.close()
def test_migrates_chunks(self):
"""Chunks table migrates to memories as vault_chunk type."""
conn = _make_conn()
self._setup_memories_table(conn)
conn.execute("""
CREATE TABLE chunks (
id TEXT PRIMARY KEY,
content TEXT,
filepath TEXT,
embedding TEXT,
created_at TEXT
)
""")
conn.execute(
"INSERT INTO chunks VALUES (?, ?, ?, ?, ?)",
("ch-1", "chunk text", "/vault/note.md", None, "2025-03-01"),
)
conn.commit()
_migrate_schema(conn)
tables = {
r[0]
for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
}
assert "chunks" not in tables
row = conn.execute("SELECT * FROM memories WHERE id='ch-1'").fetchone()
assert row is not None
assert row["memory_type"] == "vault_chunk"
assert row["source"] == "/vault/note.md"
conn.close()
def test_migrates_chunks_minimal_columns(self):
"""Chunks with minimal columns (text instead of content, no id)."""
conn = _make_conn()
self._setup_memories_table(conn)
conn.execute("CREATE TABLE chunks (text TEXT, source TEXT)")
conn.execute("INSERT INTO chunks VALUES (?, ?)", ("minimal chunk", "vault"))
conn.commit()
_migrate_schema(conn)
rows = conn.execute("SELECT * FROM memories WHERE memory_type='vault_chunk'").fetchall()
assert len(rows) == 1
assert rows[0]["content"] == "minimal chunk"
assert rows[0]["source"] == "vault"
conn.close()
def test_drops_facts_table(self):
"""Old facts table is dropped during migration."""
conn = _make_conn()
self._setup_memories_table(conn)
conn.execute("CREATE TABLE facts (id TEXT PRIMARY KEY, content TEXT)")
conn.commit()
_migrate_schema(conn)
tables = {
r[0]
for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
}
assert "facts" not in tables
conn.close()
def test_migration_handles_episode_error_gracefully(self):
"""If episode migration fails, it logs warning but continues."""
conn = _make_conn()
self._setup_memories_table(conn)
# Create episodes with a column that will cause a conflict
conn.execute("CREATE TABLE episodes (id TEXT PRIMARY KEY, content TEXT)")
# Insert a row with the same ID already in memories to trigger conflict
conn.execute(
"INSERT INTO memories (id, content, created_at) VALUES (?, ?, ?)",
("dup-id", "existing", "2025-01-01"),
)
conn.execute("INSERT INTO episodes VALUES (?, ?)", ("dup-id", "duplicate"))
conn.commit()
# Should not raise — logs warning instead
_migrate_schema(conn)
conn.close()
def test_full_migration_all_old_tables(self):
"""All three old tables migrate in a single pass."""
conn = _make_conn()
self._setup_memories_table(conn)
conn.execute("""
CREATE TABLE episodes (
id TEXT, content TEXT, context_type TEXT, source TEXT,
embedding TEXT, metadata TEXT, agent_id TEXT, task_id TEXT,
session_id TEXT, timestamp TEXT
)
""")
conn.execute(
"CREATE TABLE chunks (id TEXT, content TEXT, filepath TEXT, embedding TEXT, created_at TEXT)"
)
conn.execute("CREATE TABLE facts (id TEXT, content TEXT)")
conn.execute(
"INSERT INTO episodes VALUES ('e1','ep','conv','agent',NULL,NULL,NULL,NULL,NULL,'2025-01-01')"
)
conn.execute("INSERT INTO chunks VALUES ('c1','chunk','/f.md',NULL,'2025-01-01')")
conn.commit()
_migrate_schema(conn)
tables = {
r[0]
for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
}
assert "episodes" not in tables
assert "chunks" not in tables
assert "facts" not in tables
assert "memories" in tables
assert conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0] >= 2
conn.close()
# ── MemoryEntry dataclass ────────────────────────────────────────────────────
class TestMemoryEntry:
"""Tests for the MemoryEntry dataclass."""
def test_defaults(self):
entry = MemoryEntry()
assert entry.content == ""
assert entry.source == ""
assert entry.context_type == "conversation"
assert entry.agent_id is None
assert entry.task_id is None
assert entry.session_id is None
assert entry.metadata is None
assert entry.embedding is None
assert entry.relevance_score is None
# ID should be a valid UUID
uuid.UUID(entry.id)
# Timestamp should be ISO format
assert "T" in entry.timestamp
def test_custom_values(self):
entry = MemoryEntry(
id="custom-id",
content="hello world",
source="user",
context_type="fact",
agent_id="agent-1",
task_id="task-1",
session_id="sess-1",
metadata={"key": "value"},
embedding=[0.1, 0.2, 0.3],
timestamp="2025-06-01T00:00:00",
relevance_score=0.95,
)
assert entry.id == "custom-id"
assert entry.content == "hello world"
assert entry.source == "user"
assert entry.context_type == "fact"
assert entry.agent_id == "agent-1"
assert entry.metadata == {"key": "value"}
assert entry.embedding == [0.1, 0.2, 0.3]
assert entry.relevance_score == 0.95
def test_unique_ids(self):
"""Each MemoryEntry gets a unique default ID."""
a = MemoryEntry()
b = MemoryEntry()
assert a.id != b.id
# ── MemoryChunk dataclass ────────────────────────────────────────────────────
class TestMemoryChunk:
"""Tests for the MemoryChunk dataclass."""
def test_fields(self):
chunk = MemoryChunk(
id="ch-1",
source="/vault/file.md",
content="some text",
embedding=[0.5, 0.6],
created_at="2025-01-01T00:00:00",
)
assert chunk.id == "ch-1"
assert chunk.source == "/vault/file.md"
assert chunk.content == "some text"
assert chunk.embedding == [0.5, 0.6]
assert chunk.created_at == "2025-01-01T00:00:00"
def test_required_fields(self):
"""MemoryChunk requires all fields — no defaults."""
with pytest.raises(TypeError):
MemoryChunk() # type: ignore[call-arg]