""" Tests for the OpenAI-compatible API server gateway adapter. Tests cover: - Chat Completions endpoint (request parsing, response format) - Responses API endpoint (request parsing, response format) - previous_response_id chaining (store/retrieve) - Auth (valid key, invalid key, no key configured) - /v1/models endpoint - /health endpoint - System prompt extraction - Error handling (invalid JSON, missing fields) """ import json import time import uuid from unittest.mock import AsyncMock, MagicMock, patch import pytest from aiohttp import web from aiohttp.test_utils import AioHTTPTestCase, TestClient, TestServer from gateway.config import GatewayConfig, Platform, PlatformConfig from gateway.platforms.api_server import ( APIServerAdapter, ResponseStore, _CORS_HEADERS, check_api_server_requirements, cors_middleware, security_headers_middleware, ) # --------------------------------------------------------------------------- # check_api_server_requirements # --------------------------------------------------------------------------- class TestCheckRequirements: def test_returns_true_when_aiohttp_available(self): assert check_api_server_requirements() is True @patch("gateway.platforms.api_server.AIOHTTP_AVAILABLE", False) def test_returns_false_without_aiohttp(self): assert check_api_server_requirements() is False # --------------------------------------------------------------------------- # ResponseStore # --------------------------------------------------------------------------- class TestResponseStore: def test_put_and_get(self): store = ResponseStore(max_size=10) store.put("resp_1", {"output": "hello"}) assert store.get("resp_1") == {"output": "hello"} def test_get_missing_returns_none(self): store = ResponseStore(max_size=10) assert store.get("resp_missing") is None def test_lru_eviction(self): store = ResponseStore(max_size=3) store.put("resp_1", {"output": "one"}) store.put("resp_2", {"output": "two"}) store.put("resp_3", {"output": "three"}) # Adding a 4th should evict resp_1 store.put("resp_4", {"output": "four"}) assert store.get("resp_1") is None assert store.get("resp_2") is not None assert len(store) == 3 def test_access_refreshes_lru(self): store = ResponseStore(max_size=3) store.put("resp_1", {"output": "one"}) store.put("resp_2", {"output": "two"}) store.put("resp_3", {"output": "three"}) # Access resp_1 to move it to end store.get("resp_1") # Now resp_2 is the oldest — adding a 4th should evict resp_2 store.put("resp_4", {"output": "four"}) assert store.get("resp_2") is None assert store.get("resp_1") is not None def test_update_existing_key(self): store = ResponseStore(max_size=10) store.put("resp_1", {"output": "v1"}) store.put("resp_1", {"output": "v2"}) assert store.get("resp_1") == {"output": "v2"} assert len(store) == 1 def test_delete_existing(self): store = ResponseStore(max_size=10) store.put("resp_1", {"output": "hello"}) assert store.delete("resp_1") is True assert store.get("resp_1") is None assert len(store) == 0 def test_delete_missing(self): store = ResponseStore(max_size=10) assert store.delete("resp_missing") is False # --------------------------------------------------------------------------- # Adapter initialization # --------------------------------------------------------------------------- class TestAdapterInit: def test_default_config(self): config = PlatformConfig(enabled=True) adapter = APIServerAdapter(config) assert adapter._host == "127.0.0.1" assert adapter._port == 8642 assert adapter._api_key == "" assert adapter.platform == Platform.API_SERVER def test_custom_config_from_extra(self): config = PlatformConfig( enabled=True, extra={ "host": "0.0.0.0", "port": 9999, "key": "sk-test", "cors_origins": ["http://localhost:3000"], }, ) adapter = APIServerAdapter(config) assert adapter._host == "0.0.0.0" assert adapter._port == 9999 assert adapter._api_key == "sk-test" assert adapter._cors_origins == ("http://localhost:3000",) def test_config_from_env(self, monkeypatch): monkeypatch.setenv("API_SERVER_HOST", "10.0.0.1") monkeypatch.setenv("API_SERVER_PORT", "7777") monkeypatch.setenv("API_SERVER_KEY", "sk-env") monkeypatch.setenv("API_SERVER_CORS_ORIGINS", "http://localhost:3000, http://127.0.0.1:3000") config = PlatformConfig(enabled=True) adapter = APIServerAdapter(config) assert adapter._host == "10.0.0.1" assert adapter._port == 7777 assert adapter._api_key == "sk-env" assert adapter._cors_origins == ( "http://localhost:3000", "http://127.0.0.1:3000", ) # --------------------------------------------------------------------------- # Auth checking # --------------------------------------------------------------------------- class TestAuth: def test_no_key_configured_allows_all(self): config = PlatformConfig(enabled=True) adapter = APIServerAdapter(config) mock_request = MagicMock() mock_request.headers = {} assert adapter._check_auth(mock_request) is None def test_valid_key_passes(self): config = PlatformConfig(enabled=True, extra={"key": "sk-test123"}) adapter = APIServerAdapter(config) mock_request = MagicMock() mock_request.headers = {"Authorization": "Bearer sk-test123"} assert adapter._check_auth(mock_request) is None def test_invalid_key_returns_401(self): config = PlatformConfig(enabled=True, extra={"key": "sk-test123"}) adapter = APIServerAdapter(config) mock_request = MagicMock() mock_request.headers = {"Authorization": "Bearer wrong-key"} result = adapter._check_auth(mock_request) assert result is not None assert result.status == 401 def test_missing_auth_header_returns_401(self): config = PlatformConfig(enabled=True, extra={"key": "sk-test123"}) adapter = APIServerAdapter(config) mock_request = MagicMock() mock_request.headers = {} result = adapter._check_auth(mock_request) assert result is not None assert result.status == 401 def test_malformed_auth_header_returns_401(self): config = PlatformConfig(enabled=True, extra={"key": "sk-test123"}) adapter = APIServerAdapter(config) mock_request = MagicMock() mock_request.headers = {"Authorization": "Basic dXNlcjpwYXNz"} result = adapter._check_auth(mock_request) assert result is not None assert result.status == 401 # --------------------------------------------------------------------------- # Helpers for HTTP tests # --------------------------------------------------------------------------- def _make_adapter(api_key: str = "", cors_origins=None) -> APIServerAdapter: """Create an adapter with optional API key.""" extra = {} if api_key: extra["key"] = api_key if cors_origins is not None: extra["cors_origins"] = cors_origins config = PlatformConfig(enabled=True, extra=extra) return APIServerAdapter(config) def _create_app(adapter: APIServerAdapter) -> web.Application: """Create the aiohttp app from the adapter (without starting the full server).""" mws = [mw for mw in (cors_middleware, security_headers_middleware) if mw is not None] app = web.Application(middlewares=mws) app["api_server_adapter"] = adapter app.router.add_get("/health", adapter._handle_health) app.router.add_get("/v1/health", adapter._handle_health) app.router.add_get("/v1/models", adapter._handle_models) app.router.add_post("/v1/chat/completions", adapter._handle_chat_completions) app.router.add_post("/v1/responses", adapter._handle_responses) app.router.add_get("/v1/responses/{response_id}", adapter._handle_get_response) app.router.add_delete("/v1/responses/{response_id}", adapter._handle_delete_response) return app @pytest.fixture def adapter(): return _make_adapter() @pytest.fixture def auth_adapter(): return _make_adapter(api_key="sk-secret") # --------------------------------------------------------------------------- # /health endpoint # --------------------------------------------------------------------------- class TestHealthEndpoint: @pytest.mark.asyncio async def test_security_headers_present(self, adapter): """Responses should include basic security headers.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/health") assert resp.status == 200 assert resp.headers.get("X-Content-Type-Options") == "nosniff" assert resp.headers.get("Referrer-Policy") == "no-referrer" @pytest.mark.asyncio async def test_health_returns_ok(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/health") assert resp.status == 200 data = await resp.json() assert data["status"] == "ok" assert data["platform"] == "hermes-agent" @pytest.mark.asyncio async def test_v1_health_alias_returns_ok(self, adapter): """GET /v1/health should return the same response as /health.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/v1/health") assert resp.status == 200 data = await resp.json() assert data["status"] == "ok" assert data["platform"] == "hermes-agent" # --------------------------------------------------------------------------- # /v1/models endpoint # --------------------------------------------------------------------------- class TestModelsEndpoint: @pytest.mark.asyncio async def test_models_returns_hermes_agent(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/v1/models") assert resp.status == 200 data = await resp.json() assert data["object"] == "list" assert len(data["data"]) == 1 assert data["data"][0]["id"] == "hermes-agent" assert data["data"][0]["owned_by"] == "hermes" @pytest.mark.asyncio async def test_models_requires_auth(self, auth_adapter): app = _create_app(auth_adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/v1/models") assert resp.status == 401 @pytest.mark.asyncio async def test_models_with_valid_auth(self, auth_adapter): app = _create_app(auth_adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get( "/v1/models", headers={"Authorization": "Bearer sk-secret"}, ) assert resp.status == 200 # --------------------------------------------------------------------------- # /v1/chat/completions endpoint # --------------------------------------------------------------------------- class TestChatCompletionsEndpoint: @pytest.mark.asyncio async def test_invalid_json_returns_400(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post( "/v1/chat/completions", data="not json", headers={"Content-Type": "application/json"}, ) assert resp.status == 400 data = await resp.json() assert "Invalid JSON" in data["error"]["message"] @pytest.mark.asyncio async def test_missing_messages_returns_400(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post("/v1/chat/completions", json={"model": "test"}) assert resp.status == 400 data = await resp.json() assert "messages" in data["error"]["message"] @pytest.mark.asyncio async def test_empty_messages_returns_400(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post("/v1/chat/completions", json={"model": "test", "messages": []}) assert resp.status == 400 @pytest.mark.asyncio async def test_stream_true_returns_sse(self, adapter): """stream=true returns SSE format with the full response.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: async def _mock_run_agent(**kwargs): # Simulate streaming: invoke stream_delta_callback with tokens cb = kwargs.get("stream_delta_callback") if cb: cb("Hello!") cb(None) # End signal return ( {"final_response": "Hello!", "messages": [], "api_calls": 1}, {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, ) with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent) as mock_run: resp = await cli.post( "/v1/chat/completions", json={ "model": "test", "messages": [{"role": "user", "content": "hi"}], "stream": True, }, ) assert resp.status == 200 assert "text/event-stream" in resp.headers.get("Content-Type", "") body = await resp.text() assert "data: " in body assert "[DONE]" in body assert "Hello!" in body @pytest.mark.asyncio async def test_stream_survives_tool_call_none_sentinel(self, adapter): """stream_delta_callback(None) mid-stream (tool calls) must NOT kill the SSE stream. The agent fires stream_delta_callback(None) to tell the CLI display to close its response box before executing tool calls. The API server's _on_delta must filter this out so the SSE response stays open and the final answer (streamed after tool execution) reaches the client. """ import asyncio app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: async def _mock_run_agent(**kwargs): cb = kwargs.get("stream_delta_callback") if cb: # Simulate: agent streams partial text, then fires None # (tool call box-close signal), then streams the final answer cb("Thinking") cb(None) # mid-stream None from tool calls await asyncio.sleep(0.05) # simulate tool execution delay cb(" about it...") cb(None) # another None (possible second tool round) await asyncio.sleep(0.05) cb(" The answer is 42.") return ( {"final_response": "Thinking about it... The answer is 42.", "messages": [], "api_calls": 3}, {"input_tokens": 20, "output_tokens": 15, "total_tokens": 35}, ) with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent): resp = await cli.post( "/v1/chat/completions", json={ "model": "test", "messages": [{"role": "user", "content": "What is the answer?"}], "stream": True, }, ) assert resp.status == 200 body = await resp.text() assert "[DONE]" in body # The final answer text must appear in the SSE stream assert "The answer is 42." in body # All partial text must be present too assert "Thinking" in body assert " about it..." in body @pytest.mark.asyncio async def test_no_user_message_returns_400(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post( "/v1/chat/completions", json={ "model": "test", "messages": [{"role": "system", "content": "You are helpful."}], }, ) assert resp.status == 400 @pytest.mark.asyncio async def test_successful_completion(self, adapter): """Test a successful chat completion with mocked agent.""" mock_result = { "final_response": "Hello! How can I help you today?", "messages": [], "api_calls": 1, } app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/chat/completions", json={ "model": "hermes-agent", "messages": [{"role": "user", "content": "Hello"}], }, ) assert resp.status == 200 data = await resp.json() assert data["object"] == "chat.completion" assert data["id"].startswith("chatcmpl-") assert data["model"] == "hermes-agent" assert len(data["choices"]) == 1 assert data["choices"][0]["message"]["role"] == "assistant" assert data["choices"][0]["message"]["content"] == "Hello! How can I help you today?" assert data["choices"][0]["finish_reason"] == "stop" assert "usage" in data @pytest.mark.asyncio async def test_system_prompt_extracted(self, adapter): """System messages from the client are passed as ephemeral_system_prompt.""" mock_result = { "final_response": "I am a pirate! Arrr!", "messages": [], "api_calls": 1, } app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/chat/completions", json={ "model": "hermes-agent", "messages": [ {"role": "system", "content": "You are a pirate."}, {"role": "user", "content": "Hello"}, ], }, ) assert resp.status == 200 # Check that _run_agent was called with the system prompt call_kwargs = mock_run.call_args assert call_kwargs.kwargs.get("ephemeral_system_prompt") == "You are a pirate." assert call_kwargs.kwargs.get("user_message") == "Hello" @pytest.mark.asyncio async def test_conversation_history_passed(self, adapter): """Previous user/assistant messages become conversation_history.""" mock_result = {"final_response": "3", "messages": [], "api_calls": 1} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/chat/completions", json={ "model": "hermes-agent", "messages": [ {"role": "user", "content": "1+1=?"}, {"role": "assistant", "content": "2"}, {"role": "user", "content": "Now add 1 more"}, ], }, ) assert resp.status == 200 call_kwargs = mock_run.call_args.kwargs assert call_kwargs["user_message"] == "Now add 1 more" assert len(call_kwargs["conversation_history"]) == 2 assert call_kwargs["conversation_history"][0] == {"role": "user", "content": "1+1=?"} assert call_kwargs["conversation_history"][1] == {"role": "assistant", "content": "2"} @pytest.mark.asyncio async def test_agent_error_returns_500(self, adapter): """Agent exception returns 500.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.side_effect = RuntimeError("Provider failed") resp = await cli.post( "/v1/chat/completions", json={ "model": "hermes-agent", "messages": [{"role": "user", "content": "Hello"}], }, ) assert resp.status == 500 data = await resp.json() assert "Provider failed" in data["error"]["message"] # --------------------------------------------------------------------------- # /v1/responses endpoint # --------------------------------------------------------------------------- class TestResponsesEndpoint: @pytest.mark.asyncio async def test_missing_input_returns_400(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post("/v1/responses", json={"model": "test"}) assert resp.status == 400 data = await resp.json() assert "input" in data["error"]["message"] @pytest.mark.asyncio async def test_invalid_json_returns_400(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post( "/v1/responses", data="not json", headers={"Content-Type": "application/json"}, ) assert resp.status == 400 @pytest.mark.asyncio async def test_successful_response_with_string_input(self, adapter): """String input is wrapped in a user message.""" mock_result = { "final_response": "Paris is the capital of France.", "messages": [], "api_calls": 1, } app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/responses", json={ "model": "hermes-agent", "input": "What is the capital of France?", }, ) assert resp.status == 200 data = await resp.json() assert data["object"] == "response" assert data["id"].startswith("resp_") assert data["status"] == "completed" assert len(data["output"]) == 1 assert data["output"][0]["type"] == "message" assert data["output"][0]["content"][0]["type"] == "output_text" assert data["output"][0]["content"][0]["text"] == "Paris is the capital of France." @pytest.mark.asyncio async def test_successful_response_with_array_input(self, adapter): """Array input with role/content objects.""" mock_result = {"final_response": "Done", "messages": [], "api_calls": 1} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/responses", json={ "model": "hermes-agent", "input": [ {"role": "user", "content": "Hello"}, {"role": "user", "content": "What is 2+2?"}, ], }, ) assert resp.status == 200 call_kwargs = mock_run.call_args.kwargs # Last message is user_message, rest are history assert call_kwargs["user_message"] == "What is 2+2?" assert len(call_kwargs["conversation_history"]) == 1 @pytest.mark.asyncio async def test_instructions_as_ephemeral_prompt(self, adapter): """The instructions field maps to ephemeral_system_prompt.""" mock_result = {"final_response": "Ahoy!", "messages": [], "api_calls": 1} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/responses", json={ "model": "hermes-agent", "input": "Hello", "instructions": "Talk like a pirate.", }, ) assert resp.status == 200 call_kwargs = mock_run.call_args.kwargs assert call_kwargs["ephemeral_system_prompt"] == "Talk like a pirate." @pytest.mark.asyncio async def test_previous_response_id_chaining(self, adapter): """Test that responses can be chained via previous_response_id.""" mock_result_1 = { "final_response": "2", "messages": [{"role": "assistant", "content": "2"}], "api_calls": 1, } app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: # First request with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result_1, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp1 = await cli.post( "/v1/responses", json={"model": "hermes-agent", "input": "What is 1+1?"}, ) assert resp1.status == 200 data1 = await resp1.json() response_id = data1["id"] # Second request chaining from the first mock_result_2 = { "final_response": "3", "messages": [{"role": "assistant", "content": "3"}], "api_calls": 1, } with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result_2, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp2 = await cli.post( "/v1/responses", json={ "model": "hermes-agent", "input": "Now add 1 more", "previous_response_id": response_id, }, ) assert resp2.status == 200 # The conversation_history should contain the full history from the first response call_kwargs = mock_run.call_args.kwargs assert len(call_kwargs["conversation_history"]) > 0 assert call_kwargs["user_message"] == "Now add 1 more" @pytest.mark.asyncio async def test_invalid_previous_response_id_returns_404(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post( "/v1/responses", json={ "model": "hermes-agent", "input": "follow up", "previous_response_id": "resp_nonexistent", }, ) assert resp.status == 404 @pytest.mark.asyncio async def test_store_false_does_not_store(self, adapter): """When store=false, the response is NOT stored.""" mock_result = {"final_response": "OK", "messages": [], "api_calls": 1} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/responses", json={ "model": "hermes-agent", "input": "Hello", "store": False, }, ) assert resp.status == 200 data = await resp.json() # The response has an ID but it shouldn't be retrievable assert adapter._response_store.get(data["id"]) is None @pytest.mark.asyncio async def test_instructions_inherited_from_previous(self, adapter): """If no instructions provided, carry forward from previous response.""" mock_result = {"final_response": "Ahoy!", "messages": [], "api_calls": 1} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: # First request with instructions with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp1 = await cli.post( "/v1/responses", json={ "model": "hermes-agent", "input": "Hello", "instructions": "Be a pirate", }, ) data1 = await resp1.json() resp_id = data1["id"] # Second request without instructions with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp2 = await cli.post( "/v1/responses", json={ "model": "hermes-agent", "input": "Tell me more", "previous_response_id": resp_id, }, ) assert resp2.status == 200 call_kwargs = mock_run.call_args.kwargs assert call_kwargs["ephemeral_system_prompt"] == "Be a pirate" @pytest.mark.asyncio async def test_agent_error_returns_500(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.side_effect = RuntimeError("Boom") resp = await cli.post( "/v1/responses", json={"model": "hermes-agent", "input": "Hello"}, ) assert resp.status == 500 @pytest.mark.asyncio async def test_invalid_input_type_returns_400(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post( "/v1/responses", json={"model": "hermes-agent", "input": 42}, ) assert resp.status == 400 # --------------------------------------------------------------------------- # Auth on endpoints # --------------------------------------------------------------------------- class TestEndpointAuth: @pytest.mark.asyncio async def test_chat_completions_requires_auth(self, auth_adapter): app = _create_app(auth_adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post( "/v1/chat/completions", json={"model": "test", "messages": [{"role": "user", "content": "hi"}]}, ) assert resp.status == 401 @pytest.mark.asyncio async def test_responses_requires_auth(self, auth_adapter): app = _create_app(auth_adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post( "/v1/responses", json={"model": "test", "input": "hi"}, ) assert resp.status == 401 @pytest.mark.asyncio async def test_models_requires_auth(self, auth_adapter): app = _create_app(auth_adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/v1/models") assert resp.status == 401 @pytest.mark.asyncio async def test_health_does_not_require_auth(self, auth_adapter): app = _create_app(auth_adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/health") assert resp.status == 200 # --------------------------------------------------------------------------- # Config integration # --------------------------------------------------------------------------- class TestConfigIntegration: def test_platform_enum_has_api_server(self): assert Platform.API_SERVER.value == "api_server" def test_env_override_enables_api_server(self, monkeypatch): monkeypatch.setenv("API_SERVER_ENABLED", "true") from gateway.config import load_gateway_config config = load_gateway_config() assert Platform.API_SERVER in config.platforms assert config.platforms[Platform.API_SERVER].enabled is True def test_env_override_with_key(self, monkeypatch): monkeypatch.setenv("API_SERVER_KEY", "sk-mykey") from gateway.config import load_gateway_config config = load_gateway_config() assert Platform.API_SERVER in config.platforms assert config.platforms[Platform.API_SERVER].extra.get("key") == "sk-mykey" def test_env_override_port_and_host(self, monkeypatch): monkeypatch.setenv("API_SERVER_ENABLED", "true") monkeypatch.setenv("API_SERVER_PORT", "9999") monkeypatch.setenv("API_SERVER_HOST", "0.0.0.0") from gateway.config import load_gateway_config config = load_gateway_config() assert config.platforms[Platform.API_SERVER].extra.get("port") == 9999 assert config.platforms[Platform.API_SERVER].extra.get("host") == "0.0.0.0" def test_env_override_cors_origins(self, monkeypatch): monkeypatch.setenv("API_SERVER_ENABLED", "true") monkeypatch.setenv( "API_SERVER_CORS_ORIGINS", "http://localhost:3000, http://127.0.0.1:3000", ) from gateway.config import load_gateway_config config = load_gateway_config() assert config.platforms[Platform.API_SERVER].extra.get("cors_origins") == [ "http://localhost:3000", "http://127.0.0.1:3000", ] def test_api_server_in_connected_platforms(self): config = GatewayConfig() config.platforms[Platform.API_SERVER] = PlatformConfig(enabled=True) connected = config.get_connected_platforms() assert Platform.API_SERVER in connected def test_api_server_not_in_connected_when_disabled(self): config = GatewayConfig() config.platforms[Platform.API_SERVER] = PlatformConfig(enabled=False) connected = config.get_connected_platforms() assert Platform.API_SERVER not in connected # --------------------------------------------------------------------------- # Multiple system messages # --------------------------------------------------------------------------- class TestMultipleSystemMessages: @pytest.mark.asyncio async def test_multiple_system_messages_concatenated(self, adapter): mock_result = {"final_response": "OK", "messages": [], "api_calls": 1} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/chat/completions", json={ "model": "hermes-agent", "messages": [ {"role": "system", "content": "You are helpful."}, {"role": "system", "content": "Be concise."}, {"role": "user", "content": "Hello"}, ], }, ) assert resp.status == 200 call_kwargs = mock_run.call_args.kwargs prompt = call_kwargs["ephemeral_system_prompt"] assert "You are helpful." in prompt assert "Be concise." in prompt # --------------------------------------------------------------------------- # send() method (not used but required by base) # --------------------------------------------------------------------------- class TestSendMethod: @pytest.mark.asyncio async def test_send_returns_not_supported(self): config = PlatformConfig(enabled=True) adapter = APIServerAdapter(config) result = await adapter.send("chat1", "hello") assert result.success is False assert "HTTP request/response" in result.error # --------------------------------------------------------------------------- # GET /v1/responses/{response_id} # --------------------------------------------------------------------------- class TestGetResponse: @pytest.mark.asyncio async def test_get_stored_response(self, adapter): """GET returns a previously stored response.""" mock_result = {"final_response": "Hello!", "messages": [], "api_calls": 1} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: # Create a response first with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}) resp = await cli.post( "/v1/responses", json={"model": "hermes-agent", "input": "Hi"}, ) assert resp.status == 200 data = await resp.json() response_id = data["id"] # Now GET it resp2 = await cli.get(f"/v1/responses/{response_id}") assert resp2.status == 200 data2 = await resp2.json() assert data2["id"] == response_id assert data2["object"] == "response" assert data2["status"] == "completed" @pytest.mark.asyncio async def test_get_not_found(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/v1/responses/resp_nonexistent") assert resp.status == 404 @pytest.mark.asyncio async def test_get_requires_auth(self, auth_adapter): app = _create_app(auth_adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/v1/responses/resp_any") assert resp.status == 401 # --------------------------------------------------------------------------- # DELETE /v1/responses/{response_id} # --------------------------------------------------------------------------- class TestDeleteResponse: @pytest.mark.asyncio async def test_delete_stored_response(self, adapter): """DELETE removes a stored response and returns confirmation.""" mock_result = {"final_response": "Hello!", "messages": [], "api_calls": 1} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/responses", json={"model": "hermes-agent", "input": "Hi"}, ) data = await resp.json() response_id = data["id"] # Delete it resp2 = await cli.delete(f"/v1/responses/{response_id}") assert resp2.status == 200 data2 = await resp2.json() assert data2["id"] == response_id assert data2["object"] == "response" assert data2["deleted"] is True # Verify it's gone resp3 = await cli.get(f"/v1/responses/{response_id}") assert resp3.status == 404 @pytest.mark.asyncio async def test_delete_not_found(self, adapter): app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.delete("/v1/responses/resp_nonexistent") assert resp.status == 404 @pytest.mark.asyncio async def test_delete_requires_auth(self, auth_adapter): app = _create_app(auth_adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.delete("/v1/responses/resp_any") assert resp.status == 401 # --------------------------------------------------------------------------- # Tool calls in output # --------------------------------------------------------------------------- class TestToolCallsInOutput: @pytest.mark.asyncio async def test_tool_calls_in_output(self, adapter): """When agent returns tool calls, they appear as function_call items.""" mock_result = { "final_response": "The result is 42.", "messages": [ { "role": "assistant", "content": None, "tool_calls": [ { "id": "call_abc123", "function": { "name": "calculator", "arguments": '{"expression": "6*7"}', }, } ], }, { "role": "tool", "tool_call_id": "call_abc123", "content": "42", }, { "role": "assistant", "content": "The result is 42.", }, ], "api_calls": 2, } app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/responses", json={"model": "hermes-agent", "input": "What is 6*7?"}, ) assert resp.status == 200 data = await resp.json() output = data["output"] # Should have: function_call, function_call_output, message assert len(output) == 3 assert output[0]["type"] == "function_call" assert output[0]["name"] == "calculator" assert output[0]["arguments"] == '{"expression": "6*7"}' assert output[0]["call_id"] == "call_abc123" assert output[1]["type"] == "function_call_output" assert output[1]["call_id"] == "call_abc123" assert output[1]["output"] == "42" assert output[2]["type"] == "message" assert output[2]["content"][0]["text"] == "The result is 42." @pytest.mark.asyncio async def test_no_tool_calls_still_works(self, adapter): """Without tool calls, output is just a message.""" mock_result = {"final_response": "Hello!", "messages": [], "api_calls": 1} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/responses", json={"model": "hermes-agent", "input": "Hello"}, ) assert resp.status == 200 data = await resp.json() assert len(data["output"]) == 1 assert data["output"][0]["type"] == "message" # --------------------------------------------------------------------------- # Usage / token counting # --------------------------------------------------------------------------- class TestUsageCounting: @pytest.mark.asyncio async def test_responses_usage(self, adapter): """Responses API returns real token counts.""" mock_result = {"final_response": "Done", "messages": [], "api_calls": 1} usage = {"input_tokens": 100, "output_tokens": 50, "total_tokens": 150} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, usage) resp = await cli.post( "/v1/responses", json={"model": "hermes-agent", "input": "Hi"}, ) assert resp.status == 200 data = await resp.json() assert data["usage"]["input_tokens"] == 100 assert data["usage"]["output_tokens"] == 50 assert data["usage"]["total_tokens"] == 150 @pytest.mark.asyncio async def test_chat_completions_usage(self, adapter): """Chat completions returns real token counts.""" mock_result = {"final_response": "Done", "messages": [], "api_calls": 1} usage = {"input_tokens": 200, "output_tokens": 80, "total_tokens": 280} app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, usage) resp = await cli.post( "/v1/chat/completions", json={ "model": "hermes-agent", "messages": [{"role": "user", "content": "Hi"}], }, ) assert resp.status == 200 data = await resp.json() assert data["usage"]["prompt_tokens"] == 200 assert data["usage"]["completion_tokens"] == 80 assert data["usage"]["total_tokens"] == 280 # --------------------------------------------------------------------------- # Truncation # --------------------------------------------------------------------------- class TestTruncation: @pytest.mark.asyncio async def test_truncation_auto_limits_history(self, adapter): """With truncation=auto, history over 100 messages is trimmed.""" mock_result = {"final_response": "OK", "messages": [], "api_calls": 1} # Pre-seed a stored response with a long history long_history = [{"role": "user", "content": f"msg {i}"} for i in range(150)] adapter._response_store.put("resp_prev", { "response": {"id": "resp_prev", "object": "response"}, "conversation_history": long_history, "instructions": None, }) app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/responses", json={ "model": "hermes-agent", "input": "follow up", "previous_response_id": "resp_prev", "truncation": "auto", }, ) assert resp.status == 200 call_kwargs = mock_run.call_args.kwargs # History should be truncated to 100 assert len(call_kwargs["conversation_history"]) <= 100 @pytest.mark.asyncio async def test_no_truncation_keeps_full_history(self, adapter): """Without truncation=auto, long history is passed as-is.""" mock_result = {"final_response": "OK", "messages": [], "api_calls": 1} long_history = [{"role": "user", "content": f"msg {i}"} for i in range(150)] adapter._response_store.put("resp_prev2", { "response": {"id": "resp_prev2", "object": "response"}, "conversation_history": long_history, "instructions": None, }) app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = (mock_result, {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}) resp = await cli.post( "/v1/responses", json={ "model": "hermes-agent", "input": "follow up", "previous_response_id": "resp_prev2", }, ) assert resp.status == 200 call_kwargs = mock_run.call_args.kwargs assert len(call_kwargs["conversation_history"]) == 150 # --------------------------------------------------------------------------- # CORS # --------------------------------------------------------------------------- class TestCORS: def test_origin_allowed_for_non_browser_client(self, adapter): assert adapter._origin_allowed("") is True def test_origin_rejected_by_default(self, adapter): assert adapter._origin_allowed("http://evil.example") is False def test_origin_allowed_for_allowlist_match(self): adapter = _make_adapter(cors_origins=["http://localhost:3000"]) assert adapter._origin_allowed("http://localhost:3000") is True def test_cors_headers_for_origin_disabled_by_default(self, adapter): assert adapter._cors_headers_for_origin("http://localhost:3000") is None def test_cors_headers_for_origin_matches_allowlist(self): adapter = _make_adapter(cors_origins=["http://localhost:3000"]) headers = adapter._cors_headers_for_origin("http://localhost:3000") assert headers is not None assert headers["Access-Control-Allow-Origin"] == "http://localhost:3000" assert "POST" in headers["Access-Control-Allow-Methods"] def test_cors_headers_for_origin_rejects_unknown_origin(self): adapter = _make_adapter(cors_origins=["http://localhost:3000"]) assert adapter._cors_headers_for_origin("http://evil.example") is None @pytest.mark.asyncio async def test_cors_headers_not_present_by_default(self, adapter): """CORS is disabled unless explicitly configured.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/health") assert resp.status == 200 assert resp.headers.get("Access-Control-Allow-Origin") is None @pytest.mark.asyncio async def test_browser_origin_rejected_by_default(self, adapter): """Browser-originated requests are rejected unless explicitly allowed.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/health", headers={"Origin": "http://evil.example"}) assert resp.status == 403 assert resp.headers.get("Access-Control-Allow-Origin") is None @pytest.mark.asyncio async def test_cors_options_preflight_rejected_by_default(self, adapter): """Browser preflight is rejected unless CORS is explicitly configured.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.options( "/v1/chat/completions", headers={ "Origin": "http://evil.example", "Access-Control-Request-Method": "POST", }, ) assert resp.status == 403 assert resp.headers.get("Access-Control-Allow-Origin") is None @pytest.mark.asyncio async def test_cors_headers_present_for_allowed_origin(self): """Allowed origins receive explicit CORS headers.""" adapter = _make_adapter(cors_origins=["http://localhost:3000"]) app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/health", headers={"Origin": "http://localhost:3000"}) assert resp.status == 200 assert resp.headers.get("Access-Control-Allow-Origin") == "http://localhost:3000" assert "POST" in resp.headers.get("Access-Control-Allow-Methods", "") assert "DELETE" in resp.headers.get("Access-Control-Allow-Methods", "") @pytest.mark.asyncio async def test_cors_allows_idempotency_key_header(self): adapter = _make_adapter(cors_origins=["http://localhost:3000"]) app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.options( "/v1/chat/completions", headers={ "Origin": "http://localhost:3000", "Access-Control-Request-Method": "POST", "Access-Control-Request-Headers": "Idempotency-Key", }, ) assert resp.status == 200 assert "Idempotency-Key" in resp.headers.get("Access-Control-Allow-Headers", "") @pytest.mark.asyncio async def test_cors_sets_vary_origin_header(self): adapter = _make_adapter(cors_origins=["http://localhost:3000"]) app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.get("/health", headers={"Origin": "http://localhost:3000"}) assert resp.status == 200 assert resp.headers.get("Vary") == "Origin" @pytest.mark.asyncio async def test_cors_options_preflight_allowed_for_configured_origin(self): """Configured origins can complete browser preflight.""" adapter = _make_adapter(cors_origins=["http://localhost:3000"]) app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.options( "/v1/chat/completions", headers={ "Origin": "http://localhost:3000", "Access-Control-Request-Method": "POST", "Access-Control-Request-Headers": "Authorization, Content-Type", }, ) assert resp.status == 200 assert resp.headers.get("Access-Control-Allow-Origin") == "http://localhost:3000" assert "Authorization" in resp.headers.get("Access-Control-Allow-Headers", "") @pytest.mark.asyncio async def test_cors_preflight_sets_max_age(self): adapter = _make_adapter(cors_origins=["http://localhost:3000"]) app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.options( "/v1/chat/completions", headers={ "Origin": "http://localhost:3000", "Access-Control-Request-Method": "POST", "Access-Control-Request-Headers": "Authorization, Content-Type", }, ) assert resp.status == 200 assert resp.headers.get("Access-Control-Max-Age") == "600" # --------------------------------------------------------------------------- # Conversation parameter # --------------------------------------------------------------------------- class TestConversationParameter: @pytest.mark.asyncio async def test_conversation_creates_new(self, adapter): """First request with a conversation name works (new conversation).""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = ( {"final_response": "Hello!", "messages": [], "api_calls": 1}, {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, ) resp = await cli.post("/v1/responses", json={ "input": "hi", "conversation": "my-chat", }) assert resp.status == 200 data = await resp.json() assert data["status"] == "completed" # Conversation mapping should be set assert adapter._response_store.get_conversation("my-chat") is not None @pytest.mark.asyncio async def test_conversation_chains_automatically(self, adapter): """Second request with same conversation name chains to first.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = ( {"final_response": "First response", "messages": [], "api_calls": 1}, {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, ) # First request resp1 = await cli.post("/v1/responses", json={ "input": "hello", "conversation": "test-conv", }) assert resp1.status == 200 data1 = await resp1.json() resp1_id = data1["id"] # Second request — should chain mock_run.return_value = ( {"final_response": "Second response", "messages": [], "api_calls": 1}, {"input_tokens": 20, "output_tokens": 10, "total_tokens": 30}, ) resp2 = await cli.post("/v1/responses", json={ "input": "follow up", "conversation": "test-conv", }) assert resp2.status == 200 # The second call should have received conversation history from the first assert mock_run.call_count == 2 second_call_kwargs = mock_run.call_args_list[1] history = second_call_kwargs.kwargs.get("conversation_history", second_call_kwargs[1].get("conversation_history", []) if len(second_call_kwargs) > 1 else []) # History should be non-empty (contains messages from first response) assert len(history) > 0 @pytest.mark.asyncio async def test_conversation_and_previous_response_id_conflict(self, adapter): """Cannot use both conversation and previous_response_id.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: resp = await cli.post("/v1/responses", json={ "input": "hi", "conversation": "my-chat", "previous_response_id": "resp_abc123", }) assert resp.status == 400 data = await resp.json() assert "Cannot use both" in data["error"]["message"] @pytest.mark.asyncio async def test_separate_conversations_are_isolated(self, adapter): """Different conversation names have independent histories.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = ( {"final_response": "Response A", "messages": [], "api_calls": 1}, {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, ) # Conversation A await cli.post("/v1/responses", json={"input": "conv-a msg", "conversation": "conv-a"}) # Conversation B mock_run.return_value = ( {"final_response": "Response B", "messages": [], "api_calls": 1}, {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, ) await cli.post("/v1/responses", json={"input": "conv-b msg", "conversation": "conv-b"}) # They should have different response IDs in the mapping assert adapter._response_store.get_conversation("conv-a") != adapter._response_store.get_conversation("conv-b") @pytest.mark.asyncio async def test_conversation_store_false_no_mapping(self, adapter): """If store=false, conversation mapping is not updated.""" app = _create_app(adapter) async with TestClient(TestServer(app)) as cli: with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: mock_run.return_value = ( {"final_response": "Ephemeral", "messages": [], "api_calls": 1}, {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, ) resp = await cli.post("/v1/responses", json={ "input": "hi", "conversation": "ephemeral-chat", "store": False, }) assert resp.status == 200 # Conversation mapping should NOT be set since store=false assert adapter._response_store.get_conversation("ephemeral-chat") is None