refactor: break up create_timmy() into testable helpers (#520)
Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
This commit was merged in pull request #520.
This commit is contained in:
@@ -197,6 +197,90 @@ def _resolve_backend(requested: str | None) -> str:
|
||||
return "ollama"
|
||||
|
||||
|
||||
def _build_tools_list(use_tools: bool, skip_mcp: bool, model_name: str) -> list:
|
||||
"""Assemble the tools list based on model capability and MCP flags.
|
||||
|
||||
Returns a list of Toolkit / MCPTools objects, or an empty list.
|
||||
"""
|
||||
if not use_tools:
|
||||
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
|
||||
return []
|
||||
|
||||
tools_list: list = [create_full_toolkit()]
|
||||
|
||||
# Add MCP tool servers (lazy-connected on first arun()).
|
||||
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
|
||||
# scopes that conflict with asyncio background task cancellation (#72).
|
||||
if not skip_mcp:
|
||||
try:
|
||||
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
|
||||
|
||||
gitea_mcp = create_gitea_mcp_tools()
|
||||
if gitea_mcp:
|
||||
tools_list.append(gitea_mcp)
|
||||
|
||||
fs_mcp = create_filesystem_mcp_tools()
|
||||
if fs_mcp:
|
||||
tools_list.append(fs_mcp)
|
||||
except Exception as exc:
|
||||
logger.debug("MCP tools unavailable: %s", exc)
|
||||
|
||||
return tools_list
|
||||
|
||||
|
||||
def _build_prompt(use_tools: bool, session_id: str) -> str:
|
||||
"""Build the full system prompt with optional memory context."""
|
||||
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
|
||||
|
||||
try:
|
||||
from timmy.memory_system import memory_system
|
||||
|
||||
memory_context = memory_system.get_system_context()
|
||||
if memory_context:
|
||||
# Smaller budget for small models — expanded prompt uses more tokens
|
||||
max_context = 2000 if not use_tools else 8000
|
||||
if len(memory_context) > max_context:
|
||||
memory_context = memory_context[:max_context] + "\n... [truncated]"
|
||||
return (
|
||||
f"{base_prompt}\n\n"
|
||||
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
|
||||
f"{memory_context}"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load memory context: %s", exc)
|
||||
|
||||
return base_prompt
|
||||
|
||||
|
||||
def _create_ollama_agent(
|
||||
*,
|
||||
db_file: str,
|
||||
model_name: str,
|
||||
tools_list: list,
|
||||
full_prompt: str,
|
||||
use_tools: bool,
|
||||
) -> Agent:
|
||||
"""Construct the Agno Agent with Ollama backend and warm up the model."""
|
||||
model_kwargs = {}
|
||||
if settings.ollama_num_ctx > 0:
|
||||
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
|
||||
|
||||
agent = Agent(
|
||||
name="Agent",
|
||||
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
|
||||
db=SqliteDb(db_file=db_file),
|
||||
description=full_prompt,
|
||||
add_history_to_context=True,
|
||||
num_history_runs=20,
|
||||
markdown=False,
|
||||
tools=tools_list if tools_list else None,
|
||||
tool_call_limit=settings.max_agent_steps if use_tools else None,
|
||||
telemetry=settings.telemetry_enabled,
|
||||
)
|
||||
_warmup_model(model_name)
|
||||
return agent
|
||||
|
||||
|
||||
def create_timmy(
|
||||
db_file: str = "timmy.db",
|
||||
backend: str | None = None,
|
||||
@@ -238,16 +322,12 @@ def create_timmy(
|
||||
return TimmyAirLLMAgent(model_size=size)
|
||||
|
||||
# Default: Ollama via Agno.
|
||||
# Resolve model with automatic pulling and fallback
|
||||
model_name, is_fallback = _resolve_model_with_fallback(
|
||||
requested_model=None,
|
||||
require_vision=False,
|
||||
auto_pull=True,
|
||||
)
|
||||
|
||||
# If Ollama is completely unreachable, fail loudly.
|
||||
# Sovereignty: never silently send data to a cloud API.
|
||||
# Use --backend claude explicitly if you want cloud inference.
|
||||
if not _check_model_available(model_name):
|
||||
logger.error(
|
||||
"Ollama unreachable and no local models available. "
|
||||
@@ -258,76 +338,16 @@ def create_timmy(
|
||||
logger.info("Using fallback model %s (requested was unavailable)", model_name)
|
||||
|
||||
use_tools = _model_supports_tools(model_name)
|
||||
tools_list = _build_tools_list(use_tools, skip_mcp, model_name)
|
||||
full_prompt = _build_prompt(use_tools, session_id)
|
||||
|
||||
# Conditionally include tools — small models get none
|
||||
toolkit = create_full_toolkit() if use_tools else None
|
||||
if not use_tools:
|
||||
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
|
||||
|
||||
# Build the tools list — Agno accepts a list of Toolkit / MCPTools
|
||||
tools_list: list = []
|
||||
if toolkit:
|
||||
tools_list.append(toolkit)
|
||||
|
||||
# Add MCP tool servers (lazy-connected on first arun()).
|
||||
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
|
||||
# scopes that conflict with asyncio background task cancellation (#72).
|
||||
if use_tools and not skip_mcp:
|
||||
try:
|
||||
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
|
||||
|
||||
gitea_mcp = create_gitea_mcp_tools()
|
||||
if gitea_mcp:
|
||||
tools_list.append(gitea_mcp)
|
||||
|
||||
fs_mcp = create_filesystem_mcp_tools()
|
||||
if fs_mcp:
|
||||
tools_list.append(fs_mcp)
|
||||
except Exception as exc:
|
||||
logger.debug("MCP tools unavailable: %s", exc)
|
||||
|
||||
# Select prompt tier based on tool capability
|
||||
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
|
||||
|
||||
# Try to load memory context
|
||||
try:
|
||||
from timmy.memory_system import memory_system
|
||||
|
||||
memory_context = memory_system.get_system_context()
|
||||
if memory_context:
|
||||
# Truncate if too long — smaller budget for small models
|
||||
# since the expanded prompt (roster, guardrails) uses more tokens
|
||||
max_context = 2000 if not use_tools else 8000
|
||||
if len(memory_context) > max_context:
|
||||
memory_context = memory_context[:max_context] + "\n... [truncated]"
|
||||
full_prompt = (
|
||||
f"{base_prompt}\n\n"
|
||||
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
|
||||
f"{memory_context}"
|
||||
)
|
||||
else:
|
||||
full_prompt = base_prompt
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load memory context: %s", exc)
|
||||
full_prompt = base_prompt
|
||||
|
||||
model_kwargs = {}
|
||||
if settings.ollama_num_ctx > 0:
|
||||
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
|
||||
agent = Agent(
|
||||
name="Agent",
|
||||
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
|
||||
db=SqliteDb(db_file=db_file),
|
||||
description=full_prompt,
|
||||
add_history_to_context=True,
|
||||
num_history_runs=20,
|
||||
markdown=False,
|
||||
tools=tools_list if tools_list else None,
|
||||
tool_call_limit=settings.max_agent_steps if use_tools else None,
|
||||
telemetry=settings.telemetry_enabled,
|
||||
return _create_ollama_agent(
|
||||
db_file=db_file,
|
||||
model_name=model_name,
|
||||
tools_list=tools_list,
|
||||
full_prompt=full_prompt,
|
||||
use_tools=use_tools,
|
||||
)
|
||||
_warmup_model(model_name)
|
||||
return agent
|
||||
|
||||
|
||||
class TimmyWithMemory:
|
||||
|
||||
@@ -444,6 +444,150 @@ def test_get_effective_ollama_model_walks_fallback_chain():
|
||||
assert result == "fb-2"
|
||||
|
||||
|
||||
# ── _build_tools_list ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_build_tools_list_empty_when_tools_disabled():
|
||||
"""Small models get an empty tools list."""
|
||||
from timmy.agent import _build_tools_list
|
||||
|
||||
result = _build_tools_list(use_tools=False, skip_mcp=False, model_name="llama3.2")
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_build_tools_list_includes_toolkit_when_enabled():
|
||||
"""Tool-capable models get the full toolkit."""
|
||||
mock_toolkit = MagicMock()
|
||||
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
|
||||
from timmy.agent import _build_tools_list
|
||||
|
||||
result = _build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
|
||||
assert mock_toolkit in result
|
||||
|
||||
|
||||
def test_build_tools_list_skips_mcp_when_flagged():
|
||||
"""skip_mcp=True must not call MCP factories."""
|
||||
mock_toolkit = MagicMock()
|
||||
with (
|
||||
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
|
||||
patch("timmy.mcp_tools.create_gitea_mcp_tools") as mock_gitea,
|
||||
patch("timmy.mcp_tools.create_filesystem_mcp_tools") as mock_fs,
|
||||
):
|
||||
from timmy.agent import _build_tools_list
|
||||
|
||||
_build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
|
||||
mock_gitea.assert_not_called()
|
||||
mock_fs.assert_not_called()
|
||||
|
||||
|
||||
def test_build_tools_list_includes_mcp_when_not_skipped():
|
||||
"""skip_mcp=False should attempt MCP tool creation."""
|
||||
mock_toolkit = MagicMock()
|
||||
with (
|
||||
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
|
||||
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=None) as mock_gitea,
|
||||
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None) as mock_fs,
|
||||
):
|
||||
from timmy.agent import _build_tools_list
|
||||
|
||||
_build_tools_list(use_tools=True, skip_mcp=False, model_name="llama3.1")
|
||||
mock_gitea.assert_called_once()
|
||||
mock_fs.assert_called_once()
|
||||
|
||||
|
||||
# ── _build_prompt ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_build_prompt_includes_base_prompt():
|
||||
"""Prompt should always contain the base system prompt."""
|
||||
from timmy.agent import _build_prompt
|
||||
|
||||
result = _build_prompt(use_tools=False, session_id="test")
|
||||
assert "Timmy" in result
|
||||
|
||||
|
||||
def test_build_prompt_appends_memory_context():
|
||||
"""Memory context should be appended when available."""
|
||||
mock_memory = MagicMock()
|
||||
mock_memory.get_system_context.return_value = "User prefers dark mode."
|
||||
with patch("timmy.memory_system.memory_system", mock_memory):
|
||||
from timmy.agent import _build_prompt
|
||||
|
||||
result = _build_prompt(use_tools=True, session_id="test")
|
||||
assert "GROUNDED CONTEXT" in result
|
||||
assert "dark mode" in result
|
||||
|
||||
|
||||
def test_build_prompt_truncates_long_memory():
|
||||
"""Long memory context should be truncated."""
|
||||
mock_memory = MagicMock()
|
||||
mock_memory.get_system_context.return_value = "x" * 10000
|
||||
with patch("timmy.memory_system.memory_system", mock_memory):
|
||||
from timmy.agent import _build_prompt
|
||||
|
||||
result = _build_prompt(use_tools=False, session_id="test")
|
||||
assert "[truncated]" in result
|
||||
|
||||
|
||||
def test_build_prompt_survives_memory_failure():
|
||||
"""Prompt should fall back to base when memory fails."""
|
||||
mock_memory = MagicMock()
|
||||
mock_memory.get_system_context.side_effect = RuntimeError("db locked")
|
||||
with patch("timmy.memory_system.memory_system", mock_memory):
|
||||
from timmy.agent import _build_prompt
|
||||
|
||||
result = _build_prompt(use_tools=True, session_id="test")
|
||||
assert "Timmy" in result
|
||||
# Memory context should NOT be appended (the db locked error was caught)
|
||||
assert "db locked" not in result
|
||||
|
||||
|
||||
# ── _create_ollama_agent ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_create_ollama_agent_passes_correct_kwargs():
|
||||
"""_create_ollama_agent must pass the expected kwargs to Agent."""
|
||||
with (
|
||||
patch("timmy.agent.Agent") as MockAgent,
|
||||
patch("timmy.agent.Ollama"),
|
||||
patch("timmy.agent.SqliteDb"),
|
||||
patch("timmy.agent._warmup_model", return_value=True),
|
||||
):
|
||||
from timmy.agent import _create_ollama_agent
|
||||
|
||||
_create_ollama_agent(
|
||||
db_file="test.db",
|
||||
model_name="llama3.1",
|
||||
tools_list=[MagicMock()],
|
||||
full_prompt="test prompt",
|
||||
use_tools=True,
|
||||
)
|
||||
kwargs = MockAgent.call_args.kwargs
|
||||
assert kwargs["description"] == "test prompt"
|
||||
assert kwargs["markdown"] is False
|
||||
|
||||
|
||||
def test_create_ollama_agent_none_tools_when_empty():
|
||||
"""Empty tools_list should pass tools=None to Agent."""
|
||||
with (
|
||||
patch("timmy.agent.Agent") as MockAgent,
|
||||
patch("timmy.agent.Ollama"),
|
||||
patch("timmy.agent.SqliteDb"),
|
||||
patch("timmy.agent._warmup_model", return_value=True),
|
||||
):
|
||||
from timmy.agent import _create_ollama_agent
|
||||
|
||||
_create_ollama_agent(
|
||||
db_file="test.db",
|
||||
model_name="llama3.2",
|
||||
tools_list=[],
|
||||
full_prompt="test prompt",
|
||||
use_tools=False,
|
||||
)
|
||||
kwargs = MockAgent.call_args.kwargs
|
||||
assert kwargs["tools"] is None
|
||||
|
||||
|
||||
def test_no_hardcoded_fallback_constants_in_agent():
|
||||
"""agent.py must not define module-level DEFAULT_MODEL_FALLBACKS."""
|
||||
import timmy.agent as agent_mod
|
||||
|
||||
Reference in New Issue
Block a user