Files
Timmy-time-dashboard/tests/conftest.py
Trip T bd1aa55904
All checks were successful
Tests / lint (pull_request) Successful in 23s
Tests / test (pull_request) Successful in 32s
fix: use StdioServerParameters to bypass Agno executable whitelist
Agno's MCPTools has an undocumented executable whitelist that blocks
gitea-mcp (Go binary). Switch to server_params=StdioServerParameters()
which bypasses this restriction. Also fixes:

- Use tools.session.call_tool() for standalone invocation (MCPTools
  doesn't expose call_tool() directly)
- Use close() instead of disconnect() for cleanup
- Resolve gitea-mcp path via ~/go/bin fallback when not on PATH
- Stub mcp.client.stdio in test conftest

Smoke-tested end-to-end against real Gitea: connect, list_issues,
create issue, close issue, create_gitea_issue_via_mcp — all pass.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 22:03:45 -04:00

242 lines
7.3 KiB
Python

"""Pytest configuration and fixtures for the test suite."""
import os
import sqlite3
import sys
from unittest.mock import MagicMock
import pytest
# Import pytest marker configuration
try:
from . import conftest_markers # noqa: F401
except ImportError:
import conftest_markers # noqa: F401
# ── Stub heavy optional dependencies so unit tests run without them ────────────
# Only stub truly optional packages that may not be installed.
# agno is a core dependency (always installed) — do NOT stub it, or its
# internal import chains break under xdist parallel workers.
for _mod in [
"airllm",
"mcp",
"mcp.client",
"mcp.client.stdio",
"mcp.registry",
"telegram",
"telegram.ext",
"discord",
"discord.ext",
"discord.ext.commands",
"pyzbar",
"pyzbar.pyzbar",
"pyttsx3",
"sentence_transformers",
]:
sys.modules.setdefault(_mod, MagicMock())
# agno.tools.mcp requires the real mcp package; stub the sub-module so
# patch("agno.tools.mcp.MCPTools") resolves without installing mcp.
if "agno.tools.mcp" not in sys.modules:
_agno_mcp_stub = MagicMock()
sys.modules["agno.tools.mcp"] = _agno_mcp_stub
# mcp.registry needs a tool_registry with get_handler (used by timmy.agents.base)
_mcp_reg = sys.modules.get("mcp.registry")
if _mcp_reg is not None and not hasattr(_mcp_reg, "tool_registry"):
_mock_tool_reg = MagicMock()
_mock_tool_reg.get_handler.return_value = None
_mcp_reg.tool_registry = _mock_tool_reg
# ── Test mode setup ──────────────────────────────────────────────────────────
os.environ["TIMMY_TEST_MODE"] = "1"
os.environ["TIMMY_DISABLE_CSRF"] = "1"
os.environ["TIMMY_SKIP_EMBEDDINGS"] = "1"
@pytest.fixture(autouse=True)
def reset_message_log():
"""Clear the in-memory chat log before and after every test."""
from dashboard.store import message_log
message_log.clear()
yield
message_log.clear()
@pytest.fixture(autouse=True)
def clean_database(tmp_path):
"""Clean up database tables between tests for isolation.
Redirects every module-level DB_PATH to the per-test temp directory.
"""
tmp_swarm_db = tmp_path / "swarm.db"
tmp_spark_db = tmp_path / "spark.db"
tmp_self_coding_db = tmp_path / "self_coding.db"
tmp_tasks_db = tmp_path / "tasks.db"
tmp_work_orders_db = tmp_path / "work_orders.db"
_swarm_db_modules = [
"infrastructure.models.registry",
]
_memory_db_modules = [
"timmy.memory.unified",
]
_spark_db_modules = [
"spark.memory",
"spark.eidos",
]
_self_coding_db_modules = []
tmp_memory_db = tmp_path / "memory.db"
originals = {}
for mod_name in _swarm_db_modules:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
attr = "DB_PATH"
originals[(mod_name, attr)] = getattr(mod, attr)
setattr(mod, attr, tmp_swarm_db)
except Exception:
pass
for mod_name in _memory_db_modules:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
originals[(mod_name, "DB_PATH")] = mod.DB_PATH
mod.DB_PATH = tmp_memory_db
except Exception:
pass
# Redirect semantic memory DB path (uses SEMANTIC_DB_PATH, not DB_PATH)
try:
import timmy.semantic_memory as _sem_mod
originals[("timmy.semantic_memory", "SEMANTIC_DB_PATH")] = _sem_mod.SEMANTIC_DB_PATH
_sem_mod.SEMANTIC_DB_PATH = tmp_memory_db
except Exception:
pass
for mod_name in _spark_db_modules:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
originals[(mod_name, "DB_PATH")] = mod.DB_PATH
mod.DB_PATH = tmp_spark_db
except Exception:
pass
for mod_name in _self_coding_db_modules:
try:
mod = __import__(mod_name, fromlist=["DEFAULT_DB_PATH"])
originals[(mod_name, "DEFAULT_DB_PATH")] = mod.DEFAULT_DB_PATH
mod.DEFAULT_DB_PATH = tmp_self_coding_db
except Exception:
pass
# Redirect task queue and work orders DBs to temp dir.
# IMPORTANT: swarm.task_queue.models also has a DB_PATH that writes to
# tasks.db — it MUST be patched too, or error_capture.capture_error()
# will write test data to the production database.
for mod_name, tmp_db in [
("dashboard.routes.tasks", tmp_tasks_db),
("dashboard.routes.work_orders", tmp_work_orders_db),
("swarm.task_queue.models", tmp_tasks_db),
]:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
originals[(mod_name, "DB_PATH")] = mod.DB_PATH
mod.DB_PATH = tmp_db
except Exception:
pass
yield
for (mod_name, attr), original in originals.items():
try:
mod = __import__(mod_name, fromlist=[attr])
setattr(mod, attr, original)
except Exception:
pass
@pytest.fixture(autouse=True)
def cleanup_event_loops():
"""Clean up any leftover event loops after each test."""
import asyncio
import warnings
yield
try:
try:
loop = asyncio.get_running_loop()
return
except RuntimeError:
pass
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
loop = asyncio.get_event_loop_policy().get_event_loop()
if loop and not loop.is_closed():
loop.close()
except RuntimeError:
pass
@pytest.fixture
def client():
"""FastAPI test client with fresh app instance."""
from fastapi.testclient import TestClient
from dashboard.app import app
with TestClient(app) as c:
yield c
@pytest.fixture
def db_connection():
"""Provide a fresh in-memory SQLite connection for tests."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript("""
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'idle',
capabilities TEXT DEFAULT '',
registered_at TEXT NOT NULL,
last_seen TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
description TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
assigned_agent TEXT,
result TEXT,
created_at TEXT NOT NULL,
completed_at TEXT
);
""")
conn.commit()
yield conn
conn.close()
@pytest.fixture
def mock_ollama_client():
"""Provide a mock Ollama client for unit tests."""
client = MagicMock()
client.generate = MagicMock(return_value={"response": "Test response"})
client.chat = MagicMock(return_value={"message": {"content": "Test chat response"}})
client.list = MagicMock(return_value={"models": [{"name": "llama3.2"}]})
return client
@pytest.fixture
def mock_timmy_agent():
"""Provide a mock Timmy agent for testing."""
agent = MagicMock()
agent.name = "Timmy"
agent.run = MagicMock(return_value="Test response from Timmy")
agent.chat = MagicMock(return_value="Test chat response")
return agent