From b2a6b012fe164ce97862849bcd346ad9ff278e4c Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Wed, 25 Mar 2026 09:56:20 -0700 Subject: [PATCH] fix(api_server): streaming breaks when agent makes tool calls (#2985) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(run_agent): ensure _fire_first_delta() is called for tool generation events Added calls to _fire_first_delta() in the AIAgent class to improve the handling of tool generation events, ensuring timely notifications during the processing of function calls and tool usage. * fix(run_agent): improve timeout handling for chat completions Enhanced the timeout configuration for chat completions in the AIAgent class by introducing customizable connection, read, and write timeouts using environment variables. This ensures more robust handling of API requests during streaming operations. * fix(run_agent): reduce default stream read timeout for chat completions Updated the default stream read timeout from 120 seconds to 60 seconds in the AIAgent class, enhancing the timeout configuration for chat completions. This change aims to improve responsiveness during streaming operations. * fix(run_agent): enhance streaming error handling and retry logic Improved the error handling and retry mechanism for streaming requests in the AIAgent class. Introduced a configurable maximum number of stream retries and refined the handling of transient network errors, allowing for retries with fresh connections. Non-transient errors now trigger a fallback to non-streaming only when appropriate, ensuring better resilience during API interactions. * fix(api_server): streaming breaks when agent makes tool calls The agent fires stream_delta_callback(None) to signal the CLI display to close its response box before tool execution begins. The API server's _on_delta callback was forwarding this None directly into the SSE queue, where the SSE writer treats it as end-of-stream and terminates the HTTP response prematurely. After tool calls complete, the agent streams the final answer through the same callback, but the SSE response was already closed — so Open WebUI (and similar frontends) never received the actual answer. Fix: filter out None in _on_delta so the SSE stream stays open. The SSE loop already detects completion via agent_task.done(), which handles stream termination correctly without needing the None sentinel. Reported by Rohit Paul on X. --- gateway/platforms/api_server.py | 10 ++++++- tests/gateway/test_api_server.py | 48 ++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 68bbb4b04..425613cb5 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -478,7 +478,15 @@ class APIServerAdapter(BasePlatformAdapter): _stream_q: _q.Queue = _q.Queue() def _on_delta(delta): - _stream_q.put(delta) + # Filter out None — the agent fires stream_delta_callback(None) + # to signal the CLI display to close its response box before + # tool execution, but the SSE writer uses None as end-of-stream + # sentinel. Forwarding it would prematurely close the HTTP + # response, causing Open WebUI (and similar frontends) to miss + # the final answer after tool calls. The SSE loop detects + # completion via agent_task.done() instead. + if delta is not None: + _stream_q.put(delta) # Start agent in background agent_task = asyncio.ensure_future(self._run_agent( diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 96160b5a5..2ee928484 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -355,6 +355,54 @@ class TestChatCompletionsEndpoint: assert "[DONE]" in body assert "Hello!" in body + @pytest.mark.asyncio + async def test_stream_survives_tool_call_none_sentinel(self, adapter): + """stream_delta_callback(None) mid-stream (tool calls) must NOT kill the SSE stream. + + The agent fires stream_delta_callback(None) to tell the CLI display to + close its response box before executing tool calls. The API server's + _on_delta must filter this out so the SSE response stays open and the + final answer (streamed after tool execution) reaches the client. + """ + import asyncio + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + async def _mock_run_agent(**kwargs): + cb = kwargs.get("stream_delta_callback") + if cb: + # Simulate: agent streams partial text, then fires None + # (tool call box-close signal), then streams the final answer + cb("Thinking") + cb(None) # mid-stream None from tool calls + await asyncio.sleep(0.05) # simulate tool execution delay + cb(" about it...") + cb(None) # another None (possible second tool round) + await asyncio.sleep(0.05) + cb(" The answer is 42.") + return ( + {"final_response": "Thinking about it... The answer is 42.", "messages": [], "api_calls": 3}, + {"input_tokens": 20, "output_tokens": 15, "total_tokens": 35}, + ) + + with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent): + resp = await cli.post( + "/v1/chat/completions", + json={ + "model": "test", + "messages": [{"role": "user", "content": "What is the answer?"}], + "stream": True, + }, + ) + assert resp.status == 200 + body = await resp.text() + assert "[DONE]" in body + # The final answer text must appear in the SSE stream + assert "The answer is 42." in body + # All partial text must be present too + assert "Thinking" in body + assert " about it..." in body + @pytest.mark.asyncio async def test_no_user_message_returns_400(self, adapter): app = _create_app(adapter)