fix(streaming): detect and kill stale SSE connections
Adds a wall-clock stale stream detector (HERMES_STREAM_STALE_TIMEOUT, default 90s) that force-closes the httpx client when no real chunks arrive, even if SSE keep-alive pings keep the socket alive. Works with the existing streaming retry loop to recover via fresh connection. Made-with: Cursor
This commit is contained in:
27
run_agent.py
27
run_agent.py
@@ -3601,6 +3601,10 @@ class AIAgent:
|
|||||||
request_client_holder = {"client": None}
|
request_client_holder = {"client": None}
|
||||||
first_delta_fired = {"done": False}
|
first_delta_fired = {"done": False}
|
||||||
deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback)
|
deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback)
|
||||||
|
# Wall-clock timestamp of the last real streaming chunk. The outer
|
||||||
|
# poll loop uses this to detect stale connections that keep receiving
|
||||||
|
# SSE keep-alive pings but no actual data.
|
||||||
|
last_chunk_time = {"t": time.time()}
|
||||||
|
|
||||||
def _fire_first_delta():
|
def _fire_first_delta():
|
||||||
if not first_delta_fired["done"] and on_first_delta:
|
if not first_delta_fired["done"] and on_first_delta:
|
||||||
@@ -3641,6 +3645,8 @@ class AIAgent:
|
|||||||
usage_obj = None
|
usage_obj = None
|
||||||
|
|
||||||
for chunk in stream:
|
for chunk in stream:
|
||||||
|
last_chunk_time["t"] = time.time()
|
||||||
|
|
||||||
if self._interrupt_requested:
|
if self._interrupt_requested:
|
||||||
break
|
break
|
||||||
|
|
||||||
@@ -3878,10 +3884,31 @@ class AIAgent:
|
|||||||
if request_client is not None:
|
if request_client is not None:
|
||||||
self._close_request_openai_client(request_client, reason="stream_request_complete")
|
self._close_request_openai_client(request_client, reason="stream_request_complete")
|
||||||
|
|
||||||
|
_stream_stale_timeout = float(os.getenv("HERMES_STREAM_STALE_TIMEOUT", 90.0))
|
||||||
|
|
||||||
t = threading.Thread(target=_call, daemon=True)
|
t = threading.Thread(target=_call, daemon=True)
|
||||||
t.start()
|
t.start()
|
||||||
while t.is_alive():
|
while t.is_alive():
|
||||||
t.join(timeout=0.3)
|
t.join(timeout=0.3)
|
||||||
|
|
||||||
|
# Detect stale streams: connections kept alive by SSE pings
|
||||||
|
# but delivering no real chunks. Kill the client so the
|
||||||
|
# inner retry loop can start a fresh connection.
|
||||||
|
if time.time() - last_chunk_time["t"] > _stream_stale_timeout:
|
||||||
|
logger.warning(
|
||||||
|
"Stream stale for %.0fs — no chunks received. Killing connection.",
|
||||||
|
_stream_stale_timeout,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
rc = request_client_holder.get("client")
|
||||||
|
if rc is not None:
|
||||||
|
self._close_request_openai_client(rc, reason="stale_stream_kill")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# Reset the timer so we don't kill repeatedly while
|
||||||
|
# the inner thread processes the closure.
|
||||||
|
last_chunk_time["t"] = time.time()
|
||||||
|
|
||||||
if self._interrupt_requested:
|
if self._interrupt_requested:
|
||||||
try:
|
try:
|
||||||
if self.api_mode == "anthropic_messages":
|
if self.api_mode == "anthropic_messages":
|
||||||
|
|||||||
Reference in New Issue
Block a user