diff --git a/model_tools.py b/model_tools.py index 515c5868..a380d0e9 100644 --- a/model_tools.py +++ b/model_tools.py @@ -39,6 +39,7 @@ logger = logging.getLogger(__name__) _tool_loop = None # persistent loop for the main (CLI) thread _tool_loop_lock = threading.Lock() +_worker_thread_local = threading.local() # per-worker-thread persistent loops def _get_tool_loop(): @@ -56,6 +57,28 @@ def _get_tool_loop(): return _tool_loop +def _get_worker_loop(): + """Return a persistent event loop for the current worker thread. + + Each worker thread (e.g., delegate_task's ThreadPoolExecutor threads) + gets its own long-lived loop stored in thread-local storage. This + prevents the "Event loop is closed" errors that occurred when + asyncio.run() was used per-call: asyncio.run() creates a loop, runs + the coroutine, then *closes* the loop — but cached httpx/AsyncOpenAI + clients remain bound to that now-dead loop and raise RuntimeError + during garbage collection or subsequent use. + + By keeping the loop alive for the thread's lifetime, cached clients + stay valid and their cleanup runs on a live loop. + """ + loop = getattr(_worker_thread_local, 'loop', None) + if loop is None or loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + _worker_thread_local.loop = loop + return loop + + def _run_async(coro): """Run an async coroutine from a sync context. @@ -68,9 +91,10 @@ def _run_async(coro): loop so that cached async clients (httpx / AsyncOpenAI) remain bound to a live loop and don't trigger "Event loop is closed" on GC. - When called from a worker thread (parallel tool execution), we detect - that we're NOT on the main thread and use asyncio.run() with a fresh - loop to avoid contention on the shared persistent loop. + When called from a worker thread (parallel tool execution), we use a + per-thread persistent loop to avoid both contention with the main + thread's shared loop AND the "Event loop is closed" errors caused by + asyncio.run()'s create-and-destroy lifecycle. This is the single source of truth for sync->async bridging in tool handlers. The RL paths (agent_loop.py, tool_context.py) also provide @@ -89,11 +113,14 @@ def _run_async(coro): future = pool.submit(asyncio.run, coro) return future.result(timeout=300) - # If we're on a worker thread (e.g., parallel tool execution), - # use asyncio.run() with its own loop to avoid contending with the - # shared persistent loop from another parallel worker. + # If we're on a worker thread (e.g., parallel tool execution in + # delegate_task), use a per-thread persistent loop. This avoids + # contention with the main thread's shared loop while keeping cached + # httpx/AsyncOpenAI clients bound to a live loop for the thread's + # lifetime — preventing "Event loop is closed" on GC cleanup. if threading.current_thread() is not threading.main_thread(): - return asyncio.run(coro) + worker_loop = _get_worker_loop() + return worker_loop.run_until_complete(coro) tool_loop = _get_tool_loop() return tool_loop.run_until_complete(coro) diff --git a/tests/test_model_tools_async_bridge.py b/tests/test_model_tools_async_bridge.py index 6597ca87..d7acb46a 100644 --- a/tests/test_model_tools_async_bridge.py +++ b/tests/test_model_tools_async_bridge.py @@ -84,6 +84,102 @@ class TestRunAsyncLoopLifecycle: assert not loop.is_closed(), "Loop closed before second call" +class TestRunAsyncWorkerThread: + """Verify worker threads get persistent per-thread loops (delegate_task fix).""" + + def test_worker_thread_loop_not_closed(self): + """A worker thread's loop must stay open after _run_async returns, + so cached httpx/AsyncOpenAI clients don't crash on GC.""" + from concurrent.futures import ThreadPoolExecutor + from model_tools import _run_async + + def _run_on_worker(): + loop = _run_async(_get_current_loop()) + still_open = not loop.is_closed() + return loop, still_open + + with ThreadPoolExecutor(max_workers=1) as pool: + loop, still_open = pool.submit(_run_on_worker).result() + + assert still_open, ( + "Worker thread's event loop was closed after _run_async — " + "cached async clients will crash with 'Event loop is closed'" + ) + + def test_worker_thread_reuses_loop_across_calls(self): + """Multiple _run_async calls on the same worker thread should + reuse the same persistent loop (not create-and-destroy each time).""" + from concurrent.futures import ThreadPoolExecutor + from model_tools import _run_async + + def _run_twice_on_worker(): + loop1 = _run_async(_get_current_loop()) + loop2 = _run_async(_get_current_loop()) + return loop1, loop2 + + with ThreadPoolExecutor(max_workers=1) as pool: + loop1, loop2 = pool.submit(_run_twice_on_worker).result() + + assert loop1 is loop2, ( + "Worker thread created different loops for consecutive calls — " + "cached clients from the first call would be orphaned" + ) + assert not loop1.is_closed() + + def test_parallel_workers_get_separate_loops(self): + """Different worker threads must get their own loops to avoid + contention (the original reason for the worker-thread branch).""" + import time + from concurrent.futures import ThreadPoolExecutor, as_completed + from model_tools import _run_async + + barrier = threading.Barrier(3, timeout=5) + + def _get_loop_id(): + # Use a barrier to force all 3 threads to be alive simultaneously, + # ensuring the ThreadPoolExecutor actually uses 3 distinct threads. + loop = _run_async(_get_current_loop()) + barrier.wait() + return id(loop), not loop.is_closed(), threading.current_thread().ident + + with ThreadPoolExecutor(max_workers=3) as pool: + futures = [pool.submit(_get_loop_id) for _ in range(3)] + results = [f.result() for f in as_completed(futures)] + + loop_ids = {r[0] for r in results} + thread_ids = {r[2] for r in results} + all_open = all(r[1] for r in results) + + assert all_open, "At least one worker thread's loop was closed" + # The barrier guarantees 3 distinct threads were used + assert len(thread_ids) == 3, f"Expected 3 threads, got {len(thread_ids)}" + # Each thread should have its own loop + assert len(loop_ids) == 3, ( + f"Expected 3 distinct loops for 3 parallel workers, " + f"got {len(loop_ids)} — workers may be contending on a shared loop" + ) + + def test_worker_loop_separate_from_main_loop(self): + """Worker thread loops must be different from the main thread's + persistent loop to avoid cross-thread contention.""" + from concurrent.futures import ThreadPoolExecutor + from model_tools import _run_async, _get_tool_loop + + main_loop = _get_tool_loop() + + def _get_worker_loop_id(): + loop = _run_async(_get_current_loop()) + return id(loop) + + with ThreadPoolExecutor(max_workers=1) as pool: + worker_loop_id = pool.submit(_get_worker_loop_id).result() + + assert worker_loop_id != id(main_loop), ( + "Worker thread used the main thread's loop — this would cause " + "cross-thread contention on the event loop" + ) + + class TestRunAsyncWithRunningLoop: """When a loop is already running, _run_async falls back to a thread."""