forked from Rockachopa/Timmy-time-dashboard
## Thinking Engine Tests (#1314) - New: tests/timmy/test_thinking_engine.py — 117 tests across 21 test classes - Covers ThinkingEngine core + all 4 mixin classes: - engine.py: init, idle detection, store/retrieve, pruning, dedup, continuity, context assembly, novel thought generation, think_once, journal, broadcast - _distillation.py: should_distill, build_distill_prompt, parse_facts_response, filter_and_store_facts, maybe_distill - _issue_filing.py: references_real_files, get_recent_thoughts_for_issues, build_issue_classify_prompt, parse_issue_items, file_single_issue - _seeds_mixin.py: pick_seed_type, gather_seed, all seed sources, check_workspace - _snapshot.py: system snapshot, memory context, update_memory - _db.py: get_conn, row_to_thought, Thought dataclass - seeds.py: constants, prompt template, think tag regex - Targets 80%+ coverage of engine.py's 430 lines ## Stack Manifest (#986) - New: docs/stack_manifest.json — 8 categories, 40+ tools with pinned versions - LLM Inference, Coding Agents, Image Gen, Music/Voice, Orchestration, Nostr+Lightning+Bitcoin, Memory/KG, Streaming/Content - Schema: {tool, version, role, install_command, license, status} - New: src/timmy/stack_manifest.py — query_stack() runtime tool - Category and tool filtering (case-insensitive, partial match) - Manifest caching, graceful error handling - New: tests/timmy/test_stack_manifest.py — 24 tests - Registered query_stack in tool registry + tool catalog - Total: 141 new tests, all passing
1357 lines
53 KiB
Python
1357 lines
53 KiB
Python
"""Comprehensive tests for timmy.thinking.engine — ThinkingEngine class.
|
|
|
|
Targets 80%+ line coverage of engine.py (430 lines). Tests every public method,
|
|
key private methods, and all four mixin classes:
|
|
- engine.py: ThinkingEngine core
|
|
- _db.py: Thought dataclass, DB helpers
|
|
- _distillation.py: _DistillationMixin (fact extraction, memory monitoring)
|
|
- _issue_filing.py: _IssueFilingMixin (phantom file validation, issue creation)
|
|
- _seeds_mixin.py: _SeedsMixin (seed selection, context gathering)
|
|
- _snapshot.py: _SnapshotMixin (system snapshot, memory context)
|
|
|
|
Issue: #1314
|
|
"""
|
|
|
|
import sqlite3
|
|
import time
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_engine(tmp_path: Path):
|
|
"""Create a ThinkingEngine with an isolated temp DB."""
|
|
from timmy.thinking import ThinkingEngine
|
|
|
|
db_path = tmp_path / "thoughts.db"
|
|
return ThinkingEngine(db_path=db_path)
|
|
|
|
|
|
def _seed_thoughts(engine, n: int, seed_type: str = "freeform") -> list:
|
|
"""Quickly insert n thoughts and return them."""
|
|
thoughts = []
|
|
for i in range(n):
|
|
t = engine._store_thought(f"Thought #{i}: some content.", seed_type)
|
|
engine._last_thought_id = t.id
|
|
thoughts.append(t)
|
|
return thoughts
|
|
|
|
|
|
# ===========================================================================
|
|
# 1. ThinkingEngine.__init__ — chain continuity on restart
|
|
# ===========================================================================
|
|
|
|
|
|
class TestInit:
|
|
"""ThinkingEngine.__init__ — DB creation and chain resume."""
|
|
|
|
def test_init_creates_db(self, tmp_path):
|
|
"""__init__ should create the SQLite database file."""
|
|
_make_engine(tmp_path)
|
|
assert (tmp_path / "thoughts.db").exists()
|
|
|
|
def test_init_fresh_has_no_last_thought(self, tmp_path):
|
|
"""Fresh engine should have _last_thought_id = None."""
|
|
engine = _make_engine(tmp_path)
|
|
assert engine._last_thought_id is None
|
|
|
|
def test_init_resumes_chain_from_existing_thoughts(self, tmp_path):
|
|
"""If thoughts exist, __init__ should set _last_thought_id to the newest."""
|
|
from timmy.thinking._db import _get_conn
|
|
|
|
db_path = tmp_path / "thoughts.db"
|
|
# Pre-populate the DB before engine init
|
|
with _get_conn(db_path) as conn:
|
|
conn.execute(
|
|
"INSERT INTO thoughts (id, content, seed_type, parent_id, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
("abc-123", "pre-existing thought", "freeform", None, "2026-01-01T00:00:00"),
|
|
)
|
|
conn.commit()
|
|
|
|
from timmy.thinking import ThinkingEngine
|
|
|
|
engine = ThinkingEngine(db_path=db_path)
|
|
assert engine._last_thought_id == "abc-123"
|
|
|
|
def test_init_graceful_on_corrupt_db(self, tmp_path):
|
|
"""__init__ should handle DB errors gracefully (fresh start)."""
|
|
db_path = tmp_path / "thoughts.db"
|
|
db_path.write_text("not a valid sqlite database")
|
|
|
|
from timmy.thinking import ThinkingEngine
|
|
|
|
# Should not raise — just starts fresh
|
|
engine = ThinkingEngine(db_path=db_path)
|
|
assert engine._last_thought_id is None
|
|
|
|
|
|
# ===========================================================================
|
|
# 2. record_user_input and _is_idle
|
|
# ===========================================================================
|
|
|
|
|
|
class TestIdleDetection:
|
|
"""User activity tracking and idle timeout."""
|
|
|
|
def test_record_user_input_resets_timer(self, tmp_path):
|
|
"""record_user_input should update _last_input_time."""
|
|
engine = _make_engine(tmp_path)
|
|
old_time = engine._last_input_time
|
|
time.sleep(0.01)
|
|
engine.record_user_input()
|
|
assert engine._last_input_time > old_time
|
|
|
|
def test_is_idle_returns_false_after_recent_input(self, tmp_path):
|
|
"""Not idle immediately after input."""
|
|
engine = _make_engine(tmp_path)
|
|
engine.record_user_input()
|
|
with patch("timmy.thinking.engine.settings") as mock_settings:
|
|
mock_settings.thinking_idle_timeout_minutes = 5
|
|
assert engine._is_idle() is False
|
|
|
|
def test_is_idle_returns_true_after_timeout(self, tmp_path):
|
|
"""Idle after timeout has elapsed."""
|
|
engine = _make_engine(tmp_path)
|
|
# Simulate old input time
|
|
engine._last_input_time = datetime.now(UTC) - timedelta(minutes=60)
|
|
with patch("timmy.thinking.engine.settings") as mock_settings:
|
|
mock_settings.thinking_idle_timeout_minutes = 5
|
|
assert engine._is_idle() is True
|
|
|
|
def test_is_idle_disabled_when_timeout_zero(self, tmp_path):
|
|
"""When timeout is 0 or negative, idle detection is disabled."""
|
|
engine = _make_engine(tmp_path)
|
|
engine._last_input_time = datetime.now(UTC) - timedelta(hours=24)
|
|
with patch("timmy.thinking.engine.settings") as mock_settings:
|
|
mock_settings.thinking_idle_timeout_minutes = 0
|
|
assert engine._is_idle() is False
|
|
mock_settings.thinking_idle_timeout_minutes = -1
|
|
assert engine._is_idle() is False
|
|
|
|
|
|
# ===========================================================================
|
|
# 3. _store_thought — persistence
|
|
# ===========================================================================
|
|
|
|
|
|
class TestStoreThought:
|
|
"""_store_thought — SQLite persistence."""
|
|
|
|
def test_store_thought_assigns_uuid(self, tmp_path):
|
|
"""Stored thoughts get a UUID-format ID."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Hello world.", "creative")
|
|
assert len(t.id) == 36 # UUID format: 8-4-4-4-12
|
|
|
|
def test_store_thought_persists_to_sqlite(self, tmp_path):
|
|
"""Thoughts should be queryable directly from SQLite."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Persisted thought.", "swarm")
|
|
conn = sqlite3.connect(str(tmp_path / "thoughts.db"))
|
|
conn.row_factory = sqlite3.Row
|
|
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (t.id,)).fetchone()
|
|
conn.close()
|
|
assert row is not None
|
|
assert row["content"] == "Persisted thought."
|
|
assert row["seed_type"] == "swarm"
|
|
|
|
def test_store_thought_with_parent_id(self, tmp_path):
|
|
"""parent_id should be set from _last_thought_id."""
|
|
engine = _make_engine(tmp_path)
|
|
t1 = engine._store_thought("First.", "freeform")
|
|
engine._last_thought_id = t1.id
|
|
t2 = engine._store_thought("Second.", "freeform")
|
|
assert t2.parent_id == t1.id
|
|
|
|
def test_store_thought_with_custom_arrived_at(self, tmp_path):
|
|
"""arrived_at parameter should override the created_at timestamp."""
|
|
engine = _make_engine(tmp_path)
|
|
custom_ts = "2026-01-15T12:00:00+00:00"
|
|
t = engine._store_thought("Timestamped.", "memory", arrived_at=custom_ts)
|
|
assert t.created_at == custom_ts
|
|
|
|
def test_store_thought_default_timestamp(self, tmp_path):
|
|
"""Without arrived_at, created_at should be approximately now."""
|
|
engine = _make_engine(tmp_path)
|
|
before = datetime.now(UTC).isoformat()
|
|
t = engine._store_thought("Now-ish.", "freeform")
|
|
assert t.created_at >= before
|
|
|
|
|
|
# ===========================================================================
|
|
# 4. get_thought, get_recent_thoughts, count_thoughts, get_thought_chain
|
|
# ===========================================================================
|
|
|
|
|
|
class TestQueryMethods:
|
|
"""Read-path methods for thought retrieval."""
|
|
|
|
def test_get_thought_found(self, tmp_path):
|
|
"""get_thought returns the thought when it exists."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Find me.", "existential")
|
|
found = engine.get_thought(t.id)
|
|
assert found is not None
|
|
assert found.content == "Find me."
|
|
|
|
def test_get_thought_not_found(self, tmp_path):
|
|
"""get_thought returns None for unknown IDs."""
|
|
engine = _make_engine(tmp_path)
|
|
assert engine.get_thought("nonexistent") is None
|
|
|
|
def test_get_recent_thoughts_ordering(self, tmp_path):
|
|
"""get_recent_thoughts returns newest first."""
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 5)
|
|
recent = engine.get_recent_thoughts(limit=3)
|
|
assert len(recent) == 3
|
|
# Newest (Thought #4) should be first
|
|
assert "Thought #4" in recent[0].content
|
|
|
|
def test_count_thoughts_accuracy(self, tmp_path):
|
|
"""count_thoughts should match the number of stored thoughts."""
|
|
engine = _make_engine(tmp_path)
|
|
assert engine.count_thoughts() == 0
|
|
_seed_thoughts(engine, 7)
|
|
assert engine.count_thoughts() == 7
|
|
|
|
def test_get_thought_chain_max_depth(self, tmp_path):
|
|
"""get_thought_chain should respect max_depth parameter."""
|
|
engine = _make_engine(tmp_path)
|
|
thoughts = _seed_thoughts(engine, 10)
|
|
chain = engine.get_thought_chain(thoughts[-1].id, max_depth=3)
|
|
assert len(chain) == 3
|
|
|
|
def test_get_thought_chain_chronological_order(self, tmp_path):
|
|
"""Chain should be oldest-first (chronological)."""
|
|
engine = _make_engine(tmp_path)
|
|
thoughts = _seed_thoughts(engine, 4)
|
|
chain = engine.get_thought_chain(thoughts[-1].id)
|
|
assert chain[0].content == thoughts[0].content
|
|
assert chain[-1].content == thoughts[-1].content
|
|
|
|
|
|
# ===========================================================================
|
|
# 5. prune_old_thoughts
|
|
# ===========================================================================
|
|
|
|
|
|
class TestPruning:
|
|
"""prune_old_thoughts — retention policy enforcement."""
|
|
|
|
def test_prune_deletes_old_thoughts(self, tmp_path):
|
|
"""Thoughts older than keep_days should be deleted."""
|
|
engine = _make_engine(tmp_path)
|
|
# Insert old thoughts with old timestamps
|
|
from timmy.thinking._db import _get_conn
|
|
|
|
old_ts = (datetime.now(UTC) - timedelta(days=100)).isoformat()
|
|
with _get_conn(engine._db_path) as conn:
|
|
for i in range(10):
|
|
conn.execute(
|
|
"INSERT INTO thoughts (id, content, seed_type, parent_id, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(f"old-{i}", f"Old thought {i}", "freeform", None, old_ts),
|
|
)
|
|
conn.commit()
|
|
|
|
# Add recent thoughts
|
|
for i in range(5):
|
|
engine._store_thought(f"Recent {i}", "freeform")
|
|
|
|
assert engine.count_thoughts() == 15
|
|
deleted = engine.prune_old_thoughts(keep_days=90, keep_min=5)
|
|
assert deleted == 10
|
|
assert engine.count_thoughts() == 5
|
|
|
|
def test_prune_respects_keep_min(self, tmp_path):
|
|
"""Should not prune below keep_min even if thoughts are old."""
|
|
engine = _make_engine(tmp_path)
|
|
from timmy.thinking._db import _get_conn
|
|
|
|
old_ts = (datetime.now(UTC) - timedelta(days=200)).isoformat()
|
|
with _get_conn(engine._db_path) as conn:
|
|
for i in range(5):
|
|
conn.execute(
|
|
"INSERT INTO thoughts (id, content, seed_type, parent_id, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(f"old-{i}", f"Old thought {i}", "freeform", None, old_ts),
|
|
)
|
|
conn.commit()
|
|
|
|
# Total=5, keep_min=200 → should not prune at all
|
|
deleted = engine.prune_old_thoughts(keep_days=1, keep_min=200)
|
|
assert deleted == 0
|
|
assert engine.count_thoughts() == 5
|
|
|
|
def test_prune_returns_zero_when_nothing_to_prune(self, tmp_path):
|
|
"""No old thoughts → 0 deleted."""
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 3)
|
|
deleted = engine.prune_old_thoughts(keep_days=90, keep_min=1)
|
|
assert deleted == 0
|
|
|
|
def test_prune_graceful_on_empty_db(self, tmp_path):
|
|
"""Pruning an empty DB should return 0 without errors."""
|
|
engine = _make_engine(tmp_path)
|
|
assert engine.prune_old_thoughts() == 0
|
|
|
|
|
|
# ===========================================================================
|
|
# 6. _is_too_similar — deduplication
|
|
# ===========================================================================
|
|
|
|
|
|
class TestDeduplication:
|
|
"""Semantic deduplication checks."""
|
|
|
|
def test_identical_content_is_similar(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("The sky is blue.", "observation")
|
|
assert engine._is_too_similar("The sky is blue.", [t]) is True
|
|
|
|
def test_very_different_content_passes(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("The sky is blue.", "observation")
|
|
assert engine._is_too_similar(
|
|
"Quantum entanglement challenges our understanding of locality.", [t]
|
|
) is False
|
|
|
|
def test_empty_recent_list_always_passes(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
assert engine._is_too_similar("Anything at all.", []) is False
|
|
|
|
def test_case_insensitive_comparison(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("THE SKY IS BLUE.", "observation")
|
|
assert engine._is_too_similar("the sky is blue.", [t]) is True
|
|
|
|
def test_threshold_constant(self, tmp_path):
|
|
"""Verify the similarity threshold is set correctly."""
|
|
from timmy.thinking.engine import ThinkingEngine
|
|
|
|
assert ThinkingEngine._SIMILARITY_THRESHOLD == 0.6
|
|
assert ThinkingEngine._MAX_DEDUP_RETRIES == 2
|
|
|
|
|
|
# ===========================================================================
|
|
# 7. _build_continuity_context
|
|
# ===========================================================================
|
|
|
|
|
|
class TestContinuityContext:
|
|
"""Continuity context for anti-repetition."""
|
|
|
|
def test_first_thought_context(self, tmp_path):
|
|
"""Empty DB should return 'first thought' guidance."""
|
|
engine = _make_engine(tmp_path)
|
|
ctx = engine._build_continuity_context()
|
|
assert "first thought" in ctx.lower()
|
|
|
|
def test_includes_recent_thoughts(self, tmp_path):
|
|
"""Should include snippets from recent thoughts."""
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("Sovereignty is the foundation.", "sovereignty")
|
|
ctx = engine._build_continuity_context()
|
|
assert "Sovereignty" in ctx
|
|
|
|
def test_includes_seed_type_labels(self, tmp_path):
|
|
"""Each thought snippet should be labeled with its seed type."""
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("A creative musing.", "creative")
|
|
ctx = engine._build_continuity_context()
|
|
assert "[creative]" in ctx
|
|
|
|
def test_truncates_long_thoughts(self, tmp_path):
|
|
"""Thoughts longer than 100 chars should be truncated with ellipsis."""
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("X" * 200, "freeform")
|
|
ctx = engine._build_continuity_context()
|
|
assert "..." in ctx
|
|
assert "X" * 200 not in ctx
|
|
|
|
def test_anti_repetition_header(self, tmp_path):
|
|
"""Context should instruct model not to repeat themes."""
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("Something.", "freeform")
|
|
ctx = engine._build_continuity_context()
|
|
assert "do not repeat" in ctx.lower() or "new" in ctx.lower()
|
|
|
|
|
|
# ===========================================================================
|
|
# 8. _build_thinking_context
|
|
# ===========================================================================
|
|
|
|
|
|
class TestBuildThinkingContext:
|
|
"""Full context assembly for a thinking cycle."""
|
|
|
|
def test_returns_three_tuple(self, tmp_path):
|
|
"""Should return (memory_context, system_context, recent_thoughts)."""
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._build_thinking_context()
|
|
assert isinstance(result, tuple)
|
|
assert len(result) == 3
|
|
|
|
def test_recent_thoughts_are_thought_objects(self, tmp_path):
|
|
"""Third element should be a list of Thought objects."""
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 3)
|
|
_, _, recent = engine._build_thinking_context()
|
|
assert len(recent) == 3
|
|
from timmy.thinking._db import Thought
|
|
|
|
assert all(isinstance(t, Thought) for t in recent)
|
|
|
|
|
|
# ===========================================================================
|
|
# 9. _generate_novel_thought (async)
|
|
# ===========================================================================
|
|
|
|
|
|
class TestGenerateNovelThought:
|
|
"""The dedup-retry loop for producing novel thoughts."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_novel_thought(self, tmp_path):
|
|
"""Should return content when agent produces novel thought."""
|
|
engine = _make_engine(tmp_path)
|
|
with patch.object(engine, "_call_agent", return_value="A brand new idea."):
|
|
content, seed_type = await engine._generate_novel_thought(
|
|
None, "", "", []
|
|
)
|
|
assert content == "A brand new idea."
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_none_on_agent_failure(self, tmp_path):
|
|
"""Should return None when agent raises."""
|
|
engine = _make_engine(tmp_path)
|
|
with patch.object(engine, "_call_agent", side_effect=Exception("Ollama down")):
|
|
content, _ = await engine._generate_novel_thought(None, "", "", [])
|
|
assert content is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_none_on_empty_response(self, tmp_path):
|
|
"""Should return None when agent returns empty string."""
|
|
engine = _make_engine(tmp_path)
|
|
with patch.object(engine, "_call_agent", return_value=" "):
|
|
content, _ = await engine._generate_novel_thought(None, "", "", [])
|
|
assert content is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_retries_on_similar_then_succeeds(self, tmp_path):
|
|
"""Should retry when first attempt is too similar, succeed on novel retry."""
|
|
engine = _make_engine(tmp_path)
|
|
existing = engine._store_thought("The swarm is quiet.", "observation")
|
|
call_count = 0
|
|
|
|
def side_effect(prompt):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
return "The swarm is quiet today." # too similar
|
|
return "A completely different and novel insight."
|
|
|
|
with patch.object(engine, "_call_agent", side_effect=side_effect):
|
|
content, _ = await engine._generate_novel_thought(
|
|
None, "", "", [existing]
|
|
)
|
|
assert content is not None
|
|
assert "novel" in content.lower()
|
|
assert call_count == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_discards_after_max_retries(self, tmp_path):
|
|
"""Should return None after all retries produce similar content."""
|
|
engine = _make_engine(tmp_path)
|
|
existing = engine._store_thought("The swarm is quiet.", "observation")
|
|
|
|
with patch.object(
|
|
engine, "_call_agent", return_value="The swarm is quiet today."
|
|
):
|
|
content, _ = await engine._generate_novel_thought(
|
|
None, "", "", [existing]
|
|
)
|
|
assert content is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_prompted_mode_sets_seed_type(self, tmp_path):
|
|
"""When prompt is given, seed_type should be 'prompted'."""
|
|
engine = _make_engine(tmp_path)
|
|
with patch.object(engine, "_call_agent", return_value="Answer to prompt."):
|
|
content, seed_type = await engine._generate_novel_thought(
|
|
"What is sovereignty?", "", "", []
|
|
)
|
|
assert seed_type == "prompted"
|
|
assert content == "Answer to prompt."
|
|
|
|
|
|
# ===========================================================================
|
|
# 10. _process_thinking_result (async)
|
|
# ===========================================================================
|
|
|
|
|
|
class TestProcessThinkingResult:
|
|
"""Post-hooks after a thought is stored."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_calls_all_post_hooks(self, tmp_path):
|
|
"""Should call all six post-hooks in sequence."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Test thought.", "freeform")
|
|
|
|
with (
|
|
patch.object(engine, "_maybe_check_memory") as m1,
|
|
patch.object(engine, "_maybe_distill", new_callable=AsyncMock) as m2,
|
|
patch.object(engine, "_maybe_file_issues", new_callable=AsyncMock) as m3,
|
|
patch.object(engine, "_check_workspace", new_callable=AsyncMock) as m4,
|
|
patch.object(engine, "_maybe_check_memory_status") as m5,
|
|
patch.object(engine, "_update_memory") as m6,
|
|
patch.object(engine, "_log_event") as m7,
|
|
patch.object(engine, "_write_journal") as m8,
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock) as m9,
|
|
):
|
|
await engine._process_thinking_result(t)
|
|
|
|
m1.assert_called_once()
|
|
m2.assert_called_once()
|
|
m3.assert_called_once()
|
|
m4.assert_called_once()
|
|
m5.assert_called_once()
|
|
m6.assert_called_once_with(t)
|
|
m7.assert_called_once_with(t)
|
|
m8.assert_called_once_with(t)
|
|
m9.assert_called_once_with(t)
|
|
|
|
|
|
# ===========================================================================
|
|
# 11. think_once (async) — full integration
|
|
# ===========================================================================
|
|
|
|
|
|
class TestThinkOnce:
|
|
"""Full thinking cycle integration."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_none_when_disabled(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
with patch("timmy.thinking.engine.settings") as s:
|
|
s.thinking_enabled = False
|
|
result = await engine.think_once()
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_none_when_idle_no_prompt(self, tmp_path):
|
|
"""Should skip when idle and no prompt provided."""
|
|
engine = _make_engine(tmp_path)
|
|
engine._last_input_time = datetime.now(UTC) - timedelta(hours=2)
|
|
with patch("timmy.thinking.engine.settings") as s:
|
|
s.thinking_enabled = True
|
|
s.thinking_idle_timeout_minutes = 5
|
|
result = await engine.think_once()
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_prompt_overrides_idle_skip(self, tmp_path):
|
|
"""When prompt is provided, idle check should be bypassed."""
|
|
engine = _make_engine(tmp_path)
|
|
engine._last_input_time = datetime.now(UTC) - timedelta(hours=2)
|
|
with (
|
|
patch("timmy.thinking.engine.settings") as s,
|
|
patch.object(engine, "_call_agent", return_value="Prompted thought."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_write_journal"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
s.thinking_enabled = True
|
|
s.thinking_idle_timeout_minutes = 5
|
|
result = await engine.think_once(prompt="What matters most?")
|
|
assert result is not None
|
|
assert result.seed_type == "prompted"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stores_thought_and_updates_chain(self, tmp_path):
|
|
"""think_once should store thought and update _last_thought_id."""
|
|
engine = _make_engine(tmp_path)
|
|
with (
|
|
patch.object(engine, "_call_agent", return_value="A real thought."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_write_journal"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
t = await engine.think_once()
|
|
assert t is not None
|
|
assert engine._last_thought_id == t.id
|
|
assert engine.count_thoughts() == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_captures_arrival_timestamp(self, tmp_path):
|
|
"""Thought timestamp should reflect cycle start, not LLM finish."""
|
|
engine = _make_engine(tmp_path)
|
|
before = datetime.now(UTC).isoformat()
|
|
with (
|
|
patch.object(engine, "_call_agent", return_value="Timestamped."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_write_journal"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
t = await engine.think_once()
|
|
assert t.created_at >= before
|
|
|
|
|
|
# ===========================================================================
|
|
# 12. _log_event
|
|
# ===========================================================================
|
|
|
|
|
|
class TestLogEvent:
|
|
"""_log_event — swarm event logging."""
|
|
|
|
def test_log_event_calls_swarm(self, tmp_path):
|
|
"""Should call swarm event_log.log_event."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Logged thought.", "freeform")
|
|
|
|
mock_log = MagicMock()
|
|
with patch("swarm.event_log.log_event", mock_log):
|
|
engine._log_event(t)
|
|
# swarm.event_log is stubbed in conftest, so just verify no crash
|
|
# The actual call may or may not happen depending on stub behavior
|
|
|
|
def test_log_event_graceful_on_failure(self, tmp_path):
|
|
"""Should not crash if swarm event logging fails."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Graceful log.", "freeform")
|
|
with patch("swarm.event_log.log_event", side_effect=Exception("boom")):
|
|
engine._log_event(t) # Should not raise
|
|
|
|
|
|
# ===========================================================================
|
|
# 13. _write_journal
|
|
# ===========================================================================
|
|
|
|
|
|
class TestWriteJournal:
|
|
"""_write_journal — daily markdown journal."""
|
|
|
|
def test_creates_journal_directory(self, tmp_path):
|
|
"""Journal dir should be created on first write."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Journal entry.", "creative")
|
|
engine._write_journal(t)
|
|
journal_dir = tmp_path / "journal"
|
|
assert journal_dir.exists()
|
|
|
|
def test_creates_daily_journal_file(self, tmp_path):
|
|
"""Should create a YYYY-MM-DD.md file."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Daily note.", "memory")
|
|
engine._write_journal(t)
|
|
journal_files = list((tmp_path / "journal").glob("*.md"))
|
|
assert len(journal_files) == 1
|
|
assert journal_files[0].name.endswith(".md")
|
|
|
|
def test_appends_to_existing_journal(self, tmp_path):
|
|
"""Multiple thoughts on same day should append to same file."""
|
|
engine = _make_engine(tmp_path)
|
|
t1 = engine._store_thought("First entry.", "freeform")
|
|
t2 = engine._store_thought("Second entry.", "creative")
|
|
engine._write_journal(t1)
|
|
engine._write_journal(t2)
|
|
journal_files = list((tmp_path / "journal").glob("*.md"))
|
|
assert len(journal_files) == 1
|
|
content = journal_files[0].read_text()
|
|
assert "First entry." in content
|
|
assert "Second entry." in content
|
|
|
|
def test_journal_entry_format(self, tmp_path):
|
|
"""Entry should have '## TIME — seed_type' header and content."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Formatted entry.", "sovereignty")
|
|
engine._write_journal(t)
|
|
content = list((tmp_path / "journal").glob("*.md"))[0].read_text()
|
|
assert "## " in content
|
|
assert "sovereignty" in content
|
|
assert "Formatted entry." in content
|
|
assert "---" in content # separator
|
|
|
|
def test_journal_graceful_on_failure(self, tmp_path):
|
|
"""Should not crash on write errors."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Bad journal.", "freeform")
|
|
# Force an invalid created_at to trigger parse error
|
|
t = type(t)(id=t.id, content=t.content, seed_type=t.seed_type,
|
|
parent_id=t.parent_id, created_at="not-a-date")
|
|
engine._write_journal(t) # Should not raise
|
|
|
|
|
|
# ===========================================================================
|
|
# 14. _broadcast (async)
|
|
# ===========================================================================
|
|
|
|
|
|
class TestBroadcast:
|
|
"""_broadcast — WebSocket broadcasting."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_broadcast_sends_thought_data(self, tmp_path):
|
|
"""Should call ws_manager.broadcast with thought data."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Broadcast me.", "swarm")
|
|
|
|
mock_ws = AsyncMock()
|
|
with patch("infrastructure.ws_manager.handler.ws_manager", mock_ws):
|
|
await engine._broadcast(t)
|
|
# ws_manager is stubbed, verify graceful execution
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_broadcast_graceful_on_failure(self, tmp_path):
|
|
"""Should not crash when WebSocket broadcast fails."""
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Failed broadcast.", "freeform")
|
|
with patch(
|
|
"infrastructure.ws_manager.handler.ws_manager",
|
|
side_effect=Exception("ws down"),
|
|
):
|
|
await engine._broadcast(t) # Should not raise
|
|
|
|
|
|
# ===========================================================================
|
|
# 15. _DistillationMixin
|
|
# ===========================================================================
|
|
|
|
|
|
class TestDistillation:
|
|
"""Fact distillation from thoughts."""
|
|
|
|
def test_should_distill_at_interval(self, tmp_path):
|
|
"""_should_distill returns True when count is a multiple of interval."""
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 10)
|
|
with patch("timmy.thinking._distillation.settings") as s:
|
|
s.thinking_distill_every = 10
|
|
assert engine._should_distill() is True
|
|
|
|
def test_should_distill_not_at_interval(self, tmp_path):
|
|
"""_should_distill returns False between intervals."""
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 7)
|
|
with patch("timmy.thinking._distillation.settings") as s:
|
|
s.thinking_distill_every = 10
|
|
assert engine._should_distill() is False
|
|
|
|
def test_should_distill_disabled(self, tmp_path):
|
|
"""_should_distill returns False when interval is 0."""
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 10)
|
|
with patch("timmy.thinking._distillation.settings") as s:
|
|
s.thinking_distill_every = 0
|
|
assert engine._should_distill() is False
|
|
|
|
def test_build_distill_prompt_includes_thoughts(self, tmp_path):
|
|
"""Distill prompt should include thought content."""
|
|
engine = _make_engine(tmp_path)
|
|
thoughts = _seed_thoughts(engine, 3)
|
|
prompt = engine._build_distill_prompt(thoughts)
|
|
assert "Thought #0" in prompt
|
|
assert "JSON array" in prompt
|
|
|
|
def test_parse_facts_response_clean(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
assert engine._parse_facts_response('["fact1", "fact2"]') == ["fact1", "fact2"]
|
|
|
|
def test_parse_facts_response_empty(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
assert engine._parse_facts_response("[]") == []
|
|
assert engine._parse_facts_response("") == []
|
|
assert engine._parse_facts_response(None) == []
|
|
|
|
def test_parse_facts_response_with_prose(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._parse_facts_response('Here are facts:\n["fact1"]')
|
|
assert result == ["fact1"]
|
|
|
|
def test_parse_facts_response_markdown_fences(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._parse_facts_response('```json\n["fenced"]\n```')
|
|
assert result == ["fenced"]
|
|
|
|
def test_parse_facts_filters_non_strings(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._parse_facts_response('[42, "valid", null]')
|
|
assert result == ["valid"]
|
|
|
|
def test_filter_and_store_facts_blocks_sensitive(self, tmp_path):
|
|
"""Sensitive facts (containing 'token', 'password', etc.) should be blocked."""
|
|
import timmy.memory_system as _ms
|
|
|
|
engine = _make_engine(tmp_path)
|
|
_ms.memory_write = MagicMock()
|
|
try:
|
|
engine._filter_and_store_facts(["The API token is abc123"])
|
|
_ms.memory_write.assert_not_called()
|
|
finally:
|
|
del _ms.memory_write
|
|
|
|
def test_filter_and_store_facts_blocks_meta(self, tmp_path):
|
|
"""Meta-observations about thinking should be blocked."""
|
|
import timmy.memory_system as _ms
|
|
|
|
engine = _make_engine(tmp_path)
|
|
_ms.memory_write = MagicMock()
|
|
try:
|
|
engine._filter_and_store_facts(["my thinking process is recursive"])
|
|
_ms.memory_write.assert_not_called()
|
|
finally:
|
|
del _ms.memory_write
|
|
|
|
def test_filter_and_store_facts_stores_valid(self, tmp_path):
|
|
"""Valid facts should be stored via memory_write."""
|
|
import timmy.memory_system as _ms
|
|
|
|
engine = _make_engine(tmp_path)
|
|
_ms.memory_write = MagicMock(return_value="stored ok")
|
|
try:
|
|
engine._filter_and_store_facts(
|
|
["Alexander prefers YAML configuration over code changes"]
|
|
)
|
|
_ms.memory_write.assert_called_once()
|
|
finally:
|
|
del _ms.memory_write
|
|
|
|
def test_filter_and_store_facts_caps_at_three(self, tmp_path):
|
|
"""Should store at most 3 facts."""
|
|
import timmy.memory_system as _ms
|
|
|
|
engine = _make_engine(tmp_path)
|
|
_ms.memory_write = MagicMock(return_value="stored")
|
|
try:
|
|
engine._filter_and_store_facts([
|
|
"Fact one is valid and long enough.",
|
|
"Fact two is valid and long enough.",
|
|
"Fact three is valid and long enough.",
|
|
"Fact four should be skipped.",
|
|
"Fact five should be skipped.",
|
|
])
|
|
assert _ms.memory_write.call_count == 3
|
|
finally:
|
|
del _ms.memory_write
|
|
|
|
def test_filter_and_store_facts_skips_short(self, tmp_path):
|
|
"""Facts shorter than 10 chars should be skipped."""
|
|
import timmy.memory_system as _ms
|
|
|
|
engine = _make_engine(tmp_path)
|
|
_ms.memory_write = MagicMock()
|
|
try:
|
|
engine._filter_and_store_facts(["short"])
|
|
_ms.memory_write.assert_not_called()
|
|
finally:
|
|
del _ms.memory_write
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_maybe_distill_calls_agent(self, tmp_path):
|
|
"""_maybe_distill should call _call_agent at the right interval."""
|
|
import timmy.memory_system as _ms
|
|
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 10)
|
|
|
|
_ms.memory_write = MagicMock(return_value="ok")
|
|
try:
|
|
with (
|
|
patch("timmy.thinking._distillation.settings") as s,
|
|
patch.object(engine, "_call_agent", return_value='["Valid fact for distillation."]'),
|
|
):
|
|
s.thinking_distill_every = 10
|
|
await engine._maybe_distill()
|
|
finally:
|
|
del _ms.memory_write
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_maybe_distill_graceful_on_error(self, tmp_path):
|
|
"""_maybe_distill should not crash on errors."""
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 10)
|
|
|
|
with (
|
|
patch("timmy.thinking._distillation.settings") as s,
|
|
patch.object(engine, "_call_agent", side_effect=Exception("LLM down")),
|
|
):
|
|
s.thinking_distill_every = 10
|
|
await engine._maybe_distill() # Should not raise
|
|
|
|
|
|
# ===========================================================================
|
|
# 16. _IssueFilingMixin
|
|
# ===========================================================================
|
|
|
|
|
|
class TestIssueFiling:
|
|
"""Issue filing from thought analysis."""
|
|
|
|
def test_references_real_files_no_paths(self, tmp_path):
|
|
"""Text without file paths should pass."""
|
|
from timmy.thinking.engine import ThinkingEngine
|
|
|
|
assert ThinkingEngine._references_real_files("Pure prose about sovereignty.") is True
|
|
|
|
def test_references_real_files_blocks_phantom(self, tmp_path):
|
|
"""Non-existent files should be blocked."""
|
|
from timmy.thinking.engine import ThinkingEngine
|
|
|
|
assert ThinkingEngine._references_real_files(
|
|
"Fix src/timmy/quantum_brain.py for better reasoning."
|
|
) is False
|
|
|
|
def test_references_real_files_allows_real(self, tmp_path):
|
|
"""Real project files should pass validation."""
|
|
from timmy.thinking.engine import ThinkingEngine
|
|
|
|
# engine.py definitely exists
|
|
result = ThinkingEngine._references_real_files(
|
|
"The bug is in src/timmy/thinking/engine.py"
|
|
)
|
|
assert result is True
|
|
|
|
def test_get_recent_thoughts_for_issues_disabled(self, tmp_path):
|
|
"""Should return None when interval is 0."""
|
|
engine = _make_engine(tmp_path)
|
|
with patch("timmy.thinking._issue_filing.settings") as s:
|
|
s.thinking_issue_every = 0
|
|
assert engine._get_recent_thoughts_for_issues() is None
|
|
|
|
def test_get_recent_thoughts_for_issues_wrong_count(self, tmp_path):
|
|
"""Should return None when count is not a multiple of interval."""
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 7)
|
|
with patch("timmy.thinking._issue_filing.settings") as s:
|
|
s.thinking_issue_every = 10
|
|
assert engine._get_recent_thoughts_for_issues() is None
|
|
|
|
def test_get_recent_thoughts_for_issues_no_gitea(self, tmp_path):
|
|
"""Should return None when Gitea is not enabled."""
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 10)
|
|
with patch("timmy.thinking._issue_filing.settings") as s:
|
|
s.thinking_issue_every = 10
|
|
s.gitea_enabled = False
|
|
s.gitea_token = ""
|
|
assert engine._get_recent_thoughts_for_issues() is None
|
|
|
|
def test_build_issue_classify_prompt(self, tmp_path):
|
|
"""Classify prompt should include thought content and JSON instructions."""
|
|
engine = _make_engine(tmp_path)
|
|
thoughts = _seed_thoughts(engine, 3)
|
|
prompt = engine._build_issue_classify_prompt(thoughts)
|
|
assert "Thought #0" in prompt
|
|
assert "JSON array" in prompt
|
|
assert "category" in prompt
|
|
|
|
def test_parse_issue_items_valid(self, tmp_path):
|
|
"""Valid JSON should be parsed correctly."""
|
|
from timmy.thinking.engine import ThinkingEngine
|
|
|
|
items = ThinkingEngine._parse_issue_items(
|
|
'[{"title": "Bug fix", "body": "Details", "category": "bug"}]'
|
|
)
|
|
assert items is not None
|
|
assert len(items) == 1
|
|
assert items[0]["title"] == "Bug fix"
|
|
|
|
def test_parse_issue_items_empty(self, tmp_path):
|
|
from timmy.thinking.engine import ThinkingEngine
|
|
|
|
assert ThinkingEngine._parse_issue_items("[]") is None
|
|
assert ThinkingEngine._parse_issue_items("") is None
|
|
assert ThinkingEngine._parse_issue_items(None) is None
|
|
|
|
def test_parse_issue_items_markdown_fences(self, tmp_path):
|
|
from timmy.thinking.engine import ThinkingEngine
|
|
|
|
items = ThinkingEngine._parse_issue_items(
|
|
'```json\n[{"title": "Feature req", "body": "Add X", "category": "feature"}]\n```'
|
|
)
|
|
assert items is not None
|
|
assert len(items) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_single_issue_validates_title(self, tmp_path):
|
|
"""Short titles should be rejected."""
|
|
engine = _make_engine(tmp_path)
|
|
mock_fn = AsyncMock()
|
|
await engine._file_single_issue({"title": "short", "body": "x"}, mock_fn)
|
|
mock_fn.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_single_issue_checks_phantom_files(self, tmp_path):
|
|
"""Issues referencing phantom files should be skipped."""
|
|
engine = _make_engine(tmp_path)
|
|
mock_fn = AsyncMock()
|
|
await engine._file_single_issue(
|
|
{
|
|
"title": "[Bug] Fix src/timmy/quantum_brain.py crash",
|
|
"body": "The quantum brain module is failing.",
|
|
"category": "bug",
|
|
},
|
|
mock_fn,
|
|
)
|
|
mock_fn.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_single_issue_creates_issue(self, tmp_path):
|
|
"""Valid issue should call create function."""
|
|
engine = _make_engine(tmp_path)
|
|
mock_fn = AsyncMock(return_value="Created issue #42")
|
|
await engine._file_single_issue(
|
|
{
|
|
"title": "[Memory] MEMORY.md timestamp not updating correctly",
|
|
"body": "The last reflection timestamp is stale.",
|
|
"category": "bug",
|
|
},
|
|
mock_fn,
|
|
)
|
|
mock_fn.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_single_issue_skips_non_dict(self, tmp_path):
|
|
"""Non-dict items should be silently skipped."""
|
|
engine = _make_engine(tmp_path)
|
|
mock_fn = AsyncMock()
|
|
await engine._file_single_issue("not a dict", mock_fn)
|
|
mock_fn.assert_not_called()
|
|
|
|
|
|
# ===========================================================================
|
|
# 17. _SeedsMixin
|
|
# ===========================================================================
|
|
|
|
|
|
class TestSeedsMixin:
|
|
"""Seed type selection and context gathering."""
|
|
|
|
def test_pick_seed_type_valid(self, tmp_path):
|
|
"""Should always return a type from SEED_TYPES."""
|
|
from timmy.thinking.seeds import SEED_TYPES
|
|
|
|
engine = _make_engine(tmp_path)
|
|
for _ in range(30):
|
|
st = engine._pick_seed_type()
|
|
assert st in SEED_TYPES
|
|
|
|
def test_pick_seed_type_avoids_recent(self, tmp_path):
|
|
"""Should not pick types used in last 3 thoughts."""
|
|
engine = _make_engine(tmp_path)
|
|
for _ in range(3):
|
|
t = engine._store_thought("Existential musing.", "existential")
|
|
engine._last_thought_id = t.id
|
|
|
|
for _ in range(20):
|
|
assert engine._pick_seed_type() != "existential"
|
|
|
|
def test_gather_seed_returns_tuple(self, tmp_path):
|
|
"""Should return (seed_type, context_string)."""
|
|
engine = _make_engine(tmp_path)
|
|
st, ctx = engine._gather_seed()
|
|
assert isinstance(st, str)
|
|
assert isinstance(ctx, str)
|
|
|
|
def test_seed_from_swarm_graceful(self, tmp_path):
|
|
"""_seed_from_swarm should not crash."""
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._seed_from_swarm()
|
|
assert isinstance(result, str)
|
|
|
|
def test_seed_from_scripture_returns_string(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
assert isinstance(engine._seed_from_scripture(), str)
|
|
|
|
def test_seed_from_memory_graceful(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
assert isinstance(engine._seed_from_memory(), str)
|
|
|
|
def test_seed_from_observation_returns_string(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._seed_from_observation()
|
|
assert isinstance(result, str)
|
|
assert "Observation" in result or "prompt" in result.lower()
|
|
|
|
def test_seed_from_workspace_returns_string(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._seed_from_workspace()
|
|
assert isinstance(result, str)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_check_workspace_graceful(self, tmp_path):
|
|
"""_check_workspace should not crash when workspace module fails."""
|
|
engine = _make_engine(tmp_path)
|
|
await engine._check_workspace() # Should not raise
|
|
|
|
|
|
# ===========================================================================
|
|
# 18. _SnapshotMixin
|
|
# ===========================================================================
|
|
|
|
|
|
class TestSnapshotMixin:
|
|
"""System snapshot and memory context."""
|
|
|
|
def test_gather_system_snapshot_includes_time(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
snap = engine._gather_system_snapshot()
|
|
assert "AM" in snap or "PM" in snap
|
|
|
|
def test_snap_thought_count(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
_seed_thoughts(engine, 3)
|
|
result = engine._snap_thought_count(datetime.now().astimezone())
|
|
assert result is not None
|
|
assert "3" in result
|
|
|
|
def test_snap_chat_activity_graceful(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._snap_chat_activity()
|
|
assert isinstance(result, list)
|
|
|
|
def test_snap_task_queue_graceful(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._snap_task_queue()
|
|
# May return None or a string, both are valid
|
|
assert result is None or isinstance(result, str)
|
|
|
|
def test_snap_workspace_graceful(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._snap_workspace()
|
|
assert isinstance(result, list)
|
|
|
|
def test_load_memory_context_empty(self, tmp_path):
|
|
"""Should return empty string when no memory files exist."""
|
|
engine = _make_engine(tmp_path)
|
|
with (
|
|
patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", tmp_path / "no-memory.md"),
|
|
patch("timmy.thinking._snapshot.SOUL_PATH", tmp_path / "no-soul.md"),
|
|
):
|
|
result = engine._load_memory_context()
|
|
assert result == ""
|
|
|
|
def test_load_memory_context_with_files(self, tmp_path):
|
|
"""Should load and combine MEMORY.md and soul.md content."""
|
|
engine = _make_engine(tmp_path)
|
|
mem = tmp_path / "MEMORY.md"
|
|
mem.write_text("Hot memory content here.")
|
|
soul = tmp_path / "soul.md"
|
|
soul.write_text("Soul content here.")
|
|
|
|
with (
|
|
patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", mem),
|
|
patch("timmy.thinking._snapshot.SOUL_PATH", soul),
|
|
):
|
|
result = engine._load_memory_context()
|
|
assert "Hot memory content" in result
|
|
assert "Soul content" in result
|
|
|
|
def test_load_memory_context_truncates(self, tmp_path):
|
|
"""Content longer than 1500 chars should be truncated."""
|
|
engine = _make_engine(tmp_path)
|
|
mem = tmp_path / "MEMORY.md"
|
|
mem.write_text("A" * 2000)
|
|
|
|
with (
|
|
patch("timmy.thinking._snapshot.HOT_MEMORY_PATH", mem),
|
|
patch("timmy.thinking._snapshot.SOUL_PATH", tmp_path / "no-soul.md"),
|
|
):
|
|
result = engine._load_memory_context()
|
|
assert len(result) < 1600
|
|
assert "truncated" in result.lower()
|
|
|
|
def test_update_memory_graceful(self, tmp_path):
|
|
"""_update_memory should not crash on errors."""
|
|
import timmy.memory_system as _ms
|
|
|
|
engine = _make_engine(tmp_path)
|
|
t = engine._store_thought("Memory update test.", "freeform")
|
|
original = _ms.store_last_reflection
|
|
_ms.store_last_reflection = MagicMock(side_effect=Exception("fail"))
|
|
try:
|
|
engine._update_memory(t) # Should not raise
|
|
finally:
|
|
_ms.store_last_reflection = original
|
|
|
|
|
|
# ===========================================================================
|
|
# 19. _call_agent
|
|
# ===========================================================================
|
|
|
|
|
|
class TestCallAgent:
|
|
"""_call_agent — LLM invocation with caching and think-tag stripping."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_creates_agent_with_skip_mcp(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
mock_agent = AsyncMock()
|
|
mock_run = AsyncMock()
|
|
mock_run.content = "result"
|
|
mock_agent.arun.return_value = mock_run
|
|
|
|
with patch("timmy.agent.create_timmy", return_value=mock_agent) as factory:
|
|
await engine._call_agent("test")
|
|
factory.assert_called_once_with(skip_mcp=True)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_caches_agent_across_calls(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
mock_agent = AsyncMock()
|
|
mock_run = AsyncMock()
|
|
mock_run.content = "cached"
|
|
mock_agent.arun.return_value = mock_run
|
|
|
|
with patch("timmy.agent.create_timmy", return_value=mock_agent) as factory:
|
|
await engine._call_agent("call 1")
|
|
await engine._call_agent("call 2")
|
|
factory.assert_called_once() # Only created once
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_strips_think_tags(self, tmp_path):
|
|
engine = _make_engine(tmp_path)
|
|
mock_agent = AsyncMock()
|
|
mock_run = AsyncMock()
|
|
mock_run.content = "<think>reasoning</think>Clean output."
|
|
mock_agent.arun.return_value = mock_run
|
|
|
|
with patch("timmy.agent.create_timmy", return_value=mock_agent):
|
|
result = await engine._call_agent("prompt")
|
|
assert "<think>" not in result
|
|
assert result == "Clean output."
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_returns_empty(self, tmp_path):
|
|
"""Should return empty string on timeout."""
|
|
import asyncio
|
|
|
|
engine = _make_engine(tmp_path)
|
|
mock_agent = AsyncMock()
|
|
|
|
async def slow_call(*args, **kwargs):
|
|
await asyncio.sleep(999)
|
|
|
|
mock_agent.arun = slow_call
|
|
|
|
with (
|
|
patch("timmy.agent.create_timmy", return_value=mock_agent),
|
|
patch("asyncio.timeout", side_effect=TimeoutError),
|
|
):
|
|
result = await engine._call_agent("prompt")
|
|
assert result == ""
|
|
|
|
|
|
# ===========================================================================
|
|
# 20. _db.py helpers
|
|
# ===========================================================================
|
|
|
|
|
|
class TestDbHelpers:
|
|
"""Database utility functions."""
|
|
|
|
def test_get_conn_creates_table(self, tmp_path):
|
|
from timmy.thinking._db import _get_conn
|
|
|
|
db_path = tmp_path / "test.db"
|
|
with _get_conn(db_path) as conn:
|
|
tables = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='thoughts'"
|
|
).fetchall()
|
|
assert len(tables) == 1
|
|
|
|
def test_get_conn_creates_index(self, tmp_path):
|
|
from timmy.thinking._db import _get_conn
|
|
|
|
db_path = tmp_path / "test.db"
|
|
with _get_conn(db_path) as conn:
|
|
indexes = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' AND name='idx_thoughts_time'"
|
|
).fetchall()
|
|
assert len(indexes) == 1
|
|
|
|
def test_row_to_thought(self, tmp_path):
|
|
from timmy.thinking._db import Thought, _get_conn, _row_to_thought
|
|
|
|
db_path = tmp_path / "test.db"
|
|
with _get_conn(db_path) as conn:
|
|
conn.execute(
|
|
"INSERT INTO thoughts (id, content, seed_type, parent_id, created_at) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
("test-id", "test content", "freeform", None, "2026-01-01"),
|
|
)
|
|
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", ("test-id",)).fetchone()
|
|
t = _row_to_thought(row)
|
|
assert isinstance(t, Thought)
|
|
assert t.id == "test-id"
|
|
assert t.content == "test content"
|
|
assert t.parent_id is None
|
|
|
|
def test_thought_dataclass_fields(self):
|
|
from timmy.thinking._db import Thought
|
|
|
|
t = Thought(
|
|
id="abc", content="Hello", seed_type="creative",
|
|
parent_id="parent-1", created_at="2026-01-01"
|
|
)
|
|
assert t.id == "abc"
|
|
assert t.content == "Hello"
|
|
assert t.seed_type == "creative"
|
|
assert t.parent_id == "parent-1"
|
|
assert t.created_at == "2026-01-01"
|
|
|
|
|
|
# ===========================================================================
|
|
# 21. seeds.py constants
|
|
# ===========================================================================
|
|
|
|
|
|
class TestSeedsConstants:
|
|
"""Seed type constants and prompt template."""
|
|
|
|
def test_seed_types_tuple(self):
|
|
from timmy.thinking.seeds import SEED_TYPES
|
|
|
|
assert isinstance(SEED_TYPES, tuple)
|
|
assert len(SEED_TYPES) >= 8
|
|
assert "freeform" in SEED_TYPES
|
|
assert "sovereignty" in SEED_TYPES
|
|
|
|
def test_thinking_prompt_has_required_slots(self):
|
|
from timmy.thinking.seeds import _THINKING_PROMPT
|
|
|
|
assert "{memory_context}" in _THINKING_PROMPT
|
|
assert "{system_context}" in _THINKING_PROMPT
|
|
assert "{seed_context}" in _THINKING_PROMPT
|
|
assert "{continuity_context}" in _THINKING_PROMPT
|
|
|
|
def test_think_tag_regex(self):
|
|
from timmy.thinking.seeds import _THINK_TAG_RE
|
|
|
|
text = "<think>reasoning here</think>Clean output."
|
|
assert _THINK_TAG_RE.sub("", text) == "Clean output."
|
|
|
|
def test_sensitive_patterns_exist(self):
|
|
from timmy.thinking.seeds import _SENSITIVE_PATTERNS
|
|
|
|
assert "token" in _SENSITIVE_PATTERNS
|
|
assert "password" in _SENSITIVE_PATTERNS
|
|
|
|
def test_meta_observation_phrases_exist(self):
|
|
from timmy.thinking.seeds import _META_OBSERVATION_PHRASES
|
|
|
|
assert "my thinking" in _META_OBSERVATION_PHRASES
|