fix(streaming): stale stream detector race causing spurious RemoteProtocolError
The stale stream detector (90s timeout) was killing healthy connections
during the model's thinking phase, producing self-inflicted
RemoteProtocolError ("peer closed connection without sending complete
message body"). Three issues:
1. last_chunk_time was never reset between inner stream retries, so
subsequent attempts inherited the previous attempt's stale budget
2. The non-streaming fallback path didn't reset the timer either
3. 90s base timeout was too aggressive for large-context Opus sessions
where thinking time before first token routinely exceeds 90s
Fix: reset last_chunk_time at the start of each streaming attempt and
before the non-streaming fallback. Increase base timeout to 180s and
scale to 300s for >100K token contexts.
Made-with: Cursor
This commit is contained in:
23
run_agent.py
23
run_agent.py
@@ -3774,6 +3774,9 @@ class AIAgent:
|
||||
request_client_holder["client"] = self._create_request_openai_client(
|
||||
reason="chat_completion_stream_request"
|
||||
)
|
||||
# Reset stale-stream timer so the detector measures from this
|
||||
# attempt's start, not a previous attempt's last chunk.
|
||||
last_chunk_time["t"] = time.time()
|
||||
stream = request_client_holder["client"].chat.completions.create(**stream_kwargs)
|
||||
|
||||
content_parts: list = []
|
||||
@@ -3904,6 +3907,8 @@ class AIAgent:
|
||||
"""
|
||||
has_tool_use = False
|
||||
|
||||
# Reset stale-stream timer for this attempt
|
||||
last_chunk_time["t"] = time.time()
|
||||
# Use the Anthropic SDK's streaming context manager
|
||||
with self._anthropic_client.messages.stream(**api_kwargs) as stream:
|
||||
for event in stream:
|
||||
@@ -4016,6 +4021,10 @@ class AIAgent:
|
||||
)
|
||||
|
||||
try:
|
||||
# Reset stale timer — the non-streaming fallback
|
||||
# uses its own client; prevent the stale detector
|
||||
# from firing on stale timestamps from failed streams.
|
||||
last_chunk_time["t"] = time.time()
|
||||
result["response"] = self._interruptible_api_call(api_kwargs)
|
||||
except Exception as fallback_err:
|
||||
result["error"] = fallback_err
|
||||
@@ -4025,7 +4034,19 @@ class AIAgent:
|
||||
if request_client is not None:
|
||||
self._close_request_openai_client(request_client, reason="stream_request_complete")
|
||||
|
||||
_stream_stale_timeout = float(os.getenv("HERMES_STREAM_STALE_TIMEOUT", 90.0))
|
||||
_stream_stale_timeout_base = float(os.getenv("HERMES_STREAM_STALE_TIMEOUT", 180.0))
|
||||
# Scale the stale timeout for large contexts: slow models (like Opus)
|
||||
# can legitimately think for minutes before producing the first token
|
||||
# when the context is large. Without this, the stale detector kills
|
||||
# healthy connections during the model's thinking phase, producing
|
||||
# spurious RemoteProtocolError ("peer closed connection").
|
||||
_est_tokens = sum(len(str(v)) for v in api_kwargs.get("messages", [])) // 4
|
||||
if _est_tokens > 100_000:
|
||||
_stream_stale_timeout = max(_stream_stale_timeout_base, 300.0)
|
||||
elif _est_tokens > 50_000:
|
||||
_stream_stale_timeout = max(_stream_stale_timeout_base, 240.0)
|
||||
else:
|
||||
_stream_stale_timeout = _stream_stale_timeout_base
|
||||
|
||||
t = threading.Thread(target=_call, daemon=True)
|
||||
t.start()
|
||||
|
||||
Reference in New Issue
Block a user