diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index cbdb6f48..1c1b9869 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -1216,6 +1216,39 @@ _client_cache: Dict[tuple, tuple] = {} _client_cache_lock = threading.Lock() +def neuter_async_httpx_del() -> None: + """Monkey-patch ``AsyncHttpxClientWrapper.__del__`` to be a no-op. + + The OpenAI SDK's ``AsyncHttpxClientWrapper.__del__`` schedules + ``self.aclose()`` via ``asyncio.get_running_loop().create_task()``. + When an ``AsyncOpenAI`` client is garbage-collected while + prompt_toolkit's event loop is running (the common CLI idle state), + the ``aclose()`` task runs on prompt_toolkit's loop but the + underlying TCP transport is bound to a *different* loop (the worker + thread's loop that the client was originally created on). If that + loop is closed or its thread is dead, the transport's + ``self._loop.call_soon()`` raises ``RuntimeError("Event loop is + closed")``, which prompt_toolkit surfaces as "Unhandled exception + in event loop ... Press ENTER to continue...". + + Neutering ``__del__`` is safe because: + - Cached clients are explicitly cleaned via ``_force_close_async_httpx`` + on stale-loop detection and ``shutdown_cached_clients`` on exit. + - Uncached clients' TCP connections are cleaned up by the OS when the + process exits. + - The OpenAI SDK itself marks this as a TODO (``# TODO(someday): + support non asyncio runtimes here``). + + Call this once at CLI startup, before any ``AsyncOpenAI`` clients are + created. + """ + try: + from openai._base_client import AsyncHttpxClientWrapper + AsyncHttpxClientWrapper.__del__ = lambda self: None # type: ignore[assignment] + except (ImportError, AttributeError): + pass # Graceful degradation if the SDK changes its internals + + def _force_close_async_httpx(client: Any) -> None: """Mark the httpx AsyncClient inside an AsyncOpenAI client as closed. @@ -1263,6 +1296,25 @@ def shutdown_cached_clients() -> None: _client_cache.clear() +def cleanup_stale_async_clients() -> None: + """Force-close cached async clients whose event loop is closed. + + Call this after each agent turn to proactively clean up stale clients + before GC can trigger ``AsyncHttpxClientWrapper.__del__`` on them. + This is defense-in-depth — the primary fix is ``neuter_async_httpx_del`` + which disables ``__del__`` entirely. + """ + with _client_cache_lock: + stale_keys = [] + for key, entry in _client_cache.items(): + client, _default, cached_loop = entry + if cached_loop is not None and cached_loop.is_closed(): + _force_close_async_httpx(client) + stale_keys.append(key) + for key in stale_keys: + del _client_cache[key] + + def _get_cached_client( provider: str, model: str = None, diff --git a/cli.py b/cli.py index bb5c94db..bd7c26c4 100644 --- a/cli.py +++ b/cli.py @@ -449,6 +449,17 @@ try: except Exception: pass # Skin engine is optional — default skin used if unavailable +# Neuter AsyncHttpxClientWrapper.__del__ before any AsyncOpenAI clients are +# created. The SDK's __del__ schedules aclose() on asyncio.get_running_loop() +# which, during CLI idle time, finds prompt_toolkit's event loop and tries to +# close TCP transports bound to dead worker loops — producing +# "Event loop is closed" / "Press ENTER to continue..." errors. +try: + from agent.auxiliary_client import neuter_async_httpx_del + neuter_async_httpx_del() +except Exception: + pass + from rich import box as rich_box from rich.console import Console from rich.markup import escape as _escape @@ -5678,6 +5689,16 @@ class HermesCLI: agent_thread.join() # Ensure agent thread completes + # Proactively clean up async clients whose event loop is dead. + # The agent thread may have created AsyncOpenAI clients bound + # to a per-thread event loop; if that loop is now closed, those + # clients' __del__ would crash prompt_toolkit's loop on GC. + try: + from agent.auxiliary_client import cleanup_stale_async_clients + cleanup_stale_async_clients() + except Exception: + pass + # Flush any remaining streamed text and close the box self._flush_stream() @@ -7241,9 +7262,28 @@ class HermesCLI: # Register atexit cleanup so resources are freed even on unexpected exit atexit.register(_run_cleanup) + # Install a custom asyncio exception handler that suppresses the + # "Event loop is closed" RuntimeError from httpx transport cleanup. + # This is defense-in-depth — the primary fix is neuter_async_httpx_del + # which disables __del__ entirely, but older clients or SDK upgrades + # could bypass it. + def _suppress_closed_loop_errors(loop, context): + exc = context.get("exception") + if isinstance(exc, RuntimeError) and "Event loop is closed" in str(exc): + return # silently suppress + # Fall back to default handler for everything else + loop.default_exception_handler(context) + # Run the application with patch_stdout for proper output handling try: with patch_stdout(): + # Set the custom handler on prompt_toolkit's event loop + try: + import asyncio as _aio + _loop = _aio.get_event_loop() + _loop.set_exception_handler(_suppress_closed_loop_errors) + except Exception: + pass app.run() except (EOFError, KeyboardInterrupt): pass diff --git a/tests/test_async_httpx_del_neuter.py b/tests/test_async_httpx_del_neuter.py new file mode 100644 index 00000000..ce8e20e7 --- /dev/null +++ b/tests/test_async_httpx_del_neuter.py @@ -0,0 +1,162 @@ +"""Tests for the AsyncHttpxClientWrapper.__del__ neuter fix. + +The OpenAI SDK's ``AsyncHttpxClientWrapper.__del__`` schedules +``aclose()`` via ``asyncio.get_running_loop().create_task()``. When GC +fires during CLI idle time, prompt_toolkit's event loop picks up the task +and crashes with "Event loop is closed" because the underlying TCP +transport is bound to a dead worker loop. + +The three-layer defence: +1. ``neuter_async_httpx_del()`` replaces ``__del__`` with a no-op. +2. A custom asyncio exception handler silences residual errors. +3. ``cleanup_stale_async_clients()`` evicts stale cache entries. +""" + +import asyncio +import threading +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Layer 1: neuter_async_httpx_del +# --------------------------------------------------------------------------- + +class TestNeuterAsyncHttpxDel: + """Verify neuter_async_httpx_del replaces __del__ on the SDK class.""" + + def test_del_becomes_noop(self): + """After neuter, __del__ should do nothing (no RuntimeError).""" + from agent.auxiliary_client import neuter_async_httpx_del + + try: + from openai._base_client import AsyncHttpxClientWrapper + except ImportError: + pytest.skip("openai SDK not installed") + + # Save original so we can restore + original_del = AsyncHttpxClientWrapper.__del__ + try: + neuter_async_httpx_del() + # The patched __del__ should be a no-op lambda + assert AsyncHttpxClientWrapper.__del__ is not original_del + # Calling it should not raise, even without a running loop + wrapper = MagicMock(spec=AsyncHttpxClientWrapper) + AsyncHttpxClientWrapper.__del__(wrapper) # Should be silent + finally: + # Restore original to avoid leaking into other tests + AsyncHttpxClientWrapper.__del__ = original_del + + def test_neuter_idempotent(self): + """Calling neuter twice doesn't break anything.""" + from agent.auxiliary_client import neuter_async_httpx_del + + try: + from openai._base_client import AsyncHttpxClientWrapper + except ImportError: + pytest.skip("openai SDK not installed") + + original_del = AsyncHttpxClientWrapper.__del__ + try: + neuter_async_httpx_del() + first_del = AsyncHttpxClientWrapper.__del__ + neuter_async_httpx_del() + second_del = AsyncHttpxClientWrapper.__del__ + # Both calls should succeed; the class should have a no-op + assert first_del is not original_del + assert second_del is not original_del + finally: + AsyncHttpxClientWrapper.__del__ = original_del + + def test_neuter_graceful_without_sdk(self): + """neuter_async_httpx_del doesn't raise if the openai SDK isn't installed.""" + from agent.auxiliary_client import neuter_async_httpx_del + + with patch.dict("sys.modules", {"openai._base_client": None}): + # Should not raise + neuter_async_httpx_del() + + +# --------------------------------------------------------------------------- +# Layer 3: cleanup_stale_async_clients +# --------------------------------------------------------------------------- + +class TestCleanupStaleAsyncClients: + """Verify stale cache entries are evicted and force-closed.""" + + def test_removes_stale_entries(self): + """Entries with a closed loop should be evicted.""" + from agent.auxiliary_client import ( + _client_cache, + _client_cache_lock, + cleanup_stale_async_clients, + ) + + # Create a loop, close it, make a cache entry + loop = asyncio.new_event_loop() + loop.close() + + mock_client = MagicMock() + # Give it _client attribute for _force_close_async_httpx + mock_client._client = MagicMock() + mock_client._client.is_closed = False + + key = ("test_stale", True, "", "", id(loop)) + with _client_cache_lock: + _client_cache[key] = (mock_client, "test-model", loop) + + try: + cleanup_stale_async_clients() + with _client_cache_lock: + assert key not in _client_cache, "Stale entry should be removed" + finally: + # Clean up in case test fails + with _client_cache_lock: + _client_cache.pop(key, None) + + def test_keeps_live_entries(self): + """Entries with an open loop should be preserved.""" + from agent.auxiliary_client import ( + _client_cache, + _client_cache_lock, + cleanup_stale_async_clients, + ) + + loop = asyncio.new_event_loop() # NOT closed + + mock_client = MagicMock() + key = ("test_live", True, "", "", id(loop)) + with _client_cache_lock: + _client_cache[key] = (mock_client, "test-model", loop) + + try: + cleanup_stale_async_clients() + with _client_cache_lock: + assert key in _client_cache, "Live entry should be preserved" + finally: + loop.close() + with _client_cache_lock: + _client_cache.pop(key, None) + + def test_keeps_entries_without_loop(self): + """Sync entries (cached_loop=None) should be preserved.""" + from agent.auxiliary_client import ( + _client_cache, + _client_cache_lock, + cleanup_stale_async_clients, + ) + + mock_client = MagicMock() + key = ("test_sync", False, "", "", 0) + with _client_cache_lock: + _client_cache[key] = (mock_client, "test-model", None) + + try: + cleanup_stale_async_clients() + with _client_cache_lock: + assert key in _client_cache, "Sync entry should be preserved" + finally: + with _client_cache_lock: + _client_cache.pop(key, None)