diff --git a/MULTIMODAL_BACKLOG.md b/MULTIMODAL_BACKLOG.md new file mode 100644 index 0000000..5bc50a3 --- /dev/null +++ b/MULTIMODAL_BACKLOG.md @@ -0,0 +1,29 @@ +# Gemma 4 Multimodal Backlog + +## Epic 1: Visual QA for Nexus World +- **Goal:** Use Gemma 4's vision to audit screenshots of the Three.js Nexus world for layout inconsistencies and UI bugs. +- **Tasks:** + - [x] Capture automated screenshots of all primary Nexus zones. + - [ ] Analyze images for clipping, overlapping UI elements, and lighting glitches. + - [ ] Generate a structured bug report with coordinates and suggested fixes. + +## Epic 2: The Testament Visual Consistency Audit +- **Goal:** Ensure the generated image assets for The Testament align with the narrative mood and visual manifest. +- **Tasks:** + - [ ] Compare generated assets against `visual_manifest.json` descriptions. + - [ ] Flag images that diverge from the "Cinematic Noir, 35mm, high contrast" aesthetic. + - [ ] Refine prompts for divergent beats and trigger re-renders. + +## Epic 3: Sovereign Heart Emotive Stillness +- **Goal:** Develop a system for selecting the most emotive static image based on the sentiment of generated TTS. +- **Tasks:** + - [ ] Analyze TTS output for emotional valence and arousal. + - [ ] Map sentiment kernels to the visual asset library. + - [ ] Implement a "breathing" transition logic between assets for an expressive presence. + +## Epic 4: Multimodal Architecture Synthesis +- **Goal:** Extract and synthesize architectural patterns from visual research papers. +- **Tasks:** + - [ ] Ingest PDF research papers on agentic workflows. + - [ ] Analyze diagrams and charts to extract structural logic. + - [ ] Synthesize findings into `Sovereign_Knowledge_Graph.md`. diff --git a/scripts/capture_nexus_screenshots.py b/scripts/capture_nexus_screenshots.py new file mode 100644 index 0000000..dda0350 --- /dev/null +++ b/scripts/capture_nexus_screenshots.py @@ -0,0 +1,283 @@ +#!/usr/bin/env python3 +"""Capture automated screenshots of all primary Nexus zones. + +Part of Epic 1: Visual QA for Nexus World. +Uses Selenium + Chrome headless to navigate each dashboard zone and +save full-page screenshots for visual audit. + +Usage: + # Start the dashboard first (in another terminal): + PYTHONPATH=src python3 -m uvicorn dashboard.app:app --host 127.0.0.1 --port 8000 + + # Then run this script: + python3 scripts/capture_nexus_screenshots.py [--base-url http://127.0.0.1:8000] [--output-dir data/nexus_screenshots] + +Requirements: + pip install selenium Pillow + Chrome/Chromium browser installed +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +from datetime import datetime, timezone +from pathlib import Path + +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.common.by import By +from selenium.common.exceptions import ( + TimeoutException, + WebDriverException, +) + +# ── Primary Nexus Zones ────────────────────────────────────────────────────── +# These are the main HTML page routes of the Timmy dashboard. +# API endpoints, HTMX partials, and WebSocket routes are excluded. + +PRIMARY_ZONES: list[dict] = [ + {"path": "/", "name": "landing", "description": "Public landing page"}, + {"path": "/dashboard", "name": "dashboard", "description": "Main mission control dashboard"}, + {"path": "/nexus", "name": "nexus", "description": "Nexus conversational awareness space"}, + {"path": "/agents", "name": "agents", "description": "Agent management panel"}, + {"path": "/briefing", "name": "briefing", "description": "Daily briefing view"}, + {"path": "/calm", "name": "calm", "description": "Calm ritual space"}, + {"path": "/thinking", "name": "thinking", "description": "Thinking engine visualization"}, + {"path": "/memory", "name": "memory", "description": "Memory system explorer"}, + {"path": "/tasks", "name": "tasks", "description": "Task management"}, + {"path": "/experiments", "name": "experiments", "description": "Experiments dashboard"}, + {"path": "/monitoring", "name": "monitoring", "description": "System monitoring"}, + {"path": "/tower", "name": "tower", "description": "Tower world view"}, + {"path": "/tools", "name": "tools", "description": "Tools overview"}, + {"path": "/voice/settings", "name": "voice-settings", "description": "Voice/TTS settings"}, + {"path": "/scorecards", "name": "scorecards", "description": "Agent scorecards"}, + {"path": "/quests", "name": "quests", "description": "Quest tracking"}, + {"path": "/spark", "name": "spark", "description": "Spark intelligence UI"}, + {"path": "/self-correction/ui", "name": "self-correction", "description": "Self-correction interface"}, + {"path": "/energy/report", "name": "energy", "description": "Energy management report"}, + {"path": "/creative/ui", "name": "creative", "description": "Creative generation UI"}, + {"path": "/mobile", "name": "mobile", "description": "Mobile companion view"}, + {"path": "/db-explorer", "name": "db-explorer", "description": "Database explorer"}, + {"path": "/bugs", "name": "bugs", "description": "Bug tracker"}, + {"path": "/self-coding", "name": "self-coding", "description": "Self-coding interface"}, +] + +# ── Defaults ───────────────────────────────────────────────────────────────── + +DEFAULT_BASE_URL = "http://127.0.0.1:8000" +DEFAULT_OUTPUT_DIR = "data/nexus_screenshots" +DEFAULT_WIDTH = 1920 +DEFAULT_HEIGHT = 1080 +PAGE_LOAD_TIMEOUT = 15 # seconds + + +def create_driver(width: int, height: int) -> webdriver.Chrome: + """Create a headless Chrome driver with the given viewport size.""" + options = Options() + options.add_argument("--headless=new") + options.add_argument("--no-sandbox") + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--disable-gpu") + options.add_argument(f"--window-size={width},{height}") + options.add_argument("--hide-scrollbars") + options.add_argument("--force-device-scale-factor=1") + + # Try common Chrome paths + chrome_paths = [ + "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", + "/usr/bin/google-chrome", + "/usr/bin/chromium", + "/usr/bin/chromium-browser", + ] + + for path in chrome_paths: + if os.path.exists(path): + options.binary_location = path + break + + driver = webdriver.Chrome(options=options) + driver.set_window_size(width, height) + return driver + + +def capture_zone( + driver: webdriver.Chrome, + base_url: str, + zone: dict, + output_dir: Path, + timeout: int = PAGE_LOAD_TIMEOUT, +) -> dict: + """Capture a screenshot of a single Nexus zone. + + Returns a result dict with status, file path, and metadata. + """ + url = base_url.rstrip("/") + zone["path"] + name = zone["name"] + screenshot_path = output_dir / f"{name}.png" + result = { + "zone": name, + "path": zone["path"], + "url": url, + "description": zone["description"], + "screenshot": str(screenshot_path), + "status": "pending", + "error": None, + "timestamp": None, + } + + try: + print(f" Capturing {zone['path']:30s} → {name}...", end=" ", flush=True) + driver.get(url) + + # Wait for body to be present (basic page load) + try: + WebDriverWait(driver, timeout).until( + EC.presence_of_element_located((By.TAG_NAME, "body")) + ) + except TimeoutException: + result["status"] = "timeout" + result["error"] = f"Page load timed out after {timeout}s" + print(f"TIMEOUT ({timeout}s)") + return result + + # Additional wait for JS frameworks to render + time.sleep(2) + + # Capture full-page screenshot (scroll to capture all content) + total_height = driver.execute_script("return document.body.scrollHeight") + driver.set_window_size(DEFAULT_WIDTH, max(DEFAULT_HEIGHT, total_height)) + time.sleep(0.5) + + # Save screenshot + output_dir.mkdir(parents=True, exist_ok=True) + driver.save_screenshot(str(screenshot_path)) + + # Capture page title for metadata + title = driver.title or "(no title)" + + result["status"] = "ok" + result["timestamp"] = datetime.now(timezone.utc).isoformat() + result["page_title"] = title + result["file_size"] = screenshot_path.stat().st_size if screenshot_path.exists() else 0 + print(f"OK — {title} ({result['file_size']:,} bytes)") + + except WebDriverException as exc: + result["status"] = "error" + result["error"] = str(exc)[:200] + print(f"ERROR — {str(exc)[:100]}") + + return result + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Capture screenshots of all primary Nexus zones." + ) + parser.add_argument( + "--base-url", + default=DEFAULT_BASE_URL, + help=f"Dashboard base URL (default: {DEFAULT_BASE_URL})", + ) + parser.add_argument( + "--output-dir", + default=DEFAULT_OUTPUT_DIR, + help=f"Output directory for screenshots (default: {DEFAULT_OUTPUT_DIR})", + ) + parser.add_argument( + "--width", + type=int, + default=DEFAULT_WIDTH, + help=f"Viewport width (default: {DEFAULT_WIDTH})", + ) + parser.add_argument( + "--height", + type=int, + default=DEFAULT_HEIGHT, + help=f"Viewport height (default: {DEFAULT_HEIGHT})", + ) + parser.add_argument( + "--timeout", + type=int, + default=PAGE_LOAD_TIMEOUT, + help=f"Page load timeout in seconds (default: {PAGE_LOAD_TIMEOUT})", + ) + parser.add_argument( + "--zones", + nargs="*", + help="Specific zone names to capture (default: all)", + ) + + args = parser.parse_args() + output_dir = Path(args.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # Filter zones if specific ones requested + zones = PRIMARY_ZONES + if args.zones: + zones = [z for z in PRIMARY_ZONES if z["name"] in args.zones] + if not zones: + print(f"Error: No matching zones found for: {args.zones}") + print(f"Available: {[z['name'] for z in PRIMARY_ZONES]}") + return 1 + + print(f"Nexus Screenshot Capture") + print(f" Base URL: {args.base_url}") + print(f" Output dir: {output_dir}") + print(f" Viewport: {args.width}x{args.height}") + print(f" Zones: {len(zones)}") + print() + + # Create driver + try: + driver = create_driver(args.width, args.height) + except WebDriverException as exc: + print(f"Failed to create Chrome driver: {exc}") + return 1 + + results = [] + try: + for zone in zones: + result = capture_zone( + driver, args.base_url, zone, output_dir, timeout=args.timeout + ) + results.append(result) + finally: + driver.quit() + + # Write manifest + manifest = { + "captured_at": datetime.now(timezone.utc).isoformat(), + "base_url": args.base_url, + "viewport": {"width": args.width, "height": args.height}, + "total_zones": len(zones), + "ok": sum(1 for r in results if r["status"] == "ok"), + "errors": sum(1 for r in results if r["status"] != "ok"), + "zones": results, + } + + manifest_path = output_dir / "manifest.json" + with open(manifest_path, "w") as f: + json.dump(manifest, f, indent=2) + + print() + print(f"Done! {manifest['ok']}/{manifest['total_zones']} zones captured successfully.") + print(f"Manifest: {manifest_path}") + + if manifest["errors"] > 0: + print(f"\nFailed zones:") + for r in results: + if r["status"] != "ok": + print(f" {r['zone']:20s} — {r['status']}: {r['error']}") + + return 0 if manifest["errors"] == 0 else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/infrastructure/test_multimodal.py b/tests/infrastructure/test_multimodal.py index ac8bfd7..4ad1678 100644 --- a/tests/infrastructure/test_multimodal.py +++ b/tests/infrastructure/test_multimodal.py @@ -3,6 +3,8 @@ import json from unittest.mock import MagicMock, patch +import pytest + from infrastructure.models.multimodal import ( DEFAULT_FALLBACK_CHAINS, KNOWN_MODEL_CAPABILITIES, @@ -10,11 +12,14 @@ from infrastructure.models.multimodal import ( ModelInfo, MultiModalManager, get_model_for_capability, + get_multimodal_manager, model_supports_tools, model_supports_vision, pull_model_with_fallback, ) +pytestmark = pytest.mark.unit + # --------------------------------------------------------------------------- # ModelCapability enum # --------------------------------------------------------------------------- @@ -507,3 +512,41 @@ class TestModelInfoPopulation: assert info.is_pulled is True assert info.size_mb == 4 * 1024 # 4 GiB in MiB assert info.description == "test" + + +# --------------------------------------------------------------------------- +# _pull_model — non-200 status branch (lines 480-481) +# --------------------------------------------------------------------------- + + +class TestPullModelNon200: + def test_pull_non_200_returns_false(self): + mgr = _make_manager([]) + pull_resp = MagicMock() + pull_resp.__enter__ = MagicMock(return_value=pull_resp) + pull_resp.__exit__ = MagicMock(return_value=False) + pull_resp.status = 500 # Non-200 response + + with patch("urllib.request.urlopen", return_value=pull_resp): + assert mgr._pull_model("some-model:1b") is False + + +# --------------------------------------------------------------------------- +# get_multimodal_manager singleton (line 552) +# --------------------------------------------------------------------------- + + +class TestGetMultimodalManager: + def test_creates_singleton(self): + with ( + patch("infrastructure.models.multimodal._multimodal_manager", None), + patch("urllib.request.urlopen", side_effect=ConnectionError("no ollama")), + ): + mgr = get_multimodal_manager() + assert isinstance(mgr, MultiModalManager) + + def test_returns_existing_singleton(self): + sentinel = _make_manager(None) + with patch("infrastructure.models.multimodal._multimodal_manager", sentinel): + mgr = get_multimodal_manager() + assert mgr is sentinel diff --git a/tests/timmy/test_tools_registry.py b/tests/timmy/test_tools_registry.py new file mode 100644 index 0000000..a775dfe --- /dev/null +++ b/tests/timmy/test_tools_registry.py @@ -0,0 +1,496 @@ +"""Comprehensive unit tests for timmy.tools._registry. + +Covers: +- _register_* helpers (web_fetch, search, core, grok, memory, agentic_loop, + introspection, delegation, gematria, artifact, thinking) +- create_full_toolkit factory +- create_experiment_tools factory +- AGENT_TOOLKITS registry & get_tools_for_agent +- Backward-compat aliases +- Tool catalog functions (_core, _analysis, _ai, _introspection, _experiment) +- _import_creative_catalogs / _merge_catalog +- get_all_available_tools +""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# All functions under test +from timmy.tools._registry import ( + AGENT_TOOLKITS, + PERSONA_TOOLKITS, + _core_tool_catalog, + _analysis_tool_catalog, + _ai_tool_catalog, + _create_stub_toolkit, + _experiment_tool_catalog, + _import_creative_catalogs, + _introspection_tool_catalog, + _merge_catalog, + _register_artifact_tools, + _register_core_tools, + _register_delegation_tools, + _register_gematria_tool, + _register_grok_tool, + _register_introspection_tools, + _register_memory_tools, + _register_search_tools, + _register_thinking_tools, + _register_web_fetch_tool, + create_experiment_tools, + create_full_toolkit, + get_all_available_tools, + get_tools_for_agent, + get_tools_for_persona, +) + +# import_module is used inside _merge_catalog as a local import +from importlib import import_module as _real_import_module + +# _register_agentic_loop_tool may fail to import if conftest stubs interfere +try: + from timmy.tools._registry import _register_agentic_loop_tool +except ImportError: + _register_agentic_loop_tool = None + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def mock_toolkit(): + """A mock Toolkit with a register method that records calls.""" + tk = MagicMock() + tk.name = "test" + tk.registered_tools = {} + + def _register(func, name=None): + tk.registered_tools[name or func.__name__] = func + + tk.register = MagicMock(side_effect=_register) + return tk + + +# --------------------------------------------------------------------------- +# _register_* helpers +# --------------------------------------------------------------------------- + + +class TestRegisterWebFetchTool: + def test_registers_web_fetch(self, mock_toolkit): + _register_web_fetch_tool(mock_toolkit) + mock_toolkit.register.assert_called_once() + assert "web_fetch" in mock_toolkit.registered_tools + + def test_raises_on_failure(self, mock_toolkit): + mock_toolkit.register.side_effect = RuntimeError("boom") + with pytest.raises(RuntimeError, match="boom"): + _register_web_fetch_tool(mock_toolkit) + + +class TestRegisterSearchTools: + def test_registers_both_tools(self, mock_toolkit): + _register_search_tools(mock_toolkit) + assert mock_toolkit.register.call_count == 2 + assert "web_search" in mock_toolkit.registered_tools + assert "scrape_url" in mock_toolkit.registered_tools + + def test_raises_on_failure(self, mock_toolkit): + mock_toolkit.register.side_effect = RuntimeError("fail") + with pytest.raises(RuntimeError): + _register_search_tools(mock_toolkit) + + +class TestRegisterCoreTools: + @patch("timmy.tools._registry.FileTools") + @patch("timmy.tools._registry.ShellTools") + @patch("timmy.tools._registry.PythonTools") + @patch("timmy.tools._registry._make_smart_read_file") + def test_registers_core_tools(self, mock_smart_read, mock_py, mock_sh, mock_ft, mock_toolkit): + mock_smart_read.return_value = lambda: "read" + _register_core_tools(mock_toolkit, Path("/tmp/test")) + # python, shell, read_file, write_file, list_files, calculator = 6 + assert mock_toolkit.register.call_count == 6 + names = set(mock_toolkit.registered_tools.keys()) + assert {"python", "shell", "read_file", "write_file", "list_files", "calculator"} == names + + +class TestRegisterGrokTool: + @patch("timmy.tools._registry.consult_grok") + def test_registers_when_available(self, mock_grok, mock_toolkit): + with patch.dict("sys.modules", {"timmy.backends": MagicMock(grok_available=lambda: True)}): + _register_grok_tool(mock_toolkit) + assert "consult_grok" in mock_toolkit.registered_tools + + @patch("timmy.tools._registry.consult_grok") + def test_skips_when_unavailable(self, mock_grok, mock_toolkit): + with patch.dict("sys.modules", {"timmy.backends": MagicMock(grok_available=lambda: False)}): + _register_grok_tool(mock_toolkit) + assert "consult_grok" not in mock_toolkit.registered_tools + + def test_raises_on_import_error(self, mock_toolkit): + with patch.dict("sys.modules", {"timmy.backends": None}): + with pytest.raises((ImportError, AttributeError)): + _register_grok_tool(mock_toolkit) + + +class TestRegisterMemoryTools: + def test_registers_four_tools(self, mock_toolkit): + mock_mod = MagicMock() + with patch.dict("sys.modules", {"timmy.memory_system": mock_mod}): + _register_memory_tools(mock_toolkit) + assert mock_toolkit.register.call_count == 4 + names = set(mock_toolkit.registered_tools.keys()) + assert {"memory_search", "memory_write", "memory_read", "memory_forget"} == names + + +@pytest.mark.skipif(_register_agentic_loop_tool is None, reason="agentic_loop not importable") +class TestRegisterAgenticLoopTool: + def test_registers_plan_and_execute(self, mock_toolkit): + mock_mod = MagicMock() + with patch.dict("sys.modules", {"timmy.agentic_loop": mock_mod}): + _register_agentic_loop_tool(mock_toolkit) + assert "plan_and_execute" in mock_toolkit.registered_tools + + def test_raises_on_import_error(self, mock_toolkit): + with patch.dict("sys.modules", {"timmy.agentic_loop": None}): + with pytest.raises((ImportError, AttributeError)): + _register_agentic_loop_tool(mock_toolkit) + + +class TestRegisterIntrospectionTools: + def test_registers_all_introspection(self, mock_toolkit): + mock_intro = MagicMock() + mock_mcp = MagicMock() + mock_session = MagicMock() + with patch.dict( + "sys.modules", + { + "timmy.tools_intro": mock_intro, + "timmy.mcp_tools": mock_mcp, + "timmy.session_logger": mock_session, + }, + ): + _register_introspection_tools(mock_toolkit) + # 4 intro + 1 avatar + 2 session = 7 + assert mock_toolkit.register.call_count == 7 + names = set(mock_toolkit.registered_tools.keys()) + assert "get_system_info" in names + assert "check_ollama_health" in names + assert "update_gitea_avatar" in names + assert "session_history" in names + assert "self_reflect" in names + + +class TestRegisterDelegationTools: + def test_registers_three_tools(self, mock_toolkit): + mock_mod = MagicMock() + with patch.dict("sys.modules", {"timmy.tools_delegation": mock_mod}): + _register_delegation_tools(mock_toolkit) + assert mock_toolkit.register.call_count == 3 + names = set(mock_toolkit.registered_tools.keys()) + assert {"delegate_task", "delegate_to_kimi", "list_swarm_agents"} == names + + def test_raises_on_failure(self, mock_toolkit): + with patch.dict("sys.modules", {"timmy.tools_delegation": None}): + with pytest.raises((ImportError, AttributeError)): + _register_delegation_tools(mock_toolkit) + + +class TestRegisterGematriaTool: + def test_registers_gematria(self, mock_toolkit): + mock_mod = MagicMock() + with patch.dict("sys.modules", {"timmy.gematria": mock_mod}): + _register_gematria_tool(mock_toolkit) + assert "gematria" in mock_toolkit.registered_tools + + def test_raises_on_import_error(self, mock_toolkit): + with patch.dict("sys.modules", {"timmy.gematria": None}): + with pytest.raises((ImportError, AttributeError)): + _register_gematria_tool(mock_toolkit) + + +class TestRegisterArtifactTools: + def test_registers_jot_and_log(self, mock_toolkit): + mock_mod = MagicMock() + with patch.dict("sys.modules", {"timmy.memory_system": mock_mod}): + _register_artifact_tools(mock_toolkit) + assert mock_toolkit.register.call_count == 2 + assert "jot_note" in mock_toolkit.registered_tools + assert "log_decision" in mock_toolkit.registered_tools + + +class TestRegisterThinkingTools: + def test_registers_thought_search(self, mock_toolkit): + mock_mod = MagicMock() + with patch.dict("sys.modules", {"timmy.thinking": mock_mod}): + _register_thinking_tools(mock_toolkit) + assert "thought_search" in mock_toolkit.registered_tools + + def test_raises_on_import_error(self, mock_toolkit): + with patch.dict("sys.modules", {"timmy.thinking": None}): + with pytest.raises((ImportError, AttributeError)): + _register_thinking_tools(mock_toolkit) + + +# --------------------------------------------------------------------------- +# Toolkit factories +# --------------------------------------------------------------------------- + + +class TestCreateFullToolkit: + @patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", False) + def test_returns_none_without_agno(self): + result = create_full_toolkit() + assert result is None + + @patch("timmy.tools._registry._register_thinking_tools") + @patch("timmy.tools._registry._register_artifact_tools") + @patch("timmy.tools._registry._register_gematria_tool") + @patch("timmy.tools._registry._register_delegation_tools") + @patch("timmy.tools._registry._register_introspection_tools") + @patch("timmy.tools._registry._register_agentic_loop_tool") + @patch("timmy.tools._registry._register_memory_tools") + @patch("timmy.tools._registry._register_grok_tool") + @patch("timmy.tools._registry._register_search_tools") + @patch("timmy.tools._registry._register_web_fetch_tool") + @patch("timmy.tools._registry._register_core_tools") + @patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", True) + def test_calls_all_register_helpers( + self, + mock_core, + mock_web, + mock_search, + mock_grok, + mock_memory, + mock_agentic, + mock_intro, + mock_deleg, + mock_gematria, + mock_artifact, + mock_thinking, + ): + mock_settings = MagicMock(repo_root="/tmp/test") + with patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}): + with patch("timmy.tools._registry.Toolkit") as MockTK: + mock_tk_inst = MagicMock() + MockTK.return_value = mock_tk_inst + with patch.dict( + "sys.modules", {"timmy.tool_safety": MagicMock(DANGEROUS_TOOLS=["shell"])} + ): + result = create_full_toolkit() + + assert result is mock_tk_inst + mock_core.assert_called_once() + mock_web.assert_called_once() + mock_search.assert_called_once() + mock_grok.assert_called_once() + mock_memory.assert_called_once() + mock_agentic.assert_called_once() + mock_intro.assert_called_once() + mock_deleg.assert_called_once() + mock_gematria.assert_called_once() + mock_artifact.assert_called_once() + mock_thinking.assert_called_once() + + +class TestCreateExperimentTools: + @patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", False) + def test_raises_without_agno(self): + with pytest.raises(ImportError, match="Agno tools not available"): + create_experiment_tools() + + @patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", True) + def test_creates_experiment_toolkit(self): + mock_settings = MagicMock( + repo_root="/tmp/test", + autoresearch_workspace="workspace", + autoresearch_time_budget=300, + autoresearch_metric="loss", + ) + mock_autoresearch = MagicMock() + with ( + patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}), + patch.dict("sys.modules", {"timmy.autoresearch": mock_autoresearch}), + patch("timmy.tools._registry.Toolkit") as MockTK, + patch("timmy.tools._registry.ShellTools"), + patch("timmy.tools._registry.FileTools"), + patch("timmy.tools._registry._make_smart_read_file", return_value=lambda: None), + ): + mock_tk = MagicMock() + MockTK.return_value = mock_tk + result = create_experiment_tools() + + assert result is mock_tk + # prepare_experiment, run_experiment, evaluate_result, shell, read_file, write_file, list_files = 7 + assert mock_tk.register.call_count == 7 + + +# --------------------------------------------------------------------------- +# Agent toolkit registry +# --------------------------------------------------------------------------- + + +class TestAgentToolkitRegistry: + def test_agent_toolkits_has_expected_agents(self): + expected = {"echo", "mace", "helm", "seer", "forge", "quill", "lab", "pixel", "lyra", "reel"} + assert set(AGENT_TOOLKITS.keys()) == expected + + def test_persona_toolkits_is_alias(self): + assert PERSONA_TOOLKITS is AGENT_TOOLKITS + + def test_get_tools_for_persona_is_alias(self): + assert get_tools_for_persona is get_tools_for_agent + + +class TestGetToolsForAgent: + def test_unknown_agent_returns_none(self): + result = get_tools_for_agent("nonexistent_agent_xyz") + assert result is None + + def test_stub_agents_return_toolkit(self): + """Pixel, lyra, reel use stub toolkits.""" + for agent_id in ("pixel", "lyra", "reel"): + result = get_tools_for_agent(agent_id) + # May be None if agno not available, or a Toolkit stub + # Just verify no exception is raised + assert result is None or hasattr(result, "name") + + +class TestCreateStubToolkit: + @patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", False) + def test_returns_none_without_agno(self): + assert _create_stub_toolkit("test") is None + + @patch("timmy.tools._registry._AGNO_TOOLS_AVAILABLE", True) + def test_creates_named_toolkit(self): + with patch("timmy.tools._registry.Toolkit") as MockTK: + mock_tk = MagicMock() + MockTK.return_value = mock_tk + result = _create_stub_toolkit("pixel") + MockTK.assert_called_once_with(name="pixel") + assert result is mock_tk + + +# --------------------------------------------------------------------------- +# Tool catalog functions +# --------------------------------------------------------------------------- + + +class TestToolCatalogs: + def test_core_catalog_has_expected_tools(self): + cat = _core_tool_catalog() + assert isinstance(cat, dict) + assert {"shell", "python", "read_file", "write_file", "list_files"} == set(cat.keys()) + for tool_id, info in cat.items(): + assert "name" in info + assert "description" in info + assert "available_in" in info + assert isinstance(info["available_in"], list) + + def test_analysis_catalog(self): + cat = _analysis_tool_catalog() + assert {"calculator", "web_fetch", "web_search", "scrape_url"} == set(cat.keys()) + + def test_ai_catalog(self): + cat = _ai_tool_catalog() + assert "consult_grok" in cat + assert "aider" in cat + + def test_introspection_catalog(self): + cat = _introspection_tool_catalog() + expected = { + "get_system_info", + "check_ollama_health", + "get_memory_status", + "session_history", + "thought_search", + "self_reflect", + "update_gitea_avatar", + } + assert expected == set(cat.keys()) + + def test_experiment_catalog(self): + cat = _experiment_tool_catalog() + assert {"prepare_experiment", "run_experiment", "evaluate_result"} == set(cat.keys()) + + def test_all_catalogs_have_consistent_schema(self): + """Every catalog entry must have name, description, available_in.""" + for fn in ( + _core_tool_catalog, + _analysis_tool_catalog, + _ai_tool_catalog, + _introspection_tool_catalog, + _experiment_tool_catalog, + ): + cat = fn() + for tool_id, info in cat.items(): + assert isinstance(info.get("name"), str), f"{tool_id} missing 'name'" + assert isinstance(info.get("description"), str), f"{tool_id} missing 'description'" + assert isinstance(info.get("available_in"), list), f"{tool_id} missing 'available_in'" + + +class TestMergeCatalog: + def test_merges_catalog_entries(self): + catalog = {} + mock_mod = MagicMock() + mock_mod.TEST_CATALOG = { + "tool_a": {"name": "Tool A", "description": "Does A"}, + "tool_b": {"name": "Tool B", "description": "Does B"}, + } + with patch("importlib.import_module", return_value=mock_mod): + _merge_catalog(catalog, "fake.module", "TEST_CATALOG", ["pixel", "orchestrator"]) + assert "tool_a" in catalog + assert catalog["tool_a"]["available_in"] == ["pixel", "orchestrator"] + assert catalog["tool_b"]["name"] == "Tool B" + + def test_handles_import_error_gracefully(self): + catalog = {} + with patch("importlib.import_module", side_effect=ImportError("nope")): + # Should NOT raise — just logs and skips + _merge_catalog(catalog, "missing.module", "CATALOG", []) + assert catalog == {} + + +class TestImportCreativeCatalogs: + def test_calls_merge_for_each_source(self): + catalog = {} + with patch("timmy.tools._registry._merge_catalog") as mock_merge: + _import_creative_catalogs(catalog) + # Should be called once per _CREATIVE_CATALOG_SOURCES entry (6 sources) + assert mock_merge.call_count == 6 + + +class TestGetAllAvailableTools: + def test_returns_merged_catalog(self): + catalog = get_all_available_tools() + assert isinstance(catalog, dict) + # Must contain core tools at minimum + assert "shell" in catalog + assert "calculator" in catalog + assert "web_search" in catalog + assert "consult_grok" in catalog + assert "get_system_info" in catalog + assert "prepare_experiment" in catalog + + def test_no_duplicate_keys(self): + """Each sub-catalog shouldn't override another's keys.""" + catalog = get_all_available_tools() + # Count total keys from individual catalogs + individual = {} + for fn in ( + _core_tool_catalog, + _analysis_tool_catalog, + _ai_tool_catalog, + _introspection_tool_catalog, + _experiment_tool_catalog, + ): + for k in fn(): + assert k not in individual, f"Duplicate key '{k}' across catalogs" + individual[k] = True