test: add HA integration tests with fake in-process server
Fake HA server (aiohttp.web) simulates full API surface over real TCP: - WebSocket auth handshake + event push - REST endpoints (states, services, notifications) 14 integration tests verify end-to-end flows without mocks: - WS connect/auth/subscribe/event-forwarding/disconnect - REST list/get/call-service against fake server - send() notification delivery and auth failure - 401/500 error handling
This commit is contained in:
0
tests/fakes/__init__.py
Normal file
0
tests/fakes/__init__.py
Normal file
288
tests/fakes/fake_ha_server.py
Normal file
288
tests/fakes/fake_ha_server.py
Normal file
@@ -0,0 +1,288 @@
|
||||
"""Fake Home Assistant server for integration testing.
|
||||
|
||||
Provides a real HTTP + WebSocket server (via aiohttp.web) that mimics the
|
||||
Home Assistant API surface used by hermes-agent:
|
||||
|
||||
- ``/api/websocket`` -- WebSocket auth handshake + event push
|
||||
- ``/api/states`` -- GET all entity states
|
||||
- ``/api/states/{entity_id}`` -- GET single entity state
|
||||
- ``/api/services/{domain}/{service}`` -- POST service call
|
||||
- ``/api/services/persistent_notification/create`` -- POST notification
|
||||
|
||||
Usage::
|
||||
|
||||
async with FakeHAServer(token="test-token") as server:
|
||||
url = server.url # e.g. "http://127.0.0.1:54321"
|
||||
await server.push_event(event_data)
|
||||
assert server.received_notifications # verify what arrived
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import web
|
||||
from aiohttp.test_utils import TestServer
|
||||
|
||||
|
||||
# -- Sample entity data -------------------------------------------------------
|
||||
|
||||
ENTITY_STATES: List[Dict[str, Any]] = [
|
||||
{
|
||||
"entity_id": "light.bedroom",
|
||||
"state": "on",
|
||||
"attributes": {"friendly_name": "Bedroom Light", "brightness": 200},
|
||||
"last_changed": "2025-01-15T10:30:00+00:00",
|
||||
"last_updated": "2025-01-15T10:30:00+00:00",
|
||||
},
|
||||
{
|
||||
"entity_id": "light.kitchen",
|
||||
"state": "off",
|
||||
"attributes": {"friendly_name": "Kitchen Light"},
|
||||
"last_changed": "2025-01-15T09:00:00+00:00",
|
||||
"last_updated": "2025-01-15T09:00:00+00:00",
|
||||
},
|
||||
{
|
||||
"entity_id": "sensor.temperature",
|
||||
"state": "22.5",
|
||||
"attributes": {
|
||||
"friendly_name": "Kitchen Temperature",
|
||||
"unit_of_measurement": "C",
|
||||
},
|
||||
"last_changed": "2025-01-15T10:00:00+00:00",
|
||||
"last_updated": "2025-01-15T10:00:00+00:00",
|
||||
},
|
||||
{
|
||||
"entity_id": "switch.fan",
|
||||
"state": "on",
|
||||
"attributes": {"friendly_name": "Living Room Fan"},
|
||||
"last_changed": "2025-01-15T08:00:00+00:00",
|
||||
"last_updated": "2025-01-15T08:00:00+00:00",
|
||||
},
|
||||
{
|
||||
"entity_id": "climate.thermostat",
|
||||
"state": "heat",
|
||||
"attributes": {
|
||||
"friendly_name": "Main Thermostat",
|
||||
"current_temperature": 21,
|
||||
"temperature": 23,
|
||||
},
|
||||
"last_changed": "2025-01-15T07:00:00+00:00",
|
||||
"last_updated": "2025-01-15T07:00:00+00:00",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
class FakeHAServer:
|
||||
"""In-process fake Home Assistant for integration tests.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
token : str
|
||||
The expected Bearer token for authentication.
|
||||
"""
|
||||
|
||||
def __init__(self, token: str = "test-token-123"):
|
||||
self.token = token
|
||||
|
||||
# Observability -- tests inspect these after exercising the adapter.
|
||||
self.received_service_calls: List[Dict[str, Any]] = []
|
||||
self.received_notifications: List[Dict[str, Any]] = []
|
||||
|
||||
# Control -- tests push events, server forwards them over WS.
|
||||
self._event_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue()
|
||||
|
||||
# Flag to simulate auth rejection.
|
||||
self.reject_auth = False
|
||||
|
||||
# Flag to simulate server errors.
|
||||
self.force_500 = False
|
||||
|
||||
# Internal bookkeeping.
|
||||
self._app: Optional[web.Application] = None
|
||||
self._server: Optional[TestServer] = None
|
||||
self._ws_connections: List[web.WebSocketResponse] = []
|
||||
|
||||
# -- Public helpers --------------------------------------------------------
|
||||
|
||||
@property
|
||||
def url(self) -> str:
|
||||
"""Base URL of the running server, e.g. ``http://127.0.0.1:12345``."""
|
||||
assert self._server is not None, "Server not started"
|
||||
host = self._server.host
|
||||
port = self._server.port
|
||||
return f"http://{host}:{port}"
|
||||
|
||||
async def push_event(self, event_data: Dict[str, Any]) -> None:
|
||||
"""Enqueue a state_changed event for delivery over WebSocket."""
|
||||
await self._event_queue.put(event_data)
|
||||
|
||||
# -- Lifecycle -------------------------------------------------------------
|
||||
|
||||
async def start(self) -> None:
|
||||
self._app = self._build_app()
|
||||
self._server = TestServer(self._app)
|
||||
await self._server.start_server()
|
||||
|
||||
async def stop(self) -> None:
|
||||
# Close any remaining WS connections.
|
||||
for ws in self._ws_connections:
|
||||
if not ws.closed:
|
||||
await ws.close()
|
||||
self._ws_connections.clear()
|
||||
if self._server is not None:
|
||||
await self._server.close()
|
||||
|
||||
async def __aenter__(self) -> "FakeHAServer":
|
||||
await self.start()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *exc) -> None:
|
||||
await self.stop()
|
||||
|
||||
# -- Application construction ----------------------------------------------
|
||||
|
||||
def _build_app(self) -> web.Application:
|
||||
app = web.Application()
|
||||
app.router.add_get("/api/websocket", self._handle_ws)
|
||||
app.router.add_get("/api/states", self._handle_get_states)
|
||||
app.router.add_get("/api/states/{entity_id}", self._handle_get_state)
|
||||
# Notification endpoint must be registered before the generic service
|
||||
# route so that it takes priority.
|
||||
app.router.add_post(
|
||||
"/api/services/persistent_notification/create",
|
||||
self._handle_notification,
|
||||
)
|
||||
app.router.add_post(
|
||||
"/api/services/{domain}/{service}",
|
||||
self._handle_call_service,
|
||||
)
|
||||
return app
|
||||
|
||||
# -- Auth helper -----------------------------------------------------------
|
||||
|
||||
def _check_rest_auth(self, request: web.Request) -> Optional[web.Response]:
|
||||
"""Return a 401 response if the Bearer token is wrong, else None."""
|
||||
auth = request.headers.get("Authorization", "")
|
||||
if auth != f"Bearer {self.token}":
|
||||
return web.Response(status=401, text="Unauthorized")
|
||||
if self.force_500:
|
||||
return web.Response(status=500, text="Internal Server Error")
|
||||
return None
|
||||
|
||||
# -- WebSocket handler -----------------------------------------------------
|
||||
|
||||
async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse:
|
||||
ws = web.WebSocketResponse()
|
||||
await ws.prepare(request)
|
||||
self._ws_connections.append(ws)
|
||||
|
||||
# Step 1: auth_required
|
||||
await ws.send_json({"type": "auth_required", "ha_version": "2025.1.0"})
|
||||
|
||||
# Step 2: receive auth
|
||||
msg = await ws.receive()
|
||||
if msg.type != aiohttp.WSMsgType.TEXT:
|
||||
await ws.close()
|
||||
return ws
|
||||
auth_msg = json.loads(msg.data)
|
||||
|
||||
# Step 3: validate
|
||||
if self.reject_auth or auth_msg.get("access_token") != self.token:
|
||||
await ws.send_json({"type": "auth_invalid", "message": "Invalid token"})
|
||||
await ws.close()
|
||||
return ws
|
||||
|
||||
await ws.send_json({"type": "auth_ok", "ha_version": "2025.1.0"})
|
||||
|
||||
# Step 4: subscribe_events
|
||||
msg = await ws.receive()
|
||||
if msg.type != aiohttp.WSMsgType.TEXT:
|
||||
await ws.close()
|
||||
return ws
|
||||
sub_msg = json.loads(msg.data)
|
||||
sub_id = sub_msg.get("id", 1)
|
||||
|
||||
# Step 5: ACK
|
||||
await ws.send_json({
|
||||
"id": sub_id,
|
||||
"type": "result",
|
||||
"success": True,
|
||||
"result": None,
|
||||
})
|
||||
|
||||
# Step 6: push events from queue until closed
|
||||
try:
|
||||
while not ws.closed:
|
||||
try:
|
||||
event_data = await asyncio.wait_for(
|
||||
self._event_queue.get(), timeout=0.1,
|
||||
)
|
||||
await ws.send_json({
|
||||
"id": sub_id,
|
||||
"type": "event",
|
||||
"event": event_data,
|
||||
})
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except (ConnectionResetError, asyncio.CancelledError):
|
||||
pass
|
||||
|
||||
return ws
|
||||
|
||||
# -- REST handlers ---------------------------------------------------------
|
||||
|
||||
async def _handle_get_states(self, request: web.Request) -> web.Response:
|
||||
err = self._check_rest_auth(request)
|
||||
if err:
|
||||
return err
|
||||
return web.json_response(ENTITY_STATES)
|
||||
|
||||
async def _handle_get_state(self, request: web.Request) -> web.Response:
|
||||
err = self._check_rest_auth(request)
|
||||
if err:
|
||||
return err
|
||||
entity_id = request.match_info["entity_id"]
|
||||
for s in ENTITY_STATES:
|
||||
if s["entity_id"] == entity_id:
|
||||
return web.json_response(s)
|
||||
return web.Response(status=404, text=f"Entity {entity_id} not found")
|
||||
|
||||
async def _handle_notification(self, request: web.Request) -> web.Response:
|
||||
err = self._check_rest_auth(request)
|
||||
if err:
|
||||
return err
|
||||
body = await request.json()
|
||||
self.received_notifications.append(body)
|
||||
return web.json_response([])
|
||||
|
||||
async def _handle_call_service(self, request: web.Request) -> web.Response:
|
||||
err = self._check_rest_auth(request)
|
||||
if err:
|
||||
return err
|
||||
domain = request.match_info["domain"]
|
||||
service = request.match_info["service"]
|
||||
body = await request.json()
|
||||
|
||||
self.received_service_calls.append({
|
||||
"domain": domain,
|
||||
"service": service,
|
||||
"data": body,
|
||||
})
|
||||
|
||||
# Return affected entities (mimics real HA behaviour for light/switch).
|
||||
affected = []
|
||||
entity_id = body.get("entity_id")
|
||||
if entity_id:
|
||||
new_state = "on" if service == "turn_on" else "off"
|
||||
for s in ENTITY_STATES:
|
||||
if s["entity_id"] == entity_id:
|
||||
affected.append({
|
||||
"entity_id": entity_id,
|
||||
"state": new_state,
|
||||
"attributes": s.get("attributes", {}),
|
||||
})
|
||||
break
|
||||
|
||||
return web.json_response(affected)
|
||||
341
tests/integration/test_ha_integration.py
Normal file
341
tests/integration/test_ha_integration.py
Normal file
@@ -0,0 +1,341 @@
|
||||
"""Integration tests for Home Assistant (tool + gateway).
|
||||
|
||||
Spins up a real in-process fake HA server (HTTP + WebSocket) and exercises
|
||||
the full adapter and tool handler paths over real TCP connections.
|
||||
No mocks -- only real async I/O against a fake server.
|
||||
|
||||
Run with: uv run pytest tests/integration/test_ha_integration.py -v
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.homeassistant import HomeAssistantAdapter
|
||||
from tests.fakes.fake_ha_server import FakeHAServer, ENTITY_STATES
|
||||
from tools.homeassistant_tool import (
|
||||
_async_call_service,
|
||||
_async_get_state,
|
||||
_async_list_entities,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _adapter_for(server: FakeHAServer, **extra) -> HomeAssistantAdapter:
|
||||
"""Create an adapter pointed at the fake server."""
|
||||
config = PlatformConfig(
|
||||
enabled=True,
|
||||
token=server.token,
|
||||
extra={"url": server.url, **extra},
|
||||
)
|
||||
return HomeAssistantAdapter(config)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. Gateway -- WebSocket lifecycle
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGatewayWebSocket:
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_auth_subscribe(self):
|
||||
"""Full WS handshake succeeds: auth_required -> auth -> auth_ok -> subscribe -> ACK."""
|
||||
async with FakeHAServer() as server:
|
||||
adapter = _adapter_for(server)
|
||||
connected = await adapter.connect()
|
||||
assert connected is True
|
||||
assert adapter._running is True
|
||||
assert adapter._ws is not None
|
||||
assert not adapter._ws.closed
|
||||
await adapter.disconnect()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_auth_rejected(self):
|
||||
"""connect() returns False when the server rejects auth."""
|
||||
async with FakeHAServer() as server:
|
||||
server.reject_auth = True
|
||||
adapter = _adapter_for(server)
|
||||
connected = await adapter.connect()
|
||||
assert connected is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_event_received_and_forwarded(self):
|
||||
"""Server pushes event -> adapter calls handle_message with correct MessageEvent."""
|
||||
async with FakeHAServer() as server:
|
||||
adapter = _adapter_for(server)
|
||||
adapter.handle_message = AsyncMock()
|
||||
|
||||
await adapter.connect()
|
||||
|
||||
# Push a state_changed event
|
||||
await server.push_event({
|
||||
"data": {
|
||||
"entity_id": "light.bedroom",
|
||||
"old_state": {"state": "off", "attributes": {}},
|
||||
"new_state": {
|
||||
"state": "on",
|
||||
"attributes": {"friendly_name": "Bedroom Light"},
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
# Wait for the adapter to process it
|
||||
for _ in range(50):
|
||||
if adapter.handle_message.call_count > 0:
|
||||
break
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
assert adapter.handle_message.call_count == 1
|
||||
msg_event = adapter.handle_message.call_args[0][0]
|
||||
assert "Bedroom Light" in msg_event.text
|
||||
assert "turned on" in msg_event.text
|
||||
assert msg_event.source.platform == Platform.HOMEASSISTANT
|
||||
|
||||
await adapter.disconnect()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_event_filtering_ignores_unwatched(self):
|
||||
"""Events outside watch_domains are silently dropped."""
|
||||
async with FakeHAServer() as server:
|
||||
adapter = _adapter_for(server, watch_domains=["climate"])
|
||||
adapter.handle_message = AsyncMock()
|
||||
|
||||
await adapter.connect()
|
||||
|
||||
# Push a light event (not in watch_domains)
|
||||
await server.push_event({
|
||||
"data": {
|
||||
"entity_id": "light.bedroom",
|
||||
"old_state": {"state": "off", "attributes": {}},
|
||||
"new_state": {
|
||||
"state": "on",
|
||||
"attributes": {"friendly_name": "Bedroom Light"},
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
await asyncio.sleep(0.5)
|
||||
assert adapter.handle_message.call_count == 0
|
||||
|
||||
await adapter.disconnect()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disconnect_closes_cleanly(self):
|
||||
"""disconnect() cancels listener and closes WebSocket."""
|
||||
async with FakeHAServer() as server:
|
||||
adapter = _adapter_for(server)
|
||||
await adapter.connect()
|
||||
ws_ref = adapter._ws
|
||||
|
||||
await adapter.disconnect()
|
||||
|
||||
assert adapter._running is False
|
||||
assert adapter._listen_task is None
|
||||
assert adapter._ws is None
|
||||
# The original WS reference should be closed
|
||||
assert ws_ref.closed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. REST tool handlers (real HTTP against fake server)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToolRest:
|
||||
"""Call the async tool functions directly against the fake server.
|
||||
|
||||
Note: we call ``_async_*`` instead of the sync ``_handle_*`` wrappers
|
||||
because the sync wrappers use ``_run_async`` which blocks the event
|
||||
loop, deadlocking with the in-process fake server. The async functions
|
||||
are the real logic; the sync wrappers are trivial bridge code already
|
||||
covered by unit tests.
|
||||
"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_entities_returns_all(self, monkeypatch):
|
||||
"""_async_list_entities returns all entities from the fake server."""
|
||||
async with FakeHAServer() as server:
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_URL", server.url,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_TOKEN", server.token,
|
||||
)
|
||||
|
||||
result = await _async_list_entities()
|
||||
|
||||
assert result["count"] == len(ENTITY_STATES)
|
||||
ids = {e["entity_id"] for e in result["entities"]}
|
||||
assert "light.bedroom" in ids
|
||||
assert "climate.thermostat" in ids
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_entities_domain_filter(self, monkeypatch):
|
||||
"""Domain filter is applied after fetching from server."""
|
||||
async with FakeHAServer() as server:
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_URL", server.url,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_TOKEN", server.token,
|
||||
)
|
||||
|
||||
result = await _async_list_entities(domain="light")
|
||||
|
||||
assert result["count"] == 2
|
||||
for e in result["entities"]:
|
||||
assert e["entity_id"].startswith("light.")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_state_single_entity(self, monkeypatch):
|
||||
"""_async_get_state returns full entity details."""
|
||||
async with FakeHAServer() as server:
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_URL", server.url,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_TOKEN", server.token,
|
||||
)
|
||||
|
||||
result = await _async_get_state("light.bedroom")
|
||||
|
||||
assert result["entity_id"] == "light.bedroom"
|
||||
assert result["state"] == "on"
|
||||
assert result["attributes"]["brightness"] == 200
|
||||
assert result["last_changed"] is not None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_state_not_found(self, monkeypatch):
|
||||
"""Non-existent entity raises an aiohttp error (404)."""
|
||||
import aiohttp as _aiohttp
|
||||
|
||||
async with FakeHAServer() as server:
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_URL", server.url,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_TOKEN", server.token,
|
||||
)
|
||||
|
||||
with pytest.raises(_aiohttp.ClientResponseError) as exc_info:
|
||||
await _async_get_state("light.nonexistent")
|
||||
assert exc_info.value.status == 404
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_service_turn_on(self, monkeypatch):
|
||||
"""_async_call_service sends correct payload and server records it."""
|
||||
async with FakeHAServer() as server:
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_URL", server.url,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_TOKEN", server.token,
|
||||
)
|
||||
|
||||
result = await _async_call_service(
|
||||
domain="light",
|
||||
service="turn_on",
|
||||
entity_id="light.bedroom",
|
||||
data={"brightness": 255},
|
||||
)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["service"] == "light.turn_on"
|
||||
assert len(result["affected_entities"]) == 1
|
||||
assert result["affected_entities"][0]["state"] == "on"
|
||||
|
||||
# Verify fake server recorded the call
|
||||
assert len(server.received_service_calls) == 1
|
||||
call = server.received_service_calls[0]
|
||||
assert call["domain"] == "light"
|
||||
assert call["service"] == "turn_on"
|
||||
assert call["data"]["entity_id"] == "light.bedroom"
|
||||
assert call["data"]["brightness"] == 255
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3. send() -- REST notification
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSendNotification:
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_notification_delivered(self):
|
||||
"""Adapter send() delivers notification to fake server REST endpoint."""
|
||||
async with FakeHAServer() as server:
|
||||
adapter = _adapter_for(server)
|
||||
|
||||
result = await adapter.send("ha_events", "Test notification from agent")
|
||||
|
||||
assert result.success is True
|
||||
assert len(server.received_notifications) == 1
|
||||
notif = server.received_notifications[0]
|
||||
assert notif["title"] == "Hermes Agent"
|
||||
assert notif["message"] == "Test notification from agent"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_auth_failure(self):
|
||||
"""send() returns failure when token is wrong."""
|
||||
async with FakeHAServer() as server:
|
||||
config = PlatformConfig(
|
||||
enabled=True,
|
||||
token="wrong-token",
|
||||
extra={"url": server.url},
|
||||
)
|
||||
adapter = HomeAssistantAdapter(config)
|
||||
|
||||
result = await adapter.send("ha_events", "Should fail")
|
||||
|
||||
assert result.success is False
|
||||
assert "401" in result.error
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. Auth and error cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAuthAndErrors:
|
||||
@pytest.mark.asyncio
|
||||
async def test_rest_unauthorized(self, monkeypatch):
|
||||
"""Async function raises on 401 when token is wrong."""
|
||||
import aiohttp as _aiohttp
|
||||
|
||||
async with FakeHAServer() as server:
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_URL", server.url,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_TOKEN", "bad-token",
|
||||
)
|
||||
|
||||
with pytest.raises(_aiohttp.ClientResponseError) as exc_info:
|
||||
await _async_list_entities()
|
||||
assert exc_info.value.status == 401
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rest_server_error(self, monkeypatch):
|
||||
"""Async function raises on 500 response."""
|
||||
import aiohttp as _aiohttp
|
||||
|
||||
async with FakeHAServer() as server:
|
||||
server.force_500 = True
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_URL", server.url,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"tools.homeassistant_tool._HASS_TOKEN", server.token,
|
||||
)
|
||||
|
||||
with pytest.raises(_aiohttp.ClientResponseError) as exc_info:
|
||||
await _async_list_entities()
|
||||
assert exc_info.value.status == 500
|
||||
Reference in New Issue
Block a user