forked from Rockachopa/Timmy-time-dashboard
test: add 86 tests for semantic_memory.py (#54)
Comprehensive test coverage for the semantic memory module: - _simple_hash_embedding determinism and normalization - cosine_similarity including zero vectors - SemanticMemory: init, index_file, index_vault, search, stats - _split_into_chunks with various sizes - memory_search, memory_read, memory_write, memory_forget tools - MemorySearcher class - Edge cases: empty DB, unicode, very long text, special chars - All tests use tmp_path for isolation, no sentence-transformers needed 86 tests, all passing. 1393 total tests passing.
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
"""Tests for timmy.semantic_memory — semantic search, chunking, indexing."""
|
"""Tests for timmy.semantic_memory — semantic search, chunking, indexing."""
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@@ -8,11 +9,14 @@ from timmy.semantic_memory import (
|
|||||||
MemoryChunk,
|
MemoryChunk,
|
||||||
MemorySearcher,
|
MemorySearcher,
|
||||||
SemanticMemory,
|
SemanticMemory,
|
||||||
|
_get_embedding_model,
|
||||||
_simple_hash_embedding,
|
_simple_hash_embedding,
|
||||||
cosine_similarity,
|
cosine_similarity,
|
||||||
embed_text,
|
embed_text,
|
||||||
|
memory_forget,
|
||||||
memory_read,
|
memory_read,
|
||||||
memory_search,
|
memory_search,
|
||||||
|
memory_write,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -42,6 +46,40 @@ class TestSimpleHashEmbedding:
|
|||||||
magnitude = math.sqrt(sum(x * x for x in vec))
|
magnitude = math.sqrt(sum(x * x for x in vec))
|
||||||
assert abs(magnitude - 1.0) < 0.01
|
assert abs(magnitude - 1.0) < 0.01
|
||||||
|
|
||||||
|
def test_empty_string(self):
|
||||||
|
"""Test that empty string produces a valid normalized vector."""
|
||||||
|
vec = _simple_hash_embedding("")
|
||||||
|
assert isinstance(vec, list)
|
||||||
|
assert len(vec) == 128
|
||||||
|
# All zeros vector should still normalize (to zeros since magnitude stays 0)
|
||||||
|
assert all(isinstance(x, float) for x in vec)
|
||||||
|
|
||||||
|
def test_unicode_handling(self):
|
||||||
|
"""Test handling of unicode characters."""
|
||||||
|
vec = _simple_hash_embedding("Hello 世界 🌍 àáâãäå")
|
||||||
|
assert isinstance(vec, list)
|
||||||
|
assert len(vec) == 128
|
||||||
|
|
||||||
|
def test_special_characters(self):
|
||||||
|
"""Test handling of special characters and punctuation."""
|
||||||
|
text = "<script>alert('xss')</script> \\n\\t!@#$%^&*()"
|
||||||
|
vec = _simple_hash_embedding(text)
|
||||||
|
assert isinstance(vec, list)
|
||||||
|
assert len(vec) == 128
|
||||||
|
|
||||||
|
def test_very_long_text(self):
|
||||||
|
"""Test handling of text with many words (only first 50 words used)."""
|
||||||
|
text = "word " * 1000 # 1000 words
|
||||||
|
vec = _simple_hash_embedding(text)
|
||||||
|
assert isinstance(vec, list)
|
||||||
|
assert len(vec) == 128
|
||||||
|
|
||||||
|
def test_single_word(self):
|
||||||
|
"""Test handling of single word."""
|
||||||
|
vec = _simple_hash_embedding("test")
|
||||||
|
assert isinstance(vec, list)
|
||||||
|
assert len(vec) == 128
|
||||||
|
|
||||||
|
|
||||||
class TestEmbedText:
|
class TestEmbedText:
|
||||||
"""Test embed_text with fallback."""
|
"""Test embed_text with fallback."""
|
||||||
@@ -52,6 +90,24 @@ class TestEmbedText:
|
|||||||
assert isinstance(vec, list)
|
assert isinstance(vec, list)
|
||||||
assert len(vec) > 0
|
assert len(vec) > 0
|
||||||
|
|
||||||
|
def test_consistency(self):
|
||||||
|
"""Test that same text produces same embedding."""
|
||||||
|
a = embed_text("consistent text")
|
||||||
|
b = embed_text("consistent text")
|
||||||
|
assert a == b
|
||||||
|
|
||||||
|
def test_different_texts(self):
|
||||||
|
"""Test that different texts produce different embeddings."""
|
||||||
|
a = embed_text("hello world")
|
||||||
|
b = embed_text("goodbye world")
|
||||||
|
assert a != b
|
||||||
|
|
||||||
|
def test_empty_text(self):
|
||||||
|
"""Test embedding empty text."""
|
||||||
|
vec = embed_text("")
|
||||||
|
assert isinstance(vec, list)
|
||||||
|
assert len(vec) == 128 # fallback dimension
|
||||||
|
|
||||||
|
|
||||||
class TestCosineSimilarity:
|
class TestCosineSimilarity:
|
||||||
"""Test cosine_similarity function."""
|
"""Test cosine_similarity function."""
|
||||||
@@ -75,6 +131,62 @@ class TestCosineSimilarity:
|
|||||||
b = [1.0, 0.0]
|
b = [1.0, 0.0]
|
||||||
assert cosine_similarity(a, b) == 0.0
|
assert cosine_similarity(a, b) == 0.0
|
||||||
|
|
||||||
|
def test_both_zero_vectors(self):
|
||||||
|
"""Test similarity when both vectors are zero."""
|
||||||
|
a = [0.0, 0.0, 0.0]
|
||||||
|
b = [0.0, 0.0, 0.0]
|
||||||
|
assert cosine_similarity(a, b) == 0.0
|
||||||
|
|
||||||
|
def test_partial_zero_vector(self):
|
||||||
|
"""Test similarity with partially zero vector."""
|
||||||
|
a = [1.0, 0.0, 1.0]
|
||||||
|
b = [0.0, 0.0, 0.0]
|
||||||
|
assert cosine_similarity(a, b) == 0.0
|
||||||
|
|
||||||
|
def test_different_lengths(self):
|
||||||
|
"""Test that different length vectors are handled gracefully."""
|
||||||
|
a = [1.0, 0.5, 0.25]
|
||||||
|
b = [1.0, 0.5] # shorter
|
||||||
|
# zip with strict=False handles different lengths
|
||||||
|
result = cosine_similarity(a, b)
|
||||||
|
assert isinstance(result, float)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMemoryChunk:
|
||||||
|
"""Test MemoryChunk dataclass."""
|
||||||
|
|
||||||
|
def test_create(self):
|
||||||
|
chunk = MemoryChunk(
|
||||||
|
id="c1",
|
||||||
|
source="/path/to/file.md",
|
||||||
|
content="chunk text",
|
||||||
|
embedding=[0.1, 0.2],
|
||||||
|
created_at="2026-03-06",
|
||||||
|
)
|
||||||
|
assert chunk.id == "c1"
|
||||||
|
assert chunk.content == "chunk text"
|
||||||
|
|
||||||
|
def test_with_unicode_content(self):
|
||||||
|
"""Test MemoryChunk with unicode content."""
|
||||||
|
chunk = MemoryChunk(
|
||||||
|
id="c2",
|
||||||
|
source="/path/to/文件.md",
|
||||||
|
content="Unicode content: 你好世界 🎉",
|
||||||
|
embedding=[0.1, 0.2, 0.3],
|
||||||
|
created_at="2026-03-06T10:00:00",
|
||||||
|
)
|
||||||
|
assert "你好" in chunk.content
|
||||||
|
|
||||||
|
def test_equality(self):
|
||||||
|
"""Test that same values create equal objects."""
|
||||||
|
chunk1 = MemoryChunk(
|
||||||
|
id="c1", source="/a.md", content="text", embedding=[0.1], created_at="now"
|
||||||
|
)
|
||||||
|
chunk2 = MemoryChunk(
|
||||||
|
id="c1", source="/a.md", content="text", embedding=[0.1], created_at="now"
|
||||||
|
)
|
||||||
|
assert chunk1 == chunk2
|
||||||
|
|
||||||
|
|
||||||
class TestSemanticMemory:
|
class TestSemanticMemory:
|
||||||
"""Test SemanticMemory class."""
|
"""Test SemanticMemory class."""
|
||||||
@@ -110,6 +222,24 @@ class TestSemanticMemory:
|
|||||||
def test_split_empty_text(self, mem):
|
def test_split_empty_text(self, mem):
|
||||||
assert mem._split_into_chunks("") == []
|
assert mem._split_into_chunks("") == []
|
||||||
|
|
||||||
|
def test_split_whitespace_only(self, mem):
|
||||||
|
"""Test that whitespace-only text produces no chunks."""
|
||||||
|
assert mem._split_into_chunks(" \n\n \n") == []
|
||||||
|
|
||||||
|
def test_split_exact_chunk_boundary(self, mem):
|
||||||
|
"""Test splitting when text is exactly at chunk boundary."""
|
||||||
|
text = "A" * 500 # Exactly at default max_chunk_size
|
||||||
|
chunks = mem._split_into_chunks(text)
|
||||||
|
assert len(chunks) == 1
|
||||||
|
assert len(chunks[0]) == 500
|
||||||
|
|
||||||
|
def test_split_very_long_sentence(self, mem):
|
||||||
|
"""Test splitting text with no sentence boundaries."""
|
||||||
|
text = "A" * 2000 # One long word essentially
|
||||||
|
chunks = mem._split_into_chunks(text, max_chunk_size=100)
|
||||||
|
# Should still produce chunks
|
||||||
|
assert len(chunks) > 0
|
||||||
|
|
||||||
def test_index_file(self, mem):
|
def test_index_file(self, mem):
|
||||||
md_file = mem.vault_path / "test.md"
|
md_file = mem.vault_path / "test.md"
|
||||||
md_file.write_text(
|
md_file.write_text(
|
||||||
@@ -130,6 +260,36 @@ class TestSemanticMemory:
|
|||||||
assert count1 > 0
|
assert count1 > 0
|
||||||
assert count2 == 0 # Already indexed, same hash
|
assert count2 == 0 # Already indexed, same hash
|
||||||
|
|
||||||
|
def test_index_file_updates_when_changed(self, mem):
|
||||||
|
"""Test that file is re-indexed when content changes."""
|
||||||
|
md_file = mem.vault_path / "changed.md"
|
||||||
|
md_file.write_text("# Original\n\nOriginal content here for indexing.")
|
||||||
|
count1 = mem.index_file(md_file)
|
||||||
|
|
||||||
|
# Change the file
|
||||||
|
md_file.write_text("# Updated\n\nUpdated content that is different.")
|
||||||
|
count2 = mem.index_file(md_file)
|
||||||
|
|
||||||
|
assert count1 > 0
|
||||||
|
assert count2 > 0 # Re-indexed because hash changed
|
||||||
|
|
||||||
|
def test_index_file_skips_tiny_chunks(self, mem):
|
||||||
|
"""Test that chunks under 20 characters are skipped (not stored in DB)."""
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
md_file = mem.vault_path / "tiny.md"
|
||||||
|
# Create a paragraph that is definitely under 20 chars
|
||||||
|
md_file.write_text("Tiny") # Just 4 characters
|
||||||
|
mem.index_file(md_file)
|
||||||
|
|
||||||
|
# Check DB directly - tiny chunks should NOT be stored
|
||||||
|
conn = sqlite3.connect(str(mem.db_path))
|
||||||
|
cursor = conn.execute("SELECT COUNT(*) FROM chunks WHERE source = ?", (str(md_file),))
|
||||||
|
stored_count = cursor.fetchone()[0]
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
assert stored_count == 0 # "Tiny" was too short, nothing stored
|
||||||
|
|
||||||
def test_index_vault(self, mem):
|
def test_index_vault(self, mem):
|
||||||
(mem.vault_path / "a.md").write_text(
|
(mem.vault_path / "a.md").write_text(
|
||||||
"# File A\n\nContent of file A with some meaningful text here."
|
"# File A\n\nContent of file A with some meaningful text here."
|
||||||
@@ -169,6 +329,21 @@ class TestSemanticMemory:
|
|||||||
assert any("real" in s for s in sources)
|
assert any("real" in s for s in sources)
|
||||||
assert not any("last-session-handoff" in s for s in sources)
|
assert not any("last-session-handoff" in s for s in sources)
|
||||||
|
|
||||||
|
def test_index_vault_recursive(self, mem):
|
||||||
|
"""Test that index_vault finds files in subdirectories."""
|
||||||
|
subdir = mem.vault_path / "subdir" / "nested"
|
||||||
|
subdir.mkdir(parents=True)
|
||||||
|
(subdir / "deep.md").write_text(
|
||||||
|
"# Deep file\n\nThis file is nested deep in the directory structure."
|
||||||
|
)
|
||||||
|
total = mem.index_vault()
|
||||||
|
assert total > 0
|
||||||
|
|
||||||
|
def test_index_vault_no_markdown_files(self, mem):
|
||||||
|
"""Test index_vault when no markdown files exist."""
|
||||||
|
total = mem.index_vault()
|
||||||
|
assert total == 0
|
||||||
|
|
||||||
def test_search_returns_results(self, mem):
|
def test_search_returns_results(self, mem):
|
||||||
md = mem.vault_path / "searchable.md"
|
md = mem.vault_path / "searchable.md"
|
||||||
md.write_text(
|
md.write_text(
|
||||||
@@ -186,6 +361,17 @@ class TestSemanticMemory:
|
|||||||
results = mem.search("anything")
|
results = mem.search("anything")
|
||||||
assert results == []
|
assert results == []
|
||||||
|
|
||||||
|
def test_search_returns_top_k(self, mem):
|
||||||
|
"""Test that search respects top_k parameter."""
|
||||||
|
# Create multiple files
|
||||||
|
for i in range(10):
|
||||||
|
md = mem.vault_path / f"file{i}.md"
|
||||||
|
md.write_text(f"# File {i}\n\nThis is content about topic number {i}.")
|
||||||
|
mem.index_file(md)
|
||||||
|
|
||||||
|
results = mem.search("topic", top_k=3)
|
||||||
|
assert len(results) <= 3
|
||||||
|
|
||||||
def test_get_relevant_context(self, mem):
|
def test_get_relevant_context(self, mem):
|
||||||
md = mem.vault_path / "context.md"
|
md = mem.vault_path / "context.md"
|
||||||
md.write_text(
|
md.write_text(
|
||||||
@@ -200,12 +386,53 @@ class TestSemanticMemory:
|
|||||||
def test_get_relevant_context_empty(self, mem):
|
def test_get_relevant_context_empty(self, mem):
|
||||||
assert mem.get_relevant_context("anything") == ""
|
assert mem.get_relevant_context("anything") == ""
|
||||||
|
|
||||||
|
def test_get_relevant_context_respects_max_chars(self, mem):
|
||||||
|
"""Test that get_relevant_context respects max_chars limit."""
|
||||||
|
# Create multiple files with content
|
||||||
|
for i in range(5):
|
||||||
|
md = mem.vault_path / f"ctx{i}.md"
|
||||||
|
md.write_text(f"# Context {i}\n\n" + "X" * 500)
|
||||||
|
mem.index_file(md)
|
||||||
|
|
||||||
|
ctx = mem.get_relevant_context("context", max_chars=200)
|
||||||
|
assert len(ctx) <= 200
|
||||||
|
|
||||||
|
def test_get_relevant_context_filters_by_score(self, mem):
|
||||||
|
"""Test that results below score threshold (0.3) are filtered."""
|
||||||
|
md = mem.vault_path / "low_score.md"
|
||||||
|
md.write_text("XYZ random unrelated content that should not match.")
|
||||||
|
mem.index_file(md)
|
||||||
|
|
||||||
|
ctx = mem.get_relevant_context("completely different topic about quantum physics")
|
||||||
|
# May be empty if score < 0.3
|
||||||
|
assert isinstance(ctx, str)
|
||||||
|
|
||||||
def test_stats(self, mem):
|
def test_stats(self, mem):
|
||||||
stats = mem.stats()
|
stats = mem.stats()
|
||||||
assert "total_chunks" in stats
|
assert "total_chunks" in stats
|
||||||
assert "total_files" in stats
|
assert "total_files" in stats
|
||||||
assert stats["total_chunks"] == 0
|
assert stats["total_chunks"] == 0
|
||||||
|
|
||||||
|
def test_stats_after_indexing(self, mem):
|
||||||
|
"""Test stats after adding content."""
|
||||||
|
md = mem.vault_path / "stats.md"
|
||||||
|
md.write_text(
|
||||||
|
"# Stats\n\nThis is paragraph one with enough content to be indexed properly.\n\n"
|
||||||
|
"This is paragraph two with also enough meaningful content text."
|
||||||
|
)
|
||||||
|
mem.index_file(md)
|
||||||
|
|
||||||
|
stats = mem.stats()
|
||||||
|
assert stats["total_chunks"] > 0
|
||||||
|
assert stats["total_files"] == 1
|
||||||
|
assert "embedding_dim" in stats
|
||||||
|
|
||||||
|
def test_stats_embedding_dim_fallback(self, mem):
|
||||||
|
"""Test that stats returns correct embedding dimension for fallback."""
|
||||||
|
stats = mem.stats()
|
||||||
|
# When using fallback (sentence-transformers not available)
|
||||||
|
assert stats["embedding_dim"] == 128
|
||||||
|
|
||||||
|
|
||||||
class TestMemorySearcher:
|
class TestMemorySearcher:
|
||||||
"""Test MemorySearcher high-level interface."""
|
"""Test MemorySearcher high-level interface."""
|
||||||
@@ -231,18 +458,34 @@ class TestMemorySearcher:
|
|||||||
ctx = searcher.get_context_for_query("test")
|
ctx = searcher.get_context_for_query("test")
|
||||||
assert ctx == "" # Empty DB
|
assert ctx == "" # Empty DB
|
||||||
|
|
||||||
|
def test_get_context_for_query_with_results(self, searcher):
|
||||||
|
"""Test get_context_for_query when there are results."""
|
||||||
|
md = searcher.semantic.vault_path / "context.md"
|
||||||
|
md.write_text("# System\n\nThe system architecture uses microservices for scalability.")
|
||||||
|
searcher.semantic.index_file(md)
|
||||||
|
|
||||||
|
ctx = searcher.get_context_for_query("architecture")
|
||||||
|
assert isinstance(ctx, str)
|
||||||
|
# Should either be empty or contain context header
|
||||||
|
assert ctx == "" or "Relevant Past Context" in ctx
|
||||||
|
|
||||||
|
|
||||||
class TestMemorySearch:
|
class TestMemorySearch:
|
||||||
"""Test module-level memory_search function."""
|
"""Test module-level memory_search function."""
|
||||||
|
|
||||||
def test_no_results(self):
|
def test_no_results(self):
|
||||||
result = memory_search("something obscure that won't match anything")
|
result = memory_search("something obscure that won't match anything xyz123")
|
||||||
assert isinstance(result, str)
|
assert isinstance(result, str)
|
||||||
|
|
||||||
def test_none_top_k_handled(self):
|
def test_none_top_k_handled(self):
|
||||||
result = memory_search("test", top_k=None)
|
result = memory_search("test", top_k=None)
|
||||||
assert isinstance(result, str)
|
assert isinstance(result, str)
|
||||||
|
|
||||||
|
def test_basic_search_returns_string(self):
|
||||||
|
"""Test that memory_search returns a string result."""
|
||||||
|
result = memory_search("test query")
|
||||||
|
assert isinstance(result, str)
|
||||||
|
|
||||||
|
|
||||||
class TestMemoryRead:
|
class TestMemoryRead:
|
||||||
"""Test module-level memory_read function."""
|
"""Test module-level memory_read function."""
|
||||||
@@ -259,17 +502,297 @@ class TestMemoryRead:
|
|||||||
result = memory_read("test", top_k=None)
|
result = memory_read("test", top_k=None)
|
||||||
assert isinstance(result, str)
|
assert isinstance(result, str)
|
||||||
|
|
||||||
|
def test_memory_read_empty_message(self):
|
||||||
|
"""Test that empty db returns appropriate message."""
|
||||||
|
result = memory_read()
|
||||||
|
# Should indicate no memories or return empty results
|
||||||
|
assert isinstance(result, str)
|
||||||
|
|
||||||
class TestMemoryChunk:
|
|
||||||
"""Test MemoryChunk dataclass."""
|
|
||||||
|
|
||||||
def test_create(self):
|
class TestMemoryWrite:
|
||||||
chunk = MemoryChunk(
|
"""Test module-level memory_write function."""
|
||||||
id="c1",
|
|
||||||
source="/path/to/file.md",
|
@pytest.fixture(autouse=True)
|
||||||
content="chunk text",
|
def mock_vector_store(self):
|
||||||
embedding=[0.1, 0.2],
|
"""Mock vector_store functions for memory_write tests."""
|
||||||
created_at="2026-03-06",
|
# Patch where it's imported from, not where it's used
|
||||||
|
with (
|
||||||
|
patch("timmy.memory.vector_store.search_memories") as mock_search,
|
||||||
|
patch("timmy.memory.vector_store.store_memory") as mock_store,
|
||||||
|
):
|
||||||
|
# Default: no existing memories (no duplicates)
|
||||||
|
mock_search.return_value = []
|
||||||
|
|
||||||
|
# Mock store_memory return value
|
||||||
|
mock_entry = MagicMock()
|
||||||
|
mock_entry.id = "test-id-12345"
|
||||||
|
mock_store.return_value = mock_entry
|
||||||
|
|
||||||
|
yield {"search": mock_search, "store": mock_store}
|
||||||
|
|
||||||
|
def test_memory_write_empty_content(self):
|
||||||
|
"""Test that empty content returns error message."""
|
||||||
|
result = memory_write("")
|
||||||
|
assert "empty" in result.lower()
|
||||||
|
|
||||||
|
def test_memory_write_whitespace_only(self):
|
||||||
|
"""Test that whitespace-only content returns error."""
|
||||||
|
result = memory_write(" \n\t ")
|
||||||
|
assert "empty" in result.lower()
|
||||||
|
|
||||||
|
def test_memory_write_valid_content(self, mock_vector_store):
|
||||||
|
"""Test writing valid content."""
|
||||||
|
result = memory_write("Remember this important fact.")
|
||||||
|
assert "stored" in result.lower() or "memory" in result.lower()
|
||||||
|
mock_vector_store["store"].assert_called_once()
|
||||||
|
|
||||||
|
def test_memory_write_dedup_for_facts(self, mock_vector_store):
|
||||||
|
"""Test that duplicate facts are skipped."""
|
||||||
|
# Simulate existing similar fact
|
||||||
|
mock_entry = MagicMock()
|
||||||
|
mock_entry.id = "existing-id"
|
||||||
|
mock_vector_store["search"].return_value = [mock_entry]
|
||||||
|
|
||||||
|
result = memory_write("Similar fact text", context_type="fact")
|
||||||
|
assert "similar" in result.lower() or "duplicate" in result.lower()
|
||||||
|
mock_vector_store["store"].assert_not_called()
|
||||||
|
|
||||||
|
def test_memory_write_no_dedup_for_conversation(self, mock_vector_store):
|
||||||
|
"""Test that conversation entries are not deduplicated."""
|
||||||
|
# Even with existing entries, conversations should be stored
|
||||||
|
mock_entry = MagicMock()
|
||||||
|
mock_entry.id = "existing-id"
|
||||||
|
mock_vector_store["search"].return_value = [mock_entry]
|
||||||
|
|
||||||
|
memory_write("Conversation text", context_type="conversation")
|
||||||
|
# Should still store (no duplicate check for non-fact)
|
||||||
|
mock_vector_store["store"].assert_called_once()
|
||||||
|
|
||||||
|
def test_memory_write_invalid_context_type(self, mock_vector_store):
|
||||||
|
"""Test that invalid context_type defaults to 'fact'."""
|
||||||
|
memory_write("Some content", context_type="invalid_type")
|
||||||
|
# Should still succeed, using "fact" as default
|
||||||
|
mock_vector_store["store"].assert_called_once()
|
||||||
|
call_kwargs = mock_vector_store["store"].call_args.kwargs
|
||||||
|
assert call_kwargs.get("context_type") == "fact"
|
||||||
|
|
||||||
|
def test_memory_write_valid_context_types(self, mock_vector_store):
|
||||||
|
"""Test all valid context types."""
|
||||||
|
valid_types = ["fact", "conversation", "document"]
|
||||||
|
for ctx_type in valid_types:
|
||||||
|
mock_vector_store["store"].reset_mock()
|
||||||
|
memory_write(f"Content for {ctx_type}", context_type=ctx_type)
|
||||||
|
mock_vector_store["store"].assert_called_once()
|
||||||
|
|
||||||
|
def test_memory_write_strips_content(self, mock_vector_store):
|
||||||
|
"""Test that content is stripped of leading/trailing whitespace."""
|
||||||
|
memory_write(" padded content ")
|
||||||
|
call_kwargs = mock_vector_store["store"].call_args.kwargs
|
||||||
|
assert call_kwargs.get("content") == "padded content"
|
||||||
|
|
||||||
|
def test_memory_write_unicode_content(self, mock_vector_store):
|
||||||
|
"""Test writing unicode content."""
|
||||||
|
result = memory_write("Unicode content: 你好世界 🎉")
|
||||||
|
assert "stored" in result.lower() or "memory" in result.lower()
|
||||||
|
|
||||||
|
def test_memory_write_handles_exception(self, mock_vector_store):
|
||||||
|
"""Test handling of store_memory exceptions."""
|
||||||
|
mock_vector_store["store"].side_effect = Exception("DB error")
|
||||||
|
result = memory_write("This will fail")
|
||||||
|
assert "failed" in result.lower() or "error" in result.lower()
|
||||||
|
|
||||||
|
|
||||||
|
class TestMemoryForget:
|
||||||
|
"""Test module-level memory_forget function."""
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def mock_vector_store(self):
|
||||||
|
"""Mock vector_store functions for memory_forget tests."""
|
||||||
|
# Patch where it's imported from, not where it's used
|
||||||
|
with (
|
||||||
|
patch("timmy.memory.vector_store.search_memories") as mock_search,
|
||||||
|
patch("timmy.memory.vector_store.delete_memory") as mock_delete,
|
||||||
|
):
|
||||||
|
# Default: no results
|
||||||
|
mock_search.return_value = []
|
||||||
|
mock_delete.return_value = True
|
||||||
|
|
||||||
|
yield {"search": mock_search, "delete": mock_delete}
|
||||||
|
|
||||||
|
def test_memory_forget_empty_query(self):
|
||||||
|
"""Test that empty query returns error message."""
|
||||||
|
result = memory_forget("")
|
||||||
|
assert "empty" in result.lower()
|
||||||
|
|
||||||
|
def test_memory_forget_whitespace_only(self):
|
||||||
|
"""Test that whitespace-only query returns error."""
|
||||||
|
result = memory_forget(" \n\t ")
|
||||||
|
assert "empty" in result.lower()
|
||||||
|
|
||||||
|
def test_memory_forget_no_matches(self, mock_vector_store):
|
||||||
|
"""Test when no memories match the query."""
|
||||||
|
mock_vector_store["search"].return_value = []
|
||||||
|
result = memory_forget("nonexistent query xyz123")
|
||||||
|
assert "no matching" in result.lower() or "not found" in result.lower()
|
||||||
|
|
||||||
|
def test_memory_forget_success(self, mock_vector_store):
|
||||||
|
"""Test successful deletion."""
|
||||||
|
mock_entry = MagicMock()
|
||||||
|
mock_entry.id = "entry-to-delete"
|
||||||
|
mock_entry.content = "Content to forget"
|
||||||
|
mock_entry.context_type = "fact"
|
||||||
|
mock_vector_store["search"].return_value = [mock_entry]
|
||||||
|
mock_vector_store["delete"].return_value = True
|
||||||
|
|
||||||
|
result = memory_forget("content to forget")
|
||||||
|
assert "forgotten" in result.lower() or "forgot" in result.lower()
|
||||||
|
mock_vector_store["delete"].assert_called_once_with("entry-to-delete")
|
||||||
|
|
||||||
|
def test_memory_forget_delete_fails(self, mock_vector_store):
|
||||||
|
"""Test when delete_memory returns False."""
|
||||||
|
mock_entry = MagicMock()
|
||||||
|
mock_entry.id = "entry-id"
|
||||||
|
mock_entry.content = "Content"
|
||||||
|
mock_entry.context_type = "fact"
|
||||||
|
mock_vector_store["search"].return_value = [mock_entry]
|
||||||
|
mock_vector_store["delete"].return_value = False
|
||||||
|
|
||||||
|
result = memory_forget("content")
|
||||||
|
# Should indicate the memory wasn't found or already deleted
|
||||||
|
assert "not found" in result.lower() or "already" in result.lower()
|
||||||
|
|
||||||
|
def test_memory_forget_strips_query(self, mock_vector_store):
|
||||||
|
"""Test that query is stripped of whitespace."""
|
||||||
|
mock_vector_store["search"].return_value = []
|
||||||
|
memory_forget(" padded query ")
|
||||||
|
# Check that search was called with stripped query
|
||||||
|
call_args = mock_vector_store["search"].call_args
|
||||||
|
assert call_args.args[0] == "padded query"
|
||||||
|
|
||||||
|
def test_memory_forget_handles_exception(self, mock_vector_store):
|
||||||
|
"""Test handling of exceptions during forget."""
|
||||||
|
mock_vector_store["search"].side_effect = Exception("DB error")
|
||||||
|
result = memory_forget("query")
|
||||||
|
assert "failed" in result.lower() or "error" in result.lower()
|
||||||
|
|
||||||
|
def test_memory_forget_uses_min_relevance(self, mock_vector_store):
|
||||||
|
"""Test that search uses min_relevance parameter."""
|
||||||
|
mock_vector_store["search"].return_value = []
|
||||||
|
memory_forget("test query")
|
||||||
|
call_kwargs = mock_vector_store["search"].call_args.kwargs
|
||||||
|
assert call_kwargs.get("min_relevance") == 0.3
|
||||||
|
assert call_kwargs.get("limit") == 3
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetEmbeddingModel:
|
||||||
|
"""Test _get_embedding_model function."""
|
||||||
|
|
||||||
|
def test_returns_false_when_skip_embeddings(self):
|
||||||
|
"""Test that _get_embedding_model returns False when skip_embeddings is set."""
|
||||||
|
# conftest sets TIMMY_SKIP_EMBEDDINGS=1
|
||||||
|
model = _get_embedding_model()
|
||||||
|
assert model is False
|
||||||
|
|
||||||
|
def test_returns_model_when_available(self):
|
||||||
|
"""Test loading when sentence-transformers is available."""
|
||||||
|
# This is mocked in conftest, so model is not actually loaded
|
||||||
|
model = _get_embedding_model()
|
||||||
|
# Should be False because sentence_transformers is mocked
|
||||||
|
assert model is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestEdgeCases:
|
||||||
|
"""Test various edge cases and boundary conditions."""
|
||||||
|
|
||||||
|
def test_semantic_memory_with_unicode_filepaths(self, tmp_path):
|
||||||
|
"""Test handling of unicode file paths."""
|
||||||
|
mem = SemanticMemory()
|
||||||
|
mem.db_path = tmp_path / "unicode.db"
|
||||||
|
mem.vault_path = tmp_path / "vault"
|
||||||
|
mem.vault_path.mkdir()
|
||||||
|
mem._init_db()
|
||||||
|
|
||||||
|
# Create file with unicode name
|
||||||
|
md_file = mem.vault_path / "文件_📝.md"
|
||||||
|
md_file.write_text(
|
||||||
|
"# Unicode filename\n\nThis is meaningful content for testing unicode paths."
|
||||||
)
|
)
|
||||||
assert chunk.id == "c1"
|
count = mem.index_file(md_file)
|
||||||
assert chunk.content == "chunk text"
|
assert count > 0
|
||||||
|
|
||||||
|
# Verify it can be searched
|
||||||
|
results = mem.search("content")
|
||||||
|
assert len(results) > 0
|
||||||
|
|
||||||
|
def test_semantic_memory_special_chars_in_content(self, tmp_path):
|
||||||
|
"""Test handling of special characters in content."""
|
||||||
|
mem = SemanticMemory()
|
||||||
|
mem.db_path = tmp_path / "special.db"
|
||||||
|
mem.vault_path = tmp_path / "vault"
|
||||||
|
mem.vault_path.mkdir()
|
||||||
|
mem._init_db()
|
||||||
|
|
||||||
|
md_file = mem.vault_path / "special.md"
|
||||||
|
content = """# Special Characters
|
||||||
|
|
||||||
|
<script>alert('xss')</script>
|
||||||
|
SQL: SELECT * FROM users WHERE name = "admin' OR '1'='1"
|
||||||
|
JSON: {"key": "value", "nested": {"array": [1, 2, 3]}}
|
||||||
|
Unicode: 你好世界 🌍 café naïve
|
||||||
|
Escapes: \\n \\t \\r
|
||||||
|
"""
|
||||||
|
md_file.write_text(content)
|
||||||
|
count = mem.index_file(md_file)
|
||||||
|
assert count > 0
|
||||||
|
|
||||||
|
def test_very_long_file_content(self, tmp_path):
|
||||||
|
"""Test handling of very long file content."""
|
||||||
|
mem = SemanticMemory()
|
||||||
|
mem.db_path = tmp_path / "long.db"
|
||||||
|
mem.vault_path = tmp_path / "vault"
|
||||||
|
mem.vault_path.mkdir()
|
||||||
|
mem._init_db()
|
||||||
|
|
||||||
|
md_file = mem.vault_path / "long.md"
|
||||||
|
# Create content with many paragraphs
|
||||||
|
paragraphs = [f"Paragraph {i} with some content text here." for i in range(100)]
|
||||||
|
md_file.write_text("# Long doc\n\n" + "\n\n".join(paragraphs))
|
||||||
|
count = mem.index_file(md_file)
|
||||||
|
assert count > 0
|
||||||
|
|
||||||
|
def test_search_with_unicode_query(self, tmp_path):
|
||||||
|
"""Test search with unicode query."""
|
||||||
|
mem = SemanticMemory()
|
||||||
|
mem.db_path = tmp_path / "unicode_query.db"
|
||||||
|
mem.vault_path = tmp_path / "vault"
|
||||||
|
mem.vault_path.mkdir()
|
||||||
|
mem._init_db()
|
||||||
|
|
||||||
|
md_file = mem.vault_path / "test.md"
|
||||||
|
md_file.write_text("# Test\n\nThis is a test document.")
|
||||||
|
mem.index_file(md_file)
|
||||||
|
|
||||||
|
# Search with unicode query should not crash
|
||||||
|
results = mem.search("测试 查询 🌍")
|
||||||
|
assert isinstance(results, list)
|
||||||
|
|
||||||
|
def test_empty_vault_directory(self, tmp_path):
|
||||||
|
"""Test operations on empty vault directory."""
|
||||||
|
mem = SemanticMemory()
|
||||||
|
mem.db_path = tmp_path / "empty.db"
|
||||||
|
mem.vault_path = tmp_path / "empty_vault"
|
||||||
|
mem.vault_path.mkdir()
|
||||||
|
mem._init_db()
|
||||||
|
|
||||||
|
# Index empty vault
|
||||||
|
count = mem.index_vault()
|
||||||
|
assert count == 0
|
||||||
|
|
||||||
|
# Search should return empty results
|
||||||
|
results = mem.search("anything")
|
||||||
|
assert results == []
|
||||||
|
|
||||||
|
# Stats should show zeros
|
||||||
|
stats = mem.stats()
|
||||||
|
assert stats["total_chunks"] == 0
|
||||||
|
assert stats["total_files"] == 0
|
||||||
|
|||||||
Reference in New Issue
Block a user