#!/usr/bin/env python3 """ Test script to verify model_tools.py optimizations: 1. Thread pool singleton - should not create multiple thread pools 2. Lazy tool loading - tools should only be imported when needed """ import sys import time import concurrent.futures def test_thread_pool_singleton(): """Test that _run_async uses a singleton thread pool, not creating one per call.""" print("=" * 60) print("TEST 1: Thread Pool Singleton Pattern") print("=" * 60) # Import after clearing any previous state from model_tools import _get_async_bridge_executor, _run_async # Get the executor reference executor1 = _get_async_bridge_executor() executor2 = _get_async_bridge_executor() # Should be the same object assert executor1 is executor2, "ThreadPoolExecutor should be a singleton!" print(f"✅ Singleton check passed: {executor1 is executor2}") print(f" Executor ID: {id(executor1)}") print(f" Thread name prefix: {executor1._thread_name_prefix}") print(f" Max workers: {executor1._max_workers}") # Verify it's a ThreadPoolExecutor assert isinstance(executor1, concurrent.futures.ThreadPoolExecutor) print("✅ Executor is ThreadPoolExecutor type") print() return True def test_lazy_tool_loading(): """Test that tools are lazy-loaded only when needed.""" print("=" * 60) print("TEST 2: Lazy Tool Loading") print("=" * 60) # Must reimport to get fresh state import importlib import model_tools importlib.reload(model_tools) # Check that tools are NOT discovered at import time assert not model_tools._tools_discovered, "Tools should NOT be discovered at import time!" print("✅ Tools are NOT discovered at import time (lazy loading enabled)") # Now call a function that should trigger discovery start_time = time.time() tool_names = model_tools.get_all_tool_names() elapsed = time.time() - start_time # Tools should now be discovered assert model_tools._tools_discovered, "Tools should be discovered after get_all_tool_names()" print(f"✅ Tools discovered after first function call ({elapsed:.3f}s)") print(f" Discovered {len(tool_names)} tools") # Second call should be instant (already discovered) start_time = time.time() tool_names_2 = model_tools.get_all_tool_names() elapsed_2 = time.time() - start_time print(f"✅ Second call is fast ({elapsed_2:.4f}s) - tools already loaded") print() return True def test_get_tool_definitions_lazy(): """Test the new get_tool_definitions_lazy function.""" print("=" * 60) print("TEST 3: get_tool_definitions_lazy() function") print("=" * 60) import importlib import model_tools importlib.reload(model_tools) # Check lazy loading state assert not model_tools._tools_discovered, "Tools should NOT be discovered initially" print("✅ Tools not discovered before calling get_tool_definitions_lazy()") # Call the lazy version definitions = model_tools.get_tool_definitions_lazy(quiet_mode=True) assert model_tools._tools_discovered, "Tools should be discovered after get_tool_definitions_lazy()" print(f"✅ Tools discovered on first call, got {len(definitions)} definitions") # Verify we got valid tool definitions if definitions: sample = definitions[0] assert "type" in sample, "Definition should have 'type' key" assert "function" in sample, "Definition should have 'function' key" print(f"✅ Tool definitions are valid OpenAI format") print() return True def test_backward_compat(): """Test that existing API still works.""" print("=" * 60) print("TEST 4: Backward Compatibility") print("=" * 60) import importlib import model_tools importlib.reload(model_tools) # Test all the existing public API print("Testing existing API functions...") # get_tool_definitions (eager version) defs = model_tools.get_tool_definitions(quiet_mode=True) print(f"✅ get_tool_definitions() works ({len(defs)} tools)") # get_all_tool_names names = model_tools.get_all_tool_names() print(f"✅ get_all_tool_names() works ({len(names)} tools)") # get_toolset_for_tool if names: toolset = model_tools.get_toolset_for_tool(names[0]) print(f"✅ get_toolset_for_tool() works (tool '{names[0]}' -> toolset '{toolset}')") # TOOL_TO_TOOLSET_MAP (lazy proxy) tool_map = model_tools.TOOL_TO_TOOLSET_MAP # Access it to trigger loading _ = len(tool_map) print(f"✅ TOOL_TO_TOOLSET_MAP lazy proxy works") # TOOLSET_REQUIREMENTS (lazy proxy) req_map = model_tools.TOOLSET_REQUIREMENTS _ = len(req_map) print(f"✅ TOOLSET_REQUIREMENTS lazy proxy works") # get_available_toolsets available = model_tools.get_available_toolsets() print(f"✅ get_available_toolsets() works ({len(available)} toolsets)") # check_toolset_requirements reqs = model_tools.check_toolset_requirements() print(f"✅ check_toolset_requirements() works ({len(reqs)} toolsets)") # check_tool_availability available, unavailable = model_tools.check_tool_availability(quiet=True) print(f"✅ check_tool_availability() works ({len(available)} available, {len(unavailable)} unavailable)") print() return True def test_lru_cache(): """Test that _get_discovered_tools is properly cached.""" print("=" * 60) print("TEST 5: LRU Cache for Tool Discovery") print("=" * 60) import importlib import model_tools importlib.reload(model_tools) # Clear cache and check model_tools._get_discovered_tools.cache_clear() # First call result1 = model_tools._get_discovered_tools() info1 = model_tools._get_discovered_tools.cache_info() print(f"✅ First call: cache_info = {info1}") # Second call - should hit cache result2 = model_tools._get_discovered_tools() info2 = model_tools._get_discovered_tools.cache_info() print(f"✅ Second call: cache_info = {info2}") assert info2.hits > info1.hits, "Cache should have been hit on second call!" assert result1 is result2, "Should return same cached object!" print("✅ LRU cache is working correctly") print() return True def main(): print("\n" + "=" * 60) print("MODEL_TOOLS.PY OPTIMIZATION TESTS") print("=" * 60 + "\n") all_passed = True try: all_passed &= test_thread_pool_singleton() except Exception as e: print(f"❌ TEST 1 FAILED: {e}\n") all_passed = False try: all_passed &= test_lazy_tool_loading() except Exception as e: print(f"❌ TEST 2 FAILED: {e}\n") all_passed = False try: all_passed &= test_get_tool_definitions_lazy() except Exception as e: print(f"❌ TEST 3 FAILED: {e}\n") all_passed = False try: all_passed &= test_backward_compat() except Exception as e: print(f"❌ TEST 4 FAILED: {e}\n") all_passed = False try: all_passed &= test_lru_cache() except Exception as e: print(f"❌ TEST 5 FAILED: {e}\n") all_passed = False print("=" * 60) if all_passed: print("✅ ALL TESTS PASSED!") else: print("❌ SOME TESTS FAILED!") sys.exit(1) print("=" * 60) if __name__ == "__main__": main()