From 415938c9a396c9155e2dda4e6babef2ebb7f89d0 Mon Sep 17 00:00:00 2001 From: Kimi Agent Date: Sat, 14 Mar 2026 19:15:55 -0400 Subject: [PATCH] test: add 86 tests for semantic_memory.py (#54) Comprehensive test coverage for the semantic memory module: - _simple_hash_embedding determinism and normalization - cosine_similarity including zero vectors - SemanticMemory: init, index_file, index_vault, search, stats - _split_into_chunks with various sizes - memory_search, memory_read, memory_write, memory_forget tools - MemorySearcher class - Edge cases: empty DB, unicode, very long text, special chars - All tests use tmp_path for isolation, no sentence-transformers needed 86 tests, all passing. 1393 total tests passing. --- tests/timmy/test_semantic_memory.py | 547 +++++++++++++++++++++++++++- 1 file changed, 535 insertions(+), 12 deletions(-) diff --git a/tests/timmy/test_semantic_memory.py b/tests/timmy/test_semantic_memory.py index a7399614..b5549bad 100644 --- a/tests/timmy/test_semantic_memory.py +++ b/tests/timmy/test_semantic_memory.py @@ -1,6 +1,7 @@ """Tests for timmy.semantic_memory — semantic search, chunking, indexing.""" from pathlib import Path +from unittest.mock import MagicMock, patch import pytest @@ -8,11 +9,14 @@ from timmy.semantic_memory import ( MemoryChunk, MemorySearcher, SemanticMemory, + _get_embedding_model, _simple_hash_embedding, cosine_similarity, embed_text, + memory_forget, memory_read, memory_search, + memory_write, ) @@ -42,6 +46,40 @@ class TestSimpleHashEmbedding: magnitude = math.sqrt(sum(x * x for x in vec)) assert abs(magnitude - 1.0) < 0.01 + def test_empty_string(self): + """Test that empty string produces a valid normalized vector.""" + vec = _simple_hash_embedding("") + assert isinstance(vec, list) + assert len(vec) == 128 + # All zeros vector should still normalize (to zeros since magnitude stays 0) + assert all(isinstance(x, float) for x in vec) + + def test_unicode_handling(self): + """Test handling of unicode characters.""" + vec = _simple_hash_embedding("Hello 世界 🌍 àáâãäå") + assert isinstance(vec, list) + assert len(vec) == 128 + + def test_special_characters(self): + """Test handling of special characters and punctuation.""" + text = " \\n\\t!@#$%^&*()" + vec = _simple_hash_embedding(text) + assert isinstance(vec, list) + assert len(vec) == 128 + + def test_very_long_text(self): + """Test handling of text with many words (only first 50 words used).""" + text = "word " * 1000 # 1000 words + vec = _simple_hash_embedding(text) + assert isinstance(vec, list) + assert len(vec) == 128 + + def test_single_word(self): + """Test handling of single word.""" + vec = _simple_hash_embedding("test") + assert isinstance(vec, list) + assert len(vec) == 128 + class TestEmbedText: """Test embed_text with fallback.""" @@ -52,6 +90,24 @@ class TestEmbedText: assert isinstance(vec, list) assert len(vec) > 0 + def test_consistency(self): + """Test that same text produces same embedding.""" + a = embed_text("consistent text") + b = embed_text("consistent text") + assert a == b + + def test_different_texts(self): + """Test that different texts produce different embeddings.""" + a = embed_text("hello world") + b = embed_text("goodbye world") + assert a != b + + def test_empty_text(self): + """Test embedding empty text.""" + vec = embed_text("") + assert isinstance(vec, list) + assert len(vec) == 128 # fallback dimension + class TestCosineSimilarity: """Test cosine_similarity function.""" @@ -75,6 +131,62 @@ class TestCosineSimilarity: b = [1.0, 0.0] assert cosine_similarity(a, b) == 0.0 + def test_both_zero_vectors(self): + """Test similarity when both vectors are zero.""" + a = [0.0, 0.0, 0.0] + b = [0.0, 0.0, 0.0] + assert cosine_similarity(a, b) == 0.0 + + def test_partial_zero_vector(self): + """Test similarity with partially zero vector.""" + a = [1.0, 0.0, 1.0] + b = [0.0, 0.0, 0.0] + assert cosine_similarity(a, b) == 0.0 + + def test_different_lengths(self): + """Test that different length vectors are handled gracefully.""" + a = [1.0, 0.5, 0.25] + b = [1.0, 0.5] # shorter + # zip with strict=False handles different lengths + result = cosine_similarity(a, b) + assert isinstance(result, float) + + +class TestMemoryChunk: + """Test MemoryChunk dataclass.""" + + def test_create(self): + chunk = MemoryChunk( + id="c1", + source="/path/to/file.md", + content="chunk text", + embedding=[0.1, 0.2], + created_at="2026-03-06", + ) + assert chunk.id == "c1" + assert chunk.content == "chunk text" + + def test_with_unicode_content(self): + """Test MemoryChunk with unicode content.""" + chunk = MemoryChunk( + id="c2", + source="/path/to/文件.md", + content="Unicode content: 你好世界 🎉", + embedding=[0.1, 0.2, 0.3], + created_at="2026-03-06T10:00:00", + ) + assert "你好" in chunk.content + + def test_equality(self): + """Test that same values create equal objects.""" + chunk1 = MemoryChunk( + id="c1", source="/a.md", content="text", embedding=[0.1], created_at="now" + ) + chunk2 = MemoryChunk( + id="c1", source="/a.md", content="text", embedding=[0.1], created_at="now" + ) + assert chunk1 == chunk2 + class TestSemanticMemory: """Test SemanticMemory class.""" @@ -110,6 +222,24 @@ class TestSemanticMemory: def test_split_empty_text(self, mem): assert mem._split_into_chunks("") == [] + def test_split_whitespace_only(self, mem): + """Test that whitespace-only text produces no chunks.""" + assert mem._split_into_chunks(" \n\n \n") == [] + + def test_split_exact_chunk_boundary(self, mem): + """Test splitting when text is exactly at chunk boundary.""" + text = "A" * 500 # Exactly at default max_chunk_size + chunks = mem._split_into_chunks(text) + assert len(chunks) == 1 + assert len(chunks[0]) == 500 + + def test_split_very_long_sentence(self, mem): + """Test splitting text with no sentence boundaries.""" + text = "A" * 2000 # One long word essentially + chunks = mem._split_into_chunks(text, max_chunk_size=100) + # Should still produce chunks + assert len(chunks) > 0 + def test_index_file(self, mem): md_file = mem.vault_path / "test.md" md_file.write_text( @@ -130,6 +260,36 @@ class TestSemanticMemory: assert count1 > 0 assert count2 == 0 # Already indexed, same hash + def test_index_file_updates_when_changed(self, mem): + """Test that file is re-indexed when content changes.""" + md_file = mem.vault_path / "changed.md" + md_file.write_text("# Original\n\nOriginal content here for indexing.") + count1 = mem.index_file(md_file) + + # Change the file + md_file.write_text("# Updated\n\nUpdated content that is different.") + count2 = mem.index_file(md_file) + + assert count1 > 0 + assert count2 > 0 # Re-indexed because hash changed + + def test_index_file_skips_tiny_chunks(self, mem): + """Test that chunks under 20 characters are skipped (not stored in DB).""" + import sqlite3 + + md_file = mem.vault_path / "tiny.md" + # Create a paragraph that is definitely under 20 chars + md_file.write_text("Tiny") # Just 4 characters + mem.index_file(md_file) + + # Check DB directly - tiny chunks should NOT be stored + conn = sqlite3.connect(str(mem.db_path)) + cursor = conn.execute("SELECT COUNT(*) FROM chunks WHERE source = ?", (str(md_file),)) + stored_count = cursor.fetchone()[0] + conn.close() + + assert stored_count == 0 # "Tiny" was too short, nothing stored + def test_index_vault(self, mem): (mem.vault_path / "a.md").write_text( "# File A\n\nContent of file A with some meaningful text here." @@ -169,6 +329,21 @@ class TestSemanticMemory: assert any("real" in s for s in sources) assert not any("last-session-handoff" in s for s in sources) + def test_index_vault_recursive(self, mem): + """Test that index_vault finds files in subdirectories.""" + subdir = mem.vault_path / "subdir" / "nested" + subdir.mkdir(parents=True) + (subdir / "deep.md").write_text( + "# Deep file\n\nThis file is nested deep in the directory structure." + ) + total = mem.index_vault() + assert total > 0 + + def test_index_vault_no_markdown_files(self, mem): + """Test index_vault when no markdown files exist.""" + total = mem.index_vault() + assert total == 0 + def test_search_returns_results(self, mem): md = mem.vault_path / "searchable.md" md.write_text( @@ -186,6 +361,17 @@ class TestSemanticMemory: results = mem.search("anything") assert results == [] + def test_search_returns_top_k(self, mem): + """Test that search respects top_k parameter.""" + # Create multiple files + for i in range(10): + md = mem.vault_path / f"file{i}.md" + md.write_text(f"# File {i}\n\nThis is content about topic number {i}.") + mem.index_file(md) + + results = mem.search("topic", top_k=3) + assert len(results) <= 3 + def test_get_relevant_context(self, mem): md = mem.vault_path / "context.md" md.write_text( @@ -200,12 +386,53 @@ class TestSemanticMemory: def test_get_relevant_context_empty(self, mem): assert mem.get_relevant_context("anything") == "" + def test_get_relevant_context_respects_max_chars(self, mem): + """Test that get_relevant_context respects max_chars limit.""" + # Create multiple files with content + for i in range(5): + md = mem.vault_path / f"ctx{i}.md" + md.write_text(f"# Context {i}\n\n" + "X" * 500) + mem.index_file(md) + + ctx = mem.get_relevant_context("context", max_chars=200) + assert len(ctx) <= 200 + + def test_get_relevant_context_filters_by_score(self, mem): + """Test that results below score threshold (0.3) are filtered.""" + md = mem.vault_path / "low_score.md" + md.write_text("XYZ random unrelated content that should not match.") + mem.index_file(md) + + ctx = mem.get_relevant_context("completely different topic about quantum physics") + # May be empty if score < 0.3 + assert isinstance(ctx, str) + def test_stats(self, mem): stats = mem.stats() assert "total_chunks" in stats assert "total_files" in stats assert stats["total_chunks"] == 0 + def test_stats_after_indexing(self, mem): + """Test stats after adding content.""" + md = mem.vault_path / "stats.md" + md.write_text( + "# Stats\n\nThis is paragraph one with enough content to be indexed properly.\n\n" + "This is paragraph two with also enough meaningful content text." + ) + mem.index_file(md) + + stats = mem.stats() + assert stats["total_chunks"] > 0 + assert stats["total_files"] == 1 + assert "embedding_dim" in stats + + def test_stats_embedding_dim_fallback(self, mem): + """Test that stats returns correct embedding dimension for fallback.""" + stats = mem.stats() + # When using fallback (sentence-transformers not available) + assert stats["embedding_dim"] == 128 + class TestMemorySearcher: """Test MemorySearcher high-level interface.""" @@ -231,18 +458,34 @@ class TestMemorySearcher: ctx = searcher.get_context_for_query("test") assert ctx == "" # Empty DB + def test_get_context_for_query_with_results(self, searcher): + """Test get_context_for_query when there are results.""" + md = searcher.semantic.vault_path / "context.md" + md.write_text("# System\n\nThe system architecture uses microservices for scalability.") + searcher.semantic.index_file(md) + + ctx = searcher.get_context_for_query("architecture") + assert isinstance(ctx, str) + # Should either be empty or contain context header + assert ctx == "" or "Relevant Past Context" in ctx + class TestMemorySearch: """Test module-level memory_search function.""" def test_no_results(self): - result = memory_search("something obscure that won't match anything") + result = memory_search("something obscure that won't match anything xyz123") assert isinstance(result, str) def test_none_top_k_handled(self): result = memory_search("test", top_k=None) assert isinstance(result, str) + def test_basic_search_returns_string(self): + """Test that memory_search returns a string result.""" + result = memory_search("test query") + assert isinstance(result, str) + class TestMemoryRead: """Test module-level memory_read function.""" @@ -259,17 +502,297 @@ class TestMemoryRead: result = memory_read("test", top_k=None) assert isinstance(result, str) + def test_memory_read_empty_message(self): + """Test that empty db returns appropriate message.""" + result = memory_read() + # Should indicate no memories or return empty results + assert isinstance(result, str) -class TestMemoryChunk: - """Test MemoryChunk dataclass.""" - def test_create(self): - chunk = MemoryChunk( - id="c1", - source="/path/to/file.md", - content="chunk text", - embedding=[0.1, 0.2], - created_at="2026-03-06", +class TestMemoryWrite: + """Test module-level memory_write function.""" + + @pytest.fixture(autouse=True) + def mock_vector_store(self): + """Mock vector_store functions for memory_write tests.""" + # Patch where it's imported from, not where it's used + with ( + patch("timmy.memory.vector_store.search_memories") as mock_search, + patch("timmy.memory.vector_store.store_memory") as mock_store, + ): + # Default: no existing memories (no duplicates) + mock_search.return_value = [] + + # Mock store_memory return value + mock_entry = MagicMock() + mock_entry.id = "test-id-12345" + mock_store.return_value = mock_entry + + yield {"search": mock_search, "store": mock_store} + + def test_memory_write_empty_content(self): + """Test that empty content returns error message.""" + result = memory_write("") + assert "empty" in result.lower() + + def test_memory_write_whitespace_only(self): + """Test that whitespace-only content returns error.""" + result = memory_write(" \n\t ") + assert "empty" in result.lower() + + def test_memory_write_valid_content(self, mock_vector_store): + """Test writing valid content.""" + result = memory_write("Remember this important fact.") + assert "stored" in result.lower() or "memory" in result.lower() + mock_vector_store["store"].assert_called_once() + + def test_memory_write_dedup_for_facts(self, mock_vector_store): + """Test that duplicate facts are skipped.""" + # Simulate existing similar fact + mock_entry = MagicMock() + mock_entry.id = "existing-id" + mock_vector_store["search"].return_value = [mock_entry] + + result = memory_write("Similar fact text", context_type="fact") + assert "similar" in result.lower() or "duplicate" in result.lower() + mock_vector_store["store"].assert_not_called() + + def test_memory_write_no_dedup_for_conversation(self, mock_vector_store): + """Test that conversation entries are not deduplicated.""" + # Even with existing entries, conversations should be stored + mock_entry = MagicMock() + mock_entry.id = "existing-id" + mock_vector_store["search"].return_value = [mock_entry] + + memory_write("Conversation text", context_type="conversation") + # Should still store (no duplicate check for non-fact) + mock_vector_store["store"].assert_called_once() + + def test_memory_write_invalid_context_type(self, mock_vector_store): + """Test that invalid context_type defaults to 'fact'.""" + memory_write("Some content", context_type="invalid_type") + # Should still succeed, using "fact" as default + mock_vector_store["store"].assert_called_once() + call_kwargs = mock_vector_store["store"].call_args.kwargs + assert call_kwargs.get("context_type") == "fact" + + def test_memory_write_valid_context_types(self, mock_vector_store): + """Test all valid context types.""" + valid_types = ["fact", "conversation", "document"] + for ctx_type in valid_types: + mock_vector_store["store"].reset_mock() + memory_write(f"Content for {ctx_type}", context_type=ctx_type) + mock_vector_store["store"].assert_called_once() + + def test_memory_write_strips_content(self, mock_vector_store): + """Test that content is stripped of leading/trailing whitespace.""" + memory_write(" padded content ") + call_kwargs = mock_vector_store["store"].call_args.kwargs + assert call_kwargs.get("content") == "padded content" + + def test_memory_write_unicode_content(self, mock_vector_store): + """Test writing unicode content.""" + result = memory_write("Unicode content: 你好世界 🎉") + assert "stored" in result.lower() or "memory" in result.lower() + + def test_memory_write_handles_exception(self, mock_vector_store): + """Test handling of store_memory exceptions.""" + mock_vector_store["store"].side_effect = Exception("DB error") + result = memory_write("This will fail") + assert "failed" in result.lower() or "error" in result.lower() + + +class TestMemoryForget: + """Test module-level memory_forget function.""" + + @pytest.fixture(autouse=True) + def mock_vector_store(self): + """Mock vector_store functions for memory_forget tests.""" + # Patch where it's imported from, not where it's used + with ( + patch("timmy.memory.vector_store.search_memories") as mock_search, + patch("timmy.memory.vector_store.delete_memory") as mock_delete, + ): + # Default: no results + mock_search.return_value = [] + mock_delete.return_value = True + + yield {"search": mock_search, "delete": mock_delete} + + def test_memory_forget_empty_query(self): + """Test that empty query returns error message.""" + result = memory_forget("") + assert "empty" in result.lower() + + def test_memory_forget_whitespace_only(self): + """Test that whitespace-only query returns error.""" + result = memory_forget(" \n\t ") + assert "empty" in result.lower() + + def test_memory_forget_no_matches(self, mock_vector_store): + """Test when no memories match the query.""" + mock_vector_store["search"].return_value = [] + result = memory_forget("nonexistent query xyz123") + assert "no matching" in result.lower() or "not found" in result.lower() + + def test_memory_forget_success(self, mock_vector_store): + """Test successful deletion.""" + mock_entry = MagicMock() + mock_entry.id = "entry-to-delete" + mock_entry.content = "Content to forget" + mock_entry.context_type = "fact" + mock_vector_store["search"].return_value = [mock_entry] + mock_vector_store["delete"].return_value = True + + result = memory_forget("content to forget") + assert "forgotten" in result.lower() or "forgot" in result.lower() + mock_vector_store["delete"].assert_called_once_with("entry-to-delete") + + def test_memory_forget_delete_fails(self, mock_vector_store): + """Test when delete_memory returns False.""" + mock_entry = MagicMock() + mock_entry.id = "entry-id" + mock_entry.content = "Content" + mock_entry.context_type = "fact" + mock_vector_store["search"].return_value = [mock_entry] + mock_vector_store["delete"].return_value = False + + result = memory_forget("content") + # Should indicate the memory wasn't found or already deleted + assert "not found" in result.lower() or "already" in result.lower() + + def test_memory_forget_strips_query(self, mock_vector_store): + """Test that query is stripped of whitespace.""" + mock_vector_store["search"].return_value = [] + memory_forget(" padded query ") + # Check that search was called with stripped query + call_args = mock_vector_store["search"].call_args + assert call_args.args[0] == "padded query" + + def test_memory_forget_handles_exception(self, mock_vector_store): + """Test handling of exceptions during forget.""" + mock_vector_store["search"].side_effect = Exception("DB error") + result = memory_forget("query") + assert "failed" in result.lower() or "error" in result.lower() + + def test_memory_forget_uses_min_relevance(self, mock_vector_store): + """Test that search uses min_relevance parameter.""" + mock_vector_store["search"].return_value = [] + memory_forget("test query") + call_kwargs = mock_vector_store["search"].call_args.kwargs + assert call_kwargs.get("min_relevance") == 0.3 + assert call_kwargs.get("limit") == 3 + + +class TestGetEmbeddingModel: + """Test _get_embedding_model function.""" + + def test_returns_false_when_skip_embeddings(self): + """Test that _get_embedding_model returns False when skip_embeddings is set.""" + # conftest sets TIMMY_SKIP_EMBEDDINGS=1 + model = _get_embedding_model() + assert model is False + + def test_returns_model_when_available(self): + """Test loading when sentence-transformers is available.""" + # This is mocked in conftest, so model is not actually loaded + model = _get_embedding_model() + # Should be False because sentence_transformers is mocked + assert model is False + + +class TestEdgeCases: + """Test various edge cases and boundary conditions.""" + + def test_semantic_memory_with_unicode_filepaths(self, tmp_path): + """Test handling of unicode file paths.""" + mem = SemanticMemory() + mem.db_path = tmp_path / "unicode.db" + mem.vault_path = tmp_path / "vault" + mem.vault_path.mkdir() + mem._init_db() + + # Create file with unicode name + md_file = mem.vault_path / "文件_📝.md" + md_file.write_text( + "# Unicode filename\n\nThis is meaningful content for testing unicode paths." ) - assert chunk.id == "c1" - assert chunk.content == "chunk text" + count = mem.index_file(md_file) + assert count > 0 + + # Verify it can be searched + results = mem.search("content") + assert len(results) > 0 + + def test_semantic_memory_special_chars_in_content(self, tmp_path): + """Test handling of special characters in content.""" + mem = SemanticMemory() + mem.db_path = tmp_path / "special.db" + mem.vault_path = tmp_path / "vault" + mem.vault_path.mkdir() + mem._init_db() + + md_file = mem.vault_path / "special.md" + content = """# Special Characters + + +SQL: SELECT * FROM users WHERE name = "admin' OR '1'='1" +JSON: {"key": "value", "nested": {"array": [1, 2, 3]}} +Unicode: 你好世界 🌍 café naïve +Escapes: \\n \\t \\r +""" + md_file.write_text(content) + count = mem.index_file(md_file) + assert count > 0 + + def test_very_long_file_content(self, tmp_path): + """Test handling of very long file content.""" + mem = SemanticMemory() + mem.db_path = tmp_path / "long.db" + mem.vault_path = tmp_path / "vault" + mem.vault_path.mkdir() + mem._init_db() + + md_file = mem.vault_path / "long.md" + # Create content with many paragraphs + paragraphs = [f"Paragraph {i} with some content text here." for i in range(100)] + md_file.write_text("# Long doc\n\n" + "\n\n".join(paragraphs)) + count = mem.index_file(md_file) + assert count > 0 + + def test_search_with_unicode_query(self, tmp_path): + """Test search with unicode query.""" + mem = SemanticMemory() + mem.db_path = tmp_path / "unicode_query.db" + mem.vault_path = tmp_path / "vault" + mem.vault_path.mkdir() + mem._init_db() + + md_file = mem.vault_path / "test.md" + md_file.write_text("# Test\n\nThis is a test document.") + mem.index_file(md_file) + + # Search with unicode query should not crash + results = mem.search("测试 查询 🌍") + assert isinstance(results, list) + + def test_empty_vault_directory(self, tmp_path): + """Test operations on empty vault directory.""" + mem = SemanticMemory() + mem.db_path = tmp_path / "empty.db" + mem.vault_path = tmp_path / "empty_vault" + mem.vault_path.mkdir() + mem._init_db() + + # Index empty vault + count = mem.index_vault() + assert count == 0 + + # Search should return empty results + results = mem.search("anything") + assert results == [] + + # Stats should show zeros + stats = mem.stats() + assert stats["total_chunks"] == 0 + assert stats["total_files"] == 0