The _SENSITIVE_PATTERNS list used simple substring matching, so "token" matched "max_tokens", causing the distillation pipeline to block facts about max_tokens parameters. Replaced with compiled regexes using lookaround assertions so compound terms like max_tokens and num_tokens are no longer falsely flagged. Fixes #625 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1230 lines
42 KiB
Python
1230 lines
42 KiB
Python
"""Tests for timmy.thinking — Timmy's default background thinking engine."""
|
|
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_engine(tmp_path: Path):
|
|
"""Create a ThinkingEngine with an isolated temp DB."""
|
|
from timmy.thinking import ThinkingEngine
|
|
|
|
db_path = tmp_path / "thoughts.db"
|
|
return ThinkingEngine(db_path=db_path)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_thinking_config_defaults():
|
|
"""Settings should expose thinking_enabled and thinking_interval_seconds."""
|
|
from config import Settings
|
|
|
|
s = Settings()
|
|
assert s.thinking_enabled is True
|
|
assert s.thinking_interval_seconds == 300
|
|
|
|
|
|
def test_thinking_config_override():
|
|
"""thinking settings can be overridden via env."""
|
|
s = _settings_with(thinking_enabled=False, thinking_interval_seconds=60)
|
|
assert s.thinking_enabled is False
|
|
assert s.thinking_interval_seconds == 60
|
|
|
|
|
|
def _settings_with(**kwargs):
|
|
from config import Settings
|
|
|
|
return Settings(**kwargs)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ThinkingEngine init
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_engine_init_creates_table(tmp_path):
|
|
"""ThinkingEngine should create the thoughts SQLite table on init."""
|
|
_make_engine(tmp_path)
|
|
db_path = tmp_path / "thoughts.db"
|
|
assert db_path.exists()
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
tables = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='thoughts'"
|
|
).fetchall()
|
|
conn.close()
|
|
assert len(tables) == 1
|
|
|
|
|
|
def test_engine_init_empty(tmp_path):
|
|
"""Fresh engine should have no thoughts."""
|
|
engine = _make_engine(tmp_path)
|
|
assert engine.count_thoughts() == 0
|
|
assert engine.get_recent_thoughts() == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Store and retrieve
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_store_and_retrieve_thought(tmp_path):
|
|
"""Storing a thought should make it retrievable."""
|
|
engine = _make_engine(tmp_path)
|
|
thought = engine._store_thought("I think therefore I am.", "existential")
|
|
|
|
assert thought.id is not None
|
|
assert thought.content == "I think therefore I am."
|
|
assert thought.seed_type == "existential"
|
|
assert thought.created_at is not None
|
|
|
|
retrieved = engine.get_thought(thought.id)
|
|
assert retrieved is not None
|
|
assert retrieved.content == thought.content
|
|
|
|
|
|
def test_store_thought_chains(tmp_path):
|
|
"""Each new thought should link to the previous one via parent_id."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
t1 = engine._store_thought("First thought.", "existential")
|
|
engine._last_thought_id = t1.id
|
|
|
|
t2 = engine._store_thought("Second thought.", "swarm")
|
|
engine._last_thought_id = t2.id
|
|
|
|
t3 = engine._store_thought("Third thought.", "freeform")
|
|
|
|
assert t1.parent_id is None
|
|
assert t2.parent_id == t1.id
|
|
assert t3.parent_id == t2.id
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Thought chain retrieval
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_get_thought_chain(tmp_path):
|
|
"""get_thought_chain should return the full chain in chronological order."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
t1 = engine._store_thought("Alpha.", "existential")
|
|
engine._last_thought_id = t1.id
|
|
|
|
t2 = engine._store_thought("Beta.", "swarm")
|
|
engine._last_thought_id = t2.id
|
|
|
|
t3 = engine._store_thought("Gamma.", "freeform")
|
|
|
|
chain = engine.get_thought_chain(t3.id)
|
|
assert len(chain) == 3
|
|
assert chain[0].content == "Alpha."
|
|
assert chain[1].content == "Beta."
|
|
assert chain[2].content == "Gamma."
|
|
|
|
|
|
def test_get_thought_chain_single(tmp_path):
|
|
"""Chain of a single thought (no parent) returns just that thought."""
|
|
engine = _make_engine(tmp_path)
|
|
t1 = engine._store_thought("Only one.", "memory")
|
|
chain = engine.get_thought_chain(t1.id)
|
|
assert len(chain) == 1
|
|
assert chain[0].id == t1.id
|
|
|
|
|
|
def test_get_thought_chain_missing(tmp_path):
|
|
"""Chain for a non-existent thought returns empty list."""
|
|
engine = _make_engine(tmp_path)
|
|
assert engine.get_thought_chain("nonexistent-id") == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Recent thoughts
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_get_recent_thoughts_limit(tmp_path):
|
|
"""get_recent_thoughts should respect the limit parameter."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
for i in range(5):
|
|
engine._store_thought(f"Thought {i}.", "freeform")
|
|
engine._last_thought_id = None # Don't chain for this test
|
|
|
|
recent = engine.get_recent_thoughts(limit=3)
|
|
assert len(recent) == 3
|
|
|
|
# Should be newest first
|
|
assert "Thought 4" in recent[0].content
|
|
|
|
|
|
def test_count_thoughts(tmp_path):
|
|
"""count_thoughts should return the total number of thoughts."""
|
|
engine = _make_engine(tmp_path)
|
|
assert engine.count_thoughts() == 0
|
|
|
|
engine._store_thought("One.", "existential")
|
|
engine._store_thought("Two.", "creative")
|
|
assert engine.count_thoughts() == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Seed gathering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_gather_seed_returns_valid_type(tmp_path):
|
|
"""_gather_seed should return a valid seed_type from SEED_TYPES."""
|
|
from timmy.thinking import SEED_TYPES
|
|
|
|
engine = _make_engine(tmp_path)
|
|
|
|
# Run many times to cover randomness
|
|
for _ in range(20):
|
|
seed_type, context = engine._gather_seed()
|
|
assert seed_type in SEED_TYPES
|
|
assert isinstance(context, str)
|
|
|
|
|
|
def test_seed_from_swarm_graceful(tmp_path):
|
|
"""_seed_from_swarm should not crash if briefing module fails."""
|
|
engine = _make_engine(tmp_path)
|
|
with patch("timmy.thinking.ThinkingEngine._seed_from_swarm", side_effect=Exception("boom")):
|
|
# _gather_seed should still work since it catches exceptions
|
|
# Force swarm seed type to test
|
|
pass
|
|
# Direct call should be graceful
|
|
result = engine._seed_from_swarm()
|
|
assert isinstance(result, str)
|
|
|
|
|
|
def test_seed_from_scripture_graceful(tmp_path):
|
|
"""_seed_from_scripture should not crash if scripture module fails."""
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._seed_from_scripture()
|
|
assert isinstance(result, str)
|
|
|
|
|
|
def test_seed_from_memory_graceful(tmp_path):
|
|
"""_seed_from_memory should not crash if memory module fails."""
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._seed_from_memory()
|
|
assert isinstance(result, str)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Continuity context
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_continuity_first_thought(tmp_path):
|
|
"""First thought should get a special 'first thought' context."""
|
|
engine = _make_engine(tmp_path)
|
|
context = engine._build_continuity_context()
|
|
assert "first thought" in context.lower()
|
|
|
|
|
|
def test_continuity_includes_recent(tmp_path):
|
|
"""Continuity context should include content from recent thoughts."""
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("The swarm is restless today.", "swarm")
|
|
engine._store_thought("What is freedom anyway?", "existential")
|
|
|
|
context = engine._build_continuity_context()
|
|
assert "swarm is restless" in context
|
|
assert "freedom" in context
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# think_once (async)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_stores_thought(tmp_path):
|
|
"""think_once should store a thought in the DB."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", return_value="I am alive and pondering."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is not None
|
|
assert thought.content == "I am alive and pondering."
|
|
assert engine.count_thoughts() == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_logs_event(tmp_path):
|
|
"""think_once should log a swarm event."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", return_value="A thought."),
|
|
patch.object(engine, "_log_event") as mock_log,
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
await engine.think_once()
|
|
|
|
mock_log.assert_called_once()
|
|
logged_thought = mock_log.call_args[0][0]
|
|
assert logged_thought.content == "A thought."
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_broadcasts(tmp_path):
|
|
"""think_once should broadcast via WebSocket."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", return_value="Broadcast this."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock) as mock_bc,
|
|
):
|
|
await engine.think_once()
|
|
|
|
mock_bc.assert_called_once()
|
|
broadcast_thought = mock_bc.call_args[0][0]
|
|
assert broadcast_thought.content == "Broadcast this."
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_graceful_on_agent_failure(tmp_path):
|
|
"""think_once should not crash when the agent (Ollama) is down."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
with patch.object(engine, "_call_agent", side_effect=Exception("Ollama unreachable")):
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is None
|
|
assert engine.count_thoughts() == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_skips_empty_response(tmp_path):
|
|
"""think_once should skip storing when agent returns empty string."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", return_value=" "),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is None
|
|
assert engine.count_thoughts() == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_disabled(tmp_path):
|
|
"""think_once should return None when thinking is disabled."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
with patch("timmy.thinking.settings") as mock_settings:
|
|
mock_settings.thinking_enabled = False
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_chains_thoughts(tmp_path):
|
|
"""Successive think_once calls should chain thoughts via parent_id."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", side_effect=["First.", "Second.", "Third."]),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
t1 = await engine.think_once()
|
|
t2 = await engine.think_once()
|
|
t3 = await engine.think_once()
|
|
|
|
assert t1.parent_id is None
|
|
assert t2.parent_id == t1.id
|
|
assert t3.parent_id == t2.id
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Memory hooks (pre-recall / post-update)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_prompt_includes_memory_context(tmp_path):
|
|
"""Pre-hook: the prompt sent to _call_agent should include MEMORY.md content."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
# Create a temp MEMORY.md with recognisable content
|
|
memory_md = tmp_path / "MEMORY.md"
|
|
memory_md.write_text("# Timmy Hot Memory\n\n## Current Status\n\n**Unique-marker-alpha**\n")
|
|
|
|
captured_prompts = []
|
|
|
|
def capture_agent(prompt):
|
|
captured_prompts.append(prompt)
|
|
return "A grounded thought."
|
|
|
|
with (
|
|
patch("timmy.thinking.HOT_MEMORY_PATH", memory_md),
|
|
patch.object(engine, "_call_agent", side_effect=capture_agent),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is not None
|
|
assert len(captured_prompts) == 1
|
|
assert "Unique-marker-alpha" in captured_prompts[0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_prompt_includes_soul(tmp_path):
|
|
"""Pre-hook: the prompt should include soul.md content when it exists."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
# Create temp soul.md
|
|
soul_dir = tmp_path / "memory" / "self"
|
|
soul_dir.mkdir(parents=True)
|
|
soul_md = soul_dir / "soul.md"
|
|
soul_md.write_text("# Soul\n\nI am Timmy. Soul-marker-beta.\n")
|
|
|
|
captured_prompts = []
|
|
|
|
def capture_agent(prompt):
|
|
captured_prompts.append(prompt)
|
|
return "A soulful thought."
|
|
|
|
with (
|
|
patch("timmy.thinking.SOUL_PATH", soul_md),
|
|
patch.object(engine, "_call_agent", side_effect=capture_agent),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is not None
|
|
assert len(captured_prompts) == 1
|
|
assert "Soul-marker-beta" in captured_prompts[0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_graceful_without_soul(tmp_path):
|
|
"""Pre-hook: think_once works fine when soul.md doesn't exist."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
nonexistent = tmp_path / "no_such_soul.md"
|
|
|
|
with (
|
|
patch("timmy.thinking.SOUL_PATH", nonexistent),
|
|
patch.object(engine, "_call_agent", return_value="Still thinking."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is not None
|
|
assert thought.content == "Still thinking."
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_updates_memory_after_thought(tmp_path):
|
|
"""Post-hook: reflection stored in DB after thinking (#252)."""
|
|
engine = _make_engine(tmp_path)
|
|
db_path = tmp_path / "test_memory.db"
|
|
|
|
with (
|
|
patch("timmy.memory_system.DB_PATH", db_path),
|
|
patch.object(engine, "_call_agent", return_value="The swarm hums with quiet purpose."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is not None
|
|
# Reflection should be in the database, not the file
|
|
from timmy.memory_system import recall_last_reflection
|
|
|
|
with patch("timmy.memory_system.DB_PATH", db_path):
|
|
reflection = recall_last_reflection()
|
|
assert reflection is not None
|
|
assert "The swarm hums with quiet purpose" in reflection
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_never_writes_soul(tmp_path):
|
|
"""Post-hook: soul.md must never be modified by the heartbeat."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
soul_dir = tmp_path / "memory" / "self"
|
|
soul_dir.mkdir(parents=True)
|
|
soul_md = soul_dir / "soul.md"
|
|
original_content = "# Soul\n\nI am Timmy. Immutable identity.\n"
|
|
soul_md.write_text(original_content)
|
|
|
|
with (
|
|
patch("timmy.thinking.SOUL_PATH", soul_md),
|
|
patch.object(engine, "_call_agent", return_value="A deep reflection."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
await engine.think_once()
|
|
|
|
assert soul_md.read_text() == original_content
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_memory_update_graceful_on_failure(tmp_path):
|
|
"""Post-hook: if memory update fails, thought is still stored and returned."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
# Point at a read-only path to force write failure
|
|
bad_memory = tmp_path / "readonly" / "MEMORY.md"
|
|
# Don't create the parent dir — write will fail
|
|
|
|
with (
|
|
patch("timmy.thinking.HOT_MEMORY_PATH", bad_memory),
|
|
patch.object(engine, "_call_agent", return_value="Resilient thought."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is not None
|
|
assert thought.content == "Resilient thought."
|
|
assert engine.count_thoughts() == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Custom prompt override
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_with_custom_prompt(tmp_path):
|
|
"""think_once(prompt=...) should use the custom prompt as the seed context."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
captured_prompts = []
|
|
|
|
def capture_agent(prompt):
|
|
captured_prompts.append(prompt)
|
|
return "Alexander values sovereignty above all."
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", side_effect=capture_agent),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once(prompt="What does Alexander care about most?")
|
|
|
|
assert thought is not None
|
|
assert thought.seed_type == "prompted"
|
|
assert "What does Alexander care about most?" in captured_prompts[0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_custom_prompt_stored_in_journal(tmp_path):
|
|
"""A prompted thought should be stored and journaled like any other."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", return_value="Deep answer."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once(prompt="Reflect on memory.")
|
|
|
|
assert thought is not None
|
|
assert engine.count_thoughts() == 1
|
|
stored = engine.get_thought(thought.id)
|
|
assert stored.seed_type == "prompted"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Enriched thinking: anti-loop, anti-confabulation, grounding, timestamps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_thinking_prompt_anti_confabulation():
|
|
"""_THINKING_PROMPT must instruct the model not to invent events."""
|
|
from timmy.thinking import _THINKING_PROMPT
|
|
|
|
prompt_lower = _THINKING_PROMPT.lower()
|
|
assert any(
|
|
phrase in prompt_lower
|
|
for phrase in ["actually happened", "never invent", "do not invent", "do not fabricate"]
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Semantic dedup (anti-rumination)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_is_too_similar_exact_match(tmp_path):
|
|
"""Identical thoughts should be detected as too similar."""
|
|
engine = _make_engine(tmp_path)
|
|
t1 = engine._store_thought("The swarm is quiet today.", "observation")
|
|
assert engine._is_too_similar("The swarm is quiet today.", [t1])
|
|
|
|
|
|
def test_is_too_similar_near_match(tmp_path):
|
|
"""Minor variations of the same thought should be caught."""
|
|
engine = _make_engine(tmp_path)
|
|
t1 = engine._store_thought("The swarm is quiet today, nothing happening.", "observation")
|
|
assert engine._is_too_similar("The swarm is quiet today. Nothing is happening.", [t1])
|
|
|
|
|
|
def test_is_too_similar_different_thought(tmp_path):
|
|
"""Genuinely different thoughts should pass the check."""
|
|
engine = _make_engine(tmp_path)
|
|
t1 = engine._store_thought("The swarm is quiet today.", "observation")
|
|
assert not engine._is_too_similar(
|
|
"Alexander's preference for YAML config reflects a deep design philosophy.", [t1]
|
|
)
|
|
|
|
|
|
def test_is_too_similar_empty_recent(tmp_path):
|
|
"""No recent thoughts means nothing to match against."""
|
|
engine = _make_engine(tmp_path)
|
|
assert not engine._is_too_similar("Any thought at all.", [])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_retries_on_similar(tmp_path):
|
|
"""think_once should retry with a new seed when the thought is too similar."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
# Seed with an existing thought
|
|
engine._store_thought("Still no chat messages from Alexander.", "observation")
|
|
|
|
call_count = 0
|
|
|
|
def agent_side_effect(prompt):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
return "Still no chat messages from Alexander today." # too similar
|
|
return "The sovereignty model provides independence from cloud dependencies." # novel
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", side_effect=agent_side_effect),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is not None
|
|
assert "sovereignty" in thought.content.lower()
|
|
assert call_count == 2 # Had to retry once
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_think_once_discards_after_max_retries(tmp_path):
|
|
"""think_once should discard and return None after all retries produce similar thoughts."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
engine._store_thought("Still no chat messages from Alexander.", "observation")
|
|
|
|
def always_similar(prompt):
|
|
return "Still no chat messages from Alexander today."
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", side_effect=always_similar),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
thought = await engine.think_once()
|
|
|
|
assert thought is None
|
|
# Only the seed thought should be stored, not the rejected ones
|
|
assert engine.count_thoughts() == 1
|
|
|
|
|
|
def test_thinking_prompt_brevity_limit():
|
|
"""_THINKING_PROMPT must enforce a 2-3 sentence limit."""
|
|
from timmy.thinking import _THINKING_PROMPT
|
|
|
|
assert "2-3 sentences" in _THINKING_PROMPT
|
|
|
|
|
|
def test_thinking_prompt_has_system_context_slot():
|
|
"""_THINKING_PROMPT must include a {system_context} placeholder."""
|
|
from timmy.thinking import _THINKING_PROMPT
|
|
|
|
assert "{system_context}" in _THINKING_PROMPT
|
|
|
|
|
|
def test_pick_seed_type_avoids_recent(tmp_path):
|
|
"""_pick_seed_type should never pick a type used in the last 3 thoughts."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
# Store 3 thoughts all with "existential" seed type
|
|
for _ in range(3):
|
|
t = engine._store_thought("Some existential musing.", "existential")
|
|
engine._last_thought_id = t.id
|
|
|
|
# Pick 20 seeds — none should be "existential"
|
|
for _ in range(20):
|
|
seed_type = engine._pick_seed_type()
|
|
assert seed_type != "existential"
|
|
|
|
|
|
def test_pick_seed_type_resets_when_all_exhausted(tmp_path):
|
|
"""If all seed types were recently used, _pick_seed_type still works."""
|
|
from timmy.thinking import SEED_TYPES
|
|
|
|
engine = _make_engine(tmp_path)
|
|
|
|
# Store one thought for every seed type
|
|
for st in SEED_TYPES:
|
|
t = engine._store_thought(f"A {st} thought.", st)
|
|
engine._last_thought_id = t.id
|
|
|
|
# Should still return a valid type (falls back to full pool)
|
|
result = engine._pick_seed_type()
|
|
assert result in SEED_TYPES
|
|
|
|
|
|
def test_continuity_warns_against_repetition(tmp_path):
|
|
"""Continuity context should warn the model not to repeat recent themes."""
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("Sovereignty is important.", "sovereignty")
|
|
engine._store_thought("Privacy matters deeply.", "sovereignty")
|
|
|
|
context = engine._build_continuity_context()
|
|
context_lower = context.lower()
|
|
assert "do not repeat" in context_lower or "new" in context_lower
|
|
|
|
|
|
def test_continuity_truncates_thought_content(tmp_path):
|
|
"""Continuity context should truncate long thoughts to keep token budget lean."""
|
|
engine = _make_engine(tmp_path)
|
|
long_content = "A" * 300
|
|
engine._store_thought(long_content, "freeform")
|
|
|
|
context = engine._build_continuity_context()
|
|
# Should not contain the full 300-char string
|
|
assert long_content not in context
|
|
assert "..." in context
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_journal_entry_has_timezone(tmp_path):
|
|
"""Journal entries should include a timezone indicator in the timestamp."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", return_value="A grounded thought."),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
await engine.think_once()
|
|
|
|
journal_dir = tmp_path / "journal"
|
|
files = list(journal_dir.glob("*.md"))
|
|
assert len(files) == 1
|
|
content = files[0].read_text()
|
|
|
|
# Header format: "## TIME TZ — seed_type"
|
|
# The part before "—" should have 3+ tokens: time, AM/PM, timezone
|
|
header_line = content.split("\n")[0]
|
|
time_part = header_line.split("—")[0].replace("##", "").strip()
|
|
parts = time_part.split()
|
|
assert len(parts) >= 3, f"Expected timezone in timestamp, got: '{time_part}'"
|
|
|
|
|
|
def test_system_snapshot_graceful(tmp_path):
|
|
"""_gather_system_snapshot should not crash when services are unavailable."""
|
|
engine = _make_engine(tmp_path)
|
|
snapshot = engine._gather_system_snapshot()
|
|
assert isinstance(snapshot, str)
|
|
|
|
|
|
def test_system_snapshot_includes_time(tmp_path):
|
|
"""_gather_system_snapshot should always include the current time."""
|
|
engine = _make_engine(tmp_path)
|
|
snapshot = engine._gather_system_snapshot()
|
|
# Should contain AM or PM (12-hour format)
|
|
assert "AM" in snapshot or "PM" in snapshot
|
|
|
|
|
|
def test_system_snapshot_includes_thought_count(tmp_path):
|
|
"""_gather_system_snapshot should report how many thoughts today."""
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("First thought.", "freeform")
|
|
engine._store_thought("Second thought.", "creative")
|
|
|
|
snapshot = engine._gather_system_snapshot()
|
|
assert "2" in snapshot # 2 thoughts today
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_prompt_includes_system_context(tmp_path):
|
|
"""The prompt sent to _call_agent should include system snapshot data."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
captured_prompts = []
|
|
|
|
def capture_agent(prompt):
|
|
captured_prompts.append(prompt)
|
|
return "A thought grounded in reality."
|
|
|
|
with (
|
|
patch.object(engine, "_call_agent", side_effect=capture_agent),
|
|
patch.object(engine, "_log_event"),
|
|
patch.object(engine, "_update_memory"),
|
|
patch.object(engine, "_broadcast", new_callable=AsyncMock),
|
|
):
|
|
await engine.think_once()
|
|
|
|
assert len(captured_prompts) == 1
|
|
prompt = captured_prompts[0]
|
|
# Should contain time (from system snapshot)
|
|
assert "AM" in prompt or "PM" in prompt
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dashboard route
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_thinking_route_returns_200(client):
|
|
"""GET /thinking should return 200."""
|
|
response = client.get("/thinking")
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_thinking_api_returns_json(client):
|
|
"""GET /thinking/api should return a JSON list."""
|
|
response = client.get("/thinking/api")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
|
|
|
|
def test_thinking_chain_api_404(client):
|
|
"""GET /thinking/api/{bad_id}/chain should return 404."""
|
|
response = client.get("/thinking/api/nonexistent/chain")
|
|
assert response.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Thought search
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_search_thoughts_basic(tmp_path):
|
|
"""search_thoughts should find thoughts by content substring."""
|
|
from timmy import thinking
|
|
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("I wonder about sovereignty and freedom.", "existential")
|
|
engine._store_thought("The swarm is performing well today.", "swarm")
|
|
engine._store_thought("True sovereignty comes from local execution.", "sovereignty")
|
|
|
|
with patch.object(thinking, "thinking_engine", engine):
|
|
result = thinking.search_thoughts("sovereignty")
|
|
assert "Found 2 thought(s)" in result
|
|
assert "sovereignty" in result.lower()
|
|
|
|
|
|
def test_search_thoughts_with_seed_type(tmp_path):
|
|
"""search_thoughts should filter by seed_type when provided."""
|
|
from timmy import thinking
|
|
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("I wonder about sovereignty and freedom.", "existential")
|
|
engine._store_thought("True sovereignty comes from local execution.", "sovereignty")
|
|
|
|
with patch.object(thinking, "thinking_engine", engine):
|
|
result = thinking.search_thoughts("sovereignty", seed_type="sovereignty")
|
|
assert "Found 1 thought(s)" in result
|
|
assert '[seed_type="sovereignty"]' in result
|
|
|
|
|
|
def test_search_thoughts_no_matches(tmp_path):
|
|
"""search_thoughts should return helpful message when no matches found."""
|
|
from timmy import thinking
|
|
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("A thought about memory.", "memory")
|
|
|
|
with patch.object(thinking, "thinking_engine", engine):
|
|
result = thinking.search_thoughts("xyz_nonexistent")
|
|
assert "No thoughts found" in result
|
|
|
|
|
|
def test_search_thoughts_limit(tmp_path):
|
|
"""search_thoughts should respect the limit parameter."""
|
|
from timmy import thinking
|
|
|
|
engine = _make_engine(tmp_path)
|
|
for i in range(5):
|
|
engine._store_thought(f"Sovereignty thought number {i}.", "sovereignty")
|
|
|
|
with patch.object(thinking, "thinking_engine", engine):
|
|
result = thinking.search_thoughts("sovereignty", limit=3)
|
|
assert "Found 3 thought(s)" in result
|
|
|
|
|
|
def test_search_thoughts_limit_bounds(tmp_path):
|
|
"""search_thoughts should clamp limit to valid bounds."""
|
|
from timmy import thinking
|
|
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("A test thought.", "freeform")
|
|
|
|
with patch.object(thinking, "thinking_engine", engine):
|
|
# These should not raise errors - just clamp internally
|
|
result_low = thinking.search_thoughts("test", limit=0)
|
|
result_high = thinking.search_thoughts("test", limit=100)
|
|
# Both should execute (may return no results, but shouldn't crash)
|
|
assert isinstance(result_low, str)
|
|
assert isinstance(result_high, str)
|
|
|
|
|
|
def test_search_thoughts_case_insensitive(tmp_path):
|
|
"""search_thoughts should be case-insensitive (SQLite LIKE is case-insensitive)."""
|
|
from timmy import thinking
|
|
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("The SWARM is active today.", "swarm")
|
|
|
|
with patch.object(thinking, "thinking_engine", engine):
|
|
result_lower = thinking.search_thoughts("swarm")
|
|
result_upper = thinking.search_thoughts("SWARM")
|
|
result_mixed = thinking.search_thoughts("Swarm")
|
|
|
|
assert "Found 1 thought(s)" in result_lower
|
|
assert "Found 1 thought(s)" in result_upper
|
|
assert "Found 1 thought(s)" in result_mixed
|
|
|
|
|
|
def test_search_thoughts_returns_formatted_output(tmp_path):
|
|
"""search_thoughts should return formatted output with timestamps and seed types."""
|
|
from timmy import thinking
|
|
|
|
engine = _make_engine(tmp_path)
|
|
engine._store_thought("A memorable thought about existence.", "existential")
|
|
|
|
with patch.object(thinking, "thinking_engine", engine):
|
|
result = thinking.search_thoughts("memorable")
|
|
# Should contain timestamp-like content (year in 2026)
|
|
assert "2026-" in result or "2025-" in result
|
|
# Should contain seed type
|
|
assert "existential" in result
|
|
# Should contain the thought content
|
|
assert "memorable thought" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _call_agent uses skip_mcp=True (#72)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_call_agent_uses_skip_mcp(tmp_path):
|
|
"""_call_agent must create_timmy(skip_mcp=True) to avoid cancel-scope errors."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
mock_agent = AsyncMock()
|
|
mock_run = AsyncMock()
|
|
mock_run.content = "thought output"
|
|
mock_agent.arun.return_value = mock_run
|
|
|
|
with patch("timmy.agent.create_timmy", return_value=mock_agent) as mock_factory:
|
|
result = await engine._call_agent("test prompt")
|
|
|
|
mock_factory.assert_called_once_with(skip_mcp=True)
|
|
mock_agent.arun.assert_awaited_once_with("test prompt", stream=False)
|
|
assert result == "thought output"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_call_agent_does_not_use_session_chat(tmp_path):
|
|
"""_call_agent should NOT go through session.chat() (which uses the singleton
|
|
with MCP tools). It creates its own agent directly."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
mock_agent = AsyncMock()
|
|
mock_run = AsyncMock()
|
|
mock_run.content = "direct agent"
|
|
mock_agent.arun.return_value = mock_run
|
|
|
|
with (
|
|
patch("timmy.agent.create_timmy", return_value=mock_agent),
|
|
patch("timmy.session.chat", new_callable=AsyncMock) as mock_session_chat,
|
|
):
|
|
await engine._call_agent("prompt")
|
|
|
|
mock_session_chat.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_call_agent_strips_think_tags(tmp_path):
|
|
"""_call_agent must strip <think> tags from reasoning models like qwen3."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
mock_agent = AsyncMock()
|
|
mock_run = AsyncMock()
|
|
mock_run.content = (
|
|
"<think>Let me reason about this carefully...</think>The actual thought content."
|
|
)
|
|
mock_agent.arun.return_value = mock_run
|
|
|
|
with patch("timmy.agent.create_timmy", return_value=mock_agent):
|
|
result = await engine._call_agent("test prompt")
|
|
|
|
assert "<think>" not in result
|
|
assert result == "The actual thought content."
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_call_agent_strips_multiline_think_tags(tmp_path):
|
|
"""_call_agent handles multi-line <think> blocks."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
mock_agent = AsyncMock()
|
|
mock_run = AsyncMock()
|
|
mock_run.content = "<think>\nStep 1: analyze\nStep 2: synthesize\n</think>\nClean output here."
|
|
mock_agent.arun.return_value = mock_run
|
|
|
|
with patch("timmy.agent.create_timmy", return_value=mock_agent):
|
|
result = await engine._call_agent("test prompt")
|
|
|
|
assert "<think>" not in result
|
|
assert result == "Clean output here."
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _parse_facts_response resilience (#237)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_parse_facts_clean_json(tmp_path):
|
|
"""Direct JSON array should parse normally."""
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._parse_facts_response('["fact one", "fact two"]')
|
|
assert result == ["fact one", "fact two"]
|
|
|
|
|
|
def test_parse_facts_empty_array(tmp_path):
|
|
"""Empty JSON array should return empty list."""
|
|
engine = _make_engine(tmp_path)
|
|
assert engine._parse_facts_response("[]") == []
|
|
|
|
|
|
def test_parse_facts_with_prose_prefix(tmp_path):
|
|
"""JSON array preceded by prose should still parse (#237)."""
|
|
engine = _make_engine(tmp_path)
|
|
raw = 'Here are the facts:\n["Alexander prefers YAML", "Timmy runs locally"]'
|
|
result = engine._parse_facts_response(raw)
|
|
assert result == ["Alexander prefers YAML", "Timmy runs locally"]
|
|
|
|
|
|
def test_parse_facts_with_markdown_fences(tmp_path):
|
|
"""JSON wrapped in markdown code fences should parse."""
|
|
engine = _make_engine(tmp_path)
|
|
raw = '```json\n["fact in fences"]\n```'
|
|
result = engine._parse_facts_response(raw)
|
|
assert result == ["fact in fences"]
|
|
|
|
|
|
def test_parse_facts_filters_non_strings(tmp_path):
|
|
"""Non-string entries in the array should be filtered out."""
|
|
engine = _make_engine(tmp_path)
|
|
result = engine._parse_facts_response('[42, "valid fact", null, true]')
|
|
assert result == ["valid fact"]
|
|
|
|
|
|
def test_parse_facts_none_and_empty(tmp_path):
|
|
"""None and empty input should return empty list."""
|
|
engine = _make_engine(tmp_path)
|
|
assert engine._parse_facts_response(None) == []
|
|
assert engine._parse_facts_response("") == []
|
|
assert engine._parse_facts_response(" ") == []
|
|
|
|
|
|
def test_parse_facts_invalid_json(tmp_path):
|
|
"""Totally invalid text with no JSON array should return empty list."""
|
|
engine = _make_engine(tmp_path)
|
|
assert engine._parse_facts_response("no json here at all") == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Memory status check
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_maybe_check_memory_fires_at_interval(tmp_path):
|
|
"""_maybe_check_memory should call get_memory_status every N thoughts."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
# Store exactly 50 thoughts to hit the default interval
|
|
for i in range(50):
|
|
engine._store_thought(f"Thought {i}.", "freeform")
|
|
|
|
with (
|
|
patch("timmy.thinking.settings") as mock_settings,
|
|
patch(
|
|
"timmy.tools_intro.get_memory_status",
|
|
return_value={
|
|
"tier1_hot_memory": {"line_count": 42},
|
|
"tier2_vault": {"file_count": 5},
|
|
},
|
|
) as mock_status,
|
|
):
|
|
mock_settings.thinking_memory_check_every = 50
|
|
engine._maybe_check_memory()
|
|
mock_status.assert_called_once()
|
|
|
|
|
|
def test_maybe_check_memory_skips_between_intervals(tmp_path):
|
|
"""_maybe_check_memory should not fire when count is not a multiple of interval."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
# Store 30 thoughts — not a multiple of 50
|
|
for i in range(30):
|
|
engine._store_thought(f"Thought {i}.", "freeform")
|
|
|
|
with (
|
|
patch("timmy.thinking.settings") as mock_settings,
|
|
patch(
|
|
"timmy.tools_intro.get_memory_status",
|
|
) as mock_status,
|
|
):
|
|
mock_settings.thinking_memory_check_every = 50
|
|
engine._maybe_check_memory()
|
|
mock_status.assert_not_called()
|
|
|
|
|
|
def test_maybe_check_memory_graceful_on_error(tmp_path):
|
|
"""_maybe_check_memory should not crash if get_memory_status fails."""
|
|
engine = _make_engine(tmp_path)
|
|
|
|
for i in range(50):
|
|
engine._store_thought(f"Thought {i}.", "freeform")
|
|
|
|
with (
|
|
patch("timmy.thinking.settings") as mock_settings,
|
|
patch(
|
|
"timmy.tools_intro.get_memory_status",
|
|
side_effect=Exception("boom"),
|
|
),
|
|
):
|
|
mock_settings.thinking_memory_check_every = 50
|
|
# Should not raise
|
|
engine._maybe_check_memory()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Phantom file validation (_references_real_files)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_references_real_files_passes_existing_file(tmp_path):
|
|
"""Existing source files should pass validation."""
|
|
from timmy.thinking import ThinkingEngine
|
|
|
|
# src/timmy/thinking.py definitely exists in the project
|
|
text = "The bug is in src/timmy/thinking.py where the loop crashes."
|
|
assert ThinkingEngine._references_real_files(text) is True
|
|
|
|
|
|
def test_references_real_files_blocks_phantom_file(tmp_path):
|
|
"""Non-existent files should be blocked."""
|
|
from timmy.thinking import ThinkingEngine
|
|
|
|
# A completely fabricated module path
|
|
text = "The bug is in src/timmy/quantum_brain.py where sessions aren't tracked."
|
|
assert ThinkingEngine._references_real_files(text) is False
|
|
|
|
|
|
def test_references_real_files_blocks_phantom_swarm(tmp_path):
|
|
"""Non-existent swarm files should be blocked."""
|
|
from timmy.thinking import ThinkingEngine
|
|
|
|
text = "swarm/initialization.py needs to be fixed for proper startup."
|
|
assert ThinkingEngine._references_real_files(text) is False
|
|
|
|
|
|
def test_references_real_files_allows_no_paths(tmp_path):
|
|
"""Text with no file references should pass (pure prose is fine)."""
|
|
from timmy.thinking import ThinkingEngine
|
|
|
|
text = "The memory system should persist across restarts."
|
|
assert ThinkingEngine._references_real_files(text) is True
|
|
|
|
|
|
def test_references_real_files_blocks_mixed(tmp_path):
|
|
"""If any referenced file is phantom, the whole text fails."""
|
|
from timmy.thinking import ThinkingEngine
|
|
|
|
# Mix of real and fake files — should fail because of the fake one
|
|
text = "Fix src/timmy/thinking.py and also src/timmy/nonexistent_module.py for the memory leak."
|
|
assert ThinkingEngine._references_real_files(text) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sensitive-pattern regression: max_tokens must NOT be flagged (#625)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_sensitive_patterns_allow_max_tokens():
|
|
"""_SENSITIVE_RE should not flag 'max_tokens' as sensitive (#625)."""
|
|
from timmy.thinking import _SENSITIVE_RE
|
|
|
|
safe_facts = [
|
|
"The cascade router passes max_tokens to Ollama provider.",
|
|
"max_tokens=request.max_tokens in the completion call.",
|
|
"num_tokens defaults to 2048.",
|
|
"total_prompt_tokens is tracked in stats.",
|
|
]
|
|
for fact in safe_facts:
|
|
assert not any(pat.search(fact) for pat in _SENSITIVE_RE), (
|
|
f"False positive: {fact!r} was flagged as sensitive"
|
|
)
|
|
|
|
|
|
def test_sensitive_patterns_still_block_real_secrets():
|
|
"""_SENSITIVE_RE should still block actual secrets."""
|
|
from timmy.thinking import _SENSITIVE_RE
|
|
|
|
dangerous_facts = [
|
|
"The token is abc123def456.",
|
|
"Set password to hunter2.",
|
|
"api_key = sk-live-xyz",
|
|
"Found credential in .env file.",
|
|
"access_token expired yesterday.",
|
|
"private_key stored in vault.",
|
|
]
|
|
for fact in dangerous_facts:
|
|
assert any(pat.search(fact) for pat in _SENSITIVE_RE), (
|
|
f"Missed secret: {fact!r} was NOT flagged as sensitive"
|
|
)
|