diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 03c99c77..4e247022 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -425,10 +425,11 @@ async def lifespan(app: FastAPI): except Exception as exc: logger.debug("Vault size check skipped: %s", exc) - # Start Workshop presence heartbeat + # Start Workshop presence heartbeat with WS relay + from dashboard.routes.world import broadcast_world_state from timmy.workshop_state import WorkshopHeartbeat - workshop_heartbeat = WorkshopHeartbeat() + workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state) await workshop_heartbeat.start() # Start chat integrations in background diff --git a/src/dashboard/routes/world.py b/src/dashboard/routes/world.py index 91af8b0d..51daea0d 100644 --- a/src/dashboard/routes/world.py +++ b/src/dashboard/routes/world.py @@ -1,9 +1,12 @@ -"""Workshop world state API. +"""Workshop world state API and WebSocket relay. Serves Timmy's current presence state to the Workshop 3D renderer. The primary consumer is the browser on first load — before any WebSocket events arrive, the client needs a full state snapshot. +The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever +the heartbeat detects a state change. + Source of truth: ``~/.timmy/presence.json`` written by :class:`~timmy.workshop_state.WorkshopHeartbeat`. Falls back to a live ``get_state_dict()`` call if the file is stale @@ -15,7 +18,7 @@ import logging import time from datetime import UTC, datetime -from fastapi import APIRouter +from fastapi import APIRouter, WebSocket from fastapi.responses import JSONResponse from timmy.workshop_state import PRESENCE_FILE @@ -23,6 +26,13 @@ from timmy.workshop_state import PRESENCE_FILE logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/world", tags=["world"]) + +# --------------------------------------------------------------------------- +# WebSocket relay for live state changes +# --------------------------------------------------------------------------- + +_ws_clients: list[WebSocket] = [] + _STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild @@ -90,3 +100,44 @@ async def get_world_state() -> JSONResponse: content=_build_world_state(presence), headers={"Cache-Control": "no-cache, no-store"}, ) + + +# --------------------------------------------------------------------------- +# WebSocket endpoint — streams timmy_state changes to Workshop clients +# --------------------------------------------------------------------------- + + +@router.websocket("/ws") +async def world_ws(websocket: WebSocket) -> None: + """Accept a Workshop client and keep it alive for state broadcasts.""" + await websocket.accept() + _ws_clients.append(websocket) + logger.info("World WS connected — %d clients", len(_ws_clients)) + try: + while True: + await websocket.receive_text() # keep-alive + except Exception: + pass + finally: + if websocket in _ws_clients: + _ws_clients.remove(websocket) + logger.info("World WS disconnected — %d clients", len(_ws_clients)) + + +async def broadcast_world_state(presence: dict) -> None: + """Broadcast a ``timmy_state`` message to all connected Workshop clients. + + Called by :class:`~timmy.workshop_state.WorkshopHeartbeat` via its + ``on_change`` callback. + """ + state = _build_world_state(presence) + message = json.dumps({"type": "timmy_state", **state["timmyState"]}) + dead: list[WebSocket] = [] + for ws in _ws_clients: + try: + await ws.send_text(message) + except Exception: + dead.append(ws) + for ws in dead: + if ws in _ws_clients: + _ws_clients.remove(ws) diff --git a/src/timmy/workshop_state.py b/src/timmy/workshop_state.py index f81a0ef3..f148f911 100644 --- a/src/timmy/workshop_state.py +++ b/src/timmy/workshop_state.py @@ -11,6 +11,7 @@ import asyncio import hashlib import json import logging +from collections.abc import Awaitable, Callable from datetime import UTC, datetime from pathlib import Path @@ -91,12 +92,14 @@ class WorkshopHeartbeat: self, interval: int = HEARTBEAT_INTERVAL, path: Path | None = None, + on_change: Callable[[dict], Awaitable[None]] | None = None, ) -> None: self._interval = interval self._path = path or PRESENCE_FILE self._last_hash: str | None = None self._task: asyncio.Task | None = None self._trigger = asyncio.Event() + self._on_change = on_change async def start(self) -> None: """Start the heartbeat background loop.""" @@ -129,13 +132,13 @@ class WorkshopHeartbeat: except TimeoutError: pass # Normal periodic tick - self._write_if_changed() + await self._write_if_changed() except asyncio.CancelledError: raise except Exception as exc: logger.error("Workshop heartbeat error: %s", exc) - def _write_if_changed(self) -> None: + async def _write_if_changed(self) -> None: """Build state, compare hash, write only if changed.""" state_dict = get_state_dict() current_hash = _state_hash(state_dict) @@ -143,6 +146,11 @@ class WorkshopHeartbeat: return self._last_hash = current_hash write_state(state_dict, self._path) + if self._on_change: + try: + await self._on_change(state_dict) + except Exception as exc: + logger.warning("on_change callback failed: %s", exc) def _subscribe_to_events(self) -> None: """Subscribe to cognitive state change events on the sensory bus.""" diff --git a/tests/dashboard/test_world_api.py b/tests/dashboard/test_world_api.py index d18d6377..b7224c3e 100644 --- a/tests/dashboard/test_world_api.py +++ b/tests/dashboard/test_world_api.py @@ -1,8 +1,8 @@ -"""Tests for GET /api/world/state endpoint.""" +"""Tests for GET /api/world/state endpoint and /api/world/ws relay.""" import json import time -from unittest.mock import patch +from unittest.mock import AsyncMock, patch import pytest @@ -10,6 +10,7 @@ from dashboard.routes.world import ( _STALE_THRESHOLD, _build_world_state, _read_presence_file, + broadcast_world_state, ) # --------------------------------------------------------------------------- @@ -166,3 +167,55 @@ def test_world_state_endpoint_full_fallback(client, tmp_path): data = resp.json() assert data["timmyState"]["mood"] == "idle" assert data["version"] == 1 + + +# --------------------------------------------------------------------------- +# broadcast_world_state +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_broadcast_world_state_sends_timmy_state(): + """broadcast_world_state sends timmy_state JSON to connected clients.""" + from dashboard.routes.world import _ws_clients + + ws = AsyncMock() + _ws_clients.append(ws) + try: + presence = { + "version": 1, + "mood": "exploring", + "current_focus": "testing", + "energy": 0.8, + "confidence": 0.9, + } + await broadcast_world_state(presence) + + ws.send_text.assert_called_once() + msg = json.loads(ws.send_text.call_args[0][0]) + assert msg["type"] == "timmy_state" + assert msg["mood"] == "exploring" + assert msg["activity"] == "testing" + finally: + _ws_clients.clear() + + +@pytest.mark.asyncio +async def test_broadcast_world_state_removes_dead_clients(): + """Dead WebSocket connections are cleaned up on broadcast.""" + from dashboard.routes.world import _ws_clients + + dead_ws = AsyncMock() + dead_ws.send_text.side_effect = ConnectionError("gone") + _ws_clients.append(dead_ws) + try: + await broadcast_world_state({"mood": "idle"}) + assert dead_ws not in _ws_clients + finally: + _ws_clients.clear() + + +def test_world_ws_endpoint_accepts_connection(client): + """WebSocket endpoint at /api/world/ws accepts connections.""" + with client.websocket_connect("/api/world/ws"): + pass # Connection accepted — just close it diff --git a/tests/timmy/test_workshop_state.py b/tests/timmy/test_workshop_state.py index f55c07eb..0f5aa170 100644 --- a/tests/timmy/test_workshop_state.py +++ b/tests/timmy/test_workshop_state.py @@ -98,7 +98,8 @@ def test_state_hash_detects_mood_change(): # --------------------------------------------------------------------------- -def test_write_if_changed_writes_on_first_call(tmp_path): +@pytest.mark.asyncio +async def test_write_if_changed_writes_on_first_call(tmp_path): target = tmp_path / "presence.json" hb = WorkshopHeartbeat(interval=60, path=target) @@ -109,7 +110,7 @@ def test_write_if_changed_writes_on_first_call(tmp_path): "current_focus": "testing", "mood": "focused", } - hb._write_if_changed() + await hb._write_if_changed() assert target.exists() data = json.loads(target.read_text()) @@ -117,22 +118,24 @@ def test_write_if_changed_writes_on_first_call(tmp_path): assert data["current_focus"] == "testing" -def test_write_if_changed_skips_when_unchanged(tmp_path): +@pytest.mark.asyncio +async def test_write_if_changed_skips_when_unchanged(tmp_path): target = tmp_path / "presence.json" hb = WorkshopHeartbeat(interval=60, path=target) fixed_state = {"version": 1, "liveness": "t1", "mood": "idle"} with patch("timmy.workshop_state.get_state_dict", return_value=fixed_state): - hb._write_if_changed() # First write + await hb._write_if_changed() # First write target.write_text("") # Clear to detect if second write happens - hb._write_if_changed() # Should skip — state unchanged + await hb._write_if_changed() # Should skip — state unchanged # File should still be empty (second write was skipped) assert target.read_text() == "" -def test_write_if_changed_writes_on_state_change(tmp_path): +@pytest.mark.asyncio +async def test_write_if_changed_writes_on_state_change(tmp_path): target = tmp_path / "presence.json" hb = WorkshopHeartbeat(interval=60, path=target) @@ -140,15 +143,54 @@ def test_write_if_changed_writes_on_state_change(tmp_path): state_b = {"version": 1, "liveness": "t2", "mood": "focused"} with patch("timmy.workshop_state.get_state_dict", return_value=state_a): - hb._write_if_changed() + await hb._write_if_changed() with patch("timmy.workshop_state.get_state_dict", return_value=state_b): - hb._write_if_changed() + await hb._write_if_changed() data = json.loads(target.read_text()) assert data["mood"] == "focused" +@pytest.mark.asyncio +async def test_write_if_changed_calls_on_change(tmp_path): + """on_change callback is invoked with state dict when state changes.""" + target = tmp_path / "presence.json" + received = [] + + async def capture(state_dict): + received.append(state_dict) + + hb = WorkshopHeartbeat(interval=60, path=target, on_change=capture) + + state = {"version": 1, "liveness": "t1", "mood": "focused"} + with patch("timmy.workshop_state.get_state_dict", return_value=state): + await hb._write_if_changed() + + assert len(received) == 1 + assert received[0]["mood"] == "focused" + + +@pytest.mark.asyncio +async def test_write_if_changed_skips_on_change_when_unchanged(tmp_path): + """on_change is NOT called when state hash is unchanged.""" + target = tmp_path / "presence.json" + call_count = 0 + + async def counter(_): + nonlocal call_count + call_count += 1 + + hb = WorkshopHeartbeat(interval=60, path=target, on_change=counter) + + state = {"version": 1, "liveness": "t1", "mood": "idle"} + with patch("timmy.workshop_state.get_state_dict", return_value=state): + await hb._write_if_changed() + await hb._write_if_changed() + + assert call_count == 1 + + # --------------------------------------------------------------------------- # WorkshopHeartbeat — notify # ---------------------------------------------------------------------------