forked from Rockachopa/Timmy-time-dashboard
Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
329 lines
12 KiB
Python
329 lines
12 KiB
Python
"""Tests for timmy.memory_system — Memory system context injection."""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from timmy.memory_system import (
|
|
HotMemory,
|
|
MemorySystem,
|
|
jot_note,
|
|
log_decision,
|
|
reset_memory_system,
|
|
store_last_reflection,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_singleton():
|
|
"""Reset the memory system singleton before each test."""
|
|
reset_memory_system()
|
|
yield
|
|
reset_memory_system()
|
|
|
|
|
|
class TestGetSystemContext:
|
|
"""Tests for get_system_context() soul injection."""
|
|
|
|
def test_includes_soul_content_when_exists(self, tmp_path):
|
|
"""get_system_context() includes soul content when soul.md exists."""
|
|
# Create temp soul.md
|
|
soul_dir = tmp_path / "memory" / "self"
|
|
soul_dir.mkdir(parents=True)
|
|
soul_md = soul_dir / "soul.md"
|
|
soul_md.write_text("# Soul\n\nI am Timmy. Unique-soul-marker.\n")
|
|
|
|
# Create temp MEMORY.md
|
|
memory_md = tmp_path / "MEMORY.md"
|
|
memory_md.write_text("# Timmy Hot Memory\n\n## Current Status\n\nOperational\n")
|
|
|
|
with (
|
|
patch("timmy.memory_system.SOUL_PATH", soul_md),
|
|
patch("timmy.memory_system.HOT_MEMORY_PATH", memory_md),
|
|
):
|
|
memory = MemorySystem()
|
|
context = memory.get_system_context()
|
|
|
|
assert "## Soul Identity" in context
|
|
assert "Unique-soul-marker" in context
|
|
|
|
def test_works_without_soul_file(self, tmp_path):
|
|
"""get_system_context() works fine when soul.md is missing."""
|
|
# Point to non-existent soul.md
|
|
nonexistent_soul = tmp_path / "memory" / "self" / "soul.md"
|
|
|
|
# Create temp MEMORY.md
|
|
memory_md = tmp_path / "MEMORY.md"
|
|
memory_md.write_text("# Timmy Hot Memory\n\n## Current Status\n\nOperational\n")
|
|
|
|
with (
|
|
patch("timmy.memory_system.SOUL_PATH", nonexistent_soul),
|
|
patch("timmy.memory_system.HOT_MEMORY_PATH", memory_md),
|
|
):
|
|
memory = MemorySystem()
|
|
context = memory.get_system_context()
|
|
|
|
# Should not contain soul section but should still have hot memory
|
|
assert "## Soul Identity" not in context
|
|
assert "## Hot Memory" in context
|
|
assert "Operational" in context
|
|
|
|
def test_soul_comes_first_in_context(self, tmp_path):
|
|
"""Soul identity should be the first section in context."""
|
|
# Create temp soul.md
|
|
soul_dir = tmp_path / "memory" / "self"
|
|
soul_dir.mkdir(parents=True)
|
|
soul_md = soul_dir / "soul.md"
|
|
soul_md.write_text("# Soul\n\nI am Timmy.\n")
|
|
|
|
# Create temp MEMORY.md
|
|
memory_md = tmp_path / "MEMORY.md"
|
|
memory_md.write_text("# Timmy Hot Memory\n\n## Current Status\n\nOperational\n")
|
|
|
|
with (
|
|
patch("timmy.memory_system.SOUL_PATH", soul_md),
|
|
patch("timmy.memory_system.HOT_MEMORY_PATH", memory_md),
|
|
):
|
|
memory = MemorySystem()
|
|
context = memory.get_system_context()
|
|
|
|
# Soul should appear before hot memory
|
|
soul_pos = context.find("## Soul Identity")
|
|
hot_pos = context.find("## Hot Memory")
|
|
assert soul_pos < hot_pos, "Soul Identity should come before Hot Memory"
|
|
|
|
|
|
class TestReadSoul:
|
|
"""Tests for read_soul() method."""
|
|
|
|
def test_read_soul_returns_content_when_exists(self, tmp_path):
|
|
"""read_soul() returns content when soul.md exists."""
|
|
soul_dir = tmp_path / "memory" / "self"
|
|
soul_dir.mkdir(parents=True)
|
|
soul_md = soul_dir / "soul.md"
|
|
soul_md.write_text("# Soul\n\nTest content.\n")
|
|
|
|
with patch("timmy.memory_system.SOUL_PATH", soul_md):
|
|
memory = MemorySystem()
|
|
content = memory.read_soul()
|
|
|
|
assert "Test content" in content
|
|
|
|
def test_read_soul_returns_empty_when_missing(self, tmp_path):
|
|
"""read_soul() returns empty string when soul.md doesn't exist."""
|
|
nonexistent = tmp_path / "no_such_soul.md"
|
|
|
|
with patch("timmy.memory_system.SOUL_PATH", nonexistent):
|
|
memory = MemorySystem()
|
|
content = memory.read_soul()
|
|
|
|
assert content == ""
|
|
|
|
|
|
class TestHotMemoryCorruptionGuard:
|
|
"""Tests for MEMORY.md corruption prevention (#252)."""
|
|
|
|
def test_update_section_no_truncation_without_prune_marker(self, tmp_path):
|
|
"""update_section must not corrupt content when *Prune date:* marker is absent.
|
|
|
|
Regression test: rfind("*Prune date:") returning -1 caused
|
|
full_content[:-1] to chop the last character.
|
|
"""
|
|
mem_file = tmp_path / "MEMORY.md"
|
|
mem_file.write_text("# Timmy Hot Memory\n\n## Known Facts\n\n- fact one\n- fact two\n")
|
|
|
|
hot = HotMemory()
|
|
hot.path = mem_file
|
|
|
|
# Patch read() to return file content (simulating DB unavailable)
|
|
with patch.object(hot, "read", return_value=mem_file.read_text()):
|
|
hot.update_section("Last Reflection", "**Time:** 2026-03-15\n**Seed:** freeform")
|
|
|
|
result = mem_file.read_text()
|
|
# Must NOT have any truncation — all original content intact
|
|
assert "- fact one" in result
|
|
assert "- fact two" in result
|
|
assert "## Last Reflection" in result
|
|
assert "freeform" in result
|
|
# The old bug: last char would get sliced off
|
|
assert result.count("fact tw") == 1 # "fact two" not "fact tw"
|
|
|
|
def test_update_section_replaces_existing_section(self, tmp_path):
|
|
"""update_section correctly replaces an existing section."""
|
|
content = (
|
|
"# Timmy Hot Memory\n\n"
|
|
"## Known Facts\n\n- fact one\n\n"
|
|
"## Last Reflection\n\nold reflection\n\n"
|
|
)
|
|
mem_file = tmp_path / "MEMORY.md"
|
|
mem_file.write_text(content)
|
|
|
|
hot = HotMemory()
|
|
hot.path = mem_file
|
|
|
|
with patch.object(hot, "read", return_value=mem_file.read_text()):
|
|
hot.update_section("Last Reflection", "new reflection")
|
|
|
|
result = mem_file.read_text()
|
|
assert "new reflection" in result
|
|
assert "old reflection" not in result
|
|
assert "- fact one" in result
|
|
|
|
def test_update_section_rejects_empty_content(self, tmp_path):
|
|
"""update_section refuses to write empty content."""
|
|
mem_file = tmp_path / "MEMORY.md"
|
|
mem_file.write_text("# Timmy Hot Memory\n\noriginal\n")
|
|
|
|
hot = HotMemory()
|
|
hot.path = mem_file
|
|
hot.update_section("Test", "")
|
|
|
|
# File should be unchanged
|
|
assert mem_file.read_text() == "# Timmy Hot Memory\n\noriginal\n"
|
|
|
|
def test_update_section_truncates_oversized_content(self, tmp_path):
|
|
"""update_section caps content at 2000 chars to prevent bloat."""
|
|
mem_file = tmp_path / "MEMORY.md"
|
|
mem_file.write_text("# Timmy Hot Memory\n\n")
|
|
|
|
hot = HotMemory()
|
|
hot.path = mem_file
|
|
|
|
huge = "x" * 3000
|
|
with patch.object(hot, "read", return_value=mem_file.read_text()):
|
|
hot.update_section("Huge", huge)
|
|
|
|
result = mem_file.read_text()
|
|
assert "[truncated]" in result
|
|
|
|
|
|
class TestStoreLastReflection:
|
|
"""Tests for store_last_reflection / recall_last_reflection (#252)."""
|
|
|
|
def test_store_and_recall_reflection(self, tmp_path):
|
|
"""Reflection round-trips through the database."""
|
|
db_path = tmp_path / "test_memory.db"
|
|
|
|
with patch("timmy.memory_system.DB_PATH", db_path):
|
|
from timmy.memory_system import recall_last_reflection
|
|
|
|
store_last_reflection("**Time:** 2026-03-15\n**Seed:** freeform\n**Thought:** test")
|
|
result = recall_last_reflection()
|
|
|
|
assert result is not None
|
|
assert "freeform" in result
|
|
assert "test" in result
|
|
|
|
def test_store_replaces_previous_reflection(self, tmp_path):
|
|
"""Only the latest reflection is kept — no accumulation."""
|
|
db_path = tmp_path / "test_memory.db"
|
|
|
|
with patch("timmy.memory_system.DB_PATH", db_path):
|
|
from timmy.memory_system import recall_last_reflection
|
|
|
|
store_last_reflection("first reflection")
|
|
store_last_reflection("second reflection")
|
|
result = recall_last_reflection()
|
|
|
|
assert "second reflection" in result
|
|
# DB should only have one reflection row
|
|
import sqlite3
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
count = conn.execute(
|
|
"SELECT COUNT(*) FROM memories WHERE memory_type = 'reflection'"
|
|
).fetchone()[0]
|
|
conn.close()
|
|
assert count == 1
|
|
|
|
def test_store_empty_reflection_is_noop(self, tmp_path):
|
|
"""Empty reflections are silently ignored."""
|
|
db_path = tmp_path / "test_memory.db"
|
|
|
|
with patch("timmy.memory_system.DB_PATH", db_path):
|
|
from timmy.memory_system import recall_last_reflection
|
|
|
|
store_last_reflection("")
|
|
store_last_reflection(" ")
|
|
result = recall_last_reflection()
|
|
|
|
assert result is None
|
|
|
|
|
|
class TestJotNote:
|
|
"""Tests for jot_note() artifact tool."""
|
|
|
|
def test_saves_note_file(self, tmp_path):
|
|
"""jot_note creates a markdown file with title and body."""
|
|
notes_dir = tmp_path / "notes"
|
|
with patch("timmy.memory_system.NOTES_DIR", notes_dir):
|
|
result = jot_note("My Title", "Some body text")
|
|
|
|
assert "Note saved:" in result
|
|
files = list(notes_dir.glob("*.md"))
|
|
assert len(files) == 1
|
|
content = files[0].read_text()
|
|
assert "# My Title" in content
|
|
assert "Some body text" in content
|
|
assert "Created:" in content
|
|
|
|
def test_slug_in_filename(self, tmp_path):
|
|
"""Filename contains a slug derived from the title."""
|
|
notes_dir = tmp_path / "notes"
|
|
with patch("timmy.memory_system.NOTES_DIR", notes_dir):
|
|
jot_note("Hello World!", "body")
|
|
|
|
files = list(notes_dir.glob("*.md"))
|
|
assert "hello-world" in files[0].name
|
|
|
|
def test_rejects_empty_title(self):
|
|
"""jot_note rejects empty title."""
|
|
assert "title is empty" in jot_note("", "body")
|
|
assert "title is empty" in jot_note(" ", "body")
|
|
|
|
def test_rejects_empty_body(self):
|
|
"""jot_note rejects empty body."""
|
|
assert "body is empty" in jot_note("title", "")
|
|
assert "body is empty" in jot_note("title", " ")
|
|
|
|
|
|
class TestLogDecision:
|
|
"""Tests for log_decision() artifact tool."""
|
|
|
|
def test_creates_decision_log(self, tmp_path):
|
|
"""log_decision creates the log file and appends an entry."""
|
|
log_file = tmp_path / "decisions.md"
|
|
with patch("timmy.memory_system.DECISION_LOG", log_file):
|
|
result = log_decision("Use SQLite for storage")
|
|
|
|
assert "Decision logged:" in result
|
|
content = log_file.read_text()
|
|
assert "# Decision Log" in content
|
|
assert "Use SQLite for storage" in content
|
|
|
|
def test_appends_multiple_decisions(self, tmp_path):
|
|
"""Multiple decisions are appended to the same file."""
|
|
log_file = tmp_path / "decisions.md"
|
|
with patch("timmy.memory_system.DECISION_LOG", log_file):
|
|
log_decision("First decision")
|
|
log_decision("Second decision")
|
|
|
|
content = log_file.read_text()
|
|
assert "First decision" in content
|
|
assert "Second decision" in content
|
|
|
|
def test_includes_rationale(self, tmp_path):
|
|
"""Rationale is included when provided."""
|
|
log_file = tmp_path / "decisions.md"
|
|
with patch("timmy.memory_system.DECISION_LOG", log_file):
|
|
log_decision("Use Redis", "Fast in-memory cache")
|
|
|
|
content = log_file.read_text()
|
|
assert "Use Redis" in content
|
|
assert "Fast in-memory cache" in content
|
|
|
|
def test_rejects_empty_decision(self):
|
|
"""log_decision rejects empty decision string."""
|
|
assert "decision is empty" in log_decision("")
|
|
assert "decision is empty" in log_decision(" ")
|