Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
735bfc7820 refactor: break up create_timmy() into testable helpers
Extract three focused helpers from the 131-line create_timmy():
- _build_tools_list(): assembles toolkit + optional MCP servers
- _build_prompt(): builds system prompt with memory context
- _create_ollama_agent(): constructs the Agno Agent

create_timmy() now delegates to these helpers, making each
concern independently testable. Added 8 unit tests for the
new helpers.

Fixes #512

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 19:53:53 -04:00
4 changed files with 213 additions and 333 deletions

View File

@@ -100,25 +100,48 @@ def _get_git_context() -> dict:
return {"branch": "unknown", "commit": "unknown"}
def _extract_origin(exc: Exception) -> tuple[str, int]:
"""Walk the traceback to find the deepest file and line number."""
def capture_error(
exc: Exception,
source: str = "unknown",
context: dict | None = None,
) -> str | None:
"""Capture an error and optionally create a bug report.
Args:
exc: The exception to capture
source: Module/component where the error occurred
context: Optional dict of extra context (request path, etc.)
Returns:
Task ID of the created bug report, or None if deduplicated/disabled
"""
from config import settings
if not settings.error_feedback_enabled:
return None
error_hash = _stack_hash(exc)
if _is_duplicate(error_hash):
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
return None
# Format the stack trace
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
# Extract file/line from traceback
tb_obj = exc.__traceback__
affected_file = "unknown"
affected_line = 0
while tb_obj and tb_obj.tb_next:
tb_obj = tb_obj.tb_next
if tb_obj:
return tb_obj.tb_frame.f_code.co_filename, tb_obj.tb_lineno
return "unknown", 0
affected_file = tb_obj.tb_frame.f_code.co_filename
affected_line = tb_obj.tb_lineno
git_ctx = _get_git_context()
def _log_error_event(
exc: Exception,
source: str,
error_hash: str,
affected_file: str,
affected_line: int,
git_ctx: dict,
) -> None:
"""Log the error to the event log (best-effort)."""
# 1. Log to event_log
try:
from swarm.event_log import EventType, log_event
@@ -138,18 +161,8 @@ def _log_error_event(
except Exception as log_exc:
logger.debug("Failed to log error event: %s", log_exc)
def _create_bug_report(
exc: Exception,
source: str,
error_hash: str,
affected_file: str,
affected_line: int,
git_ctx: dict,
tb_str: str,
context: dict | None,
) -> str | None:
"""Create a bug report task and return its ID (best-effort)."""
# 2. Create bug report task
task_id = None
try:
from swarm.task_queue.models import create_task
@@ -180,30 +193,29 @@ def _create_bug_report(
auto_approve=True,
task_type="bug_report",
)
task_id = task.id
# Log the creation event
try:
from swarm.event_log import EventType, log_event
log_event(
EventType.BUG_REPORT_CREATED,
source=source,
task_id=task.id,
task_id=task_id,
data={
"error_hash": error_hash,
"title": title[:100],
},
)
except Exception as log_exc:
logger.warning("Bug report log error: %s", log_exc)
except Exception as exc:
logger.warning("Bug report screenshot error: %s", exc)
pass
return task.id
except Exception as task_exc:
logger.debug("Failed to create bug report task: %s", task_exc)
return None
def _send_error_notification(exc: Exception, source: str) -> None:
"""Push a notification about the captured error (best-effort)."""
# 3. Send notification
try:
from infrastructure.notifications.push import notifier
@@ -212,12 +224,11 @@ def _send_error_notification(exc: Exception, source: str) -> None:
message=f"{type(exc).__name__} in {source}: {str(exc)[:80]}",
category="system",
)
except Exception as notify_exc:
logger.warning("Bug report notification error: %s", notify_exc)
except Exception as exc:
logger.warning("Bug report notification error: %s", exc)
pass
def _record_to_session(exc: Exception, source: str) -> None:
"""Forward the error to the registered session recorder (best-effort)."""
# 4. Record in session logger (via registered callback)
if _error_recorder is not None:
try:
_error_recorder(
@@ -227,44 +238,4 @@ def _record_to_session(exc: Exception, source: str) -> None:
except Exception as log_exc:
logger.warning("Bug report session logging error: %s", log_exc)
def capture_error(
exc: Exception,
source: str = "unknown",
context: dict | None = None,
) -> str | None:
"""Capture an error and optionally create a bug report.
Args:
exc: The exception to capture
source: Module/component where the error occurred
context: Optional dict of extra context (request path, etc.)
Returns:
Task ID of the created bug report, or None if deduplicated/disabled
"""
from config import settings
if not settings.error_feedback_enabled:
return None
error_hash = _stack_hash(exc)
if _is_duplicate(error_hash):
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
return None
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
affected_file, affected_line = _extract_origin(exc)
git_ctx = _get_git_context()
_log_error_event(exc, source, error_hash, affected_file, affected_line, git_ctx)
task_id = _create_bug_report(
exc, source, error_hash, affected_file, affected_line, git_ctx, tb_str, context
)
_send_error_notification(exc, source)
_record_to_session(exc, source)
return task_id

View File

@@ -197,16 +197,22 @@ def _resolve_backend(requested: str | None) -> str:
return "ollama"
def _build_tools_list(use_tools: bool, skip_mcp: bool, model_name: str) -> list:
"""Assemble the tools list based on model capability and MCP flags.
def _build_tools_list(use_tools: bool, skip_mcp: bool) -> list:
"""Build the Agno tools list (toolkit + optional MCP servers).
Returns a list of Toolkit / MCPTools objects, or an empty list.
Args:
use_tools: Whether the model supports tool calling.
skip_mcp: If True, omit MCP tool servers.
Returns:
List of Toolkit / MCPTools, possibly empty.
"""
if not use_tools:
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
logger.info("Tools disabled (model too small for reliable tool calling)")
return []
tools_list: list = [create_full_toolkit()]
toolkit = create_full_toolkit()
tools_list: list = [toolkit]
# Add MCP tool servers (lazy-connected on first arun()).
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
@@ -229,7 +235,15 @@ def _build_tools_list(use_tools: bool, skip_mcp: bool, model_name: str) -> list:
def _build_prompt(use_tools: bool, session_id: str) -> str:
"""Build the full system prompt with optional memory context."""
"""Build the full system prompt with optional memory context.
Args:
use_tools: Whether tools are enabled (affects prompt tier and context budget).
session_id: Session identifier for the prompt.
Returns:
Complete system prompt string.
"""
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
try:
@@ -237,7 +251,8 @@ def _build_prompt(use_tools: bool, session_id: str) -> str:
memory_context = memory_system.get_system_context()
if memory_context:
# Smaller budget for small models — expanded prompt uses more tokens
# Truncate if too long — smaller budget for small models
# since the expanded prompt (roster, guardrails) uses more tokens
max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]"
@@ -253,19 +268,29 @@ def _build_prompt(use_tools: bool, session_id: str) -> str:
def _create_ollama_agent(
*,
db_file: str,
model_name: str,
db_file: str,
tools_list: list,
full_prompt: str,
use_tools: bool,
) -> Agent:
"""Construct the Agno Agent with Ollama backend and warm up the model."""
"""Construct the Agno Agent with an Ollama model.
Args:
model_name: Resolved Ollama model name.
db_file: SQLite file for conversation memory.
tools_list: Pre-built tools list (may be empty).
full_prompt: Complete system prompt.
use_tools: Whether tools are enabled.
Returns:
Configured Agno Agent.
"""
model_kwargs = {}
if settings.ollama_num_ctx > 0:
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
agent = Agent(
return Agent(
name="Agent",
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
db=SqliteDb(db_file=db_file),
@@ -277,8 +302,6 @@ def _create_ollama_agent(
tool_call_limit=settings.max_agent_steps if use_tools else None,
telemetry=settings.telemetry_enabled,
)
_warmup_model(model_name)
return agent
def create_timmy(
@@ -338,16 +361,11 @@ def create_timmy(
logger.info("Using fallback model %s (requested was unavailable)", model_name)
use_tools = _model_supports_tools(model_name)
tools_list = _build_tools_list(use_tools, skip_mcp, model_name)
tools_list = _build_tools_list(use_tools, skip_mcp)
full_prompt = _build_prompt(use_tools, session_id)
return _create_ollama_agent(
db_file=db_file,
model_name=model_name,
tools_list=tools_list,
full_prompt=full_prompt,
use_tools=use_tools,
)
agent = _create_ollama_agent(model_name, db_file, tools_list, full_prompt, use_tools)
_warmup_model(model_name)
return agent
class TimmyWithMemory:

View File

@@ -5,14 +5,9 @@ from datetime import UTC, datetime, timedelta
from unittest.mock import patch
from infrastructure.error_capture import (
_create_bug_report,
_dedup_cache,
_extract_origin,
_get_git_context,
_is_duplicate,
_log_error_event,
_record_to_session,
_send_error_notification,
_stack_hash,
capture_error,
)
@@ -198,87 +193,3 @@ class TestCaptureError:
def teardown_method(self):
_dedup_cache.clear()
class TestExtractOrigin:
"""Test _extract_origin helper."""
def test_returns_file_and_line(self):
try:
_make_exception()
except ValueError as e:
filename, lineno = _extract_origin(e)
assert filename.endswith("test_error_capture.py")
assert lineno > 0
def test_no_traceback_returns_defaults(self):
exc = ValueError("no tb")
exc.__traceback__ = None
assert _extract_origin(exc) == ("unknown", 0)
class TestLogErrorEvent:
"""Test _log_error_event helper."""
def test_does_not_crash_when_event_log_missing(self):
try:
raise RuntimeError("log test")
except RuntimeError as e:
_log_error_event(e, "test", "abc123", "file.py", 42, {})
class TestCreateBugReport:
"""Test _create_bug_report helper."""
def test_returns_none_on_import_failure(self):
try:
raise RuntimeError("report test")
except RuntimeError as e:
with patch("infrastructure.error_capture.logger"):
result = _create_bug_report(e, "test", "abc", "f.py", 1, {}, "tb", None)
# Returns a task id or None depending on whether swarm is available
assert result is None or isinstance(result, str)
class TestSendErrorNotification:
"""Test _send_error_notification helper."""
def test_does_not_crash_on_notifier_failure(self):
try:
raise RuntimeError("notify test")
except RuntimeError as e:
_send_error_notification(e, "test")
class TestRecordToSession:
"""Test _record_to_session helper."""
def test_noop_when_no_recorder(self):
import infrastructure.error_capture as ec
original = ec._error_recorder
try:
ec._error_recorder = None
try:
raise RuntimeError("session test")
except RuntimeError as e:
_record_to_session(e, "test") # should not crash
finally:
ec._error_recorder = original
def test_calls_registered_recorder(self):
import infrastructure.error_capture as ec
original = ec._error_recorder
calls = []
try:
ec._error_recorder = lambda **kwargs: calls.append(kwargs)
try:
raise RuntimeError("recorded")
except RuntimeError as e:
_record_to_session(e, "src")
assert len(calls) == 1
assert "RuntimeError: recorded" in calls[0]["error"]
assert calls[0]["context"] == "src"
finally:
ec._error_recorder = original

View File

@@ -444,150 +444,6 @@ def test_get_effective_ollama_model_walks_fallback_chain():
assert result == "fb-2"
# ── _build_tools_list ─────────────────────────────────────────────────────
def test_build_tools_list_empty_when_tools_disabled():
"""Small models get an empty tools list."""
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=False, skip_mcp=False, model_name="llama3.2")
assert result == []
def test_build_tools_list_includes_toolkit_when_enabled():
"""Tool-capable models get the full toolkit."""
mock_toolkit = MagicMock()
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
assert mock_toolkit in result
def test_build_tools_list_skips_mcp_when_flagged():
"""skip_mcp=True must not call MCP factories."""
mock_toolkit = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools") as mock_gitea,
patch("timmy.mcp_tools.create_filesystem_mcp_tools") as mock_fs,
):
from timmy.agent import _build_tools_list
_build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
mock_gitea.assert_not_called()
mock_fs.assert_not_called()
def test_build_tools_list_includes_mcp_when_not_skipped():
"""skip_mcp=False should attempt MCP tool creation."""
mock_toolkit = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=None) as mock_gitea,
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None) as mock_fs,
):
from timmy.agent import _build_tools_list
_build_tools_list(use_tools=True, skip_mcp=False, model_name="llama3.1")
mock_gitea.assert_called_once()
mock_fs.assert_called_once()
# ── _build_prompt ─────────────────────────────────────────────────────────
def test_build_prompt_includes_base_prompt():
"""Prompt should always contain the base system prompt."""
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
assert "Timmy" in result
def test_build_prompt_appends_memory_context():
"""Memory context should be appended when available."""
mock_memory = MagicMock()
mock_memory.get_system_context.return_value = "User prefers dark mode."
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "GROUNDED CONTEXT" in result
assert "dark mode" in result
def test_build_prompt_truncates_long_memory():
"""Long memory context should be truncated."""
mock_memory = MagicMock()
mock_memory.get_system_context.return_value = "x" * 10000
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
assert "[truncated]" in result
def test_build_prompt_survives_memory_failure():
"""Prompt should fall back to base when memory fails."""
mock_memory = MagicMock()
mock_memory.get_system_context.side_effect = RuntimeError("db locked")
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "Timmy" in result
# Memory context should NOT be appended (the db locked error was caught)
assert "db locked" not in result
# ── _create_ollama_agent ──────────────────────────────────────────────────
def test_create_ollama_agent_passes_correct_kwargs():
"""_create_ollama_agent must pass the expected kwargs to Agent."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
patch("timmy.agent._warmup_model", return_value=True),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
db_file="test.db",
model_name="llama3.1",
tools_list=[MagicMock()],
full_prompt="test prompt",
use_tools=True,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["description"] == "test prompt"
assert kwargs["markdown"] is False
def test_create_ollama_agent_none_tools_when_empty():
"""Empty tools_list should pass tools=None to Agent."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
patch("timmy.agent._warmup_model", return_value=True),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
db_file="test.db",
model_name="llama3.2",
tools_list=[],
full_prompt="test prompt",
use_tools=False,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["tools"] is None
def test_no_hardcoded_fallback_constants_in_agent():
"""agent.py must not define module-level DEFAULT_MODEL_FALLBACKS."""
import timmy.agent as agent_mod
@@ -598,3 +454,127 @@ def test_no_hardcoded_fallback_constants_in_agent():
assert not hasattr(agent_mod, "VISION_MODEL_FALLBACKS"), (
"Hardcoded VISION_MODEL_FALLBACKS still exists — use settings.vision_fallback_models"
)
# ── _build_tools_list helper ─────────────────────────────────────────────────
def test_build_tools_list_returns_empty_when_no_tools():
"""When use_tools=False, _build_tools_list returns an empty list."""
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=False, skip_mcp=False)
assert result == []
def test_build_tools_list_includes_toolkit():
"""When use_tools=True, _build_tools_list includes the toolkit."""
mock_toolkit = MagicMock()
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=True)
assert mock_toolkit in result
def test_build_tools_list_adds_mcp_when_not_skipped():
"""When skip_mcp=False, _build_tools_list attempts MCP tools."""
mock_toolkit = MagicMock()
mock_gitea = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=mock_gitea),
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None),
):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=False)
assert mock_toolkit in result
assert mock_gitea in result
# ── _build_prompt helper ─────────────────────────────────────────────────────
def test_build_prompt_returns_base_when_no_memory():
"""_build_prompt returns base prompt when memory context is empty."""
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = ""
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "Timmy" in result
def test_build_prompt_appends_memory_context():
"""_build_prompt appends memory context when available."""
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = "User likes pizza"
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "User likes pizza" in result
assert "GROUNDED CONTEXT" in result
def test_build_prompt_truncates_long_memory_for_small_models():
"""_build_prompt truncates memory for small models (use_tools=False)."""
long_context = "x" * 5000
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = long_context
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
# Max context is 2000 for small models + truncation marker
assert "[truncated]" in result
# ── _create_ollama_agent helper ──────────────────────────────────────────────
def test_create_ollama_agent_passes_correct_kwargs():
"""_create_ollama_agent passes the expected kwargs to Agent()."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
model_name="test-model",
db_file="test.db",
tools_list=[MagicMock()],
full_prompt="Test prompt",
use_tools=True,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["description"] == "Test prompt"
assert kwargs["tools"] is not None
def test_create_ollama_agent_none_tools_when_empty():
"""_create_ollama_agent passes tools=None when tools_list is empty."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
model_name="test-model",
db_file="test.db",
tools_list=[],
full_prompt="Test prompt",
use_tools=False,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["tools"] is None