From 735bfc7820852bfb6499ab552233afe2fdd42ded Mon Sep 17 00:00:00 2001 From: kimi Date: Thu, 19 Mar 2026 19:53:53 -0400 Subject: [PATCH] refactor: break up create_timmy() into testable helpers Extract three focused helpers from the 131-line create_timmy(): - _build_tools_list(): assembles toolkit + optional MCP servers - _build_prompt(): builds system prompt with memory context - _create_ollama_agent(): constructs the Agno Agent create_timmy() now delegates to these helpers, making each concern independently testable. Added 8 unit tests for the new helpers. Fixes #512 Co-Authored-By: Claude Opus 4.6 --- src/timmy/agent.py | 182 +++++++++++++++++++++++--------------- tests/timmy/test_agent.py | 124 ++++++++++++++++++++++++++ 2 files changed, 234 insertions(+), 72 deletions(-) diff --git a/src/timmy/agent.py b/src/timmy/agent.py index bcbdee2..255709c 100644 --- a/src/timmy/agent.py +++ b/src/timmy/agent.py @@ -197,6 +197,113 @@ def _resolve_backend(requested: str | None) -> str: return "ollama" +def _build_tools_list(use_tools: bool, skip_mcp: bool) -> list: + """Build the Agno tools list (toolkit + optional MCP servers). + + Args: + use_tools: Whether the model supports tool calling. + skip_mcp: If True, omit MCP tool servers. + + Returns: + List of Toolkit / MCPTools, possibly empty. + """ + if not use_tools: + logger.info("Tools disabled (model too small for reliable tool calling)") + return [] + + toolkit = create_full_toolkit() + tools_list: list = [toolkit] + + # Add MCP tool servers (lazy-connected on first arun()). + # Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel + # scopes that conflict with asyncio background task cancellation (#72). + if not skip_mcp: + try: + from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools + + gitea_mcp = create_gitea_mcp_tools() + if gitea_mcp: + tools_list.append(gitea_mcp) + + fs_mcp = create_filesystem_mcp_tools() + if fs_mcp: + tools_list.append(fs_mcp) + except Exception as exc: + logger.debug("MCP tools unavailable: %s", exc) + + return tools_list + + +def _build_prompt(use_tools: bool, session_id: str) -> str: + """Build the full system prompt with optional memory context. + + Args: + use_tools: Whether tools are enabled (affects prompt tier and context budget). + session_id: Session identifier for the prompt. + + Returns: + Complete system prompt string. + """ + base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id) + + try: + from timmy.memory_system import memory_system + + memory_context = memory_system.get_system_context() + if memory_context: + # Truncate if too long — smaller budget for small models + # since the expanded prompt (roster, guardrails) uses more tokens + max_context = 2000 if not use_tools else 8000 + if len(memory_context) > max_context: + memory_context = memory_context[:max_context] + "\n... [truncated]" + return ( + f"{base_prompt}\n\n" + f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n" + f"{memory_context}" + ) + except Exception as exc: + logger.warning("Failed to load memory context: %s", exc) + + return base_prompt + + +def _create_ollama_agent( + model_name: str, + db_file: str, + tools_list: list, + full_prompt: str, + use_tools: bool, +) -> Agent: + """Construct the Agno Agent with an Ollama model. + + Args: + model_name: Resolved Ollama model name. + db_file: SQLite file for conversation memory. + tools_list: Pre-built tools list (may be empty). + full_prompt: Complete system prompt. + use_tools: Whether tools are enabled. + + Returns: + Configured Agno Agent. + """ + model_kwargs = {} + if settings.ollama_num_ctx > 0: + model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx} + + return Agent( + name="Agent", + model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs), + db=SqliteDb(db_file=db_file), + description=full_prompt, + add_history_to_context=True, + num_history_runs=20, + markdown=False, + tools=tools_list if tools_list else None, + tool_call_limit=settings.max_agent_steps if use_tools else None, + telemetry=settings.telemetry_enabled, + ) + + def create_timmy( db_file: str = "timmy.db", backend: str | None = None, @@ -238,16 +345,12 @@ def create_timmy( return TimmyAirLLMAgent(model_size=size) # Default: Ollama via Agno. - # Resolve model with automatic pulling and fallback model_name, is_fallback = _resolve_model_with_fallback( requested_model=None, require_vision=False, auto_pull=True, ) - # If Ollama is completely unreachable, fail loudly. - # Sovereignty: never silently send data to a cloud API. - # Use --backend claude explicitly if you want cloud inference. if not _check_model_available(model_name): logger.error( "Ollama unreachable and no local models available. " @@ -258,74 +361,9 @@ def create_timmy( logger.info("Using fallback model %s (requested was unavailable)", model_name) use_tools = _model_supports_tools(model_name) - - # Conditionally include tools — small models get none - toolkit = create_full_toolkit() if use_tools else None - if not use_tools: - logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name) - - # Build the tools list — Agno accepts a list of Toolkit / MCPTools - tools_list: list = [] - if toolkit: - tools_list.append(toolkit) - - # Add MCP tool servers (lazy-connected on first arun()). - # Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel - # scopes that conflict with asyncio background task cancellation (#72). - if use_tools and not skip_mcp: - try: - from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools - - gitea_mcp = create_gitea_mcp_tools() - if gitea_mcp: - tools_list.append(gitea_mcp) - - fs_mcp = create_filesystem_mcp_tools() - if fs_mcp: - tools_list.append(fs_mcp) - except Exception as exc: - logger.debug("MCP tools unavailable: %s", exc) - - # Select prompt tier based on tool capability - base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id) - - # Try to load memory context - try: - from timmy.memory_system import memory_system - - memory_context = memory_system.get_system_context() - if memory_context: - # Truncate if too long — smaller budget for small models - # since the expanded prompt (roster, guardrails) uses more tokens - max_context = 2000 if not use_tools else 8000 - if len(memory_context) > max_context: - memory_context = memory_context[:max_context] + "\n... [truncated]" - full_prompt = ( - f"{base_prompt}\n\n" - f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n" - f"{memory_context}" - ) - else: - full_prompt = base_prompt - except Exception as exc: - logger.warning("Failed to load memory context: %s", exc) - full_prompt = base_prompt - - model_kwargs = {} - if settings.ollama_num_ctx > 0: - model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx} - agent = Agent( - name="Agent", - model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs), - db=SqliteDb(db_file=db_file), - description=full_prompt, - add_history_to_context=True, - num_history_runs=20, - markdown=False, - tools=tools_list if tools_list else None, - tool_call_limit=settings.max_agent_steps if use_tools else None, - telemetry=settings.telemetry_enabled, - ) + tools_list = _build_tools_list(use_tools, skip_mcp) + full_prompt = _build_prompt(use_tools, session_id) + agent = _create_ollama_agent(model_name, db_file, tools_list, full_prompt, use_tools) _warmup_model(model_name) return agent diff --git a/tests/timmy/test_agent.py b/tests/timmy/test_agent.py index b975d48..2f07ca4 100644 --- a/tests/timmy/test_agent.py +++ b/tests/timmy/test_agent.py @@ -454,3 +454,127 @@ def test_no_hardcoded_fallback_constants_in_agent(): assert not hasattr(agent_mod, "VISION_MODEL_FALLBACKS"), ( "Hardcoded VISION_MODEL_FALLBACKS still exists — use settings.vision_fallback_models" ) + + +# ── _build_tools_list helper ───────────────────────────────────────────────── + + +def test_build_tools_list_returns_empty_when_no_tools(): + """When use_tools=False, _build_tools_list returns an empty list.""" + from timmy.agent import _build_tools_list + + result = _build_tools_list(use_tools=False, skip_mcp=False) + assert result == [] + + +def test_build_tools_list_includes_toolkit(): + """When use_tools=True, _build_tools_list includes the toolkit.""" + mock_toolkit = MagicMock() + with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit): + from timmy.agent import _build_tools_list + + result = _build_tools_list(use_tools=True, skip_mcp=True) + + assert mock_toolkit in result + + +def test_build_tools_list_adds_mcp_when_not_skipped(): + """When skip_mcp=False, _build_tools_list attempts MCP tools.""" + mock_toolkit = MagicMock() + mock_gitea = MagicMock() + with ( + patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit), + patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=mock_gitea), + patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None), + ): + from timmy.agent import _build_tools_list + + result = _build_tools_list(use_tools=True, skip_mcp=False) + + assert mock_toolkit in result + assert mock_gitea in result + + +# ── _build_prompt helper ───────────────────────────────────────────────────── + + +def test_build_prompt_returns_base_when_no_memory(): + """_build_prompt returns base prompt when memory context is empty.""" + with patch("timmy.memory_system.memory_system") as mock_mem: + mock_mem.get_system_context.return_value = "" + from timmy.agent import _build_prompt + + result = _build_prompt(use_tools=True, session_id="test") + + assert "Timmy" in result + + +def test_build_prompt_appends_memory_context(): + """_build_prompt appends memory context when available.""" + with patch("timmy.memory_system.memory_system") as mock_mem: + mock_mem.get_system_context.return_value = "User likes pizza" + from timmy.agent import _build_prompt + + result = _build_prompt(use_tools=True, session_id="test") + + assert "User likes pizza" in result + assert "GROUNDED CONTEXT" in result + + +def test_build_prompt_truncates_long_memory_for_small_models(): + """_build_prompt truncates memory for small models (use_tools=False).""" + long_context = "x" * 5000 + with patch("timmy.memory_system.memory_system") as mock_mem: + mock_mem.get_system_context.return_value = long_context + from timmy.agent import _build_prompt + + result = _build_prompt(use_tools=False, session_id="test") + + # Max context is 2000 for small models + truncation marker + assert "[truncated]" in result + + +# ── _create_ollama_agent helper ────────────────────────────────────────────── + + +def test_create_ollama_agent_passes_correct_kwargs(): + """_create_ollama_agent passes the expected kwargs to Agent().""" + with ( + patch("timmy.agent.Agent") as MockAgent, + patch("timmy.agent.Ollama"), + patch("timmy.agent.SqliteDb"), + ): + from timmy.agent import _create_ollama_agent + + _create_ollama_agent( + model_name="test-model", + db_file="test.db", + tools_list=[MagicMock()], + full_prompt="Test prompt", + use_tools=True, + ) + + kwargs = MockAgent.call_args.kwargs + assert kwargs["description"] == "Test prompt" + assert kwargs["tools"] is not None + + +def test_create_ollama_agent_none_tools_when_empty(): + """_create_ollama_agent passes tools=None when tools_list is empty.""" + with ( + patch("timmy.agent.Agent") as MockAgent, + patch("timmy.agent.Ollama"), + patch("timmy.agent.SqliteDb"), + ): + from timmy.agent import _create_ollama_agent + + _create_ollama_agent( + model_name="test-model", + db_file="test.db", + tools_list=[], + full_prompt="Test prompt", + use_tools=False, + ) + + kwargs = MockAgent.call_args.kwargs + assert kwargs["tools"] is None