fix: enable fine-grained tool streaming for Claude/OpenRouter + retry SSE errors (#3497)
Root cause: Anthropic buffers entire tool call arguments and goes silent for minutes while thinking (verified: 167s gap with zero SSE events on direct API). OpenRouter's upstream proxy times out after ~125s of inactivity and drops the connection with 'Network connection lost'. Fix: Send the x-anthropic-beta: fine-grained-tool-streaming-2025-05-14 header for Claude models on OpenRouter. This makes Anthropic stream tool call arguments token-by-token instead of buffering them, keeping the connection alive through OpenRouter's proxy. Live-tested: the exact prompt that consistently failed at ~128s now completes successfully — 2,972 lines written, 49K tokens, 8 minutes. Additional improvements: 1. Send explicit max_tokens for Claude through OpenRouter. Without it, OpenRouter defaults to 65,536 (confirmed via echo_upstream_body) — only half of Opus 4.6's 128K limit. 2. Classify SSE 'Network connection lost' as retryable in the streaming inner retry loop. The OpenAI SDK raises APIError from SSE error events, which was bypassing our transient error retry logic. 3. Actionable diagnostic guidance when stream-drop retries exhaust.
This commit is contained in:
107
run_agent.py
107
run_agent.py
@@ -819,6 +819,25 @@ class AIAgent:
|
||||
}
|
||||
|
||||
self._client_kwargs = client_kwargs # stored for rebuilding after interrupt
|
||||
|
||||
# Enable fine-grained tool streaming for Claude on OpenRouter.
|
||||
# Without this, Anthropic buffers the entire tool call and goes
|
||||
# silent for minutes while thinking — OpenRouter's upstream proxy
|
||||
# times out during the silence. The beta header makes Anthropic
|
||||
# stream tool call arguments token-by-token, keeping the
|
||||
# connection alive.
|
||||
_effective_base = str(client_kwargs.get("base_url", "")).lower()
|
||||
if "openrouter" in _effective_base and "claude" in (self.model or "").lower():
|
||||
headers = client_kwargs.get("default_headers") or {}
|
||||
existing_beta = headers.get("x-anthropic-beta", "")
|
||||
_FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14"
|
||||
if _FINE_GRAINED not in existing_beta:
|
||||
if existing_beta:
|
||||
headers["x-anthropic-beta"] = f"{existing_beta},{_FINE_GRAINED}"
|
||||
else:
|
||||
headers["x-anthropic-beta"] = _FINE_GRAINED
|
||||
client_kwargs["default_headers"] = headers
|
||||
|
||||
self.api_key = client_kwargs.get("api_key", "")
|
||||
try:
|
||||
self.client = self._create_openai_client(client_kwargs, reason="agent_init", shared=True)
|
||||
@@ -4041,7 +4060,37 @@ class AIAgent:
|
||||
e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError)
|
||||
)
|
||||
|
||||
if _is_timeout or _is_conn_err:
|
||||
# SSE error events from proxies (e.g. OpenRouter sends
|
||||
# {"error":{"message":"Network connection lost."}}) are
|
||||
# raised as APIError by the OpenAI SDK. These are
|
||||
# semantically identical to httpx connection drops —
|
||||
# the upstream stream died — and should be retried with
|
||||
# a fresh connection. Distinguish from HTTP errors:
|
||||
# APIError from SSE has no status_code, while
|
||||
# APIStatusError (4xx/5xx) always has one.
|
||||
_is_sse_conn_err = False
|
||||
if not _is_timeout and not _is_conn_err:
|
||||
from openai import APIError as _APIError
|
||||
if isinstance(e, _APIError) and not getattr(e, "status_code", None):
|
||||
_err_lower_sse = str(e).lower()
|
||||
_SSE_CONN_PHRASES = (
|
||||
"connection lost",
|
||||
"connection reset",
|
||||
"connection closed",
|
||||
"connection terminated",
|
||||
"network error",
|
||||
"network connection",
|
||||
"terminated",
|
||||
"peer closed",
|
||||
"broken pipe",
|
||||
"upstream connect error",
|
||||
)
|
||||
_is_sse_conn_err = any(
|
||||
phrase in _err_lower_sse
|
||||
for phrase in _SSE_CONN_PHRASES
|
||||
)
|
||||
|
||||
if _is_timeout or _is_conn_err or _is_sse_conn_err:
|
||||
# Transient network / timeout error. Retry the
|
||||
# streaming request with a fresh connection first.
|
||||
if _stream_attempt < _max_stream_retries:
|
||||
@@ -4554,6 +4603,20 @@ class AIAgent:
|
||||
|
||||
if self.max_tokens is not None:
|
||||
api_kwargs.update(self._max_tokens_param(self.max_tokens))
|
||||
elif self._is_openrouter_url() and "claude" in (self.model or "").lower():
|
||||
# OpenRouter translates requests to Anthropic's Messages API,
|
||||
# which requires max_tokens as a mandatory field. When we omit
|
||||
# it, OpenRouter picks a default that can be too low — the model
|
||||
# spends its output budget on thinking and has almost nothing
|
||||
# left for the actual response (especially large tool calls like
|
||||
# write_file). Sending the model's real output limit ensures
|
||||
# full capacity. Other providers handle the default fine.
|
||||
try:
|
||||
from agent.anthropic_adapter import _get_anthropic_max_output
|
||||
_model_output_limit = _get_anthropic_max_output(self.model)
|
||||
api_kwargs["max_tokens"] = _model_output_limit
|
||||
except Exception:
|
||||
pass # fail open — let OpenRouter pick its default
|
||||
|
||||
extra_body = {}
|
||||
|
||||
@@ -6978,6 +7041,36 @@ class AIAgent:
|
||||
_final_summary = self._summarize_api_error(api_error)
|
||||
self._vprint(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded. Giving up.", force=True)
|
||||
self._vprint(f"{self.log_prefix} 💀 Final error: {_final_summary}", force=True)
|
||||
|
||||
# Detect SSE stream-drop pattern (e.g. "Network
|
||||
# connection lost") and surface actionable guidance.
|
||||
# This typically happens when the model generates a
|
||||
# very large tool call (write_file with huge content)
|
||||
# and the proxy/CDN drops the stream mid-response.
|
||||
_is_stream_drop = (
|
||||
not getattr(api_error, "status_code", None)
|
||||
and any(p in error_msg for p in (
|
||||
"connection lost", "connection reset",
|
||||
"connection closed", "network connection",
|
||||
"network error", "terminated",
|
||||
))
|
||||
)
|
||||
if _is_stream_drop:
|
||||
self._vprint(
|
||||
f"{self.log_prefix} 💡 The provider's stream "
|
||||
f"connection keeps dropping. This often happens "
|
||||
f"when the model tries to write a very large "
|
||||
f"file in a single tool call.",
|
||||
force=True,
|
||||
)
|
||||
self._vprint(
|
||||
f"{self.log_prefix} Try asking the model "
|
||||
f"to use execute_code with Python's open() for "
|
||||
f"large files, or to write the file in smaller "
|
||||
f"sections.",
|
||||
force=True,
|
||||
)
|
||||
|
||||
logging.error(
|
||||
"%sAPI call failed after %s retries. %s | provider=%s model=%s msgs=%s tokens=~%s",
|
||||
self.log_prefix, max_retries, _final_summary,
|
||||
@@ -6987,8 +7080,18 @@ class AIAgent:
|
||||
api_kwargs, reason="max_retries_exhausted", error=api_error,
|
||||
)
|
||||
self._persist_session(messages, conversation_history)
|
||||
_final_response = f"API call failed after {max_retries} retries: {_final_summary}"
|
||||
if _is_stream_drop:
|
||||
_final_response += (
|
||||
"\n\nThe provider's stream connection keeps "
|
||||
"dropping — this often happens when generating "
|
||||
"very large tool call responses (e.g. write_file "
|
||||
"with long content). Try asking me to use "
|
||||
"execute_code with Python's open() for large "
|
||||
"files, or to write in smaller sections."
|
||||
)
|
||||
return {
|
||||
"final_response": f"API call failed after {max_retries} retries: {_final_summary}",
|
||||
"final_response": _final_response,
|
||||
"messages": messages,
|
||||
"api_calls": api_call_count,
|
||||
"completed": False,
|
||||
|
||||
Reference in New Issue
Block a user