"""Tests for GET /api/world/state endpoint and /api/world/ws relay.""" import json import logging import time from unittest.mock import AsyncMock, MagicMock, patch import pytest from dashboard.routes.world import ( _STALE_THRESHOLD, _bark_and_broadcast, _broadcast, _build_world_state, _conversation, _generate_bark, _handle_client_message, _log_bark_failure, _read_presence_file, broadcast_world_state, ) # --------------------------------------------------------------------------- # _build_world_state # --------------------------------------------------------------------------- def test_build_world_state_maps_fields(): presence = { "version": 1, "liveness": "2026-03-19T02:00:00Z", "mood": "exploring", "current_focus": "reviewing PR", "energy": 0.8, "confidence": 0.9, "active_threads": [{"type": "thinking", "ref": "test", "status": "active"}], "recent_events": [], "concerns": [], } result = _build_world_state(presence) assert result["timmyState"]["mood"] == "exploring" assert result["timmyState"]["activity"] == "reviewing PR" assert result["timmyState"]["energy"] == 0.8 assert result["timmyState"]["confidence"] == 0.9 assert result["updatedAt"] == "2026-03-19T02:00:00Z" assert result["version"] == 1 assert result["visitorPresent"] is False assert len(result["activeThreads"]) == 1 def test_build_world_state_defaults(): """Missing fields get safe defaults.""" result = _build_world_state({}) assert result["timmyState"]["mood"] == "focused" assert result["timmyState"]["energy"] == 0.5 assert result["version"] == 1 # --------------------------------------------------------------------------- # _read_presence_file # --------------------------------------------------------------------------- def test_read_presence_file_missing(tmp_path): with patch("dashboard.routes.world.PRESENCE_FILE", tmp_path / "nope.json"): assert _read_presence_file() is None def test_read_presence_file_stale(tmp_path): f = tmp_path / "presence.json" f.write_text(json.dumps({"version": 1})) # Backdate the file stale_time = time.time() - _STALE_THRESHOLD - 10 import os os.utime(f, (stale_time, stale_time)) with patch("dashboard.routes.world.PRESENCE_FILE", f): assert _read_presence_file() is None def test_read_presence_file_fresh(tmp_path): f = tmp_path / "presence.json" f.write_text(json.dumps({"version": 1, "mood": "focused"})) with patch("dashboard.routes.world.PRESENCE_FILE", f): result = _read_presence_file() assert result is not None assert result["version"] == 1 def test_read_presence_file_bad_json(tmp_path): f = tmp_path / "presence.json" f.write_text("not json {{{") with patch("dashboard.routes.world.PRESENCE_FILE", f): assert _read_presence_file() is None # --------------------------------------------------------------------------- # Full endpoint via TestClient # --------------------------------------------------------------------------- @pytest.fixture def client(): from fastapi import FastAPI from fastapi.testclient import TestClient app = FastAPI() from dashboard.routes.world import router app.include_router(router) return TestClient(app) def test_world_state_endpoint_with_file(client, tmp_path): """Endpoint returns data from presence file when fresh.""" f = tmp_path / "presence.json" f.write_text( json.dumps( { "version": 1, "liveness": "2026-03-19T02:00:00Z", "mood": "exploring", "current_focus": "testing", "active_threads": [], "recent_events": [], "concerns": [], } ) ) with patch("dashboard.routes.world.PRESENCE_FILE", f): resp = client.get("/api/world/state") assert resp.status_code == 200 data = resp.json() assert data["timmyState"]["mood"] == "exploring" assert data["timmyState"]["activity"] == "testing" assert resp.headers["cache-control"] == "no-cache, no-store" def test_world_state_endpoint_fallback(client, tmp_path): """Endpoint falls back to live state when file missing.""" with ( patch("dashboard.routes.world.PRESENCE_FILE", tmp_path / "nope.json"), patch("timmy.workshop_state.get_state_dict") as mock_get, ): mock_get.return_value = { "version": 1, "liveness": "2026-03-19T02:00:00Z", "mood": "idle", "current_focus": "", "active_threads": [], "recent_events": [], "concerns": [], } resp = client.get("/api/world/state") assert resp.status_code == 200 assert resp.json()["timmyState"]["mood"] == "idle" def test_world_state_endpoint_full_fallback(client, tmp_path): """Endpoint returns safe defaults when everything fails.""" with ( patch("dashboard.routes.world.PRESENCE_FILE", tmp_path / "nope.json"), patch( "timmy.workshop_state.get_state_dict", side_effect=RuntimeError("boom"), ), ): resp = client.get("/api/world/state") assert resp.status_code == 200 data = resp.json() assert data["timmyState"]["mood"] == "idle" assert data["version"] == 1 # --------------------------------------------------------------------------- # broadcast_world_state # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_broadcast_world_state_sends_timmy_state(): """broadcast_world_state sends timmy_state JSON to connected clients.""" from dashboard.routes.world import _ws_clients ws = AsyncMock() _ws_clients.append(ws) try: presence = { "version": 1, "mood": "exploring", "current_focus": "testing", "energy": 0.8, "confidence": 0.9, } await broadcast_world_state(presence) ws.send_text.assert_called_once() msg = json.loads(ws.send_text.call_args[0][0]) assert msg["type"] == "timmy_state" assert msg["mood"] == "exploring" assert msg["activity"] == "testing" finally: _ws_clients.clear() @pytest.mark.asyncio async def test_broadcast_world_state_removes_dead_clients(): """Dead WebSocket connections are cleaned up on broadcast.""" from dashboard.routes.world import _ws_clients dead_ws = AsyncMock() dead_ws.send_text.side_effect = ConnectionError("gone") _ws_clients.append(dead_ws) try: await broadcast_world_state({"mood": "idle"}) assert dead_ws not in _ws_clients finally: _ws_clients.clear() def test_world_ws_endpoint_accepts_connection(client): """WebSocket endpoint at /api/world/ws accepts connections.""" with client.websocket_connect("/api/world/ws"): pass # Connection accepted — just close it def test_world_ws_sends_snapshot_on_connect(client, tmp_path): """WebSocket sends a world_state snapshot immediately on connect.""" f = tmp_path / "presence.json" f.write_text( json.dumps( { "version": 1, "liveness": "2026-03-19T02:00:00Z", "mood": "exploring", "current_focus": "testing", "active_threads": [], "recent_events": [], "concerns": [], } ) ) with patch("dashboard.routes.world.PRESENCE_FILE", f): with client.websocket_connect("/api/world/ws") as ws: msg = json.loads(ws.receive_text()) assert msg["type"] == "world_state" assert msg["timmyState"]["mood"] == "exploring" assert msg["timmyState"]["activity"] == "testing" assert "updatedAt" in msg # --------------------------------------------------------------------------- # Visitor chat — bark engine # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_handle_client_message_ignores_non_json(): """Non-JSON messages are silently ignored.""" await _handle_client_message("not json") # should not raise @pytest.mark.asyncio async def test_handle_client_message_ignores_unknown_type(): """Unknown message types are ignored.""" await _handle_client_message(json.dumps({"type": "unknown"})) @pytest.mark.asyncio async def test_handle_client_message_ignores_empty_text(): """Empty visitor_message text is ignored.""" await _handle_client_message(json.dumps({"type": "visitor_message", "text": " "})) @pytest.mark.asyncio async def test_generate_bark_returns_response(): """_generate_bark returns the chat response.""" with patch("timmy.session.chat", new_callable=AsyncMock) as mock_chat: mock_chat.return_value = "Woof! Good to see you." result = await _generate_bark("Hey Timmy!") assert result == "Woof! Good to see you." mock_chat.assert_called_once_with("Hey Timmy!", session_id="workshop") @pytest.mark.asyncio async def test_generate_bark_fallback_on_error(): """_generate_bark returns canned response when chat fails.""" with patch( "timmy.session.chat", new_callable=AsyncMock, side_effect=RuntimeError("no model"), ): result = await _generate_bark("Hello?") assert "tangled" in result @pytest.mark.asyncio async def test_bark_and_broadcast_sends_thinking_then_speech(): """_bark_and_broadcast sends thinking indicator then speech.""" from dashboard.routes.world import _ws_clients ws = AsyncMock() _ws_clients.append(ws) _conversation.clear() try: with patch( "timmy.session.chat", new_callable=AsyncMock, return_value="All good here!", ): await _bark_and_broadcast("How are you?") # Should have sent two messages: thinking + speech assert ws.send_text.call_count == 2 thinking = json.loads(ws.send_text.call_args_list[0][0][0]) speech = json.loads(ws.send_text.call_args_list[1][0][0]) assert thinking["type"] == "timmy_thinking" assert speech["type"] == "timmy_speech" assert speech["text"] == "All good here!" assert len(speech["recentExchanges"]) == 1 assert speech["recentExchanges"][0]["visitor"] == "How are you?" finally: _ws_clients.clear() _conversation.clear() @pytest.mark.asyncio async def test_broadcast_removes_dead_clients(): """Dead clients are cleaned up during broadcast.""" from dashboard.routes.world import _ws_clients dead = AsyncMock() dead.send_text.side_effect = ConnectionError("gone") _ws_clients.append(dead) try: await _broadcast(json.dumps({"type": "timmy_speech", "text": "test"})) assert dead not in _ws_clients finally: _ws_clients.clear() @pytest.mark.asyncio async def test_conversation_buffer_caps_at_max(): """Conversation buffer only keeps the last _MAX_EXCHANGES entries.""" from dashboard.routes.world import _MAX_EXCHANGES, _ws_clients ws = AsyncMock() _ws_clients.append(ws) _conversation.clear() try: with patch( "timmy.session.chat", new_callable=AsyncMock, return_value="reply", ): for i in range(_MAX_EXCHANGES + 2): await _bark_and_broadcast(f"msg {i}") assert len(_conversation) == _MAX_EXCHANGES # Oldest messages should have been evicted assert _conversation[0]["visitor"] == f"msg {_MAX_EXCHANGES + 2 - _MAX_EXCHANGES}" finally: _ws_clients.clear() _conversation.clear() def test_log_bark_failure_logs_exception(caplog): """_log_bark_failure logs errors from failed bark tasks.""" import asyncio loop = asyncio.new_event_loop() async def _fail(): raise RuntimeError("bark boom") task = loop.create_task(_fail()) loop.run_until_complete(asyncio.sleep(0.01)) loop.close() with caplog.at_level(logging.ERROR): _log_bark_failure(task) assert "bark boom" in caplog.text def test_log_bark_failure_ignores_cancelled(): """_log_bark_failure silently ignores cancelled tasks.""" import asyncio task = MagicMock(spec=asyncio.Task) task.cancelled.return_value = True _log_bark_failure(task) # should not raise