fix: eliminate 'Event loop is closed' / 'Press ENTER to continue' during idle (#3398)
The OpenAI SDK's AsyncHttpxClientWrapper.__del__ schedules aclose() via
asyncio.get_running_loop().create_task(). When an AsyncOpenAI client is
garbage-collected while prompt_toolkit's event loop is running (the common
CLI idle state), the aclose() task runs on prompt_toolkit's loop but the
underlying TCP transport is bound to a different (dead) worker loop.
The transport's self._loop.call_soon() then raises RuntimeError('Event
loop is closed'), which prompt_toolkit surfaces as the disruptive
'Unhandled exception in event loop ... Press ENTER to continue...' error.
Three-layer fix:
1. neuter_async_httpx_del(): Monkey-patches __del__ to a no-op at CLI
startup before any AsyncOpenAI clients are created. Safe because
cached clients are explicitly cleaned via _force_close_async_httpx,
and uncached clients' TCP connections are cleaned by the OS on exit.
2. Custom asyncio exception handler: Installed on prompt_toolkit's event
loop to silently suppress 'Event loop is closed' RuntimeError.
Defense-in-depth for SDK upgrades that might change the class name.
3. cleanup_stale_async_clients(): Called after each agent turn (when the
agent thread joins) to proactively evict cache entries whose event
loop is closed, preventing stale clients from accumulating.
This commit is contained in:
@@ -1216,6 +1216,39 @@ _client_cache: Dict[tuple, tuple] = {}
|
||||
_client_cache_lock = threading.Lock()
|
||||
|
||||
|
||||
def neuter_async_httpx_del() -> None:
|
||||
"""Monkey-patch ``AsyncHttpxClientWrapper.__del__`` to be a no-op.
|
||||
|
||||
The OpenAI SDK's ``AsyncHttpxClientWrapper.__del__`` schedules
|
||||
``self.aclose()`` via ``asyncio.get_running_loop().create_task()``.
|
||||
When an ``AsyncOpenAI`` client is garbage-collected while
|
||||
prompt_toolkit's event loop is running (the common CLI idle state),
|
||||
the ``aclose()`` task runs on prompt_toolkit's loop but the
|
||||
underlying TCP transport is bound to a *different* loop (the worker
|
||||
thread's loop that the client was originally created on). If that
|
||||
loop is closed or its thread is dead, the transport's
|
||||
``self._loop.call_soon()`` raises ``RuntimeError("Event loop is
|
||||
closed")``, which prompt_toolkit surfaces as "Unhandled exception
|
||||
in event loop ... Press ENTER to continue...".
|
||||
|
||||
Neutering ``__del__`` is safe because:
|
||||
- Cached clients are explicitly cleaned via ``_force_close_async_httpx``
|
||||
on stale-loop detection and ``shutdown_cached_clients`` on exit.
|
||||
- Uncached clients' TCP connections are cleaned up by the OS when the
|
||||
process exits.
|
||||
- The OpenAI SDK itself marks this as a TODO (``# TODO(someday):
|
||||
support non asyncio runtimes here``).
|
||||
|
||||
Call this once at CLI startup, before any ``AsyncOpenAI`` clients are
|
||||
created.
|
||||
"""
|
||||
try:
|
||||
from openai._base_client import AsyncHttpxClientWrapper
|
||||
AsyncHttpxClientWrapper.__del__ = lambda self: None # type: ignore[assignment]
|
||||
except (ImportError, AttributeError):
|
||||
pass # Graceful degradation if the SDK changes its internals
|
||||
|
||||
|
||||
def _force_close_async_httpx(client: Any) -> None:
|
||||
"""Mark the httpx AsyncClient inside an AsyncOpenAI client as closed.
|
||||
|
||||
@@ -1263,6 +1296,25 @@ def shutdown_cached_clients() -> None:
|
||||
_client_cache.clear()
|
||||
|
||||
|
||||
def cleanup_stale_async_clients() -> None:
|
||||
"""Force-close cached async clients whose event loop is closed.
|
||||
|
||||
Call this after each agent turn to proactively clean up stale clients
|
||||
before GC can trigger ``AsyncHttpxClientWrapper.__del__`` on them.
|
||||
This is defense-in-depth — the primary fix is ``neuter_async_httpx_del``
|
||||
which disables ``__del__`` entirely.
|
||||
"""
|
||||
with _client_cache_lock:
|
||||
stale_keys = []
|
||||
for key, entry in _client_cache.items():
|
||||
client, _default, cached_loop = entry
|
||||
if cached_loop is not None and cached_loop.is_closed():
|
||||
_force_close_async_httpx(client)
|
||||
stale_keys.append(key)
|
||||
for key in stale_keys:
|
||||
del _client_cache[key]
|
||||
|
||||
|
||||
def _get_cached_client(
|
||||
provider: str,
|
||||
model: str = None,
|
||||
|
||||
40
cli.py
40
cli.py
@@ -449,6 +449,17 @@ try:
|
||||
except Exception:
|
||||
pass # Skin engine is optional — default skin used if unavailable
|
||||
|
||||
# Neuter AsyncHttpxClientWrapper.__del__ before any AsyncOpenAI clients are
|
||||
# created. The SDK's __del__ schedules aclose() on asyncio.get_running_loop()
|
||||
# which, during CLI idle time, finds prompt_toolkit's event loop and tries to
|
||||
# close TCP transports bound to dead worker loops — producing
|
||||
# "Event loop is closed" / "Press ENTER to continue..." errors.
|
||||
try:
|
||||
from agent.auxiliary_client import neuter_async_httpx_del
|
||||
neuter_async_httpx_del()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
from rich import box as rich_box
|
||||
from rich.console import Console
|
||||
from rich.markup import escape as _escape
|
||||
@@ -5678,6 +5689,16 @@ class HermesCLI:
|
||||
|
||||
agent_thread.join() # Ensure agent thread completes
|
||||
|
||||
# Proactively clean up async clients whose event loop is dead.
|
||||
# The agent thread may have created AsyncOpenAI clients bound
|
||||
# to a per-thread event loop; if that loop is now closed, those
|
||||
# clients' __del__ would crash prompt_toolkit's loop on GC.
|
||||
try:
|
||||
from agent.auxiliary_client import cleanup_stale_async_clients
|
||||
cleanup_stale_async_clients()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Flush any remaining streamed text and close the box
|
||||
self._flush_stream()
|
||||
|
||||
@@ -7241,9 +7262,28 @@ class HermesCLI:
|
||||
# Register atexit cleanup so resources are freed even on unexpected exit
|
||||
atexit.register(_run_cleanup)
|
||||
|
||||
# Install a custom asyncio exception handler that suppresses the
|
||||
# "Event loop is closed" RuntimeError from httpx transport cleanup.
|
||||
# This is defense-in-depth — the primary fix is neuter_async_httpx_del
|
||||
# which disables __del__ entirely, but older clients or SDK upgrades
|
||||
# could bypass it.
|
||||
def _suppress_closed_loop_errors(loop, context):
|
||||
exc = context.get("exception")
|
||||
if isinstance(exc, RuntimeError) and "Event loop is closed" in str(exc):
|
||||
return # silently suppress
|
||||
# Fall back to default handler for everything else
|
||||
loop.default_exception_handler(context)
|
||||
|
||||
# Run the application with patch_stdout for proper output handling
|
||||
try:
|
||||
with patch_stdout():
|
||||
# Set the custom handler on prompt_toolkit's event loop
|
||||
try:
|
||||
import asyncio as _aio
|
||||
_loop = _aio.get_event_loop()
|
||||
_loop.set_exception_handler(_suppress_closed_loop_errors)
|
||||
except Exception:
|
||||
pass
|
||||
app.run()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
pass
|
||||
|
||||
162
tests/test_async_httpx_del_neuter.py
Normal file
162
tests/test_async_httpx_del_neuter.py
Normal file
@@ -0,0 +1,162 @@
|
||||
"""Tests for the AsyncHttpxClientWrapper.__del__ neuter fix.
|
||||
|
||||
The OpenAI SDK's ``AsyncHttpxClientWrapper.__del__`` schedules
|
||||
``aclose()`` via ``asyncio.get_running_loop().create_task()``. When GC
|
||||
fires during CLI idle time, prompt_toolkit's event loop picks up the task
|
||||
and crashes with "Event loop is closed" because the underlying TCP
|
||||
transport is bound to a dead worker loop.
|
||||
|
||||
The three-layer defence:
|
||||
1. ``neuter_async_httpx_del()`` replaces ``__del__`` with a no-op.
|
||||
2. A custom asyncio exception handler silences residual errors.
|
||||
3. ``cleanup_stale_async_clients()`` evicts stale cache entries.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import threading
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Layer 1: neuter_async_httpx_del
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestNeuterAsyncHttpxDel:
|
||||
"""Verify neuter_async_httpx_del replaces __del__ on the SDK class."""
|
||||
|
||||
def test_del_becomes_noop(self):
|
||||
"""After neuter, __del__ should do nothing (no RuntimeError)."""
|
||||
from agent.auxiliary_client import neuter_async_httpx_del
|
||||
|
||||
try:
|
||||
from openai._base_client import AsyncHttpxClientWrapper
|
||||
except ImportError:
|
||||
pytest.skip("openai SDK not installed")
|
||||
|
||||
# Save original so we can restore
|
||||
original_del = AsyncHttpxClientWrapper.__del__
|
||||
try:
|
||||
neuter_async_httpx_del()
|
||||
# The patched __del__ should be a no-op lambda
|
||||
assert AsyncHttpxClientWrapper.__del__ is not original_del
|
||||
# Calling it should not raise, even without a running loop
|
||||
wrapper = MagicMock(spec=AsyncHttpxClientWrapper)
|
||||
AsyncHttpxClientWrapper.__del__(wrapper) # Should be silent
|
||||
finally:
|
||||
# Restore original to avoid leaking into other tests
|
||||
AsyncHttpxClientWrapper.__del__ = original_del
|
||||
|
||||
def test_neuter_idempotent(self):
|
||||
"""Calling neuter twice doesn't break anything."""
|
||||
from agent.auxiliary_client import neuter_async_httpx_del
|
||||
|
||||
try:
|
||||
from openai._base_client import AsyncHttpxClientWrapper
|
||||
except ImportError:
|
||||
pytest.skip("openai SDK not installed")
|
||||
|
||||
original_del = AsyncHttpxClientWrapper.__del__
|
||||
try:
|
||||
neuter_async_httpx_del()
|
||||
first_del = AsyncHttpxClientWrapper.__del__
|
||||
neuter_async_httpx_del()
|
||||
second_del = AsyncHttpxClientWrapper.__del__
|
||||
# Both calls should succeed; the class should have a no-op
|
||||
assert first_del is not original_del
|
||||
assert second_del is not original_del
|
||||
finally:
|
||||
AsyncHttpxClientWrapper.__del__ = original_del
|
||||
|
||||
def test_neuter_graceful_without_sdk(self):
|
||||
"""neuter_async_httpx_del doesn't raise if the openai SDK isn't installed."""
|
||||
from agent.auxiliary_client import neuter_async_httpx_del
|
||||
|
||||
with patch.dict("sys.modules", {"openai._base_client": None}):
|
||||
# Should not raise
|
||||
neuter_async_httpx_del()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Layer 3: cleanup_stale_async_clients
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCleanupStaleAsyncClients:
|
||||
"""Verify stale cache entries are evicted and force-closed."""
|
||||
|
||||
def test_removes_stale_entries(self):
|
||||
"""Entries with a closed loop should be evicted."""
|
||||
from agent.auxiliary_client import (
|
||||
_client_cache,
|
||||
_client_cache_lock,
|
||||
cleanup_stale_async_clients,
|
||||
)
|
||||
|
||||
# Create a loop, close it, make a cache entry
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.close()
|
||||
|
||||
mock_client = MagicMock()
|
||||
# Give it _client attribute for _force_close_async_httpx
|
||||
mock_client._client = MagicMock()
|
||||
mock_client._client.is_closed = False
|
||||
|
||||
key = ("test_stale", True, "", "", id(loop))
|
||||
with _client_cache_lock:
|
||||
_client_cache[key] = (mock_client, "test-model", loop)
|
||||
|
||||
try:
|
||||
cleanup_stale_async_clients()
|
||||
with _client_cache_lock:
|
||||
assert key not in _client_cache, "Stale entry should be removed"
|
||||
finally:
|
||||
# Clean up in case test fails
|
||||
with _client_cache_lock:
|
||||
_client_cache.pop(key, None)
|
||||
|
||||
def test_keeps_live_entries(self):
|
||||
"""Entries with an open loop should be preserved."""
|
||||
from agent.auxiliary_client import (
|
||||
_client_cache,
|
||||
_client_cache_lock,
|
||||
cleanup_stale_async_clients,
|
||||
)
|
||||
|
||||
loop = asyncio.new_event_loop() # NOT closed
|
||||
|
||||
mock_client = MagicMock()
|
||||
key = ("test_live", True, "", "", id(loop))
|
||||
with _client_cache_lock:
|
||||
_client_cache[key] = (mock_client, "test-model", loop)
|
||||
|
||||
try:
|
||||
cleanup_stale_async_clients()
|
||||
with _client_cache_lock:
|
||||
assert key in _client_cache, "Live entry should be preserved"
|
||||
finally:
|
||||
loop.close()
|
||||
with _client_cache_lock:
|
||||
_client_cache.pop(key, None)
|
||||
|
||||
def test_keeps_entries_without_loop(self):
|
||||
"""Sync entries (cached_loop=None) should be preserved."""
|
||||
from agent.auxiliary_client import (
|
||||
_client_cache,
|
||||
_client_cache_lock,
|
||||
cleanup_stale_async_clients,
|
||||
)
|
||||
|
||||
mock_client = MagicMock()
|
||||
key = ("test_sync", False, "", "", 0)
|
||||
with _client_cache_lock:
|
||||
_client_cache[key] = (mock_client, "test-model", None)
|
||||
|
||||
try:
|
||||
cleanup_stale_async_clients()
|
||||
with _client_cache_lock:
|
||||
assert key in _client_cache, "Sync entry should be preserved"
|
||||
finally:
|
||||
with _client_cache_lock:
|
||||
_client_cache.pop(key, None)
|
||||
Reference in New Issue
Block a user