fix(agent): prevent AsyncOpenAI/httpx cross-loop deadlock in gateway mode (#2701)
In gateway mode, async tools (vision_analyze, web_extract, session_search) deadlock because _run_async() spawns a thread with asyncio.run(), creating a new event loop, but _get_cached_client() returns an AsyncOpenAI client bound to a different loop. httpx.AsyncClient cannot work across event loop boundaries, causing await client.chat.completions.create() to hang forever. Fix: include the event loop identity in the async client cache key so each loop gets its own AsyncOpenAI instance. Also fix session_search_tool.py which had its own broken asyncio.run()-in-thread pattern — now uses the centralized _run_async() bridge.
This commit is contained in:
@@ -1258,13 +1258,33 @@ def _get_cached_client(
|
|||||||
base_url: str = None,
|
base_url: str = None,
|
||||||
api_key: str = None,
|
api_key: str = None,
|
||||||
) -> Tuple[Optional[Any], Optional[str]]:
|
) -> Tuple[Optional[Any], Optional[str]]:
|
||||||
"""Get or create a cached client for the given provider."""
|
"""Get or create a cached client for the given provider.
|
||||||
cache_key = (provider, async_mode, base_url or "", api_key or "")
|
|
||||||
|
Async clients (AsyncOpenAI) use httpx.AsyncClient internally, which
|
||||||
|
binds to the event loop that was current when the client was created.
|
||||||
|
Using such a client on a *different* loop causes deadlocks or
|
||||||
|
RuntimeError. To prevent cross-loop issues (especially in gateway
|
||||||
|
mode where _run_async() may spawn fresh loops in worker threads), the
|
||||||
|
cache key for async clients includes the current event loop's identity
|
||||||
|
so each loop gets its own client instance.
|
||||||
|
"""
|
||||||
|
# Include loop identity for async clients to prevent cross-loop reuse.
|
||||||
|
# httpx.AsyncClient (inside AsyncOpenAI) is bound to the loop where it
|
||||||
|
# was created — reusing it on a different loop causes deadlocks (#2681).
|
||||||
|
loop_id = 0
|
||||||
|
current_loop = None
|
||||||
|
if async_mode:
|
||||||
|
try:
|
||||||
|
import asyncio as _aio
|
||||||
|
current_loop = _aio.get_event_loop()
|
||||||
|
loop_id = id(current_loop)
|
||||||
|
except RuntimeError:
|
||||||
|
pass
|
||||||
|
cache_key = (provider, async_mode, base_url or "", api_key or "", loop_id)
|
||||||
with _client_cache_lock:
|
with _client_cache_lock:
|
||||||
if cache_key in _client_cache:
|
if cache_key in _client_cache:
|
||||||
cached_client, cached_default, cached_loop = _client_cache[cache_key]
|
cached_client, cached_default, cached_loop = _client_cache[cache_key]
|
||||||
if async_mode:
|
if async_mode:
|
||||||
# Async clients are bound to the event loop that created them.
|
|
||||||
# A cached async client whose loop has been closed will raise
|
# A cached async client whose loop has been closed will raise
|
||||||
# "Event loop is closed" when httpx tries to clean up its
|
# "Event loop is closed" when httpx tries to clean up its
|
||||||
# transport. Discard the stale client and create a fresh one.
|
# transport. Discard the stale client and create a fresh one.
|
||||||
@@ -1286,13 +1306,7 @@ def _get_cached_client(
|
|||||||
if client is not None:
|
if client is not None:
|
||||||
# For async clients, remember which loop they were created on so we
|
# For async clients, remember which loop they were created on so we
|
||||||
# can detect stale entries later.
|
# can detect stale entries later.
|
||||||
bound_loop = None
|
bound_loop = current_loop
|
||||||
if async_mode:
|
|
||||||
try:
|
|
||||||
import asyncio as _aio
|
|
||||||
bound_loop = _aio.get_event_loop()
|
|
||||||
except RuntimeError:
|
|
||||||
pass
|
|
||||||
with _client_cache_lock:
|
with _client_cache_lock:
|
||||||
if cache_key not in _client_cache:
|
if cache_key not in _client_cache:
|
||||||
_client_cache[cache_key] = (client, default_model, bound_loop)
|
_client_cache[cache_key] = (client, default_model, bound_loop)
|
||||||
|
|||||||
186
tests/test_crossloop_client_cache.py
Normal file
186
tests/test_crossloop_client_cache.py
Normal file
@@ -0,0 +1,186 @@
|
|||||||
|
"""Tests for cross-loop client cache isolation fix (#2681).
|
||||||
|
|
||||||
|
Verifies that _get_cached_client() returns different AsyncOpenAI clients
|
||||||
|
when called from different event loops, preventing the httpx deadlock
|
||||||
|
that occurs when a cached async client bound to loop A is reused on loop B.
|
||||||
|
|
||||||
|
This test file is self-contained and does not import the full tool chain,
|
||||||
|
so it can run without optional dependencies like firecrawl.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import threading
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Minimal stubs so we can import _get_cached_client without the full tree
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _stub_resolve_provider_client(provider, model, async_mode, **kw):
|
||||||
|
"""Return a unique mock client each time, simulating AsyncOpenAI creation."""
|
||||||
|
client = MagicMock(name=f"client-{provider}-async={async_mode}")
|
||||||
|
client.api_key = "test"
|
||||||
|
client.base_url = kw.get("explicit_base_url", "http://localhost:8081/v1")
|
||||||
|
return client, model or "test-model"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _clean_client_cache():
|
||||||
|
"""Clear the client cache before each test."""
|
||||||
|
import importlib
|
||||||
|
# We need to patch before importing
|
||||||
|
with patch.dict("sys.modules", {}):
|
||||||
|
pass
|
||||||
|
# Import and clear
|
||||||
|
import agent.auxiliary_client as ac
|
||||||
|
ac._client_cache.clear()
|
||||||
|
yield
|
||||||
|
ac._client_cache.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestCrossLoopCacheIsolation:
|
||||||
|
"""Verify async clients are cached per-event-loop, not globally."""
|
||||||
|
|
||||||
|
def test_same_loop_reuses_client(self):
|
||||||
|
"""Within a single event loop, the same client should be returned."""
|
||||||
|
from agent.auxiliary_client import _get_cached_client, _client_cache
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
|
with patch("agent.auxiliary_client.resolve_provider_client",
|
||||||
|
side_effect=_stub_resolve_provider_client):
|
||||||
|
client1, _ = _get_cached_client("custom", "m1", async_mode=True,
|
||||||
|
base_url="http://localhost:8081/v1")
|
||||||
|
client2, _ = _get_cached_client("custom", "m1", async_mode=True,
|
||||||
|
base_url="http://localhost:8081/v1")
|
||||||
|
|
||||||
|
assert client1 is client2, (
|
||||||
|
"Same loop should return the same cached client"
|
||||||
|
)
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
def test_different_loops_get_different_clients(self):
|
||||||
|
"""Different event loops must get separate client instances."""
|
||||||
|
from agent.auxiliary_client import _get_cached_client
|
||||||
|
|
||||||
|
results = {}
|
||||||
|
|
||||||
|
def _get_client_on_new_loop(name):
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
with patch("agent.auxiliary_client.resolve_provider_client",
|
||||||
|
side_effect=_stub_resolve_provider_client):
|
||||||
|
client, _ = _get_cached_client("custom", "m1", async_mode=True,
|
||||||
|
base_url="http://localhost:8081/v1")
|
||||||
|
results[name] = (id(client), id(loop))
|
||||||
|
# Don't close loop — simulates real usage where loops persist
|
||||||
|
|
||||||
|
t1 = threading.Thread(target=_get_client_on_new_loop, args=("a",))
|
||||||
|
t2 = threading.Thread(target=_get_client_on_new_loop, args=("b",))
|
||||||
|
t1.start(); t1.join()
|
||||||
|
t2.start(); t2.join()
|
||||||
|
|
||||||
|
client_id_a, loop_id_a = results["a"]
|
||||||
|
client_id_b, loop_id_b = results["b"]
|
||||||
|
|
||||||
|
assert loop_id_a != loop_id_b, "Test setup error: same loop on both threads"
|
||||||
|
assert client_id_a != client_id_b, (
|
||||||
|
"Different event loops got the SAME cached client — this causes "
|
||||||
|
"httpx cross-loop deadlocks in gateway mode (#2681)"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_sync_clients_not_affected(self):
|
||||||
|
"""Sync clients (async_mode=False) should still be cached globally,
|
||||||
|
since httpx.Client (sync) doesn't bind to an event loop."""
|
||||||
|
from agent.auxiliary_client import _get_cached_client
|
||||||
|
|
||||||
|
results = {}
|
||||||
|
|
||||||
|
def _get_sync_client(name):
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
with patch("agent.auxiliary_client.resolve_provider_client",
|
||||||
|
side_effect=_stub_resolve_provider_client):
|
||||||
|
client, _ = _get_cached_client("custom", "m1", async_mode=False,
|
||||||
|
base_url="http://localhost:8081/v1")
|
||||||
|
results[name] = id(client)
|
||||||
|
|
||||||
|
t1 = threading.Thread(target=_get_sync_client, args=("a",))
|
||||||
|
t2 = threading.Thread(target=_get_sync_client, args=("b",))
|
||||||
|
t1.start(); t1.join()
|
||||||
|
t2.start(); t2.join()
|
||||||
|
|
||||||
|
assert results["a"] == results["b"], (
|
||||||
|
"Sync clients should be shared across threads (no loop binding)"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_gateway_simulation_no_deadlock(self):
|
||||||
|
"""Simulate gateway mode: _run_async spawns a thread with asyncio.run(),
|
||||||
|
which creates a new loop. The cached client must be created on THAT loop,
|
||||||
|
not reused from a different one."""
|
||||||
|
from agent.auxiliary_client import _get_cached_client
|
||||||
|
|
||||||
|
# Simulate: first call on "gateway loop"
|
||||||
|
gateway_loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(gateway_loop)
|
||||||
|
|
||||||
|
with patch("agent.auxiliary_client.resolve_provider_client",
|
||||||
|
side_effect=_stub_resolve_provider_client):
|
||||||
|
gateway_client, _ = _get_cached_client("custom", "m1", async_mode=True,
|
||||||
|
base_url="http://localhost:8081/v1")
|
||||||
|
|
||||||
|
# Simulate: _run_async spawns a thread with asyncio.run()
|
||||||
|
worker_client_id = [None]
|
||||||
|
def _worker():
|
||||||
|
async def _inner():
|
||||||
|
with patch("agent.auxiliary_client.resolve_provider_client",
|
||||||
|
side_effect=_stub_resolve_provider_client):
|
||||||
|
client, _ = _get_cached_client("custom", "m1", async_mode=True,
|
||||||
|
base_url="http://localhost:8081/v1")
|
||||||
|
worker_client_id[0] = id(client)
|
||||||
|
asyncio.run(_inner())
|
||||||
|
|
||||||
|
t = threading.Thread(target=_worker)
|
||||||
|
t.start()
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
assert worker_client_id[0] != id(gateway_client), (
|
||||||
|
"Worker thread (asyncio.run) got the gateway's cached client — "
|
||||||
|
"this is the exact cross-loop scenario that causes httpx deadlocks. "
|
||||||
|
"The cache key must include the event loop identity (#2681)"
|
||||||
|
)
|
||||||
|
gateway_loop.close()
|
||||||
|
|
||||||
|
def test_closed_loop_client_discarded(self):
|
||||||
|
"""A cached client whose loop has closed should be replaced."""
|
||||||
|
from agent.auxiliary_client import _get_cached_client
|
||||||
|
|
||||||
|
loop1 = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop1)
|
||||||
|
|
||||||
|
with patch("agent.auxiliary_client.resolve_provider_client",
|
||||||
|
side_effect=_stub_resolve_provider_client):
|
||||||
|
client1, _ = _get_cached_client("custom", "m1", async_mode=True,
|
||||||
|
base_url="http://localhost:8081/v1")
|
||||||
|
|
||||||
|
loop1.close()
|
||||||
|
|
||||||
|
# New loop on same thread
|
||||||
|
loop2 = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop2)
|
||||||
|
|
||||||
|
with patch("agent.auxiliary_client.resolve_provider_client",
|
||||||
|
side_effect=_stub_resolve_provider_client):
|
||||||
|
client2, _ = _get_cached_client("custom", "m1", async_mode=True,
|
||||||
|
base_url="http://localhost:8081/v1")
|
||||||
|
|
||||||
|
assert client1 is not client2, (
|
||||||
|
"Client from closed loop should not be reused"
|
||||||
|
)
|
||||||
|
loop2.close()
|
||||||
@@ -358,12 +358,14 @@ def session_search(
|
|||||||
return await asyncio.gather(*coros, return_exceptions=True)
|
return await asyncio.gather(*coros, return_exceptions=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
asyncio.get_running_loop()
|
# Use _run_async() which properly manages event loops across
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
# CLI, gateway, and worker-thread contexts. The previous
|
||||||
results = pool.submit(lambda: asyncio.run(_summarize_all())).result(timeout=60)
|
# pattern (asyncio.run() in a ThreadPoolExecutor) created a
|
||||||
except RuntimeError:
|
# disposable event loop that conflicted with cached
|
||||||
# No event loop running, create a new one
|
# AsyncOpenAI/httpx clients bound to a different loop,
|
||||||
results = asyncio.run(_summarize_all())
|
# causing deadlocks in gateway mode (#2681).
|
||||||
|
from model_tools import _run_async
|
||||||
|
results = _run_async(_summarize_all())
|
||||||
except concurrent.futures.TimeoutError:
|
except concurrent.futures.TimeoutError:
|
||||||
logging.warning(
|
logging.warning(
|
||||||
"Session summarization timed out after 60 seconds",
|
"Session summarization timed out after 60 seconds",
|
||||||
|
|||||||
Reference in New Issue
Block a user