239 lines
7.4 KiB
Python
239 lines
7.4 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Test script to verify model_tools.py optimizations:
|
||
|
|
1. Thread pool singleton - should not create multiple thread pools
|
||
|
|
2. Lazy tool loading - tools should only be imported when needed
|
||
|
|
"""
|
||
|
|
|
||
|
|
import sys
|
||
|
|
import time
|
||
|
|
import concurrent.futures
|
||
|
|
|
||
|
|
|
||
|
|
def test_thread_pool_singleton():
|
||
|
|
"""Test that _run_async uses a singleton thread pool, not creating one per call."""
|
||
|
|
print("=" * 60)
|
||
|
|
print("TEST 1: Thread Pool Singleton Pattern")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
# Import after clearing any previous state
|
||
|
|
from model_tools import _get_async_bridge_executor, _run_async
|
||
|
|
|
||
|
|
# Get the executor reference
|
||
|
|
executor1 = _get_async_bridge_executor()
|
||
|
|
executor2 = _get_async_bridge_executor()
|
||
|
|
|
||
|
|
# Should be the same object
|
||
|
|
assert executor1 is executor2, "ThreadPoolExecutor should be a singleton!"
|
||
|
|
print(f"✅ Singleton check passed: {executor1 is executor2}")
|
||
|
|
print(f" Executor ID: {id(executor1)}")
|
||
|
|
print(f" Thread name prefix: {executor1._thread_name_prefix}")
|
||
|
|
print(f" Max workers: {executor1._max_workers}")
|
||
|
|
|
||
|
|
# Verify it's a ThreadPoolExecutor
|
||
|
|
assert isinstance(executor1, concurrent.futures.ThreadPoolExecutor)
|
||
|
|
print("✅ Executor is ThreadPoolExecutor type")
|
||
|
|
|
||
|
|
print()
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def test_lazy_tool_loading():
|
||
|
|
"""Test that tools are lazy-loaded only when needed."""
|
||
|
|
print("=" * 60)
|
||
|
|
print("TEST 2: Lazy Tool Loading")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
# Must reimport to get fresh state
|
||
|
|
import importlib
|
||
|
|
import model_tools
|
||
|
|
importlib.reload(model_tools)
|
||
|
|
|
||
|
|
# Check that tools are NOT discovered at import time
|
||
|
|
assert not model_tools._tools_discovered, "Tools should NOT be discovered at import time!"
|
||
|
|
print("✅ Tools are NOT discovered at import time (lazy loading enabled)")
|
||
|
|
|
||
|
|
# Now call a function that should trigger discovery
|
||
|
|
start_time = time.time()
|
||
|
|
tool_names = model_tools.get_all_tool_names()
|
||
|
|
elapsed = time.time() - start_time
|
||
|
|
|
||
|
|
# Tools should now be discovered
|
||
|
|
assert model_tools._tools_discovered, "Tools should be discovered after get_all_tool_names()"
|
||
|
|
print(f"✅ Tools discovered after first function call ({elapsed:.3f}s)")
|
||
|
|
print(f" Discovered {len(tool_names)} tools")
|
||
|
|
|
||
|
|
# Second call should be instant (already discovered)
|
||
|
|
start_time = time.time()
|
||
|
|
tool_names_2 = model_tools.get_all_tool_names()
|
||
|
|
elapsed_2 = time.time() - start_time
|
||
|
|
print(f"✅ Second call is fast ({elapsed_2:.4f}s) - tools already loaded")
|
||
|
|
|
||
|
|
print()
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_tool_definitions_lazy():
|
||
|
|
"""Test the new get_tool_definitions_lazy function."""
|
||
|
|
print("=" * 60)
|
||
|
|
print("TEST 3: get_tool_definitions_lazy() function")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
import importlib
|
||
|
|
import model_tools
|
||
|
|
importlib.reload(model_tools)
|
||
|
|
|
||
|
|
# Check lazy loading state
|
||
|
|
assert not model_tools._tools_discovered, "Tools should NOT be discovered initially"
|
||
|
|
print("✅ Tools not discovered before calling get_tool_definitions_lazy()")
|
||
|
|
|
||
|
|
# Call the lazy version
|
||
|
|
definitions = model_tools.get_tool_definitions_lazy(quiet_mode=True)
|
||
|
|
|
||
|
|
assert model_tools._tools_discovered, "Tools should be discovered after get_tool_definitions_lazy()"
|
||
|
|
print(f"✅ Tools discovered on first call, got {len(definitions)} definitions")
|
||
|
|
|
||
|
|
# Verify we got valid tool definitions
|
||
|
|
if definitions:
|
||
|
|
sample = definitions[0]
|
||
|
|
assert "type" in sample, "Definition should have 'type' key"
|
||
|
|
assert "function" in sample, "Definition should have 'function' key"
|
||
|
|
print(f"✅ Tool definitions are valid OpenAI format")
|
||
|
|
|
||
|
|
print()
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def test_backward_compat():
|
||
|
|
"""Test that existing API still works."""
|
||
|
|
print("=" * 60)
|
||
|
|
print("TEST 4: Backward Compatibility")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
import importlib
|
||
|
|
import model_tools
|
||
|
|
importlib.reload(model_tools)
|
||
|
|
|
||
|
|
# Test all the existing public API
|
||
|
|
print("Testing existing API functions...")
|
||
|
|
|
||
|
|
# get_tool_definitions (eager version)
|
||
|
|
defs = model_tools.get_tool_definitions(quiet_mode=True)
|
||
|
|
print(f"✅ get_tool_definitions() works ({len(defs)} tools)")
|
||
|
|
|
||
|
|
# get_all_tool_names
|
||
|
|
names = model_tools.get_all_tool_names()
|
||
|
|
print(f"✅ get_all_tool_names() works ({len(names)} tools)")
|
||
|
|
|
||
|
|
# get_toolset_for_tool
|
||
|
|
if names:
|
||
|
|
toolset = model_tools.get_toolset_for_tool(names[0])
|
||
|
|
print(f"✅ get_toolset_for_tool() works (tool '{names[0]}' -> toolset '{toolset}')")
|
||
|
|
|
||
|
|
# TOOL_TO_TOOLSET_MAP (lazy proxy)
|
||
|
|
tool_map = model_tools.TOOL_TO_TOOLSET_MAP
|
||
|
|
# Access it to trigger loading
|
||
|
|
_ = len(tool_map)
|
||
|
|
print(f"✅ TOOL_TO_TOOLSET_MAP lazy proxy works")
|
||
|
|
|
||
|
|
# TOOLSET_REQUIREMENTS (lazy proxy)
|
||
|
|
req_map = model_tools.TOOLSET_REQUIREMENTS
|
||
|
|
_ = len(req_map)
|
||
|
|
print(f"✅ TOOLSET_REQUIREMENTS lazy proxy works")
|
||
|
|
|
||
|
|
# get_available_toolsets
|
||
|
|
available = model_tools.get_available_toolsets()
|
||
|
|
print(f"✅ get_available_toolsets() works ({len(available)} toolsets)")
|
||
|
|
|
||
|
|
# check_toolset_requirements
|
||
|
|
reqs = model_tools.check_toolset_requirements()
|
||
|
|
print(f"✅ check_toolset_requirements() works ({len(reqs)} toolsets)")
|
||
|
|
|
||
|
|
# check_tool_availability
|
||
|
|
available, unavailable = model_tools.check_tool_availability(quiet=True)
|
||
|
|
print(f"✅ check_tool_availability() works ({len(available)} available, {len(unavailable)} unavailable)")
|
||
|
|
|
||
|
|
print()
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def test_lru_cache():
|
||
|
|
"""Test that _get_discovered_tools is properly cached."""
|
||
|
|
print("=" * 60)
|
||
|
|
print("TEST 5: LRU Cache for Tool Discovery")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
import importlib
|
||
|
|
import model_tools
|
||
|
|
importlib.reload(model_tools)
|
||
|
|
|
||
|
|
# Clear cache and check
|
||
|
|
model_tools._get_discovered_tools.cache_clear()
|
||
|
|
|
||
|
|
# First call
|
||
|
|
result1 = model_tools._get_discovered_tools()
|
||
|
|
info1 = model_tools._get_discovered_tools.cache_info()
|
||
|
|
print(f"✅ First call: cache_info = {info1}")
|
||
|
|
|
||
|
|
# Second call - should hit cache
|
||
|
|
result2 = model_tools._get_discovered_tools()
|
||
|
|
info2 = model_tools._get_discovered_tools.cache_info()
|
||
|
|
print(f"✅ Second call: cache_info = {info2}")
|
||
|
|
|
||
|
|
assert info2.hits > info1.hits, "Cache should have been hit on second call!"
|
||
|
|
assert result1 is result2, "Should return same cached object!"
|
||
|
|
print("✅ LRU cache is working correctly")
|
||
|
|
|
||
|
|
print()
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("MODEL_TOOLS.PY OPTIMIZATION TESTS")
|
||
|
|
print("=" * 60 + "\n")
|
||
|
|
|
||
|
|
all_passed = True
|
||
|
|
|
||
|
|
try:
|
||
|
|
all_passed &= test_thread_pool_singleton()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ TEST 1 FAILED: {e}\n")
|
||
|
|
all_passed = False
|
||
|
|
|
||
|
|
try:
|
||
|
|
all_passed &= test_lazy_tool_loading()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ TEST 2 FAILED: {e}\n")
|
||
|
|
all_passed = False
|
||
|
|
|
||
|
|
try:
|
||
|
|
all_passed &= test_get_tool_definitions_lazy()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ TEST 3 FAILED: {e}\n")
|
||
|
|
all_passed = False
|
||
|
|
|
||
|
|
try:
|
||
|
|
all_passed &= test_backward_compat()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ TEST 4 FAILED: {e}\n")
|
||
|
|
all_passed = False
|
||
|
|
|
||
|
|
try:
|
||
|
|
all_passed &= test_lru_cache()
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ TEST 5 FAILED: {e}\n")
|
||
|
|
all_passed = False
|
||
|
|
|
||
|
|
print("=" * 60)
|
||
|
|
if all_passed:
|
||
|
|
print("✅ ALL TESTS PASSED!")
|
||
|
|
else:
|
||
|
|
print("❌ SOME TESTS FAILED!")
|
||
|
|
sys.exit(1)
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|