diff --git a/tests/timmy/test_semantic_memory.py b/tests/timmy/test_semantic_memory.py
index a739961..b5549ba 100644
--- a/tests/timmy/test_semantic_memory.py
+++ b/tests/timmy/test_semantic_memory.py
@@ -1,6 +1,7 @@
"""Tests for timmy.semantic_memory — semantic search, chunking, indexing."""
from pathlib import Path
+from unittest.mock import MagicMock, patch
import pytest
@@ -8,11 +9,14 @@ from timmy.semantic_memory import (
MemoryChunk,
MemorySearcher,
SemanticMemory,
+ _get_embedding_model,
_simple_hash_embedding,
cosine_similarity,
embed_text,
+ memory_forget,
memory_read,
memory_search,
+ memory_write,
)
@@ -42,6 +46,40 @@ class TestSimpleHashEmbedding:
magnitude = math.sqrt(sum(x * x for x in vec))
assert abs(magnitude - 1.0) < 0.01
+ def test_empty_string(self):
+ """Test that empty string produces a valid normalized vector."""
+ vec = _simple_hash_embedding("")
+ assert isinstance(vec, list)
+ assert len(vec) == 128
+ # All zeros vector should still normalize (to zeros since magnitude stays 0)
+ assert all(isinstance(x, float) for x in vec)
+
+ def test_unicode_handling(self):
+ """Test handling of unicode characters."""
+ vec = _simple_hash_embedding("Hello 世界 🌍 àáâãäå")
+ assert isinstance(vec, list)
+ assert len(vec) == 128
+
+ def test_special_characters(self):
+ """Test handling of special characters and punctuation."""
+ text = " \\n\\t!@#$%^&*()"
+ vec = _simple_hash_embedding(text)
+ assert isinstance(vec, list)
+ assert len(vec) == 128
+
+ def test_very_long_text(self):
+ """Test handling of text with many words (only first 50 words used)."""
+ text = "word " * 1000 # 1000 words
+ vec = _simple_hash_embedding(text)
+ assert isinstance(vec, list)
+ assert len(vec) == 128
+
+ def test_single_word(self):
+ """Test handling of single word."""
+ vec = _simple_hash_embedding("test")
+ assert isinstance(vec, list)
+ assert len(vec) == 128
+
class TestEmbedText:
"""Test embed_text with fallback."""
@@ -52,6 +90,24 @@ class TestEmbedText:
assert isinstance(vec, list)
assert len(vec) > 0
+ def test_consistency(self):
+ """Test that same text produces same embedding."""
+ a = embed_text("consistent text")
+ b = embed_text("consistent text")
+ assert a == b
+
+ def test_different_texts(self):
+ """Test that different texts produce different embeddings."""
+ a = embed_text("hello world")
+ b = embed_text("goodbye world")
+ assert a != b
+
+ def test_empty_text(self):
+ """Test embedding empty text."""
+ vec = embed_text("")
+ assert isinstance(vec, list)
+ assert len(vec) == 128 # fallback dimension
+
class TestCosineSimilarity:
"""Test cosine_similarity function."""
@@ -75,6 +131,62 @@ class TestCosineSimilarity:
b = [1.0, 0.0]
assert cosine_similarity(a, b) == 0.0
+ def test_both_zero_vectors(self):
+ """Test similarity when both vectors are zero."""
+ a = [0.0, 0.0, 0.0]
+ b = [0.0, 0.0, 0.0]
+ assert cosine_similarity(a, b) == 0.0
+
+ def test_partial_zero_vector(self):
+ """Test similarity with partially zero vector."""
+ a = [1.0, 0.0, 1.0]
+ b = [0.0, 0.0, 0.0]
+ assert cosine_similarity(a, b) == 0.0
+
+ def test_different_lengths(self):
+ """Test that different length vectors are handled gracefully."""
+ a = [1.0, 0.5, 0.25]
+ b = [1.0, 0.5] # shorter
+ # zip with strict=False handles different lengths
+ result = cosine_similarity(a, b)
+ assert isinstance(result, float)
+
+
+class TestMemoryChunk:
+ """Test MemoryChunk dataclass."""
+
+ def test_create(self):
+ chunk = MemoryChunk(
+ id="c1",
+ source="/path/to/file.md",
+ content="chunk text",
+ embedding=[0.1, 0.2],
+ created_at="2026-03-06",
+ )
+ assert chunk.id == "c1"
+ assert chunk.content == "chunk text"
+
+ def test_with_unicode_content(self):
+ """Test MemoryChunk with unicode content."""
+ chunk = MemoryChunk(
+ id="c2",
+ source="/path/to/文件.md",
+ content="Unicode content: 你好世界 🎉",
+ embedding=[0.1, 0.2, 0.3],
+ created_at="2026-03-06T10:00:00",
+ )
+ assert "你好" in chunk.content
+
+ def test_equality(self):
+ """Test that same values create equal objects."""
+ chunk1 = MemoryChunk(
+ id="c1", source="/a.md", content="text", embedding=[0.1], created_at="now"
+ )
+ chunk2 = MemoryChunk(
+ id="c1", source="/a.md", content="text", embedding=[0.1], created_at="now"
+ )
+ assert chunk1 == chunk2
+
class TestSemanticMemory:
"""Test SemanticMemory class."""
@@ -110,6 +222,24 @@ class TestSemanticMemory:
def test_split_empty_text(self, mem):
assert mem._split_into_chunks("") == []
+ def test_split_whitespace_only(self, mem):
+ """Test that whitespace-only text produces no chunks."""
+ assert mem._split_into_chunks(" \n\n \n") == []
+
+ def test_split_exact_chunk_boundary(self, mem):
+ """Test splitting when text is exactly at chunk boundary."""
+ text = "A" * 500 # Exactly at default max_chunk_size
+ chunks = mem._split_into_chunks(text)
+ assert len(chunks) == 1
+ assert len(chunks[0]) == 500
+
+ def test_split_very_long_sentence(self, mem):
+ """Test splitting text with no sentence boundaries."""
+ text = "A" * 2000 # One long word essentially
+ chunks = mem._split_into_chunks(text, max_chunk_size=100)
+ # Should still produce chunks
+ assert len(chunks) > 0
+
def test_index_file(self, mem):
md_file = mem.vault_path / "test.md"
md_file.write_text(
@@ -130,6 +260,36 @@ class TestSemanticMemory:
assert count1 > 0
assert count2 == 0 # Already indexed, same hash
+ def test_index_file_updates_when_changed(self, mem):
+ """Test that file is re-indexed when content changes."""
+ md_file = mem.vault_path / "changed.md"
+ md_file.write_text("# Original\n\nOriginal content here for indexing.")
+ count1 = mem.index_file(md_file)
+
+ # Change the file
+ md_file.write_text("# Updated\n\nUpdated content that is different.")
+ count2 = mem.index_file(md_file)
+
+ assert count1 > 0
+ assert count2 > 0 # Re-indexed because hash changed
+
+ def test_index_file_skips_tiny_chunks(self, mem):
+ """Test that chunks under 20 characters are skipped (not stored in DB)."""
+ import sqlite3
+
+ md_file = mem.vault_path / "tiny.md"
+ # Create a paragraph that is definitely under 20 chars
+ md_file.write_text("Tiny") # Just 4 characters
+ mem.index_file(md_file)
+
+ # Check DB directly - tiny chunks should NOT be stored
+ conn = sqlite3.connect(str(mem.db_path))
+ cursor = conn.execute("SELECT COUNT(*) FROM chunks WHERE source = ?", (str(md_file),))
+ stored_count = cursor.fetchone()[0]
+ conn.close()
+
+ assert stored_count == 0 # "Tiny" was too short, nothing stored
+
def test_index_vault(self, mem):
(mem.vault_path / "a.md").write_text(
"# File A\n\nContent of file A with some meaningful text here."
@@ -169,6 +329,21 @@ class TestSemanticMemory:
assert any("real" in s for s in sources)
assert not any("last-session-handoff" in s for s in sources)
+ def test_index_vault_recursive(self, mem):
+ """Test that index_vault finds files in subdirectories."""
+ subdir = mem.vault_path / "subdir" / "nested"
+ subdir.mkdir(parents=True)
+ (subdir / "deep.md").write_text(
+ "# Deep file\n\nThis file is nested deep in the directory structure."
+ )
+ total = mem.index_vault()
+ assert total > 0
+
+ def test_index_vault_no_markdown_files(self, mem):
+ """Test index_vault when no markdown files exist."""
+ total = mem.index_vault()
+ assert total == 0
+
def test_search_returns_results(self, mem):
md = mem.vault_path / "searchable.md"
md.write_text(
@@ -186,6 +361,17 @@ class TestSemanticMemory:
results = mem.search("anything")
assert results == []
+ def test_search_returns_top_k(self, mem):
+ """Test that search respects top_k parameter."""
+ # Create multiple files
+ for i in range(10):
+ md = mem.vault_path / f"file{i}.md"
+ md.write_text(f"# File {i}\n\nThis is content about topic number {i}.")
+ mem.index_file(md)
+
+ results = mem.search("topic", top_k=3)
+ assert len(results) <= 3
+
def test_get_relevant_context(self, mem):
md = mem.vault_path / "context.md"
md.write_text(
@@ -200,12 +386,53 @@ class TestSemanticMemory:
def test_get_relevant_context_empty(self, mem):
assert mem.get_relevant_context("anything") == ""
+ def test_get_relevant_context_respects_max_chars(self, mem):
+ """Test that get_relevant_context respects max_chars limit."""
+ # Create multiple files with content
+ for i in range(5):
+ md = mem.vault_path / f"ctx{i}.md"
+ md.write_text(f"# Context {i}\n\n" + "X" * 500)
+ mem.index_file(md)
+
+ ctx = mem.get_relevant_context("context", max_chars=200)
+ assert len(ctx) <= 200
+
+ def test_get_relevant_context_filters_by_score(self, mem):
+ """Test that results below score threshold (0.3) are filtered."""
+ md = mem.vault_path / "low_score.md"
+ md.write_text("XYZ random unrelated content that should not match.")
+ mem.index_file(md)
+
+ ctx = mem.get_relevant_context("completely different topic about quantum physics")
+ # May be empty if score < 0.3
+ assert isinstance(ctx, str)
+
def test_stats(self, mem):
stats = mem.stats()
assert "total_chunks" in stats
assert "total_files" in stats
assert stats["total_chunks"] == 0
+ def test_stats_after_indexing(self, mem):
+ """Test stats after adding content."""
+ md = mem.vault_path / "stats.md"
+ md.write_text(
+ "# Stats\n\nThis is paragraph one with enough content to be indexed properly.\n\n"
+ "This is paragraph two with also enough meaningful content text."
+ )
+ mem.index_file(md)
+
+ stats = mem.stats()
+ assert stats["total_chunks"] > 0
+ assert stats["total_files"] == 1
+ assert "embedding_dim" in stats
+
+ def test_stats_embedding_dim_fallback(self, mem):
+ """Test that stats returns correct embedding dimension for fallback."""
+ stats = mem.stats()
+ # When using fallback (sentence-transformers not available)
+ assert stats["embedding_dim"] == 128
+
class TestMemorySearcher:
"""Test MemorySearcher high-level interface."""
@@ -231,18 +458,34 @@ class TestMemorySearcher:
ctx = searcher.get_context_for_query("test")
assert ctx == "" # Empty DB
+ def test_get_context_for_query_with_results(self, searcher):
+ """Test get_context_for_query when there are results."""
+ md = searcher.semantic.vault_path / "context.md"
+ md.write_text("# System\n\nThe system architecture uses microservices for scalability.")
+ searcher.semantic.index_file(md)
+
+ ctx = searcher.get_context_for_query("architecture")
+ assert isinstance(ctx, str)
+ # Should either be empty or contain context header
+ assert ctx == "" or "Relevant Past Context" in ctx
+
class TestMemorySearch:
"""Test module-level memory_search function."""
def test_no_results(self):
- result = memory_search("something obscure that won't match anything")
+ result = memory_search("something obscure that won't match anything xyz123")
assert isinstance(result, str)
def test_none_top_k_handled(self):
result = memory_search("test", top_k=None)
assert isinstance(result, str)
+ def test_basic_search_returns_string(self):
+ """Test that memory_search returns a string result."""
+ result = memory_search("test query")
+ assert isinstance(result, str)
+
class TestMemoryRead:
"""Test module-level memory_read function."""
@@ -259,17 +502,297 @@ class TestMemoryRead:
result = memory_read("test", top_k=None)
assert isinstance(result, str)
+ def test_memory_read_empty_message(self):
+ """Test that empty db returns appropriate message."""
+ result = memory_read()
+ # Should indicate no memories or return empty results
+ assert isinstance(result, str)
-class TestMemoryChunk:
- """Test MemoryChunk dataclass."""
- def test_create(self):
- chunk = MemoryChunk(
- id="c1",
- source="/path/to/file.md",
- content="chunk text",
- embedding=[0.1, 0.2],
- created_at="2026-03-06",
+class TestMemoryWrite:
+ """Test module-level memory_write function."""
+
+ @pytest.fixture(autouse=True)
+ def mock_vector_store(self):
+ """Mock vector_store functions for memory_write tests."""
+ # Patch where it's imported from, not where it's used
+ with (
+ patch("timmy.memory.vector_store.search_memories") as mock_search,
+ patch("timmy.memory.vector_store.store_memory") as mock_store,
+ ):
+ # Default: no existing memories (no duplicates)
+ mock_search.return_value = []
+
+ # Mock store_memory return value
+ mock_entry = MagicMock()
+ mock_entry.id = "test-id-12345"
+ mock_store.return_value = mock_entry
+
+ yield {"search": mock_search, "store": mock_store}
+
+ def test_memory_write_empty_content(self):
+ """Test that empty content returns error message."""
+ result = memory_write("")
+ assert "empty" in result.lower()
+
+ def test_memory_write_whitespace_only(self):
+ """Test that whitespace-only content returns error."""
+ result = memory_write(" \n\t ")
+ assert "empty" in result.lower()
+
+ def test_memory_write_valid_content(self, mock_vector_store):
+ """Test writing valid content."""
+ result = memory_write("Remember this important fact.")
+ assert "stored" in result.lower() or "memory" in result.lower()
+ mock_vector_store["store"].assert_called_once()
+
+ def test_memory_write_dedup_for_facts(self, mock_vector_store):
+ """Test that duplicate facts are skipped."""
+ # Simulate existing similar fact
+ mock_entry = MagicMock()
+ mock_entry.id = "existing-id"
+ mock_vector_store["search"].return_value = [mock_entry]
+
+ result = memory_write("Similar fact text", context_type="fact")
+ assert "similar" in result.lower() or "duplicate" in result.lower()
+ mock_vector_store["store"].assert_not_called()
+
+ def test_memory_write_no_dedup_for_conversation(self, mock_vector_store):
+ """Test that conversation entries are not deduplicated."""
+ # Even with existing entries, conversations should be stored
+ mock_entry = MagicMock()
+ mock_entry.id = "existing-id"
+ mock_vector_store["search"].return_value = [mock_entry]
+
+ memory_write("Conversation text", context_type="conversation")
+ # Should still store (no duplicate check for non-fact)
+ mock_vector_store["store"].assert_called_once()
+
+ def test_memory_write_invalid_context_type(self, mock_vector_store):
+ """Test that invalid context_type defaults to 'fact'."""
+ memory_write("Some content", context_type="invalid_type")
+ # Should still succeed, using "fact" as default
+ mock_vector_store["store"].assert_called_once()
+ call_kwargs = mock_vector_store["store"].call_args.kwargs
+ assert call_kwargs.get("context_type") == "fact"
+
+ def test_memory_write_valid_context_types(self, mock_vector_store):
+ """Test all valid context types."""
+ valid_types = ["fact", "conversation", "document"]
+ for ctx_type in valid_types:
+ mock_vector_store["store"].reset_mock()
+ memory_write(f"Content for {ctx_type}", context_type=ctx_type)
+ mock_vector_store["store"].assert_called_once()
+
+ def test_memory_write_strips_content(self, mock_vector_store):
+ """Test that content is stripped of leading/trailing whitespace."""
+ memory_write(" padded content ")
+ call_kwargs = mock_vector_store["store"].call_args.kwargs
+ assert call_kwargs.get("content") == "padded content"
+
+ def test_memory_write_unicode_content(self, mock_vector_store):
+ """Test writing unicode content."""
+ result = memory_write("Unicode content: 你好世界 🎉")
+ assert "stored" in result.lower() or "memory" in result.lower()
+
+ def test_memory_write_handles_exception(self, mock_vector_store):
+ """Test handling of store_memory exceptions."""
+ mock_vector_store["store"].side_effect = Exception("DB error")
+ result = memory_write("This will fail")
+ assert "failed" in result.lower() or "error" in result.lower()
+
+
+class TestMemoryForget:
+ """Test module-level memory_forget function."""
+
+ @pytest.fixture(autouse=True)
+ def mock_vector_store(self):
+ """Mock vector_store functions for memory_forget tests."""
+ # Patch where it's imported from, not where it's used
+ with (
+ patch("timmy.memory.vector_store.search_memories") as mock_search,
+ patch("timmy.memory.vector_store.delete_memory") as mock_delete,
+ ):
+ # Default: no results
+ mock_search.return_value = []
+ mock_delete.return_value = True
+
+ yield {"search": mock_search, "delete": mock_delete}
+
+ def test_memory_forget_empty_query(self):
+ """Test that empty query returns error message."""
+ result = memory_forget("")
+ assert "empty" in result.lower()
+
+ def test_memory_forget_whitespace_only(self):
+ """Test that whitespace-only query returns error."""
+ result = memory_forget(" \n\t ")
+ assert "empty" in result.lower()
+
+ def test_memory_forget_no_matches(self, mock_vector_store):
+ """Test when no memories match the query."""
+ mock_vector_store["search"].return_value = []
+ result = memory_forget("nonexistent query xyz123")
+ assert "no matching" in result.lower() or "not found" in result.lower()
+
+ def test_memory_forget_success(self, mock_vector_store):
+ """Test successful deletion."""
+ mock_entry = MagicMock()
+ mock_entry.id = "entry-to-delete"
+ mock_entry.content = "Content to forget"
+ mock_entry.context_type = "fact"
+ mock_vector_store["search"].return_value = [mock_entry]
+ mock_vector_store["delete"].return_value = True
+
+ result = memory_forget("content to forget")
+ assert "forgotten" in result.lower() or "forgot" in result.lower()
+ mock_vector_store["delete"].assert_called_once_with("entry-to-delete")
+
+ def test_memory_forget_delete_fails(self, mock_vector_store):
+ """Test when delete_memory returns False."""
+ mock_entry = MagicMock()
+ mock_entry.id = "entry-id"
+ mock_entry.content = "Content"
+ mock_entry.context_type = "fact"
+ mock_vector_store["search"].return_value = [mock_entry]
+ mock_vector_store["delete"].return_value = False
+
+ result = memory_forget("content")
+ # Should indicate the memory wasn't found or already deleted
+ assert "not found" in result.lower() or "already" in result.lower()
+
+ def test_memory_forget_strips_query(self, mock_vector_store):
+ """Test that query is stripped of whitespace."""
+ mock_vector_store["search"].return_value = []
+ memory_forget(" padded query ")
+ # Check that search was called with stripped query
+ call_args = mock_vector_store["search"].call_args
+ assert call_args.args[0] == "padded query"
+
+ def test_memory_forget_handles_exception(self, mock_vector_store):
+ """Test handling of exceptions during forget."""
+ mock_vector_store["search"].side_effect = Exception("DB error")
+ result = memory_forget("query")
+ assert "failed" in result.lower() or "error" in result.lower()
+
+ def test_memory_forget_uses_min_relevance(self, mock_vector_store):
+ """Test that search uses min_relevance parameter."""
+ mock_vector_store["search"].return_value = []
+ memory_forget("test query")
+ call_kwargs = mock_vector_store["search"].call_args.kwargs
+ assert call_kwargs.get("min_relevance") == 0.3
+ assert call_kwargs.get("limit") == 3
+
+
+class TestGetEmbeddingModel:
+ """Test _get_embedding_model function."""
+
+ def test_returns_false_when_skip_embeddings(self):
+ """Test that _get_embedding_model returns False when skip_embeddings is set."""
+ # conftest sets TIMMY_SKIP_EMBEDDINGS=1
+ model = _get_embedding_model()
+ assert model is False
+
+ def test_returns_model_when_available(self):
+ """Test loading when sentence-transformers is available."""
+ # This is mocked in conftest, so model is not actually loaded
+ model = _get_embedding_model()
+ # Should be False because sentence_transformers is mocked
+ assert model is False
+
+
+class TestEdgeCases:
+ """Test various edge cases and boundary conditions."""
+
+ def test_semantic_memory_with_unicode_filepaths(self, tmp_path):
+ """Test handling of unicode file paths."""
+ mem = SemanticMemory()
+ mem.db_path = tmp_path / "unicode.db"
+ mem.vault_path = tmp_path / "vault"
+ mem.vault_path.mkdir()
+ mem._init_db()
+
+ # Create file with unicode name
+ md_file = mem.vault_path / "文件_📝.md"
+ md_file.write_text(
+ "# Unicode filename\n\nThis is meaningful content for testing unicode paths."
)
- assert chunk.id == "c1"
- assert chunk.content == "chunk text"
+ count = mem.index_file(md_file)
+ assert count > 0
+
+ # Verify it can be searched
+ results = mem.search("content")
+ assert len(results) > 0
+
+ def test_semantic_memory_special_chars_in_content(self, tmp_path):
+ """Test handling of special characters in content."""
+ mem = SemanticMemory()
+ mem.db_path = tmp_path / "special.db"
+ mem.vault_path = tmp_path / "vault"
+ mem.vault_path.mkdir()
+ mem._init_db()
+
+ md_file = mem.vault_path / "special.md"
+ content = """# Special Characters
+
+
+SQL: SELECT * FROM users WHERE name = "admin' OR '1'='1"
+JSON: {"key": "value", "nested": {"array": [1, 2, 3]}}
+Unicode: 你好世界 🌍 café naïve
+Escapes: \\n \\t \\r
+"""
+ md_file.write_text(content)
+ count = mem.index_file(md_file)
+ assert count > 0
+
+ def test_very_long_file_content(self, tmp_path):
+ """Test handling of very long file content."""
+ mem = SemanticMemory()
+ mem.db_path = tmp_path / "long.db"
+ mem.vault_path = tmp_path / "vault"
+ mem.vault_path.mkdir()
+ mem._init_db()
+
+ md_file = mem.vault_path / "long.md"
+ # Create content with many paragraphs
+ paragraphs = [f"Paragraph {i} with some content text here." for i in range(100)]
+ md_file.write_text("# Long doc\n\n" + "\n\n".join(paragraphs))
+ count = mem.index_file(md_file)
+ assert count > 0
+
+ def test_search_with_unicode_query(self, tmp_path):
+ """Test search with unicode query."""
+ mem = SemanticMemory()
+ mem.db_path = tmp_path / "unicode_query.db"
+ mem.vault_path = tmp_path / "vault"
+ mem.vault_path.mkdir()
+ mem._init_db()
+
+ md_file = mem.vault_path / "test.md"
+ md_file.write_text("# Test\n\nThis is a test document.")
+ mem.index_file(md_file)
+
+ # Search with unicode query should not crash
+ results = mem.search("测试 查询 🌍")
+ assert isinstance(results, list)
+
+ def test_empty_vault_directory(self, tmp_path):
+ """Test operations on empty vault directory."""
+ mem = SemanticMemory()
+ mem.db_path = tmp_path / "empty.db"
+ mem.vault_path = tmp_path / "empty_vault"
+ mem.vault_path.mkdir()
+ mem._init_db()
+
+ # Index empty vault
+ count = mem.index_vault()
+ assert count == 0
+
+ # Search should return empty results
+ results = mem.search("anything")
+ assert results == []
+
+ # Stats should show zeros
+ stats = mem.stats()
+ assert stats["total_chunks"] == 0
+ assert stats["total_files"] == 0