diff --git a/run_agent.py b/run_agent.py index dd3028c92..501a1a463 100644 --- a/run_agent.py +++ b/run_agent.py @@ -819,6 +819,25 @@ class AIAgent: } self._client_kwargs = client_kwargs # stored for rebuilding after interrupt + + # Enable fine-grained tool streaming for Claude on OpenRouter. + # Without this, Anthropic buffers the entire tool call and goes + # silent for minutes while thinking — OpenRouter's upstream proxy + # times out during the silence. The beta header makes Anthropic + # stream tool call arguments token-by-token, keeping the + # connection alive. + _effective_base = str(client_kwargs.get("base_url", "")).lower() + if "openrouter" in _effective_base and "claude" in (self.model or "").lower(): + headers = client_kwargs.get("default_headers") or {} + existing_beta = headers.get("x-anthropic-beta", "") + _FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14" + if _FINE_GRAINED not in existing_beta: + if existing_beta: + headers["x-anthropic-beta"] = f"{existing_beta},{_FINE_GRAINED}" + else: + headers["x-anthropic-beta"] = _FINE_GRAINED + client_kwargs["default_headers"] = headers + self.api_key = client_kwargs.get("api_key", "") try: self.client = self._create_openai_client(client_kwargs, reason="agent_init", shared=True) @@ -4041,7 +4060,37 @@ class AIAgent: e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError) ) - if _is_timeout or _is_conn_err: + # SSE error events from proxies (e.g. OpenRouter sends + # {"error":{"message":"Network connection lost."}}) are + # raised as APIError by the OpenAI SDK. These are + # semantically identical to httpx connection drops — + # the upstream stream died — and should be retried with + # a fresh connection. Distinguish from HTTP errors: + # APIError from SSE has no status_code, while + # APIStatusError (4xx/5xx) always has one. + _is_sse_conn_err = False + if not _is_timeout and not _is_conn_err: + from openai import APIError as _APIError + if isinstance(e, _APIError) and not getattr(e, "status_code", None): + _err_lower_sse = str(e).lower() + _SSE_CONN_PHRASES = ( + "connection lost", + "connection reset", + "connection closed", + "connection terminated", + "network error", + "network connection", + "terminated", + "peer closed", + "broken pipe", + "upstream connect error", + ) + _is_sse_conn_err = any( + phrase in _err_lower_sse + for phrase in _SSE_CONN_PHRASES + ) + + if _is_timeout or _is_conn_err or _is_sse_conn_err: # Transient network / timeout error. Retry the # streaming request with a fresh connection first. if _stream_attempt < _max_stream_retries: @@ -4554,6 +4603,20 @@ class AIAgent: if self.max_tokens is not None: api_kwargs.update(self._max_tokens_param(self.max_tokens)) + elif self._is_openrouter_url() and "claude" in (self.model or "").lower(): + # OpenRouter translates requests to Anthropic's Messages API, + # which requires max_tokens as a mandatory field. When we omit + # it, OpenRouter picks a default that can be too low — the model + # spends its output budget on thinking and has almost nothing + # left for the actual response (especially large tool calls like + # write_file). Sending the model's real output limit ensures + # full capacity. Other providers handle the default fine. + try: + from agent.anthropic_adapter import _get_anthropic_max_output + _model_output_limit = _get_anthropic_max_output(self.model) + api_kwargs["max_tokens"] = _model_output_limit + except Exception: + pass # fail open — let OpenRouter pick its default extra_body = {} @@ -6978,6 +7041,36 @@ class AIAgent: _final_summary = self._summarize_api_error(api_error) self._vprint(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded. Giving up.", force=True) self._vprint(f"{self.log_prefix} 💀 Final error: {_final_summary}", force=True) + + # Detect SSE stream-drop pattern (e.g. "Network + # connection lost") and surface actionable guidance. + # This typically happens when the model generates a + # very large tool call (write_file with huge content) + # and the proxy/CDN drops the stream mid-response. + _is_stream_drop = ( + not getattr(api_error, "status_code", None) + and any(p in error_msg for p in ( + "connection lost", "connection reset", + "connection closed", "network connection", + "network error", "terminated", + )) + ) + if _is_stream_drop: + self._vprint( + f"{self.log_prefix} 💡 The provider's stream " + f"connection keeps dropping. This often happens " + f"when the model tries to write a very large " + f"file in a single tool call.", + force=True, + ) + self._vprint( + f"{self.log_prefix} Try asking the model " + f"to use execute_code with Python's open() for " + f"large files, or to write the file in smaller " + f"sections.", + force=True, + ) + logging.error( "%sAPI call failed after %s retries. %s | provider=%s model=%s msgs=%s tokens=~%s", self.log_prefix, max_retries, _final_summary, @@ -6987,8 +7080,18 @@ class AIAgent: api_kwargs, reason="max_retries_exhausted", error=api_error, ) self._persist_session(messages, conversation_history) + _final_response = f"API call failed after {max_retries} retries: {_final_summary}" + if _is_stream_drop: + _final_response += ( + "\n\nThe provider's stream connection keeps " + "dropping — this often happens when generating " + "very large tool call responses (e.g. write_file " + "with long content). Try asking me to use " + "execute_code with Python's open() for large " + "files, or to write in smaller sections." + ) return { - "final_response": f"API call failed after {max_retries} retries: {_final_summary}", + "final_response": _final_response, "messages": messages, "api_calls": api_call_count, "completed": False, diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 88e3aa9e8..f50510f0a 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -532,6 +532,121 @@ class TestStreamingFallback: mock_non_stream.assert_called_once() assert mock_close.call_count >= 1 + @patch("run_agent.AIAgent._interruptible_api_call") + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_sse_connection_lost_retried_as_transient(self, mock_close, mock_create, mock_non_stream): + """SSE 'Network connection lost' (APIError w/ no status_code) retries like httpx errors. + + OpenRouter sends {"error":{"message":"Network connection lost."}} as an SSE + event when the upstream stream drops. The OpenAI SDK raises APIError from + this. It should be retried at the streaming level, same as httpx connection + errors, before falling back to non-streaming. + """ + from run_agent import AIAgent + import httpx + + # Create an APIError that mimics what the OpenAI SDK raises from SSE error events. + # Key: no status_code attribute (unlike APIStatusError which has one). + from openai import APIError as OAIAPIError + sse_error = OAIAPIError( + message="Network connection lost.", + request=httpx.Request("POST", "https://openrouter.ai/api/v1/chat/completions"), + body={"message": "Network connection lost."}, + ) + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = sse_error + mock_create.return_value = mock_client + + fallback_response = SimpleNamespace( + id="fallback", + model="test", + choices=[SimpleNamespace( + index=0, + message=SimpleNamespace( + role="assistant", + content="fallback after SSE retries", + tool_calls=None, + reasoning_content=None, + ), + finish_reason="stop", + )], + usage=None, + ) + mock_non_stream.return_value = fallback_response + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + assert response.choices[0].message.content == "fallback after SSE retries" + # Should retry 3 times (default HERMES_STREAM_RETRIES=2 → 3 attempts) + # before falling back to non-streaming + assert mock_client.chat.completions.create.call_count == 3 + mock_non_stream.assert_called_once() + # Connection cleanup should happen for each failed retry + assert mock_close.call_count >= 2 + + @patch("run_agent.AIAgent._interruptible_api_call") + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_sse_non_connection_error_falls_back_immediately(self, mock_close, mock_create, mock_non_stream): + """SSE errors that aren't connection-related still fall back immediately (no stream retry).""" + from run_agent import AIAgent + import httpx + + from openai import APIError as OAIAPIError + sse_error = OAIAPIError( + message="Invalid model configuration.", + request=httpx.Request("POST", "https://openrouter.ai/api/v1/chat/completions"), + body={"message": "Invalid model configuration."}, + ) + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = sse_error + mock_create.return_value = mock_client + + fallback_response = SimpleNamespace( + id="fallback", + model="test", + choices=[SimpleNamespace( + index=0, + message=SimpleNamespace( + role="assistant", + content="fallback no retry", + tool_calls=None, + reasoning_content=None, + ), + finish_reason="stop", + )], + usage=None, + ) + mock_non_stream.return_value = fallback_response + + agent = AIAgent( + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + response = agent._interruptible_streaming_api_call({}) + + assert response.choices[0].message.content == "fallback no retry" + # Should NOT retry — goes straight to non-streaming fallback + assert mock_client.chat.completions.create.call_count == 1 + mock_non_stream.assert_called_once() + # ── Test: Reasoning Streaming ────────────────────────────────────────────