Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
3de7db770f refactor: break up think_once() into _generate_thought() and _finalize_thought()
Extracts the dedup retry loop and post-hook/broadcast pipeline into
focused helpers, reducing think_once() from 118 lines to ~20.

Fixes #513

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 19:41:28 -04:00
4 changed files with 216 additions and 297 deletions

View File

@@ -197,113 +197,6 @@ def _resolve_backend(requested: str | None) -> str:
return "ollama"
def _build_tools_list(use_tools: bool, skip_mcp: bool) -> list:
"""Build the Agno tools list (toolkit + optional MCP servers).
Args:
use_tools: Whether the model supports tool calling.
skip_mcp: If True, omit MCP tool servers.
Returns:
List of Toolkit / MCPTools, possibly empty.
"""
if not use_tools:
logger.info("Tools disabled (model too small for reliable tool calling)")
return []
toolkit = create_full_toolkit()
tools_list: list = [toolkit]
# Add MCP tool servers (lazy-connected on first arun()).
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
# scopes that conflict with asyncio background task cancellation (#72).
if not skip_mcp:
try:
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
gitea_mcp = create_gitea_mcp_tools()
if gitea_mcp:
tools_list.append(gitea_mcp)
fs_mcp = create_filesystem_mcp_tools()
if fs_mcp:
tools_list.append(fs_mcp)
except Exception as exc:
logger.debug("MCP tools unavailable: %s", exc)
return tools_list
def _build_prompt(use_tools: bool, session_id: str) -> str:
"""Build the full system prompt with optional memory context.
Args:
use_tools: Whether tools are enabled (affects prompt tier and context budget).
session_id: Session identifier for the prompt.
Returns:
Complete system prompt string.
"""
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
try:
from timmy.memory_system import memory_system
memory_context = memory_system.get_system_context()
if memory_context:
# Truncate if too long — smaller budget for small models
# since the expanded prompt (roster, guardrails) uses more tokens
max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]"
return (
f"{base_prompt}\n\n"
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
f"{memory_context}"
)
except Exception as exc:
logger.warning("Failed to load memory context: %s", exc)
return base_prompt
def _create_ollama_agent(
model_name: str,
db_file: str,
tools_list: list,
full_prompt: str,
use_tools: bool,
) -> Agent:
"""Construct the Agno Agent with an Ollama model.
Args:
model_name: Resolved Ollama model name.
db_file: SQLite file for conversation memory.
tools_list: Pre-built tools list (may be empty).
full_prompt: Complete system prompt.
use_tools: Whether tools are enabled.
Returns:
Configured Agno Agent.
"""
model_kwargs = {}
if settings.ollama_num_ctx > 0:
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
return Agent(
name="Agent",
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
db=SqliteDb(db_file=db_file),
description=full_prompt,
add_history_to_context=True,
num_history_runs=20,
markdown=False,
tools=tools_list if tools_list else None,
tool_call_limit=settings.max_agent_steps if use_tools else None,
telemetry=settings.telemetry_enabled,
)
def create_timmy(
db_file: str = "timmy.db",
backend: str | None = None,
@@ -345,12 +238,16 @@ def create_timmy(
return TimmyAirLLMAgent(model_size=size)
# Default: Ollama via Agno.
# Resolve model with automatic pulling and fallback
model_name, is_fallback = _resolve_model_with_fallback(
requested_model=None,
require_vision=False,
auto_pull=True,
)
# If Ollama is completely unreachable, fail loudly.
# Sovereignty: never silently send data to a cloud API.
# Use --backend claude explicitly if you want cloud inference.
if not _check_model_available(model_name):
logger.error(
"Ollama unreachable and no local models available. "
@@ -361,9 +258,74 @@ def create_timmy(
logger.info("Using fallback model %s (requested was unavailable)", model_name)
use_tools = _model_supports_tools(model_name)
tools_list = _build_tools_list(use_tools, skip_mcp)
full_prompt = _build_prompt(use_tools, session_id)
agent = _create_ollama_agent(model_name, db_file, tools_list, full_prompt, use_tools)
# Conditionally include tools — small models get none
toolkit = create_full_toolkit() if use_tools else None
if not use_tools:
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
# Build the tools list — Agno accepts a list of Toolkit / MCPTools
tools_list: list = []
if toolkit:
tools_list.append(toolkit)
# Add MCP tool servers (lazy-connected on first arun()).
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
# scopes that conflict with asyncio background task cancellation (#72).
if use_tools and not skip_mcp:
try:
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
gitea_mcp = create_gitea_mcp_tools()
if gitea_mcp:
tools_list.append(gitea_mcp)
fs_mcp = create_filesystem_mcp_tools()
if fs_mcp:
tools_list.append(fs_mcp)
except Exception as exc:
logger.debug("MCP tools unavailable: %s", exc)
# Select prompt tier based on tool capability
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
# Try to load memory context
try:
from timmy.memory_system import memory_system
memory_context = memory_system.get_system_context()
if memory_context:
# Truncate if too long — smaller budget for small models
# since the expanded prompt (roster, guardrails) uses more tokens
max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]"
full_prompt = (
f"{base_prompt}\n\n"
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
f"{memory_context}"
)
else:
full_prompt = base_prompt
except Exception as exc:
logger.warning("Failed to load memory context: %s", exc)
full_prompt = base_prompt
model_kwargs = {}
if settings.ollama_num_ctx > 0:
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
agent = Agent(
name="Agent",
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
db=SqliteDb(db_file=db_file),
description=full_prompt,
add_history_to_context=True,
num_history_runs=20,
markdown=False,
tools=tools_list if tools_list else None,
tool_call_limit=settings.max_agent_steps if use_tools else None,
telemetry=settings.telemetry_enabled,
)
_warmup_model(model_name)
return agent

View File

@@ -232,29 +232,58 @@ class ThinkingEngine:
return False # Disabled — never idle
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
def _build_thinking_context(self) -> tuple[str, str, list["Thought"]]:
"""Assemble the context needed for a thinking cycle.
async def think_once(self, prompt: str | None = None) -> Thought | None:
"""Execute one thinking cycle.
Args:
prompt: Optional custom seed prompt. When provided, overrides
the random seed selection and uses "prompted" as the
seed type — useful for journal prompts from the CLI.
1. Gather a seed context (or use the custom prompt)
2. Build a prompt with continuity from recent thoughts
3. Call the agent
4. Store the thought
5. Log the event and broadcast via WebSocket
"""
if not settings.thinking_enabled:
return None
# Skip idle periods — don't count internal processing as thoughts
if not prompt and self._is_idle():
logger.debug(
"Thinking paused — no user input for %d minutes",
settings.thinking_idle_timeout_minutes,
)
return None
content, seed_type = await self._generate_thought(prompt)
if not content:
return None
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
await self._finalize_thought(thought)
return thought
async def _generate_thought(self, prompt: str | None = None) -> tuple[str | None, str]:
"""Generate novel thought content via the dedup retry loop.
Gathers context, builds the LLM prompt, calls the agent, and
retries with a fresh seed if the result is too similar to recent
thoughts.
Returns:
(memory_context, system_context, recent_thoughts)
A (content, seed_type) tuple. *content* is ``None`` when the
cycle should be skipped (agent failure, empty response, or
all retries exhausted).
"""
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
return memory_context, system_context, recent_thoughts
async def _generate_novel_thought(
self,
prompt: str | None,
memory_context: str,
system_context: str,
recent_thoughts: list["Thought"],
) -> tuple[str | None, str]:
"""Run the dedup-retry loop to produce a novel thought.
Returns:
(content, seed_type) — content is None if no novel thought produced.
"""
content: str | None = None
seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
@@ -287,7 +316,7 @@ class ThinkingEngine:
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
return content, seed_type # Good — novel thought
break # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
@@ -295,6 +324,7 @@ class ThinkingEngine:
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
content = None # Will retry
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
@@ -302,10 +332,10 @@ class ThinkingEngine:
)
return None, seed_type
return None, seed_type
return content, seed_type
async def _process_thinking_result(self, thought: "Thought") -> None:
"""Run all post-hooks after a thought is stored."""
async def _finalize_thought(self, thought: Thought) -> None:
"""Run post-hooks, log, journal, and broadcast a stored thought."""
self._maybe_check_memory()
await self._maybe_distill()
await self._maybe_file_issues()
@@ -316,54 +346,12 @@ class ThinkingEngine:
self._write_journal(thought)
await self._broadcast(thought)
async def think_once(self, prompt: str | None = None) -> Thought | None:
"""Execute one thinking cycle.
Args:
prompt: Optional custom seed prompt. When provided, overrides
the random seed selection and uses "prompted" as the
seed type — useful for journal prompts from the CLI.
1. Gather a seed context (or use the custom prompt)
2. Build a prompt with continuity from recent thoughts
3. Call the agent
4. Store the thought
5. Log the event and broadcast via WebSocket
"""
if not settings.thinking_enabled:
return None
# Skip idle periods — don't count internal processing as thoughts
if not prompt and self._is_idle():
logger.debug(
"Thinking paused — no user input for %d minutes",
settings.thinking_idle_timeout_minutes,
)
return None
memory_context, system_context, recent_thoughts = self._build_thinking_context()
content, seed_type = await self._generate_novel_thought(
prompt,
memory_context,
system_context,
recent_thoughts,
)
if not content:
return None
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
await self._process_thinking_result(thought)
logger.info(
"Thought [%s] (%s): %s",
thought.id[:8],
seed_type,
thought.seed_type,
thought.content[:80],
)
return thought
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
"""Retrieve the most recent thoughts."""

View File

@@ -454,127 +454,3 @@ def test_no_hardcoded_fallback_constants_in_agent():
assert not hasattr(agent_mod, "VISION_MODEL_FALLBACKS"), (
"Hardcoded VISION_MODEL_FALLBACKS still exists — use settings.vision_fallback_models"
)
# ── _build_tools_list helper ─────────────────────────────────────────────────
def test_build_tools_list_returns_empty_when_no_tools():
"""When use_tools=False, _build_tools_list returns an empty list."""
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=False, skip_mcp=False)
assert result == []
def test_build_tools_list_includes_toolkit():
"""When use_tools=True, _build_tools_list includes the toolkit."""
mock_toolkit = MagicMock()
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=True)
assert mock_toolkit in result
def test_build_tools_list_adds_mcp_when_not_skipped():
"""When skip_mcp=False, _build_tools_list attempts MCP tools."""
mock_toolkit = MagicMock()
mock_gitea = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=mock_gitea),
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None),
):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=False)
assert mock_toolkit in result
assert mock_gitea in result
# ── _build_prompt helper ─────────────────────────────────────────────────────
def test_build_prompt_returns_base_when_no_memory():
"""_build_prompt returns base prompt when memory context is empty."""
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = ""
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "Timmy" in result
def test_build_prompt_appends_memory_context():
"""_build_prompt appends memory context when available."""
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = "User likes pizza"
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "User likes pizza" in result
assert "GROUNDED CONTEXT" in result
def test_build_prompt_truncates_long_memory_for_small_models():
"""_build_prompt truncates memory for small models (use_tools=False)."""
long_context = "x" * 5000
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = long_context
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
# Max context is 2000 for small models + truncation marker
assert "[truncated]" in result
# ── _create_ollama_agent helper ──────────────────────────────────────────────
def test_create_ollama_agent_passes_correct_kwargs():
"""_create_ollama_agent passes the expected kwargs to Agent()."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
model_name="test-model",
db_file="test.db",
tools_list=[MagicMock()],
full_prompt="Test prompt",
use_tools=True,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["description"] == "Test prompt"
assert kwargs["tools"] is not None
def test_create_ollama_agent_none_tools_when_empty():
"""_create_ollama_agent passes tools=None when tools_list is empty."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
model_name="test-model",
db_file="test.db",
tools_list=[],
full_prompt="Test prompt",
use_tools=False,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["tools"] is None

View File

@@ -250,6 +250,99 @@ def test_continuity_includes_recent(tmp_path):
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# _generate_thought helper
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_generate_thought_returns_content_and_seed_type(tmp_path):
"""_generate_thought should return (content, seed_type) on success."""
from timmy.thinking import SEED_TYPES
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value="A novel idea."):
content, seed_type = await engine._generate_thought()
assert content == "A novel idea."
assert seed_type in SEED_TYPES
@pytest.mark.asyncio
async def test_generate_thought_with_prompt(tmp_path):
"""_generate_thought(prompt=...) should use 'prompted' seed type."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value="A prompted idea."):
content, seed_type = await engine._generate_thought(prompt="Reflect on joy")
assert content == "A prompted idea."
assert seed_type == "prompted"
@pytest.mark.asyncio
async def test_generate_thought_returns_none_on_agent_failure(tmp_path):
"""_generate_thought should return (None, ...) when the agent fails."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", side_effect=Exception("Ollama down")):
content, seed_type = await engine._generate_thought()
assert content is None
@pytest.mark.asyncio
async def test_generate_thought_returns_none_on_empty(tmp_path):
"""_generate_thought should return (None, ...) when agent returns empty."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value=" "):
content, seed_type = await engine._generate_thought()
assert content is None
# ---------------------------------------------------------------------------
# _finalize_thought helper
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_finalize_thought_calls_all_hooks(tmp_path):
"""_finalize_thought should call all post-hooks, log, journal, and broadcast."""
engine = _make_engine(tmp_path)
thought = engine._store_thought("Test finalize.", "freeform")
with (
patch.object(engine, "_maybe_check_memory") as m_mem,
patch.object(engine, "_maybe_distill", new_callable=AsyncMock) as m_distill,
patch.object(engine, "_maybe_file_issues", new_callable=AsyncMock) as m_issues,
patch.object(engine, "_check_workspace", new_callable=AsyncMock) as m_ws,
patch.object(engine, "_maybe_check_memory_status") as m_status,
patch.object(engine, "_update_memory") as m_update,
patch.object(engine, "_log_event") as m_log,
patch.object(engine, "_write_journal") as m_journal,
patch.object(engine, "_broadcast", new_callable=AsyncMock) as m_broadcast,
):
await engine._finalize_thought(thought)
m_mem.assert_called_once()
m_distill.assert_awaited_once()
m_issues.assert_awaited_once()
m_ws.assert_awaited_once()
m_status.assert_called_once()
m_update.assert_called_once_with(thought)
m_log.assert_called_once_with(thought)
m_journal.assert_called_once_with(thought)
m_broadcast.assert_awaited_once_with(thought)
# ---------------------------------------------------------------------------
# think_once (async)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_think_once_stores_thought(tmp_path):
"""think_once should store a thought in the DB."""