From 281100e2dfbda46170361d4451d9ffe603c198ea Mon Sep 17 00:00:00 2001 From: ctlst <37115785+ctlst@users.noreply.github.com> Date: Wed, 25 Mar 2026 17:31:56 -0700 Subject: [PATCH] fix(agent): prevent AsyncOpenAI/httpx cross-loop deadlock in gateway mode (#2701) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In gateway mode, async tools (vision_analyze, web_extract, session_search) deadlock because _run_async() spawns a thread with asyncio.run(), creating a new event loop, but _get_cached_client() returns an AsyncOpenAI client bound to a different loop. httpx.AsyncClient cannot work across event loop boundaries, causing await client.chat.completions.create() to hang forever. Fix: include the event loop identity in the async client cache key so each loop gets its own AsyncOpenAI instance. Also fix session_search_tool.py which had its own broken asyncio.run()-in-thread pattern — now uses the centralized _run_async() bridge. --- agent/auxiliary_client.py | 34 +++-- tests/test_crossloop_client_cache.py | 186 +++++++++++++++++++++++++++ tools/session_search_tool.py | 14 +- 3 files changed, 218 insertions(+), 16 deletions(-) create mode 100644 tests/test_crossloop_client_cache.py diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index 42639700..81cf44b4 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -1258,13 +1258,33 @@ def _get_cached_client( base_url: str = None, api_key: str = None, ) -> Tuple[Optional[Any], Optional[str]]: - """Get or create a cached client for the given provider.""" - cache_key = (provider, async_mode, base_url or "", api_key or "") + """Get or create a cached client for the given provider. + + Async clients (AsyncOpenAI) use httpx.AsyncClient internally, which + binds to the event loop that was current when the client was created. + Using such a client on a *different* loop causes deadlocks or + RuntimeError. To prevent cross-loop issues (especially in gateway + mode where _run_async() may spawn fresh loops in worker threads), the + cache key for async clients includes the current event loop's identity + so each loop gets its own client instance. + """ + # Include loop identity for async clients to prevent cross-loop reuse. + # httpx.AsyncClient (inside AsyncOpenAI) is bound to the loop where it + # was created — reusing it on a different loop causes deadlocks (#2681). + loop_id = 0 + current_loop = None + if async_mode: + try: + import asyncio as _aio + current_loop = _aio.get_event_loop() + loop_id = id(current_loop) + except RuntimeError: + pass + cache_key = (provider, async_mode, base_url or "", api_key or "", loop_id) with _client_cache_lock: if cache_key in _client_cache: cached_client, cached_default, cached_loop = _client_cache[cache_key] if async_mode: - # Async clients are bound to the event loop that created them. # A cached async client whose loop has been closed will raise # "Event loop is closed" when httpx tries to clean up its # transport. Discard the stale client and create a fresh one. @@ -1286,13 +1306,7 @@ def _get_cached_client( if client is not None: # For async clients, remember which loop they were created on so we # can detect stale entries later. - bound_loop = None - if async_mode: - try: - import asyncio as _aio - bound_loop = _aio.get_event_loop() - except RuntimeError: - pass + bound_loop = current_loop with _client_cache_lock: if cache_key not in _client_cache: _client_cache[cache_key] = (client, default_model, bound_loop) diff --git a/tests/test_crossloop_client_cache.py b/tests/test_crossloop_client_cache.py new file mode 100644 index 00000000..be8d51ce --- /dev/null +++ b/tests/test_crossloop_client_cache.py @@ -0,0 +1,186 @@ +"""Tests for cross-loop client cache isolation fix (#2681). + +Verifies that _get_cached_client() returns different AsyncOpenAI clients +when called from different event loops, preventing the httpx deadlock +that occurs when a cached async client bound to loop A is reused on loop B. + +This test file is self-contained and does not import the full tool chain, +so it can run without optional dependencies like firecrawl. +""" + +import asyncio +import threading +from concurrent.futures import ThreadPoolExecutor +from unittest.mock import patch, MagicMock +from types import SimpleNamespace + +import pytest + + +# --------------------------------------------------------------------------- +# Minimal stubs so we can import _get_cached_client without the full tree +# --------------------------------------------------------------------------- + +def _stub_resolve_provider_client(provider, model, async_mode, **kw): + """Return a unique mock client each time, simulating AsyncOpenAI creation.""" + client = MagicMock(name=f"client-{provider}-async={async_mode}") + client.api_key = "test" + client.base_url = kw.get("explicit_base_url", "http://localhost:8081/v1") + return client, model or "test-model" + + +@pytest.fixture(autouse=True) +def _clean_client_cache(): + """Clear the client cache before each test.""" + import importlib + # We need to patch before importing + with patch.dict("sys.modules", {}): + pass + # Import and clear + import agent.auxiliary_client as ac + ac._client_cache.clear() + yield + ac._client_cache.clear() + + +class TestCrossLoopCacheIsolation: + """Verify async clients are cached per-event-loop, not globally.""" + + def test_same_loop_reuses_client(self): + """Within a single event loop, the same client should be returned.""" + from agent.auxiliary_client import _get_cached_client, _client_cache + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + with patch("agent.auxiliary_client.resolve_provider_client", + side_effect=_stub_resolve_provider_client): + client1, _ = _get_cached_client("custom", "m1", async_mode=True, + base_url="http://localhost:8081/v1") + client2, _ = _get_cached_client("custom", "m1", async_mode=True, + base_url="http://localhost:8081/v1") + + assert client1 is client2, ( + "Same loop should return the same cached client" + ) + loop.close() + + def test_different_loops_get_different_clients(self): + """Different event loops must get separate client instances.""" + from agent.auxiliary_client import _get_cached_client + + results = {} + + def _get_client_on_new_loop(name): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + with patch("agent.auxiliary_client.resolve_provider_client", + side_effect=_stub_resolve_provider_client): + client, _ = _get_cached_client("custom", "m1", async_mode=True, + base_url="http://localhost:8081/v1") + results[name] = (id(client), id(loop)) + # Don't close loop — simulates real usage where loops persist + + t1 = threading.Thread(target=_get_client_on_new_loop, args=("a",)) + t2 = threading.Thread(target=_get_client_on_new_loop, args=("b",)) + t1.start(); t1.join() + t2.start(); t2.join() + + client_id_a, loop_id_a = results["a"] + client_id_b, loop_id_b = results["b"] + + assert loop_id_a != loop_id_b, "Test setup error: same loop on both threads" + assert client_id_a != client_id_b, ( + "Different event loops got the SAME cached client — this causes " + "httpx cross-loop deadlocks in gateway mode (#2681)" + ) + + def test_sync_clients_not_affected(self): + """Sync clients (async_mode=False) should still be cached globally, + since httpx.Client (sync) doesn't bind to an event loop.""" + from agent.auxiliary_client import _get_cached_client + + results = {} + + def _get_sync_client(name): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + with patch("agent.auxiliary_client.resolve_provider_client", + side_effect=_stub_resolve_provider_client): + client, _ = _get_cached_client("custom", "m1", async_mode=False, + base_url="http://localhost:8081/v1") + results[name] = id(client) + + t1 = threading.Thread(target=_get_sync_client, args=("a",)) + t2 = threading.Thread(target=_get_sync_client, args=("b",)) + t1.start(); t1.join() + t2.start(); t2.join() + + assert results["a"] == results["b"], ( + "Sync clients should be shared across threads (no loop binding)" + ) + + def test_gateway_simulation_no_deadlock(self): + """Simulate gateway mode: _run_async spawns a thread with asyncio.run(), + which creates a new loop. The cached client must be created on THAT loop, + not reused from a different one.""" + from agent.auxiliary_client import _get_cached_client + + # Simulate: first call on "gateway loop" + gateway_loop = asyncio.new_event_loop() + asyncio.set_event_loop(gateway_loop) + + with patch("agent.auxiliary_client.resolve_provider_client", + side_effect=_stub_resolve_provider_client): + gateway_client, _ = _get_cached_client("custom", "m1", async_mode=True, + base_url="http://localhost:8081/v1") + + # Simulate: _run_async spawns a thread with asyncio.run() + worker_client_id = [None] + def _worker(): + async def _inner(): + with patch("agent.auxiliary_client.resolve_provider_client", + side_effect=_stub_resolve_provider_client): + client, _ = _get_cached_client("custom", "m1", async_mode=True, + base_url="http://localhost:8081/v1") + worker_client_id[0] = id(client) + asyncio.run(_inner()) + + t = threading.Thread(target=_worker) + t.start() + t.join() + + assert worker_client_id[0] != id(gateway_client), ( + "Worker thread (asyncio.run) got the gateway's cached client — " + "this is the exact cross-loop scenario that causes httpx deadlocks. " + "The cache key must include the event loop identity (#2681)" + ) + gateway_loop.close() + + def test_closed_loop_client_discarded(self): + """A cached client whose loop has closed should be replaced.""" + from agent.auxiliary_client import _get_cached_client + + loop1 = asyncio.new_event_loop() + asyncio.set_event_loop(loop1) + + with patch("agent.auxiliary_client.resolve_provider_client", + side_effect=_stub_resolve_provider_client): + client1, _ = _get_cached_client("custom", "m1", async_mode=True, + base_url="http://localhost:8081/v1") + + loop1.close() + + # New loop on same thread + loop2 = asyncio.new_event_loop() + asyncio.set_event_loop(loop2) + + with patch("agent.auxiliary_client.resolve_provider_client", + side_effect=_stub_resolve_provider_client): + client2, _ = _get_cached_client("custom", "m1", async_mode=True, + base_url="http://localhost:8081/v1") + + assert client1 is not client2, ( + "Client from closed loop should not be reused" + ) + loop2.close() diff --git a/tools/session_search_tool.py b/tools/session_search_tool.py index 138925bf..d7b69318 100644 --- a/tools/session_search_tool.py +++ b/tools/session_search_tool.py @@ -358,12 +358,14 @@ def session_search( return await asyncio.gather(*coros, return_exceptions=True) try: - asyncio.get_running_loop() - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: - results = pool.submit(lambda: asyncio.run(_summarize_all())).result(timeout=60) - except RuntimeError: - # No event loop running, create a new one - results = asyncio.run(_summarize_all()) + # Use _run_async() which properly manages event loops across + # CLI, gateway, and worker-thread contexts. The previous + # pattern (asyncio.run() in a ThreadPoolExecutor) created a + # disposable event loop that conflicted with cached + # AsyncOpenAI/httpx clients bound to a different loop, + # causing deadlocks in gateway mode (#2681). + from model_tools import _run_async + results = _run_async(_summarize_all()) except concurrent.futures.TimeoutError: logging.warning( "Session summarization timed out after 60 seconds",