From 80a899a8e2907638531b4f70648898460d547d42 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sat, 28 Mar 2026 08:01:37 -0700 Subject: [PATCH] fix: enable fine-grained tool streaming for Claude/OpenRouter + retry SSE errors (#3497) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: Anthropic buffers entire tool call arguments and goes silent for minutes while thinking (verified: 167s gap with zero SSE events on direct API). OpenRouter's upstream proxy times out after ~125s of inactivity and drops the connection with 'Network connection lost'. Fix: Send the x-anthropic-beta: fine-grained-tool-streaming-2025-05-14 header for Claude models on OpenRouter. This makes Anthropic stream tool call arguments token-by-token instead of buffering them, keeping the connection alive through OpenRouter's proxy. Live-tested: the exact prompt that consistently failed at ~128s now completes successfully — 2,972 lines written, 49K tokens, 8 minutes. Additional improvements: 1. Send explicit max_tokens for Claude through OpenRouter. Without it, OpenRouter defaults to 65,536 (confirmed via echo_upstream_body) — only half of Opus 4.6's 128K limit. 2. Classify SSE 'Network connection lost' as retryable in the streaming inner retry loop. The OpenAI SDK raises APIError from SSE error events, which was bypassing our transient error retry logic. 3. Actionable diagnostic guidance when stream-drop retries exhaust. --- run_agent.py | 107 ++++++++++++++++++++++++++++++++++++- tests/test_streaming.py | 115 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 220 insertions(+), 2 deletions(-) diff --git a/run_agent.py b/run_agent.py index dd3028c9..501a1a46 100644 --- a/run_agent.py +++ b/run_agent.py @@ -819,6 +819,25 @@ class AIAgent: } self._client_kwargs = client_kwargs # stored for rebuilding after interrupt + + # Enable fine-grained tool streaming for Claude on OpenRouter. + # Without this, Anthropic buffers the entire tool call and goes + # silent for minutes while thinking — OpenRouter's upstream proxy + # times out during the silence. The beta header makes Anthropic + # stream tool call arguments token-by-token, keeping the + # connection alive. + _effective_base = str(client_kwargs.get("base_url", "")).lower() + if "openrouter" in _effective_base and "claude" in (self.model or "").lower(): + headers = client_kwargs.get("default_headers") or {} + existing_beta = headers.get("x-anthropic-beta", "") + _FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14" + if _FINE_GRAINED not in existing_beta: + if existing_beta: + headers["x-anthropic-beta"] = f"{existing_beta},{_FINE_GRAINED}" + else: + headers["x-anthropic-beta"] = _FINE_GRAINED + client_kwargs["default_headers"] = headers + self.api_key = client_kwargs.get("api_key", "") try: self.client = self._create_openai_client(client_kwargs, reason="agent_init", shared=True) @@ -4041,7 +4060,37 @@ class AIAgent: e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError) ) - if _is_timeout or _is_conn_err: + # SSE error events from proxies (e.g. OpenRouter sends + # {"error":{"message":"Network connection lost."}}) are + # raised as APIError by the OpenAI SDK. These are + # semantically identical to httpx connection drops — + # the upstream stream died — and should be retried with + # a fresh connection. Distinguish from HTTP errors: + # APIError from SSE has no status_code, while + # APIStatusError (4xx/5xx) always has one. + _is_sse_conn_err = False + if not _is_timeout and not _is_conn_err: + from openai import APIError as _APIError + if isinstance(e, _APIError) and not getattr(e, "status_code", None): + _err_lower_sse = str(e).lower() + _SSE_CONN_PHRASES = ( + "connection lost", + "connection reset", + "connection closed", + "connection terminated", + "network error", + "network connection", + "terminated", + "peer closed", + "broken pipe", + "upstream connect error", + ) + _is_sse_conn_err = any( + phrase in _err_lower_sse + for phrase in _SSE_CONN_PHRASES + ) + + if _is_timeout or _is_conn_err or _is_sse_conn_err: # Transient network / timeout error. Retry the # streaming request with a fresh connection first. if _stream_attempt < _max_stream_retries: @@ -4554,6 +4603,20 @@ class AIAgent: if self.max_tokens is not None: api_kwargs.update(self._max_tokens_param(self.max_tokens)) + elif self._is_openrouter_url() and "claude" in (self.model or "").lower(): + # OpenRouter translates requests to Anthropic's Messages API, + # which requires max_tokens as a mandatory field. When we omit + # it, OpenRouter picks a default that can be too low — the model + # spends its output budget on thinking and has almost nothing + # left for the actual response (especially large tool calls like + # write_file). Sending the model's real output limit ensures + # full capacity. Other providers handle the default fine. + try: + from agent.anthropic_adapter import _get_anthropic_max_output + _model_output_limit = _get_anthropic_max_output(self.model) + api_kwargs["max_tokens"] = _model_output_limit + except Exception: + pass # fail open — let OpenRouter pick its default extra_body = {} @@ -6978,6 +7041,36 @@ class AIAgent: _final_summary = self._summarize_api_error(api_error) self._vprint(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded. Giving up.", force=True) self._vprint(f"{self.log_prefix} 💀 Final error: {_final_summary}", force=True) + + # Detect SSE stream-drop pattern (e.g. "Network + # connection lost") and surface actionable guidance. + # This typically happens when the model generates a + # very large tool call (write_file with huge content) + # and the proxy/CDN drops the stream mid-response. + _is_stream_drop = ( + not getattr(api_error, "status_code", None) + and any(p in error_msg for p in ( + "connection lost", "connection reset", + "connection closed", "network connection", + "network error", "terminated", + )) + ) + if _is_stream_drop: + self._vprint( + f"{self.log_prefix} 💡 The provider's stream " + f"connection keeps dropping. This often happens " + f"when the model tries to write a very large " + f"file in a single tool call.", + force=True, + ) + self._vprint( + f"{self.log_prefix} Try asking the model " + f"to use execute_code with Python's open() for " + f"large files, or to write the file in smaller " + f"sections.", + force=True, + ) + logging.error( "%sAPI call failed after %s retries. %s | provider=%s model=%s msgs=%s tokens=~%s", self.log_prefix, max_retries, _final_summary, @@ -6987,8 +7080,18 @@ class AIAgent: api_kwargs, reason="max_retries_exhausted", error=api_error, ) self._persist_session(messages, conversation_history) + _final_response = f"API call failed after {max_retries} retries: {_final_summary}" + if _is_stream_drop: + _final_response += ( + "\n\nThe provider's stream connection keeps " + "dropping — this often happens when generating " + "very large tool call responses (e.g. write_file " + "with long content). Try asking me to use " + "execute_code with Python's open() for large " + "files, or to write in smaller sections." + ) return { - "final_response": f"API call failed after {max_retries} retries: {_final_summary}", + "final_response": _final_response, "messages": messages, "api_calls": api_call_count, "completed": False, diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 88e3aa9e..f50510f0 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -532,6 +532,121 @@ class TestStreamingFallback: mock_non_stream.assert_called_once() assert mock_close.call_count >= 1 + @patch("run_agent.AIAgent._interruptible_api_call") + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_sse_connection_lost_retried_as_transient(self, mock_close, mock_create, mock_non_stream): + """SSE 'Network connection lost' (APIError w/ no status_code) retries like httpx errors. + + OpenRouter sends {"error":{"message":"Network connection lost."}} as an SSE + event when the upstream stream drops. The OpenAI SDK raises APIError from + this. It should be retried at the streaming level, same as httpx connection + errors, before falling back to non-streaming. + """ + from run_agent import AIAgent + import httpx + + # Create an APIError that mimics what the OpenAI SDK raises from SSE error events. + # Key: no status_code attribute (unlike APIStatusError which has one). + from openai import APIError as OAIAPIError + sse_error = OAIAPIError( + message="Network connection lost.", + request=httpx.Request("POST", "https://openrouter.ai/api/v1/chat/completions"), + body={"message": "Network connection lost."}, + ) + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = sse_error + mock_create.return_value = mock_client + + fallback_response = SimpleNamespace( + id="fallback", + model="test", + choices=[SimpleNamespace( + index=0, + message=SimpleNamespace( + role="assistant", + content="fallback after SSE retries", + tool_calls=None, + reasoning_content=None, + ), + finish_reason="stop", + )], + usage=None, + ) + mock_non_stream.return_value = fallback_response + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + assert response.choices[0].message.content == "fallback after SSE retries" + # Should retry 3 times (default HERMES_STREAM_RETRIES=2 → 3 attempts) + # before falling back to non-streaming + assert mock_client.chat.completions.create.call_count == 3 + mock_non_stream.assert_called_once() + # Connection cleanup should happen for each failed retry + assert mock_close.call_count >= 2 + + @patch("run_agent.AIAgent._interruptible_api_call") + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_sse_non_connection_error_falls_back_immediately(self, mock_close, mock_create, mock_non_stream): + """SSE errors that aren't connection-related still fall back immediately (no stream retry).""" + from run_agent import AIAgent + import httpx + + from openai import APIError as OAIAPIError + sse_error = OAIAPIError( + message="Invalid model configuration.", + request=httpx.Request("POST", "https://openrouter.ai/api/v1/chat/completions"), + body={"message": "Invalid model configuration."}, + ) + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = sse_error + mock_create.return_value = mock_client + + fallback_response = SimpleNamespace( + id="fallback", + model="test", + choices=[SimpleNamespace( + index=0, + message=SimpleNamespace( + role="assistant", + content="fallback no retry", + tool_calls=None, + reasoning_content=None, + ), + finish_reason="stop", + )], + usage=None, + ) + mock_non_stream.return_value = fallback_response + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + assert response.choices[0].message.content == "fallback no retry" + # Should NOT retry — goes straight to non-streaming fallback + assert mock_client.chat.completions.create.call_count == 1 + mock_non_stream.assert_called_once() + # ── Test: Reasoning Streaming ────────────────────────────────────────────