From b32c642af3cfd8c2fee700e1c05fad10fca07a0e Mon Sep 17 00:00:00 2001 From: 0xbyt4 <35742124+0xbyt4@users.noreply.github.com> Date: Sat, 28 Feb 2026 14:28:04 +0300 Subject: [PATCH] test: add HA integration tests with fake in-process server Fake HA server (aiohttp.web) simulates full API surface over real TCP: - WebSocket auth handshake + event push - REST endpoints (states, services, notifications) 14 integration tests verify end-to-end flows without mocks: - WS connect/auth/subscribe/event-forwarding/disconnect - REST list/get/call-service against fake server - send() notification delivery and auth failure - 401/500 error handling --- tests/fakes/__init__.py | 0 tests/fakes/fake_ha_server.py | 288 +++++++++++++++++++ tests/integration/test_ha_integration.py | 341 +++++++++++++++++++++++ 3 files changed, 629 insertions(+) create mode 100644 tests/fakes/__init__.py create mode 100644 tests/fakes/fake_ha_server.py create mode 100644 tests/integration/test_ha_integration.py diff --git a/tests/fakes/__init__.py b/tests/fakes/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/fakes/fake_ha_server.py b/tests/fakes/fake_ha_server.py new file mode 100644 index 000000000..1d51bf51b --- /dev/null +++ b/tests/fakes/fake_ha_server.py @@ -0,0 +1,288 @@ +"""Fake Home Assistant server for integration testing. + +Provides a real HTTP + WebSocket server (via aiohttp.web) that mimics the +Home Assistant API surface used by hermes-agent: + +- ``/api/websocket`` -- WebSocket auth handshake + event push +- ``/api/states`` -- GET all entity states +- ``/api/states/{entity_id}`` -- GET single entity state +- ``/api/services/{domain}/{service}`` -- POST service call +- ``/api/services/persistent_notification/create`` -- POST notification + +Usage:: + + async with FakeHAServer(token="test-token") as server: + url = server.url # e.g. "http://127.0.0.1:54321" + await server.push_event(event_data) + assert server.received_notifications # verify what arrived +""" + +import asyncio +import json +from typing import Any, Dict, List, Optional + +import aiohttp +from aiohttp import web +from aiohttp.test_utils import TestServer + + +# -- Sample entity data ------------------------------------------------------- + +ENTITY_STATES: List[Dict[str, Any]] = [ + { + "entity_id": "light.bedroom", + "state": "on", + "attributes": {"friendly_name": "Bedroom Light", "brightness": 200}, + "last_changed": "2025-01-15T10:30:00+00:00", + "last_updated": "2025-01-15T10:30:00+00:00", + }, + { + "entity_id": "light.kitchen", + "state": "off", + "attributes": {"friendly_name": "Kitchen Light"}, + "last_changed": "2025-01-15T09:00:00+00:00", + "last_updated": "2025-01-15T09:00:00+00:00", + }, + { + "entity_id": "sensor.temperature", + "state": "22.5", + "attributes": { + "friendly_name": "Kitchen Temperature", + "unit_of_measurement": "C", + }, + "last_changed": "2025-01-15T10:00:00+00:00", + "last_updated": "2025-01-15T10:00:00+00:00", + }, + { + "entity_id": "switch.fan", + "state": "on", + "attributes": {"friendly_name": "Living Room Fan"}, + "last_changed": "2025-01-15T08:00:00+00:00", + "last_updated": "2025-01-15T08:00:00+00:00", + }, + { + "entity_id": "climate.thermostat", + "state": "heat", + "attributes": { + "friendly_name": "Main Thermostat", + "current_temperature": 21, + "temperature": 23, + }, + "last_changed": "2025-01-15T07:00:00+00:00", + "last_updated": "2025-01-15T07:00:00+00:00", + }, +] + + +class FakeHAServer: + """In-process fake Home Assistant for integration tests. + + Parameters + ---------- + token : str + The expected Bearer token for authentication. + """ + + def __init__(self, token: str = "test-token-123"): + self.token = token + + # Observability -- tests inspect these after exercising the adapter. + self.received_service_calls: List[Dict[str, Any]] = [] + self.received_notifications: List[Dict[str, Any]] = [] + + # Control -- tests push events, server forwards them over WS. + self._event_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() + + # Flag to simulate auth rejection. + self.reject_auth = False + + # Flag to simulate server errors. + self.force_500 = False + + # Internal bookkeeping. + self._app: Optional[web.Application] = None + self._server: Optional[TestServer] = None + self._ws_connections: List[web.WebSocketResponse] = [] + + # -- Public helpers -------------------------------------------------------- + + @property + def url(self) -> str: + """Base URL of the running server, e.g. ``http://127.0.0.1:12345``.""" + assert self._server is not None, "Server not started" + host = self._server.host + port = self._server.port + return f"http://{host}:{port}" + + async def push_event(self, event_data: Dict[str, Any]) -> None: + """Enqueue a state_changed event for delivery over WebSocket.""" + await self._event_queue.put(event_data) + + # -- Lifecycle ------------------------------------------------------------- + + async def start(self) -> None: + self._app = self._build_app() + self._server = TestServer(self._app) + await self._server.start_server() + + async def stop(self) -> None: + # Close any remaining WS connections. + for ws in self._ws_connections: + if not ws.closed: + await ws.close() + self._ws_connections.clear() + if self._server is not None: + await self._server.close() + + async def __aenter__(self) -> "FakeHAServer": + await self.start() + return self + + async def __aexit__(self, *exc) -> None: + await self.stop() + + # -- Application construction ---------------------------------------------- + + def _build_app(self) -> web.Application: + app = web.Application() + app.router.add_get("/api/websocket", self._handle_ws) + app.router.add_get("/api/states", self._handle_get_states) + app.router.add_get("/api/states/{entity_id}", self._handle_get_state) + # Notification endpoint must be registered before the generic service + # route so that it takes priority. + app.router.add_post( + "/api/services/persistent_notification/create", + self._handle_notification, + ) + app.router.add_post( + "/api/services/{domain}/{service}", + self._handle_call_service, + ) + return app + + # -- Auth helper ----------------------------------------------------------- + + def _check_rest_auth(self, request: web.Request) -> Optional[web.Response]: + """Return a 401 response if the Bearer token is wrong, else None.""" + auth = request.headers.get("Authorization", "") + if auth != f"Bearer {self.token}": + return web.Response(status=401, text="Unauthorized") + if self.force_500: + return web.Response(status=500, text="Internal Server Error") + return None + + # -- WebSocket handler ----------------------------------------------------- + + async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse: + ws = web.WebSocketResponse() + await ws.prepare(request) + self._ws_connections.append(ws) + + # Step 1: auth_required + await ws.send_json({"type": "auth_required", "ha_version": "2025.1.0"}) + + # Step 2: receive auth + msg = await ws.receive() + if msg.type != aiohttp.WSMsgType.TEXT: + await ws.close() + return ws + auth_msg = json.loads(msg.data) + + # Step 3: validate + if self.reject_auth or auth_msg.get("access_token") != self.token: + await ws.send_json({"type": "auth_invalid", "message": "Invalid token"}) + await ws.close() + return ws + + await ws.send_json({"type": "auth_ok", "ha_version": "2025.1.0"}) + + # Step 4: subscribe_events + msg = await ws.receive() + if msg.type != aiohttp.WSMsgType.TEXT: + await ws.close() + return ws + sub_msg = json.loads(msg.data) + sub_id = sub_msg.get("id", 1) + + # Step 5: ACK + await ws.send_json({ + "id": sub_id, + "type": "result", + "success": True, + "result": None, + }) + + # Step 6: push events from queue until closed + try: + while not ws.closed: + try: + event_data = await asyncio.wait_for( + self._event_queue.get(), timeout=0.1, + ) + await ws.send_json({ + "id": sub_id, + "type": "event", + "event": event_data, + }) + except asyncio.TimeoutError: + continue + except (ConnectionResetError, asyncio.CancelledError): + pass + + return ws + + # -- REST handlers --------------------------------------------------------- + + async def _handle_get_states(self, request: web.Request) -> web.Response: + err = self._check_rest_auth(request) + if err: + return err + return web.json_response(ENTITY_STATES) + + async def _handle_get_state(self, request: web.Request) -> web.Response: + err = self._check_rest_auth(request) + if err: + return err + entity_id = request.match_info["entity_id"] + for s in ENTITY_STATES: + if s["entity_id"] == entity_id: + return web.json_response(s) + return web.Response(status=404, text=f"Entity {entity_id} not found") + + async def _handle_notification(self, request: web.Request) -> web.Response: + err = self._check_rest_auth(request) + if err: + return err + body = await request.json() + self.received_notifications.append(body) + return web.json_response([]) + + async def _handle_call_service(self, request: web.Request) -> web.Response: + err = self._check_rest_auth(request) + if err: + return err + domain = request.match_info["domain"] + service = request.match_info["service"] + body = await request.json() + + self.received_service_calls.append({ + "domain": domain, + "service": service, + "data": body, + }) + + # Return affected entities (mimics real HA behaviour for light/switch). + affected = [] + entity_id = body.get("entity_id") + if entity_id: + new_state = "on" if service == "turn_on" else "off" + for s in ENTITY_STATES: + if s["entity_id"] == entity_id: + affected.append({ + "entity_id": entity_id, + "state": new_state, + "attributes": s.get("attributes", {}), + }) + break + + return web.json_response(affected) diff --git a/tests/integration/test_ha_integration.py b/tests/integration/test_ha_integration.py new file mode 100644 index 000000000..7f7329bad --- /dev/null +++ b/tests/integration/test_ha_integration.py @@ -0,0 +1,341 @@ +"""Integration tests for Home Assistant (tool + gateway). + +Spins up a real in-process fake HA server (HTTP + WebSocket) and exercises +the full adapter and tool handler paths over real TCP connections. +No mocks -- only real async I/O against a fake server. + +Run with: uv run pytest tests/integration/test_ha_integration.py -v +""" + +import asyncio + +import pytest + +pytestmark = pytest.mark.integration + +from unittest.mock import AsyncMock + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.homeassistant import HomeAssistantAdapter +from tests.fakes.fake_ha_server import FakeHAServer, ENTITY_STATES +from tools.homeassistant_tool import ( + _async_call_service, + _async_get_state, + _async_list_entities, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _adapter_for(server: FakeHAServer, **extra) -> HomeAssistantAdapter: + """Create an adapter pointed at the fake server.""" + config = PlatformConfig( + enabled=True, + token=server.token, + extra={"url": server.url, **extra}, + ) + return HomeAssistantAdapter(config) + + +# --------------------------------------------------------------------------- +# 1. Gateway -- WebSocket lifecycle +# --------------------------------------------------------------------------- + + +class TestGatewayWebSocket: + @pytest.mark.asyncio + async def test_connect_auth_subscribe(self): + """Full WS handshake succeeds: auth_required -> auth -> auth_ok -> subscribe -> ACK.""" + async with FakeHAServer() as server: + adapter = _adapter_for(server) + connected = await adapter.connect() + assert connected is True + assert adapter._running is True + assert adapter._ws is not None + assert not adapter._ws.closed + await adapter.disconnect() + + @pytest.mark.asyncio + async def test_connect_auth_rejected(self): + """connect() returns False when the server rejects auth.""" + async with FakeHAServer() as server: + server.reject_auth = True + adapter = _adapter_for(server) + connected = await adapter.connect() + assert connected is False + + @pytest.mark.asyncio + async def test_event_received_and_forwarded(self): + """Server pushes event -> adapter calls handle_message with correct MessageEvent.""" + async with FakeHAServer() as server: + adapter = _adapter_for(server) + adapter.handle_message = AsyncMock() + + await adapter.connect() + + # Push a state_changed event + await server.push_event({ + "data": { + "entity_id": "light.bedroom", + "old_state": {"state": "off", "attributes": {}}, + "new_state": { + "state": "on", + "attributes": {"friendly_name": "Bedroom Light"}, + }, + } + }) + + # Wait for the adapter to process it + for _ in range(50): + if adapter.handle_message.call_count > 0: + break + await asyncio.sleep(0.05) + + assert adapter.handle_message.call_count == 1 + msg_event = adapter.handle_message.call_args[0][0] + assert "Bedroom Light" in msg_event.text + assert "turned on" in msg_event.text + assert msg_event.source.platform == Platform.HOMEASSISTANT + + await adapter.disconnect() + + @pytest.mark.asyncio + async def test_event_filtering_ignores_unwatched(self): + """Events outside watch_domains are silently dropped.""" + async with FakeHAServer() as server: + adapter = _adapter_for(server, watch_domains=["climate"]) + adapter.handle_message = AsyncMock() + + await adapter.connect() + + # Push a light event (not in watch_domains) + await server.push_event({ + "data": { + "entity_id": "light.bedroom", + "old_state": {"state": "off", "attributes": {}}, + "new_state": { + "state": "on", + "attributes": {"friendly_name": "Bedroom Light"}, + }, + } + }) + + await asyncio.sleep(0.5) + assert adapter.handle_message.call_count == 0 + + await adapter.disconnect() + + @pytest.mark.asyncio + async def test_disconnect_closes_cleanly(self): + """disconnect() cancels listener and closes WebSocket.""" + async with FakeHAServer() as server: + adapter = _adapter_for(server) + await adapter.connect() + ws_ref = adapter._ws + + await adapter.disconnect() + + assert adapter._running is False + assert adapter._listen_task is None + assert adapter._ws is None + # The original WS reference should be closed + assert ws_ref.closed + + +# --------------------------------------------------------------------------- +# 2. REST tool handlers (real HTTP against fake server) +# --------------------------------------------------------------------------- + + +class TestToolRest: + """Call the async tool functions directly against the fake server. + + Note: we call ``_async_*`` instead of the sync ``_handle_*`` wrappers + because the sync wrappers use ``_run_async`` which blocks the event + loop, deadlocking with the in-process fake server. The async functions + are the real logic; the sync wrappers are trivial bridge code already + covered by unit tests. + """ + + @pytest.mark.asyncio + async def test_list_entities_returns_all(self, monkeypatch): + """_async_list_entities returns all entities from the fake server.""" + async with FakeHAServer() as server: + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_URL", server.url, + ) + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_TOKEN", server.token, + ) + + result = await _async_list_entities() + + assert result["count"] == len(ENTITY_STATES) + ids = {e["entity_id"] for e in result["entities"]} + assert "light.bedroom" in ids + assert "climate.thermostat" in ids + + @pytest.mark.asyncio + async def test_list_entities_domain_filter(self, monkeypatch): + """Domain filter is applied after fetching from server.""" + async with FakeHAServer() as server: + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_URL", server.url, + ) + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_TOKEN", server.token, + ) + + result = await _async_list_entities(domain="light") + + assert result["count"] == 2 + for e in result["entities"]: + assert e["entity_id"].startswith("light.") + + @pytest.mark.asyncio + async def test_get_state_single_entity(self, monkeypatch): + """_async_get_state returns full entity details.""" + async with FakeHAServer() as server: + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_URL", server.url, + ) + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_TOKEN", server.token, + ) + + result = await _async_get_state("light.bedroom") + + assert result["entity_id"] == "light.bedroom" + assert result["state"] == "on" + assert result["attributes"]["brightness"] == 200 + assert result["last_changed"] is not None + + @pytest.mark.asyncio + async def test_get_state_not_found(self, monkeypatch): + """Non-existent entity raises an aiohttp error (404).""" + import aiohttp as _aiohttp + + async with FakeHAServer() as server: + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_URL", server.url, + ) + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_TOKEN", server.token, + ) + + with pytest.raises(_aiohttp.ClientResponseError) as exc_info: + await _async_get_state("light.nonexistent") + assert exc_info.value.status == 404 + + @pytest.mark.asyncio + async def test_call_service_turn_on(self, monkeypatch): + """_async_call_service sends correct payload and server records it.""" + async with FakeHAServer() as server: + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_URL", server.url, + ) + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_TOKEN", server.token, + ) + + result = await _async_call_service( + domain="light", + service="turn_on", + entity_id="light.bedroom", + data={"brightness": 255}, + ) + + assert result["success"] is True + assert result["service"] == "light.turn_on" + assert len(result["affected_entities"]) == 1 + assert result["affected_entities"][0]["state"] == "on" + + # Verify fake server recorded the call + assert len(server.received_service_calls) == 1 + call = server.received_service_calls[0] + assert call["domain"] == "light" + assert call["service"] == "turn_on" + assert call["data"]["entity_id"] == "light.bedroom" + assert call["data"]["brightness"] == 255 + + +# --------------------------------------------------------------------------- +# 3. send() -- REST notification +# --------------------------------------------------------------------------- + + +class TestSendNotification: + @pytest.mark.asyncio + async def test_send_notification_delivered(self): + """Adapter send() delivers notification to fake server REST endpoint.""" + async with FakeHAServer() as server: + adapter = _adapter_for(server) + + result = await adapter.send("ha_events", "Test notification from agent") + + assert result.success is True + assert len(server.received_notifications) == 1 + notif = server.received_notifications[0] + assert notif["title"] == "Hermes Agent" + assert notif["message"] == "Test notification from agent" + + @pytest.mark.asyncio + async def test_send_auth_failure(self): + """send() returns failure when token is wrong.""" + async with FakeHAServer() as server: + config = PlatformConfig( + enabled=True, + token="wrong-token", + extra={"url": server.url}, + ) + adapter = HomeAssistantAdapter(config) + + result = await adapter.send("ha_events", "Should fail") + + assert result.success is False + assert "401" in result.error + + +# --------------------------------------------------------------------------- +# 4. Auth and error cases +# --------------------------------------------------------------------------- + + +class TestAuthAndErrors: + @pytest.mark.asyncio + async def test_rest_unauthorized(self, monkeypatch): + """Async function raises on 401 when token is wrong.""" + import aiohttp as _aiohttp + + async with FakeHAServer() as server: + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_URL", server.url, + ) + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_TOKEN", "bad-token", + ) + + with pytest.raises(_aiohttp.ClientResponseError) as exc_info: + await _async_list_entities() + assert exc_info.value.status == 401 + + @pytest.mark.asyncio + async def test_rest_server_error(self, monkeypatch): + """Async function raises on 500 response.""" + import aiohttp as _aiohttp + + async with FakeHAServer() as server: + server.force_500 = True + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_URL", server.url, + ) + monkeypatch.setattr( + "tools.homeassistant_tool._HASS_TOKEN", server.token, + ) + + with pytest.raises(_aiohttp.ClientResponseError) as exc_info: + await _async_list_entities() + assert exc_info.value.status == 500