diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 68bbb4b04..425613cb5 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -478,7 +478,15 @@ class APIServerAdapter(BasePlatformAdapter): _stream_q: _q.Queue = _q.Queue() def _on_delta(delta): - _stream_q.put(delta) + # Filter out None — the agent fires stream_delta_callback(None) + # to signal the CLI display to close its response box before + # tool execution, but the SSE writer uses None as end-of-stream + # sentinel. Forwarding it would prematurely close the HTTP + # response, causing Open WebUI (and similar frontends) to miss + # the final answer after tool calls. The SSE loop detects + # completion via agent_task.done() instead. + if delta is not None: + _stream_q.put(delta) # Start agent in background agent_task = asyncio.ensure_future(self._run_agent( diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 96160b5a5..2ee928484 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -355,6 +355,54 @@ class TestChatCompletionsEndpoint: assert "[DONE]" in body assert "Hello!" in body + @pytest.mark.asyncio + async def test_stream_survives_tool_call_none_sentinel(self, adapter): + """stream_delta_callback(None) mid-stream (tool calls) must NOT kill the SSE stream. + + The agent fires stream_delta_callback(None) to tell the CLI display to + close its response box before executing tool calls. The API server's + _on_delta must filter this out so the SSE response stays open and the + final answer (streamed after tool execution) reaches the client. + """ + import asyncio + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + async def _mock_run_agent(**kwargs): + cb = kwargs.get("stream_delta_callback") + if cb: + # Simulate: agent streams partial text, then fires None + # (tool call box-close signal), then streams the final answer + cb("Thinking") + cb(None) # mid-stream None from tool calls + await asyncio.sleep(0.05) # simulate tool execution delay + cb(" about it...") + cb(None) # another None (possible second tool round) + await asyncio.sleep(0.05) + cb(" The answer is 42.") + return ( + {"final_response": "Thinking about it... The answer is 42.", "messages": [], "api_calls": 3}, + {"input_tokens": 20, "output_tokens": 15, "total_tokens": 35}, + ) + + with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent): + resp = await cli.post( + "/v1/chat/completions", + json={ + "model": "test", + "messages": [{"role": "user", "content": "What is the answer?"}], + "stream": True, + }, + ) + assert resp.status == 200 + body = await resp.text() + assert "[DONE]" in body + # The final answer text must appear in the SSE stream + assert "The answer is 42." in body + # All partial text must be present too + assert "Thinking" in body + assert " about it..." in body + @pytest.mark.asyncio async def test_no_user_message_returns_400(self, adapter): app = _create_app(adapter)