diff --git a/run_agent.py b/run_agent.py index ef61658d9..17f1c0189 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3601,6 +3601,10 @@ class AIAgent: request_client_holder = {"client": None} first_delta_fired = {"done": False} deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback) + # Wall-clock timestamp of the last real streaming chunk. The outer + # poll loop uses this to detect stale connections that keep receiving + # SSE keep-alive pings but no actual data. + last_chunk_time = {"t": time.time()} def _fire_first_delta(): if not first_delta_fired["done"] and on_first_delta: @@ -3641,6 +3645,8 @@ class AIAgent: usage_obj = None for chunk in stream: + last_chunk_time["t"] = time.time() + if self._interrupt_requested: break @@ -3878,10 +3884,31 @@ class AIAgent: if request_client is not None: self._close_request_openai_client(request_client, reason="stream_request_complete") + _stream_stale_timeout = float(os.getenv("HERMES_STREAM_STALE_TIMEOUT", 90.0)) + t = threading.Thread(target=_call, daemon=True) t.start() while t.is_alive(): t.join(timeout=0.3) + + # Detect stale streams: connections kept alive by SSE pings + # but delivering no real chunks. Kill the client so the + # inner retry loop can start a fresh connection. + if time.time() - last_chunk_time["t"] > _stream_stale_timeout: + logger.warning( + "Stream stale for %.0fs — no chunks received. Killing connection.", + _stream_stale_timeout, + ) + try: + rc = request_client_holder.get("client") + if rc is not None: + self._close_request_openai_client(rc, reason="stale_stream_kill") + except Exception: + pass + # Reset the timer so we don't kill repeatedly while + # the inner thread processes the closure. + last_chunk_time["t"] = time.time() + if self._interrupt_requested: try: if self.api_mode == "anthropic_messages":