Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
426 lines
15 KiB
Python
426 lines
15 KiB
Python
"""Tests for the MCP tools module (factories + issue bridge)."""
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from timmy.mcp_tools import (
|
|
_bridge_to_work_order,
|
|
_generate_avatar_image,
|
|
_parse_command,
|
|
close_mcp_sessions,
|
|
create_filesystem_mcp_tools,
|
|
create_gitea_issue_via_mcp,
|
|
create_gitea_mcp_tools,
|
|
update_gitea_avatar,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _parse_command
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_parse_command_splits_correctly():
|
|
"""_parse_command splits a command string into executable and args."""
|
|
with patch("timmy.mcp_tools.shutil.which", return_value="/usr/local/bin/gitea-mcp"):
|
|
exe, args = _parse_command("gitea-mcp -t stdio")
|
|
assert exe == "/usr/local/bin/gitea-mcp"
|
|
assert args == ["-t", "stdio"]
|
|
|
|
|
|
def test_parse_command_expands_tilde():
|
|
"""_parse_command expands ~/."""
|
|
with patch("timmy.mcp_tools.shutil.which", return_value=None):
|
|
exe, args = _parse_command("~/go/bin/gitea-mcp -t stdio")
|
|
assert "/go/bin/gitea-mcp" in exe
|
|
assert "~" not in exe
|
|
assert args == ["-t", "stdio"]
|
|
|
|
|
|
def test_parse_command_preserves_absolute_path():
|
|
"""_parse_command preserves an absolute path without calling which."""
|
|
exe, args = _parse_command("/usr/local/bin/gitea-mcp -t stdio")
|
|
assert exe == "/usr/local/bin/gitea-mcp"
|
|
assert args == ["-t", "stdio"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# create_gitea_mcp_tools
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_gitea_mcp_returns_none_when_disabled():
|
|
"""Gitea MCP factory returns None when gitea_enabled=False."""
|
|
with patch("timmy.mcp_tools.settings") as mock_settings:
|
|
mock_settings.gitea_enabled = False
|
|
mock_settings.gitea_token = "some-token"
|
|
result = create_gitea_mcp_tools()
|
|
assert result is None
|
|
|
|
|
|
def test_gitea_mcp_returns_none_when_no_token():
|
|
"""Gitea MCP factory returns None when gitea_token is empty."""
|
|
with patch("timmy.mcp_tools.settings") as mock_settings:
|
|
mock_settings.gitea_enabled = True
|
|
mock_settings.gitea_token = ""
|
|
result = create_gitea_mcp_tools()
|
|
assert result is None
|
|
|
|
|
|
def test_gitea_mcp_returns_tools_when_configured():
|
|
"""Gitea MCP factory returns an MCPTools instance using server_params."""
|
|
mock_mcp = MagicMock()
|
|
mock_params = MagicMock()
|
|
with (
|
|
patch("timmy.mcp_tools.settings") as mock_settings,
|
|
patch("agno.tools.mcp.MCPTools", return_value=mock_mcp) as mock_cls,
|
|
patch("timmy.mcp_tools._gitea_server_params", return_value=mock_params),
|
|
):
|
|
mock_settings.gitea_enabled = True
|
|
mock_settings.gitea_token = "tok123"
|
|
mock_settings.mcp_timeout = 15
|
|
result = create_gitea_mcp_tools()
|
|
|
|
assert result is mock_mcp
|
|
mock_cls.assert_called_once()
|
|
call_kwargs = mock_cls.call_args[1]
|
|
assert call_kwargs["server_params"] is mock_params
|
|
assert "command" not in call_kwargs
|
|
assert "issue_write" in call_kwargs["include_tools"]
|
|
assert "pull_request_write" in call_kwargs["include_tools"]
|
|
|
|
|
|
def test_gitea_mcp_graceful_on_import_error():
|
|
"""Gitea MCP factory returns None if agno.tools.mcp isn't available."""
|
|
with (
|
|
patch("timmy.mcp_tools.settings") as mock_settings,
|
|
patch.dict("sys.modules", {"agno.tools.mcp": None}),
|
|
):
|
|
mock_settings.gitea_enabled = True
|
|
mock_settings.gitea_token = "tok123"
|
|
# This should gracefully return None (import will fail)
|
|
result = create_gitea_mcp_tools()
|
|
assert result is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# create_filesystem_mcp_tools
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_filesystem_mcp_returns_tools():
|
|
"""Filesystem MCP factory returns an MCPTools instance using server_params."""
|
|
mock_mcp = MagicMock()
|
|
mock_params_cls = MagicMock()
|
|
with (
|
|
patch("timmy.mcp_tools.settings") as mock_settings,
|
|
patch("agno.tools.mcp.MCPTools", return_value=mock_mcp) as mock_cls,
|
|
patch("mcp.client.stdio.StdioServerParameters", mock_params_cls),
|
|
patch("timmy.mcp_tools.shutil.which", return_value="/usr/local/bin/npx"),
|
|
):
|
|
mock_settings.mcp_filesystem_command = "npx -y @modelcontextprotocol/server-filesystem"
|
|
mock_settings.repo_root = "/home/user/project"
|
|
mock_settings.mcp_timeout = 15
|
|
result = create_filesystem_mcp_tools()
|
|
|
|
assert result is mock_mcp
|
|
call_kwargs = mock_cls.call_args[1]
|
|
assert "server_params" in call_kwargs
|
|
assert "read_file" in call_kwargs["include_tools"]
|
|
# Verify StdioServerParameters was called with repo_root as an arg
|
|
params_kwargs = mock_params_cls.call_args[1]
|
|
assert "/home/user/project" in params_kwargs["args"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# create_gitea_issue_via_mcp
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_issue_via_mcp_returns_not_configured():
|
|
"""Issue creation returns message when Gitea is not configured."""
|
|
with patch("timmy.mcp_tools.settings") as mock_settings:
|
|
mock_settings.gitea_enabled = False
|
|
mock_settings.gitea_token = ""
|
|
result = await create_gitea_issue_via_mcp("Test", "Body")
|
|
assert "not configured" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_issue_via_mcp_calls_tool():
|
|
"""Issue creation calls session.call_tool with correct arguments."""
|
|
import timmy.mcp_tools as mcp_mod
|
|
|
|
# Mock the inner MCP session (tools.session.call_tool)
|
|
mock_inner_session = MagicMock()
|
|
mock_inner_session.call_tool = AsyncMock(return_value="Issue #42 created")
|
|
|
|
mock_tools = MagicMock()
|
|
mock_tools.connect = AsyncMock()
|
|
mock_tools.session = mock_inner_session
|
|
mock_tools._connected = False
|
|
|
|
mock_params = MagicMock()
|
|
|
|
with (
|
|
patch("timmy.mcp_tools.settings") as mock_settings,
|
|
patch("agno.tools.mcp.MCPTools", return_value=mock_tools),
|
|
patch("timmy.mcp_tools._gitea_server_params", return_value=mock_params),
|
|
):
|
|
mock_settings.gitea_enabled = True
|
|
mock_settings.gitea_token = "tok123"
|
|
mock_settings.gitea_repo = "owner/repo"
|
|
mock_settings.gitea_url = "http://localhost:3000"
|
|
mock_settings.mcp_timeout = 15
|
|
mock_settings.repo_root = "/tmp/test"
|
|
|
|
# Reset module-level cache
|
|
mcp_mod._issue_session = None
|
|
|
|
result = await create_gitea_issue_via_mcp("Bug title", "Bug body", "bug")
|
|
|
|
assert "Bug title" in result
|
|
mock_tools.connect.assert_awaited_once()
|
|
# Verify it calls session.call_tool (not tools.call_tool)
|
|
mock_inner_session.call_tool.assert_awaited_once()
|
|
call_args = mock_inner_session.call_tool.call_args
|
|
assert call_args[0][0] == "issue_write"
|
|
assert call_args[1]["arguments"]["method"] == "create"
|
|
assert call_args[1]["arguments"]["owner"] == "owner"
|
|
assert call_args[1]["arguments"]["repo"] == "repo"
|
|
|
|
# Clean up
|
|
mcp_mod._issue_session = None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_issue_via_mcp_graceful_failure():
|
|
"""Issue creation returns error string on MCP failure."""
|
|
import timmy.mcp_tools as mcp_mod
|
|
|
|
mock_tools = MagicMock()
|
|
mock_tools.connect = AsyncMock(side_effect=ConnectionError("no process"))
|
|
mock_tools._connected = False
|
|
|
|
mock_params = MagicMock()
|
|
|
|
with (
|
|
patch("timmy.mcp_tools.settings") as mock_settings,
|
|
patch("agno.tools.mcp.MCPTools", return_value=mock_tools),
|
|
patch("timmy.mcp_tools._gitea_server_params", return_value=mock_params),
|
|
):
|
|
mock_settings.gitea_enabled = True
|
|
mock_settings.gitea_token = "tok123"
|
|
mock_settings.gitea_repo = "owner/repo"
|
|
mock_settings.gitea_url = "http://localhost:3000"
|
|
mock_settings.mcp_timeout = 15
|
|
mock_settings.repo_root = "/tmp/test"
|
|
|
|
mcp_mod._issue_session = None
|
|
|
|
result = await create_gitea_issue_via_mcp("Test", "Body")
|
|
|
|
assert "Failed" in result
|
|
mcp_mod._issue_session = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _bridge_to_work_order
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_bridge_to_work_order(tmp_path):
|
|
"""Work order bridge creates a record in SQLite."""
|
|
import sqlite3
|
|
|
|
with patch("timmy.mcp_tools.settings") as mock_settings:
|
|
mock_settings.repo_root = str(tmp_path)
|
|
_bridge_to_work_order("Test title", "Test body", "bug")
|
|
|
|
db_path = tmp_path / "data" / "work_orders.db"
|
|
assert db_path.exists()
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
rows = conn.execute("SELECT title, category FROM work_orders").fetchall()
|
|
conn.close()
|
|
assert len(rows) == 1
|
|
assert rows[0][0] == "Test title"
|
|
assert rows[0][1] == "bug"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# close_mcp_sessions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_close_mcp_sessions():
|
|
"""close_mcp_sessions closes the cached session."""
|
|
import timmy.mcp_tools as mcp_mod
|
|
|
|
mock_session = MagicMock()
|
|
mock_session.close = AsyncMock()
|
|
mcp_mod._issue_session = mock_session
|
|
|
|
await close_mcp_sessions()
|
|
|
|
mock_session.close.assert_awaited_once()
|
|
assert mcp_mod._issue_session is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_close_mcp_sessions_noop_when_none():
|
|
"""close_mcp_sessions is a no-op when no session exists."""
|
|
import timmy.mcp_tools as mcp_mod
|
|
|
|
mcp_mod._issue_session = None
|
|
await close_mcp_sessions() # Should not raise
|
|
assert mcp_mod._issue_session is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool safety integration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_mcp_tools_classified_in_safety():
|
|
"""MCP tool names are correctly classified in tool_safety."""
|
|
from timmy.tool_safety import DANGEROUS_TOOLS, SAFE_TOOLS, requires_confirmation
|
|
|
|
# Gitea MCP tools should be safe
|
|
assert "issue_write" in SAFE_TOOLS
|
|
assert "list_issues" in SAFE_TOOLS
|
|
assert "pull_request_write" in SAFE_TOOLS
|
|
|
|
# Filesystem read-only MCP tools should be safe
|
|
assert "list_directory" in SAFE_TOOLS
|
|
assert "search_files" in SAFE_TOOLS
|
|
|
|
# write_file is dangerous (filesystem MCP)
|
|
assert "write_file" in DANGEROUS_TOOLS
|
|
|
|
# Verify requires_confirmation logic
|
|
assert not requires_confirmation("issue_write")
|
|
assert not requires_confirmation("list_directory")
|
|
assert requires_confirmation("write_file")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# update_gitea_avatar
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_generate_avatar_image_returns_png():
|
|
"""_generate_avatar_image returns valid PNG bytes."""
|
|
pytest.importorskip("PIL")
|
|
data = _generate_avatar_image()
|
|
assert isinstance(data, bytes)
|
|
assert len(data) > 0
|
|
# PNG magic bytes
|
|
assert data[:4] == b"\x89PNG"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_avatar_not_configured():
|
|
"""update_gitea_avatar returns message when Gitea is disabled."""
|
|
with patch("timmy.mcp_tools.settings") as mock_settings:
|
|
mock_settings.gitea_enabled = False
|
|
mock_settings.gitea_token = ""
|
|
result = await update_gitea_avatar()
|
|
assert "not configured" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_avatar_success():
|
|
"""update_gitea_avatar uploads avatar and returns success."""
|
|
import sys
|
|
|
|
import timmy.mcp_tools as mcp_mod
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 204
|
|
mock_response.text = ""
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.post = AsyncMock(return_value=mock_response)
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
# Ensure PIL import check passes even if Pillow isn't installed
|
|
pil_stub = MagicMock()
|
|
with (
|
|
patch("timmy.mcp_tools.settings") as mock_settings,
|
|
patch.object(mcp_mod.httpx, "AsyncClient", return_value=mock_client),
|
|
patch("timmy.mcp_tools._generate_avatar_image", return_value=b"\x89PNG fake"),
|
|
patch.dict(sys.modules, {"PIL": pil_stub, "PIL.Image": pil_stub}),
|
|
):
|
|
mock_settings.gitea_enabled = True
|
|
mock_settings.gitea_token = "tok123"
|
|
mock_settings.gitea_url = "http://localhost:3000"
|
|
result = await update_gitea_avatar()
|
|
|
|
assert "successfully" in result
|
|
mock_client.post.assert_awaited_once()
|
|
call_args = mock_client.post.call_args
|
|
assert "/api/v1/user/avatar" in call_args[0][0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_avatar_api_failure():
|
|
"""update_gitea_avatar handles HTTP error gracefully."""
|
|
import sys
|
|
|
|
import timmy.mcp_tools as mcp_mod
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 400
|
|
mock_response.text = "bad request"
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.post = AsyncMock(return_value=mock_response)
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
pil_stub = MagicMock()
|
|
with (
|
|
patch("timmy.mcp_tools.settings") as mock_settings,
|
|
patch.object(mcp_mod.httpx, "AsyncClient", return_value=mock_client),
|
|
patch("timmy.mcp_tools._generate_avatar_image", return_value=b"\x89PNG fake"),
|
|
patch.dict(sys.modules, {"PIL": pil_stub, "PIL.Image": pil_stub}),
|
|
):
|
|
mock_settings.gitea_enabled = True
|
|
mock_settings.gitea_token = "tok123"
|
|
mock_settings.gitea_url = "http://localhost:3000"
|
|
result = await update_gitea_avatar()
|
|
|
|
assert "failed" in result.lower()
|
|
assert "400" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_avatar_connection_error():
|
|
"""update_gitea_avatar handles connection errors gracefully."""
|
|
import sys
|
|
|
|
import timmy.mcp_tools as mcp_mod
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.post = AsyncMock(side_effect=mcp_mod.httpx.ConnectError("refused"))
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
pil_stub = MagicMock()
|
|
with (
|
|
patch("timmy.mcp_tools.settings") as mock_settings,
|
|
patch.object(mcp_mod.httpx, "AsyncClient", return_value=mock_client),
|
|
patch("timmy.mcp_tools._generate_avatar_image", return_value=b"\x89PNG fake"),
|
|
patch.dict(sys.modules, {"PIL": pil_stub, "PIL.Image": pil_stub}),
|
|
):
|
|
mock_settings.gitea_enabled = True
|
|
mock_settings.gitea_token = "tok123"
|
|
mock_settings.gitea_url = "http://localhost:3000"
|
|
result = await update_gitea_avatar()
|
|
|
|
assert "connect" in result.lower()
|