Integrates Ollama embedding for semantic indexing of research outputs. Refactors memory_search and memory_store tools to align with issue requirements. - Added and to . - Modified to use and for generating embeddings via Ollama, with a fallback to . - Renamed to in , adjusting its signature to . - Updated in to use as default and pass confidence scoring. - Created to demonstrate indexing of research documents. Fixes #976
819 lines
30 KiB
Python
819 lines
30 KiB
Python
"""Tests for timmy.semantic_memory — semantic search, chunking, indexing."""
|
|
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from timmy.memory_system import (
|
|
MemoryChunk,
|
|
MemorySearcher,
|
|
SemanticMemory,
|
|
_get_embedding_model,
|
|
_simple_hash_embedding,
|
|
cosine_similarity,
|
|
embed_text,
|
|
memory_forget,
|
|
memory_read,
|
|
memory_search,
|
|
memory_store,
|
|
)
|
|
|
|
|
|
class TestSimpleHashEmbedding:
|
|
"""Test the fallback hash-based embedding."""
|
|
|
|
def test_returns_list_of_floats(self):
|
|
vec = _simple_hash_embedding("hello world")
|
|
assert isinstance(vec, list)
|
|
assert len(vec) == 128
|
|
assert all(isinstance(x, float) for x in vec)
|
|
|
|
def test_deterministic(self):
|
|
a = _simple_hash_embedding("same text")
|
|
b = _simple_hash_embedding("same text")
|
|
assert a == b
|
|
|
|
def test_different_texts_differ(self):
|
|
a = _simple_hash_embedding("hello world")
|
|
b = _simple_hash_embedding("goodbye universe")
|
|
assert a != b
|
|
|
|
def test_normalized(self):
|
|
import math
|
|
|
|
vec = _simple_hash_embedding("test normalization")
|
|
magnitude = math.sqrt(sum(x * x for x in vec))
|
|
assert abs(magnitude - 1.0) < 0.01
|
|
|
|
def test_empty_string(self):
|
|
"""Test that empty string produces a valid normalized vector."""
|
|
vec = _simple_hash_embedding("")
|
|
assert isinstance(vec, list)
|
|
assert len(vec) == 128
|
|
# All zeros vector should still normalize (to zeros since magnitude stays 0)
|
|
assert all(isinstance(x, float) for x in vec)
|
|
|
|
def test_unicode_handling(self):
|
|
"""Test handling of unicode characters."""
|
|
vec = _simple_hash_embedding("Hello 世界 🌍 àáâãäå")
|
|
assert isinstance(vec, list)
|
|
assert len(vec) == 128
|
|
|
|
def test_special_characters(self):
|
|
"""Test handling of special characters and punctuation."""
|
|
text = "<script>alert('xss')</script> \\n\\t!@#$%^&*()"
|
|
vec = _simple_hash_embedding(text)
|
|
assert isinstance(vec, list)
|
|
assert len(vec) == 128
|
|
|
|
def test_very_long_text(self):
|
|
"""Test handling of text with many words (only first 50 words used)."""
|
|
text = "word " * 1000 # 1000 words
|
|
vec = _simple_hash_embedding(text)
|
|
assert isinstance(vec, list)
|
|
assert len(vec) == 128
|
|
|
|
def test_single_word(self):
|
|
"""Test handling of single word."""
|
|
vec = _simple_hash_embedding("test")
|
|
assert isinstance(vec, list)
|
|
assert len(vec) == 128
|
|
|
|
|
|
class TestEmbedText:
|
|
"""Test embed_text with fallback."""
|
|
|
|
def test_returns_embedding(self):
|
|
# TIMMY_SKIP_EMBEDDINGS=1 in conftest, so uses fallback
|
|
vec = embed_text("test text")
|
|
assert isinstance(vec, list)
|
|
assert len(vec) > 0
|
|
|
|
def test_consistency(self):
|
|
"""Test that same text produces same embedding."""
|
|
a = embed_text("consistent text")
|
|
b = embed_text("consistent text")
|
|
assert a == b
|
|
|
|
def test_different_texts(self):
|
|
"""Test that different texts produce different embeddings."""
|
|
a = embed_text("hello world")
|
|
b = embed_text("goodbye world")
|
|
assert a != b
|
|
|
|
def test_empty_text(self):
|
|
"""Test embedding empty text."""
|
|
vec = embed_text("")
|
|
assert isinstance(vec, list)
|
|
assert len(vec) == 128 # fallback dimension
|
|
|
|
|
|
class TestCosineSimilarity:
|
|
"""Test cosine_similarity function."""
|
|
|
|
def test_identical_vectors(self):
|
|
v = [1.0, 0.0, 0.0]
|
|
assert cosine_similarity(v, v) == pytest.approx(1.0)
|
|
|
|
def test_orthogonal_vectors(self):
|
|
a = [1.0, 0.0]
|
|
b = [0.0, 1.0]
|
|
assert cosine_similarity(a, b) == pytest.approx(0.0)
|
|
|
|
def test_opposite_vectors(self):
|
|
a = [1.0, 0.0]
|
|
b = [-1.0, 0.0]
|
|
assert cosine_similarity(a, b) == pytest.approx(-1.0)
|
|
|
|
def test_zero_vector(self):
|
|
a = [0.0, 0.0]
|
|
b = [1.0, 0.0]
|
|
assert cosine_similarity(a, b) == 0.0
|
|
|
|
def test_both_zero_vectors(self):
|
|
"""Test similarity when both vectors are zero."""
|
|
a = [0.0, 0.0, 0.0]
|
|
b = [0.0, 0.0, 0.0]
|
|
assert cosine_similarity(a, b) == 0.0
|
|
|
|
def test_partial_zero_vector(self):
|
|
"""Test similarity with partially zero vector."""
|
|
a = [1.0, 0.0, 1.0]
|
|
b = [0.0, 0.0, 0.0]
|
|
assert cosine_similarity(a, b) == 0.0
|
|
|
|
def test_different_lengths(self):
|
|
"""Test that different length vectors are handled gracefully."""
|
|
a = [1.0, 0.5, 0.25]
|
|
b = [1.0, 0.5] # shorter
|
|
# zip with strict=False handles different lengths
|
|
result = cosine_similarity(a, b)
|
|
assert isinstance(result, float)
|
|
|
|
|
|
class TestMemoryChunk:
|
|
"""Test MemoryChunk dataclass."""
|
|
|
|
def test_create(self):
|
|
chunk = MemoryChunk(
|
|
id="c1",
|
|
source="/path/to/file.md",
|
|
content="chunk text",
|
|
embedding=[0.1, 0.2],
|
|
created_at="2026-03-06",
|
|
)
|
|
assert chunk.id == "c1"
|
|
assert chunk.content == "chunk text"
|
|
|
|
def test_with_unicode_content(self):
|
|
"""Test MemoryChunk with unicode content."""
|
|
chunk = MemoryChunk(
|
|
id="c2",
|
|
source="/path/to/文件.md",
|
|
content="Unicode content: 你好世界 🎉",
|
|
embedding=[0.1, 0.2, 0.3],
|
|
created_at="2026-03-06T10:00:00",
|
|
)
|
|
assert "你好" in chunk.content
|
|
|
|
def test_equality(self):
|
|
"""Test that same values create equal objects."""
|
|
chunk1 = MemoryChunk(
|
|
id="c1", source="/a.md", content="text", embedding=[0.1], created_at="now"
|
|
)
|
|
chunk2 = MemoryChunk(
|
|
id="c1", source="/a.md", content="text", embedding=[0.1], created_at="now"
|
|
)
|
|
assert chunk1 == chunk2
|
|
|
|
|
|
class TestSemanticMemory:
|
|
"""Test SemanticMemory class."""
|
|
|
|
@pytest.fixture
|
|
def mem(self, tmp_path):
|
|
sm = SemanticMemory()
|
|
sm.db_path = tmp_path / "test_semantic.db"
|
|
sm.vault_path = tmp_path / "vault"
|
|
sm.vault_path.mkdir()
|
|
sm._init_db()
|
|
return sm
|
|
|
|
def test_init_creates_db(self, mem):
|
|
# After consolidation, _init_db ensures schema is ready
|
|
# The DB file is created lazily; verify by checking we can get a connection
|
|
|
|
mem._init_db()
|
|
# If we get here without error, the DB is initialized
|
|
assert mem.db_path.exists()
|
|
|
|
def test_split_into_chunks_short(self, mem):
|
|
text = "Short paragraph."
|
|
chunks = mem._split_into_chunks(text)
|
|
assert len(chunks) == 1
|
|
assert chunks[0] == "Short paragraph."
|
|
|
|
def test_split_into_chunks_multiple_paragraphs(self, mem):
|
|
text = "First paragraph.\n\nSecond paragraph.\n\nThird paragraph."
|
|
chunks = mem._split_into_chunks(text)
|
|
assert len(chunks) == 3
|
|
|
|
def test_split_into_chunks_long_paragraph(self, mem):
|
|
text = ". ".join([f"Sentence {i}" for i in range(50)])
|
|
chunks = mem._split_into_chunks(text, max_chunk_size=100)
|
|
assert len(chunks) > 1
|
|
|
|
def test_split_empty_text(self, mem):
|
|
assert mem._split_into_chunks("") == []
|
|
|
|
def test_split_whitespace_only(self, mem):
|
|
"""Test that whitespace-only text produces no chunks."""
|
|
assert mem._split_into_chunks(" \n\n \n") == []
|
|
|
|
def test_split_exact_chunk_boundary(self, mem):
|
|
"""Test splitting when text is exactly at chunk boundary."""
|
|
text = "A" * 500 # Exactly at default max_chunk_size
|
|
chunks = mem._split_into_chunks(text)
|
|
assert len(chunks) == 1
|
|
assert len(chunks[0]) == 500
|
|
|
|
def test_split_very_long_sentence(self, mem):
|
|
"""Test splitting text with no sentence boundaries."""
|
|
text = "A" * 2000 # One long word essentially
|
|
chunks = mem._split_into_chunks(text, max_chunk_size=100)
|
|
# Should still produce chunks
|
|
assert len(chunks) > 0
|
|
|
|
def test_index_file(self, mem):
|
|
md_file = mem.vault_path / "test.md"
|
|
md_file.write_text(
|
|
"# Title\n\nThis is a test document with enough content to index properly.\n\nAnother paragraph with more content here."
|
|
)
|
|
count = mem.index_file(md_file)
|
|
assert count > 0
|
|
|
|
def test_index_nonexistent_file(self, mem):
|
|
count = mem.index_file(Path("/nonexistent/file.md"))
|
|
assert count == 0
|
|
|
|
def test_index_file_skips_already_indexed(self, mem):
|
|
md_file = mem.vault_path / "cached.md"
|
|
md_file.write_text("# Cached\n\nContent that should only be indexed once if unchanged.")
|
|
count1 = mem.index_file(md_file)
|
|
count2 = mem.index_file(md_file)
|
|
assert count1 > 0
|
|
assert count2 == 0 # Already indexed, same hash
|
|
|
|
def test_index_file_updates_when_changed(self, mem):
|
|
"""Test that file is re-indexed when content changes."""
|
|
md_file = mem.vault_path / "changed.md"
|
|
md_file.write_text("# Original\n\nOriginal content here for indexing.")
|
|
count1 = mem.index_file(md_file)
|
|
|
|
# Change the file
|
|
md_file.write_text("# Updated\n\nUpdated content that is different.")
|
|
count2 = mem.index_file(md_file)
|
|
|
|
assert count1 > 0
|
|
assert count2 > 0 # Re-indexed because hash changed
|
|
|
|
def test_index_file_skips_tiny_chunks(self, mem):
|
|
"""Test that chunks under 20 characters are skipped (not stored in DB)."""
|
|
import sqlite3
|
|
|
|
md_file = mem.vault_path / "tiny.md"
|
|
# Create a paragraph that is definitely under 20 chars
|
|
md_file.write_text("Tiny") # Just 4 characters
|
|
mem.index_file(md_file)
|
|
|
|
# Check DB directly - tiny chunks should NOT be stored
|
|
# After consolidation: chunks are stored in 'memories' table with memory_type='vault_chunk'
|
|
conn = sqlite3.connect(str(mem.db_path))
|
|
cursor = conn.execute(
|
|
"SELECT COUNT(*) FROM memories WHERE source = ? AND memory_type = 'vault_chunk'",
|
|
(str(md_file),),
|
|
)
|
|
stored_count = cursor.fetchone()[0]
|
|
conn.close()
|
|
|
|
assert stored_count == 0 # "Tiny" was too short, nothing stored
|
|
|
|
def test_index_vault(self, mem):
|
|
(mem.vault_path / "a.md").write_text(
|
|
"# File A\n\nContent of file A with some meaningful text here."
|
|
)
|
|
(mem.vault_path / "b.md").write_text(
|
|
"# File B\n\nContent of file B with different meaningful text."
|
|
)
|
|
total = mem.index_vault()
|
|
assert total >= 2
|
|
|
|
def test_index_vault_skips_handoff(self, mem):
|
|
"""Verify handoff files are excluded from indexing."""
|
|
handoff = mem.vault_path / "last-session-handoff.md"
|
|
handoff.write_text("# Handoff\n\nThis should be skipped completely from indexing.")
|
|
real = mem.vault_path / "real.md"
|
|
real.write_text("# Real\n\nThis should be indexed with enough meaningful content.")
|
|
|
|
# index_file on the handoff file should NOT skip it
|
|
# (that's only index_vault logic), so test the vault logic directly
|
|
count = mem.index_file(handoff)
|
|
assert count > 0 # index_file indexes everything
|
|
|
|
# Wipe and re-test via index_vault
|
|
import sqlite3
|
|
|
|
conn = sqlite3.connect(str(mem.db_path))
|
|
# After consolidation: chunks are stored in 'memories' table with memory_type='vault_chunk'
|
|
conn.execute("DELETE FROM memories WHERE memory_type = 'vault_chunk'")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
mem.index_vault()
|
|
conn = sqlite3.connect(str(mem.db_path))
|
|
rows = conn.execute(
|
|
"SELECT DISTINCT source FROM memories WHERE memory_type = 'vault_chunk'"
|
|
).fetchall()
|
|
conn.close()
|
|
sources = [r[0] for r in rows]
|
|
# Only the real file should be indexed, not the handoff
|
|
assert any("real" in s for s in sources)
|
|
assert not any("last-session-handoff" in s for s in sources)
|
|
|
|
def test_index_vault_recursive(self, mem):
|
|
"""Test that index_vault finds files in subdirectories."""
|
|
subdir = mem.vault_path / "subdir" / "nested"
|
|
subdir.mkdir(parents=True)
|
|
(subdir / "deep.md").write_text(
|
|
"# Deep file\n\nThis file is nested deep in the directory structure."
|
|
)
|
|
total = mem.index_vault()
|
|
assert total > 0
|
|
|
|
def test_index_vault_no_markdown_files(self, mem):
|
|
"""Test index_vault when no markdown files exist."""
|
|
total = mem.index_vault()
|
|
assert total == 0
|
|
|
|
def test_search_returns_results(self, mem):
|
|
md = mem.vault_path / "searchable.md"
|
|
md.write_text(
|
|
"# Python\n\nPython is a programming language used for web development and data science."
|
|
)
|
|
mem.index_file(md)
|
|
|
|
results = mem.search("programming language")
|
|
assert len(results) > 0
|
|
# Each result is (content, score)
|
|
assert isinstance(results[0], tuple)
|
|
assert len(results[0]) == 2
|
|
|
|
def test_search_empty_db(self, mem):
|
|
results = mem.search("anything")
|
|
assert results == []
|
|
|
|
def test_search_returns_top_k(self, mem):
|
|
"""Test that search respects top_k parameter."""
|
|
# Create multiple files
|
|
for i in range(10):
|
|
md = mem.vault_path / f"file{i}.md"
|
|
md.write_text(f"# File {i}\n\nThis is content about topic number {i}.")
|
|
mem.index_file(md)
|
|
|
|
results = mem.search("topic", top_k=3)
|
|
assert len(results) <= 3
|
|
|
|
def test_get_relevant_context(self, mem):
|
|
md = mem.vault_path / "context.md"
|
|
md.write_text(
|
|
"# Important\n\nThis is very important information about the system architecture."
|
|
)
|
|
mem.index_file(md)
|
|
|
|
ctx = mem.get_relevant_context("architecture")
|
|
# May or may not match depending on hash-based similarity
|
|
assert isinstance(ctx, str)
|
|
|
|
def test_get_relevant_context_empty(self, mem):
|
|
assert mem.get_relevant_context("anything") == ""
|
|
|
|
def test_get_relevant_context_respects_max_chars(self, mem):
|
|
"""Test that get_relevant_context respects max_chars limit."""
|
|
# Create multiple files with content
|
|
for i in range(5):
|
|
md = mem.vault_path / f"ctx{i}.md"
|
|
md.write_text(f"# Context {i}\n\n" + "X" * 500)
|
|
mem.index_file(md)
|
|
|
|
ctx = mem.get_relevant_context("context", max_chars=200)
|
|
assert len(ctx) <= 200
|
|
|
|
def test_get_relevant_context_filters_by_score(self, mem):
|
|
"""Test that results below score threshold (0.3) are filtered."""
|
|
md = mem.vault_path / "low_score.md"
|
|
md.write_text("XYZ random unrelated content that should not match.")
|
|
mem.index_file(md)
|
|
|
|
ctx = mem.get_relevant_context("completely different topic about quantum physics")
|
|
# May be empty if score < 0.3
|
|
assert isinstance(ctx, str)
|
|
|
|
def test_stats(self, mem):
|
|
stats = mem.stats()
|
|
assert "total_chunks" in stats
|
|
assert "total_files" in stats
|
|
assert stats["total_chunks"] == 0
|
|
|
|
def test_stats_after_indexing(self, mem):
|
|
"""Test stats after adding content."""
|
|
md = mem.vault_path / "stats.md"
|
|
md.write_text(
|
|
"# Stats\n\nThis is paragraph one with enough content to be indexed properly.\n\n"
|
|
"This is paragraph two with also enough meaningful content text."
|
|
)
|
|
mem.index_file(md)
|
|
|
|
stats = mem.stats()
|
|
assert stats["total_chunks"] > 0
|
|
assert stats["total_files"] == 1
|
|
assert "embedding_dim" in stats
|
|
|
|
def test_stats_embedding_dim_fallback(self, mem):
|
|
"""Test that stats returns correct embedding dimension for fallback."""
|
|
stats = mem.stats()
|
|
# When using fallback (sentence-transformers not available)
|
|
assert stats["embedding_dim"] == 128
|
|
|
|
|
|
class TestMemorySearcher:
|
|
"""Test MemorySearcher high-level interface."""
|
|
|
|
@pytest.fixture
|
|
def searcher(self, tmp_path):
|
|
ms = MemorySearcher()
|
|
ms.semantic.db_path = tmp_path / "searcher.db"
|
|
ms.semantic.vault_path = tmp_path / "vault"
|
|
ms.semantic.vault_path.mkdir()
|
|
ms.semantic._init_db()
|
|
return ms
|
|
|
|
def test_search_semantic_tier(self, searcher):
|
|
results = searcher.search("test query", tiers=["semantic"])
|
|
assert "semantic" in results
|
|
|
|
def test_search_defaults_to_semantic(self, searcher):
|
|
results = searcher.search("test")
|
|
assert "semantic" in results
|
|
|
|
def test_get_context_for_query_empty(self, searcher):
|
|
ctx = searcher.get_context_for_query("test")
|
|
assert ctx == "" # Empty DB
|
|
|
|
def test_get_context_for_query_with_results(self, searcher):
|
|
"""Test get_context_for_query when there are results."""
|
|
md = searcher.semantic.vault_path / "context.md"
|
|
md.write_text("# System\n\nThe system architecture uses microservices for scalability.")
|
|
searcher.semantic.index_file(md)
|
|
|
|
ctx = searcher.get_context_for_query("architecture")
|
|
assert isinstance(ctx, str)
|
|
# Should either be empty or contain context header
|
|
assert ctx == "" or "Relevant Past Context" in ctx
|
|
|
|
|
|
class TestMemorySearch:
|
|
"""Test module-level memory_search function."""
|
|
|
|
def test_no_results(self):
|
|
result = memory_search("something obscure that won't match anything xyz123")
|
|
assert isinstance(result, str)
|
|
|
|
def test_none_top_k_handled(self):
|
|
result = memory_search("test", limit=None)
|
|
assert isinstance(result, str)
|
|
|
|
def test_basic_search_returns_string(self):
|
|
"""Test that memory_search returns a string result."""
|
|
result = memory_search("test query")
|
|
assert isinstance(result, str)
|
|
|
|
|
|
class TestMemoryRead:
|
|
"""Test module-level memory_read function."""
|
|
|
|
def test_memory_read_returns_string(self):
|
|
result = memory_read()
|
|
assert isinstance(result, str)
|
|
|
|
def test_memory_read_with_query(self):
|
|
result = memory_read("some query")
|
|
assert isinstance(result, str)
|
|
|
|
def test_memory_read_none_top_k(self):
|
|
result = memory_read("test", top_k=None)
|
|
assert isinstance(result, str)
|
|
|
|
def test_memory_read_empty_message(self):
|
|
"""Test that empty db returns appropriate message."""
|
|
result = memory_read()
|
|
# Should indicate no memories or return empty results
|
|
assert isinstance(result, str)
|
|
|
|
|
|
class TestMemoryStore:
|
|
"""Test module-level memory_store function."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def mock_vector_store(self):
|
|
"""Mock vector_store functions for memory_store tests."""
|
|
# Patch where it's imported from, not where it's used
|
|
with (
|
|
patch("timmy.memory_system.search_memories") as mock_search,
|
|
patch("timmy.memory_system.store_memory") as mock_store,
|
|
):
|
|
# Default: no existing memories (no duplicates)
|
|
mock_search.return_value = []
|
|
|
|
# Mock store_memory return value
|
|
mock_entry = MagicMock()
|
|
mock_entry.id = "test-id-12345"
|
|
mock_store.return_value = mock_entry
|
|
|
|
yield {"search": mock_search, "store": mock_store}
|
|
|
|
def test_memory_store_empty_report(self):
|
|
"""Test that empty report returns error message."""
|
|
result = memory_store(topic="test", report="")
|
|
assert "empty" in result.lower()
|
|
|
|
def test_memory_store_whitespace_only(self):
|
|
"""Test that whitespace-only report returns error."""
|
|
result = memory_store(topic="test", report=" \n\t ")
|
|
assert "empty" in result.lower()
|
|
|
|
def test_memory_store_valid_content(self, mock_vector_store):
|
|
"""Test writing valid content."""
|
|
result = memory_store(topic="fact about Timmy", report="Remember this important fact.")
|
|
assert "stored" in result.lower() or "memory" in result.lower()
|
|
mock_vector_store["store"].assert_called_once()
|
|
|
|
def test_memory_store_dedup_for_facts_or_research(self, mock_vector_store):
|
|
"""Test that duplicate facts or research are skipped."""
|
|
# Simulate existing similar fact
|
|
mock_entry = MagicMock()
|
|
mock_entry.id = "existing-id"
|
|
mock_vector_store["search"].return_value = [mock_entry]
|
|
|
|
# Test with 'fact'
|
|
result = memory_store(topic="Similar fact", report="Similar fact text", type="fact")
|
|
assert "similar" in result.lower() or "duplicate" in result.lower()
|
|
mock_vector_store["store"].assert_not_called()
|
|
|
|
mock_vector_store["store"].reset_mock()
|
|
# Test with 'research'
|
|
result = memory_store(topic="Similar research", report="Similar research content", type="research")
|
|
assert "similar" in result.lower() or "duplicate" in result.lower()
|
|
mock_vector_store["store"].assert_not_called()
|
|
|
|
def test_memory_store_no_dedup_for_conversation(self, mock_vector_store):
|
|
"""Test that conversation entries are not deduplicated."""
|
|
# Even with existing entries, conversations should be stored
|
|
mock_entry = MagicMock()
|
|
mock_entry.id = "existing-id"
|
|
mock_vector_store["search"].return_value = [mock_entry]
|
|
|
|
memory_store(topic="Conversation", report="Conversation text", type="conversation")
|
|
# Should still store (no duplicate check for non-fact)
|
|
mock_vector_store["store"].assert_called_once()
|
|
|
|
def test_memory_store_invalid_type_defaults_to_research(self, mock_vector_store):
|
|
"""Test that invalid type defaults to 'research'."""
|
|
memory_store(topic="Invalid type test", report="Some content", type="invalid_type")
|
|
# Should still succeed, using "research" as default
|
|
mock_vector_store["store"].assert_called_once()
|
|
call_kwargs = mock_vector_store["store"].call_args.kwargs
|
|
assert call_kwargs.get("context_type") == "research"
|
|
|
|
def test_memory_store_valid_types(self, mock_vector_store):
|
|
"""Test all valid context types."""
|
|
valid_types = ["fact", "conversation", "document", "research"]
|
|
for ctx_type in valid_types:
|
|
mock_vector_store["store"].reset_mock()
|
|
memory_store(topic=f"Topic for {ctx_type}", report=f"Content for {ctx_type}", type=ctx_type)
|
|
mock_vector_store["store"].assert_called_once()
|
|
|
|
def test_memory_store_strips_report_and_adds_topic(self, mock_vector_store):
|
|
"""Test that report is stripped of leading/trailing whitespace and combined with topic."""
|
|
memory_store(topic=" My Topic ", report=" padded content ")
|
|
call_kwargs = mock_vector_store["store"].call_args.kwargs
|
|
assert call_kwargs.get("content") == "Topic: My Topic\n\nReport: padded content"
|
|
assert call_kwargs.get("metadata") == {"topic": " My Topic "}
|
|
|
|
def test_memory_store_unicode_report(self, mock_vector_store):
|
|
"""Test writing unicode content."""
|
|
result = memory_store(topic="Unicode", report="Unicode content: 你好世界 🎉")
|
|
assert "stored" in result.lower() or "memory" in result.lower()
|
|
|
|
def test_memory_store_handles_exception(self, mock_vector_store):
|
|
"""Test handling of store_memory exceptions."""
|
|
mock_vector_store["store"].side_effect = Exception("DB error")
|
|
result = memory_store(topic="Failing", report="This will fail")
|
|
assert "failed" in result.lower() or "error" in result.lower()
|
|
|
|
|
|
class TestMemoryForget:
|
|
"""Test module-level memory_forget function."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def mock_vector_store(self):
|
|
"""Mock vector_store functions for memory_forget tests."""
|
|
# Patch where it's imported from, not where it's used
|
|
with (
|
|
patch("timmy.memory_system.search_memories") as mock_search,
|
|
patch("timmy.memory_system.delete_memory") as mock_delete,
|
|
):
|
|
# Default: no results
|
|
mock_search.return_value = []
|
|
mock_delete.return_value = True
|
|
|
|
yield {"search": mock_search, "delete": mock_delete}
|
|
|
|
def test_memory_forget_empty_query(self):
|
|
"""Test that empty query returns error message."""
|
|
result = memory_forget("")
|
|
assert "empty" in result.lower()
|
|
|
|
def test_memory_forget_whitespace_only(self):
|
|
"""Test that whitespace-only query returns error."""
|
|
result = memory_forget(" \n\t ")
|
|
assert "empty" in result.lower()
|
|
|
|
def test_memory_forget_no_matches(self, mock_vector_store):
|
|
"""Test when no memories match the query."""
|
|
mock_vector_store["search"].return_value = []
|
|
result = memory_forget("nonexistent query xyz123")
|
|
assert "no matching" in result.lower() or "not found" in result.lower()
|
|
|
|
def test_memory_forget_success(self, mock_vector_store):
|
|
"""Test successful deletion."""
|
|
mock_entry = MagicMock()
|
|
mock_entry.id = "entry-to-delete"
|
|
mock_entry.content = "Content to forget"
|
|
mock_entry.context_type = "fact"
|
|
mock_vector_store["search"].return_value = [mock_entry]
|
|
mock_vector_store["delete"].return_value = True
|
|
|
|
result = memory_forget("content to forget")
|
|
assert "forgotten" in result.lower() or "forgot" in result.lower()
|
|
mock_vector_store["delete"].assert_called_once_with("entry-to-delete")
|
|
|
|
def test_memory_forget_delete_fails(self, mock_vector_store):
|
|
"""Test when delete_memory returns False."""
|
|
mock_entry = MagicMock()
|
|
mock_entry.id = "entry-id"
|
|
mock_entry.content = "Content"
|
|
mock_entry.context_type = "fact"
|
|
mock_vector_store["search"].return_value = [mock_entry]
|
|
mock_vector_store["delete"].return_value = False
|
|
|
|
result = memory_forget("content")
|
|
# Should indicate the memory wasn't found or already deleted
|
|
assert "not found" in result.lower() or "already" in result.lower()
|
|
|
|
def test_memory_forget_strips_query(self, mock_vector_store):
|
|
"""Test that query is stripped of whitespace."""
|
|
mock_vector_store["search"].return_value = []
|
|
memory_forget(" padded query ")
|
|
# Check that search was called with stripped query
|
|
call_args = mock_vector_store["search"].call_args
|
|
assert call_args.args[0] == "padded query"
|
|
|
|
def test_memory_forget_handles_exception(self, mock_vector_store):
|
|
"""Test handling of exceptions during forget."""
|
|
mock_vector_store["search"].side_effect = Exception("DB error")
|
|
result = memory_forget("query")
|
|
assert "failed" in result.lower() or "error" in result.lower()
|
|
|
|
def test_memory_forget_uses_min_relevance(self, mock_vector_store):
|
|
"""Test that search uses min_relevance parameter."""
|
|
mock_vector_store["search"].return_value = []
|
|
memory_forget("test query")
|
|
call_kwargs = mock_vector_store["search"].call_args.kwargs
|
|
assert call_kwargs.get("min_relevance") == 0.3
|
|
assert call_kwargs.get("limit") == 3
|
|
|
|
|
|
class TestGetEmbeddingModel:
|
|
"""Test _get_embedding_model function."""
|
|
|
|
def test_returns_false_when_skip_embeddings(self):
|
|
"""Test that _get_embedding_model returns False when skip_embeddings is set."""
|
|
# conftest sets TIMMY_SKIP_EMBEDDINGS=1
|
|
model = _get_embedding_model()
|
|
assert model is False
|
|
|
|
def test_returns_model_when_available(self):
|
|
"""Test loading when sentence-transformers is available."""
|
|
# This is mocked in conftest, so model is not actually loaded
|
|
model = _get_embedding_model()
|
|
# Should be False because sentence_transformers is mocked
|
|
assert model is False
|
|
|
|
|
|
class TestEdgeCases:
|
|
"""Test various edge cases and boundary conditions."""
|
|
|
|
def test_semantic_memory_with_unicode_filepaths(self, tmp_path):
|
|
"""Test handling of unicode file paths."""
|
|
mem = SemanticMemory()
|
|
mem.db_path = tmp_path / "unicode.db"
|
|
mem.vault_path = tmp_path / "vault"
|
|
mem.vault_path.mkdir()
|
|
mem._init_db()
|
|
|
|
# Create file with unicode name
|
|
md_file = mem.vault_path / "文件_📝.md"
|
|
md_file.write_text(
|
|
"# Unicode filename\n\nThis is meaningful content for testing unicode paths."
|
|
)
|
|
count = mem.index_file(md_file)
|
|
assert count > 0
|
|
|
|
# Verify it can be searched
|
|
results = mem.search("content")
|
|
assert len(results) > 0
|
|
|
|
def test_semantic_memory_special_chars_in_content(self, tmp_path):
|
|
"""Test handling of special characters in content."""
|
|
mem = SemanticMemory()
|
|
mem.db_path = tmp_path / "special.db"
|
|
mem.vault_path = tmp_path / "vault"
|
|
mem.vault_path.mkdir()
|
|
mem._init_db()
|
|
|
|
md_file = mem.vault_path / "special.md"
|
|
content = """# Special Characters
|
|
|
|
<script>alert('xss')</script>
|
|
SQL: SELECT * FROM users WHERE name = "admin' OR '1'='1"
|
|
JSON: {"key": "value", "nested": {"array": [1, 2, 3]}}
|
|
Unicode: 你好世界 🌍 café naïve
|
|
Escapes: \\n \\t \\r
|
|
"""
|
|
md_file.write_text(content)
|
|
count = mem.index_file(md_file)
|
|
assert count > 0
|
|
|
|
def test_very_long_file_content(self, tmp_path):
|
|
"""Test handling of very long file content."""
|
|
mem = SemanticMemory()
|
|
mem.db_path = tmp_path / "long.db"
|
|
mem.vault_path = tmp_path / "vault"
|
|
mem.vault_path.mkdir()
|
|
mem._init_db()
|
|
|
|
md_file = mem.vault_path / "long.md"
|
|
# Create content with many paragraphs
|
|
paragraphs = [f"Paragraph {i} with some content text here." for i in range(100)]
|
|
md_file.write_text("# Long doc\n\n" + "\n\n".join(paragraphs))
|
|
count = mem.index_file(md_file)
|
|
assert count > 0
|
|
|
|
def test_search_with_unicode_query(self, tmp_path):
|
|
"""Test search with unicode query."""
|
|
mem = SemanticMemory()
|
|
mem.db_path = tmp_path / "unicode_query.db"
|
|
mem.vault_path = tmp_path / "vault"
|
|
mem.vault_path.mkdir()
|
|
mem._init_db()
|
|
|
|
md_file = mem.vault_path / "test.md"
|
|
md_file.write_text("# Test\n\nThis is a test document.")
|
|
mem.index_file(md_file)
|
|
|
|
# Search with unicode query should not crash
|
|
results = mem.search("测试 查询 🌍")
|
|
assert isinstance(results, list)
|
|
|
|
def test_empty_vault_directory(self, tmp_path):
|
|
"""Test operations on empty vault directory."""
|
|
mem = SemanticMemory()
|
|
mem.db_path = tmp_path / "empty.db"
|
|
mem.vault_path = tmp_path / "empty_vault"
|
|
mem.vault_path.mkdir()
|
|
mem._init_db()
|
|
|
|
# Index empty vault
|
|
count = mem.index_vault()
|
|
assert count == 0
|
|
|
|
# Search should return empty results
|
|
results = mem.search("anything")
|
|
assert results == []
|
|
|
|
# Stats should show zeros
|
|
stats = mem.stats()
|
|
assert stats["total_chunks"] == 0
|
|
assert stats["total_files"] == 0
|