Files
Timmy-time-dashboard/tests/conftest.py
Claude (Opus 4.6) 7f875398fc
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
[claude] Add sovereignty metrics tracking + dashboard panel (#981) (#1083)
2026-03-23 14:09:03 +00:00

254 lines
7.8 KiB
Python

"""Pytest configuration and fixtures for the test suite."""
import os
import sqlite3
import sys
from unittest.mock import MagicMock
import pytest
# Import pytest marker configuration
try:
from . import conftest_markers # noqa: F401
except ImportError:
import conftest_markers # noqa: F401
# ── Stub heavy optional dependencies so unit tests run without them ────────────
# Only stub truly optional packages that may not be installed.
# agno is a core dependency (always installed) — do NOT stub it, or its
# internal import chains break under xdist parallel workers.
for _mod in [
"mcp",
"mcp.client",
"mcp.client.stdio",
"mcp.registry",
"telegram",
"telegram.ext",
"discord",
"discord.ext",
"discord.ext.commands",
"pyzbar",
"pyzbar.pyzbar",
"pyttsx3",
"sentence_transformers",
"swarm",
"swarm.event_log",
]:
sys.modules.setdefault(_mod, MagicMock())
# agno.tools.mcp requires the real mcp package; stub the sub-module so
# patch("agno.tools.mcp.MCPTools") resolves without installing mcp.
if "agno.tools.mcp" not in sys.modules:
_agno_mcp_stub = MagicMock()
sys.modules["agno.tools.mcp"] = _agno_mcp_stub
# mcp.registry needs a tool_registry with get_handler (used by timmy.agents.base)
_mcp_reg = sys.modules.get("mcp.registry")
if _mcp_reg is not None and not hasattr(_mcp_reg, "tool_registry"):
_mock_tool_reg = MagicMock()
_mock_tool_reg.get_handler.return_value = None
_mcp_reg.tool_registry = _mock_tool_reg
# ── Test mode setup ──────────────────────────────────────────────────────────
os.environ["TIMMY_TEST_MODE"] = "1"
os.environ["TIMMY_DISABLE_CSRF"] = "1"
os.environ["TIMMY_SKIP_EMBEDDINGS"] = "1"
@pytest.fixture(autouse=True)
def reset_message_log(tmp_path):
"""Redirect chat DB to temp dir and clear before/after every test."""
import dashboard.store as _store_mod
original_db_path = _store_mod.DB_PATH
tmp_chat_db = tmp_path / "chat.db"
_store_mod.DB_PATH = tmp_chat_db
# Close existing singleton connection and point it at tmp DB
_store_mod.message_log.close()
_store_mod.message_log._db_path = tmp_chat_db
_store_mod.message_log._conn = None
_store_mod.message_log.clear()
yield
_store_mod.message_log.clear()
_store_mod.message_log.close()
_store_mod.DB_PATH = original_db_path
_store_mod.message_log._db_path = original_db_path
_store_mod.message_log._conn = None
@pytest.fixture(autouse=True)
def clean_database(tmp_path):
"""Clean up database tables between tests for isolation.
Redirects every module-level DB_PATH to the per-test temp directory.
"""
tmp_swarm_db = tmp_path / "swarm.db"
tmp_spark_db = tmp_path / "spark.db"
tmp_self_coding_db = tmp_path / "self_coding.db"
tmp_tasks_db = tmp_path / "tasks.db"
tmp_work_orders_db = tmp_path / "work_orders.db"
_swarm_db_modules = [
"infrastructure.models.registry",
]
_memory_db_modules = [
"timmy.memory_system", # Canonical location
"timmy.memory.unified", # Backward compat
]
_spark_db_modules = [
"spark.memory",
"spark.eidos",
]
_self_coding_db_modules = []
tmp_memory_db = tmp_path / "memory.db"
originals = {}
for mod_name in _swarm_db_modules:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
attr = "DB_PATH"
originals[(mod_name, attr)] = getattr(mod, attr)
setattr(mod, attr, tmp_swarm_db)
except Exception:
pass
for mod_name in _memory_db_modules:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
originals[(mod_name, "DB_PATH")] = mod.DB_PATH
mod.DB_PATH = tmp_memory_db
except Exception:
pass
# Note: semantic_memory now re-exports from memory_system,
# so DB_PATH is already patched via _memory_db_modules above
for mod_name in _spark_db_modules:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
originals[(mod_name, "DB_PATH")] = mod.DB_PATH
mod.DB_PATH = tmp_spark_db
except Exception:
pass
for mod_name in _self_coding_db_modules:
try:
mod = __import__(mod_name, fromlist=["DEFAULT_DB_PATH"])
originals[(mod_name, "DEFAULT_DB_PATH")] = mod.DEFAULT_DB_PATH
mod.DEFAULT_DB_PATH = tmp_self_coding_db
except Exception:
pass
# Redirect task queue and work orders DBs to temp dir.
# IMPORTANT: swarm.task_queue.models also has a DB_PATH that writes to
# tasks.db — it MUST be patched too, or error_capture.capture_error()
# will write test data to the production database.
tmp_sovereignty_db = tmp_path / "sovereignty_metrics.db"
for mod_name, tmp_db in [
("dashboard.routes.tasks", tmp_tasks_db),
("dashboard.routes.work_orders", tmp_work_orders_db),
("swarm.task_queue.models", tmp_tasks_db),
("infrastructure.sovereignty_metrics", tmp_sovereignty_db),
]:
try:
mod = __import__(mod_name, fromlist=["DB_PATH"])
originals[(mod_name, "DB_PATH")] = mod.DB_PATH
mod.DB_PATH = tmp_db
except Exception:
pass
yield
for (mod_name, attr), original in originals.items():
try:
mod = __import__(mod_name, fromlist=[attr])
setattr(mod, attr, original)
except Exception:
pass
@pytest.fixture(autouse=True)
def cleanup_event_loops():
"""Clean up any leftover event loops after each test."""
import asyncio
import warnings
yield
try:
try:
loop = asyncio.get_running_loop()
return
except RuntimeError:
pass
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
loop = asyncio.get_event_loop_policy().get_event_loop()
if loop and not loop.is_closed():
loop.close()
except RuntimeError:
pass
@pytest.fixture
def client():
"""FastAPI test client with fresh app instance."""
from fastapi.testclient import TestClient
from dashboard.app import app
with TestClient(app) as c:
yield c
@pytest.fixture
def db_connection():
"""Provide a fresh in-memory SQLite connection for tests."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript("""
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'idle',
capabilities TEXT DEFAULT '',
registered_at TEXT NOT NULL,
last_seen TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
description TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
assigned_agent TEXT,
result TEXT,
created_at TEXT NOT NULL,
completed_at TEXT
);
""")
conn.commit()
yield conn
conn.close()
@pytest.fixture
def mock_ollama_client():
"""Provide a mock Ollama client for unit tests."""
client = MagicMock()
client.generate = MagicMock(return_value={"response": "Test response"})
client.chat = MagicMock(return_value={"message": {"content": "Test chat response"}})
client.list = MagicMock(return_value={"models": [{"name": "llama3.2"}]})
return client
@pytest.fixture
def mock_timmy_agent():
"""Provide a mock Timmy agent for testing."""
agent = MagicMock()
agent.name = "Timmy"
agent.run = MagicMock(return_value="Test response from Timmy")
agent.chat = MagicMock(return_value="Test chat response")
return agent