fix: broadcast Timmy state changes via WebSocket relay
- Add on_change async callback to WorkshopHeartbeat - Add /api/world/ws WebSocket endpoint for Workshop clients - broadcast_world_state() pushes timmy_state messages on change - Wire heartbeat to broadcaster at startup in app.py Fixes #375
This commit is contained in:
@@ -425,10 +425,11 @@ async def lifespan(app: FastAPI):
|
||||
except Exception as exc:
|
||||
logger.debug("Vault size check skipped: %s", exc)
|
||||
|
||||
# Start Workshop presence heartbeat
|
||||
# Start Workshop presence heartbeat with WS relay
|
||||
from dashboard.routes.world import broadcast_world_state
|
||||
from timmy.workshop_state import WorkshopHeartbeat
|
||||
|
||||
workshop_heartbeat = WorkshopHeartbeat()
|
||||
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
|
||||
await workshop_heartbeat.start()
|
||||
|
||||
# Start chat integrations in background
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
"""Workshop world state API.
|
||||
"""Workshop world state API and WebSocket relay.
|
||||
|
||||
Serves Timmy's current presence state to the Workshop 3D renderer.
|
||||
The primary consumer is the browser on first load — before any
|
||||
WebSocket events arrive, the client needs a full state snapshot.
|
||||
|
||||
The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever
|
||||
the heartbeat detects a state change.
|
||||
|
||||
Source of truth: ``~/.timmy/presence.json`` written by
|
||||
:class:`~timmy.workshop_state.WorkshopHeartbeat`.
|
||||
Falls back to a live ``get_state_dict()`` call if the file is stale
|
||||
@@ -15,7 +18,7 @@ import logging
|
||||
import time
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from fastapi import APIRouter
|
||||
from fastapi import APIRouter, WebSocket
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from timmy.workshop_state import PRESENCE_FILE
|
||||
@@ -23,6 +26,13 @@ from timmy.workshop_state import PRESENCE_FILE
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/world", tags=["world"])
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WebSocket relay for live state changes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ws_clients: list[WebSocket] = []
|
||||
|
||||
_STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild
|
||||
|
||||
|
||||
@@ -90,3 +100,44 @@ async def get_world_state() -> JSONResponse:
|
||||
content=_build_world_state(presence),
|
||||
headers={"Cache-Control": "no-cache, no-store"},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WebSocket endpoint — streams timmy_state changes to Workshop clients
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.websocket("/ws")
|
||||
async def world_ws(websocket: WebSocket) -> None:
|
||||
"""Accept a Workshop client and keep it alive for state broadcasts."""
|
||||
await websocket.accept()
|
||||
_ws_clients.append(websocket)
|
||||
logger.info("World WS connected — %d clients", len(_ws_clients))
|
||||
try:
|
||||
while True:
|
||||
await websocket.receive_text() # keep-alive
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
if websocket in _ws_clients:
|
||||
_ws_clients.remove(websocket)
|
||||
logger.info("World WS disconnected — %d clients", len(_ws_clients))
|
||||
|
||||
|
||||
async def broadcast_world_state(presence: dict) -> None:
|
||||
"""Broadcast a ``timmy_state`` message to all connected Workshop clients.
|
||||
|
||||
Called by :class:`~timmy.workshop_state.WorkshopHeartbeat` via its
|
||||
``on_change`` callback.
|
||||
"""
|
||||
state = _build_world_state(presence)
|
||||
message = json.dumps({"type": "timmy_state", **state["timmyState"]})
|
||||
dead: list[WebSocket] = []
|
||||
for ws in _ws_clients:
|
||||
try:
|
||||
await ws.send_text(message)
|
||||
except Exception:
|
||||
dead.append(ws)
|
||||
for ws in dead:
|
||||
if ws in _ws_clients:
|
||||
_ws_clients.remove(ws)
|
||||
|
||||
@@ -11,6 +11,7 @@ import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Awaitable, Callable
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
@@ -91,12 +92,14 @@ class WorkshopHeartbeat:
|
||||
self,
|
||||
interval: int = HEARTBEAT_INTERVAL,
|
||||
path: Path | None = None,
|
||||
on_change: Callable[[dict], Awaitable[None]] | None = None,
|
||||
) -> None:
|
||||
self._interval = interval
|
||||
self._path = path or PRESENCE_FILE
|
||||
self._last_hash: str | None = None
|
||||
self._task: asyncio.Task | None = None
|
||||
self._trigger = asyncio.Event()
|
||||
self._on_change = on_change
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the heartbeat background loop."""
|
||||
@@ -129,13 +132,13 @@ class WorkshopHeartbeat:
|
||||
except TimeoutError:
|
||||
pass # Normal periodic tick
|
||||
|
||||
self._write_if_changed()
|
||||
await self._write_if_changed()
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.error("Workshop heartbeat error: %s", exc)
|
||||
|
||||
def _write_if_changed(self) -> None:
|
||||
async def _write_if_changed(self) -> None:
|
||||
"""Build state, compare hash, write only if changed."""
|
||||
state_dict = get_state_dict()
|
||||
current_hash = _state_hash(state_dict)
|
||||
@@ -143,6 +146,11 @@ class WorkshopHeartbeat:
|
||||
return
|
||||
self._last_hash = current_hash
|
||||
write_state(state_dict, self._path)
|
||||
if self._on_change:
|
||||
try:
|
||||
await self._on_change(state_dict)
|
||||
except Exception as exc:
|
||||
logger.warning("on_change callback failed: %s", exc)
|
||||
|
||||
def _subscribe_to_events(self) -> None:
|
||||
"""Subscribe to cognitive state change events on the sensory bus."""
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
"""Tests for GET /api/world/state endpoint."""
|
||||
"""Tests for GET /api/world/state endpoint and /api/world/ws relay."""
|
||||
|
||||
import json
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -10,6 +10,7 @@ from dashboard.routes.world import (
|
||||
_STALE_THRESHOLD,
|
||||
_build_world_state,
|
||||
_read_presence_file,
|
||||
broadcast_world_state,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -166,3 +167,55 @@ def test_world_state_endpoint_full_fallback(client, tmp_path):
|
||||
data = resp.json()
|
||||
assert data["timmyState"]["mood"] == "idle"
|
||||
assert data["version"] == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# broadcast_world_state
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_world_state_sends_timmy_state():
|
||||
"""broadcast_world_state sends timmy_state JSON to connected clients."""
|
||||
from dashboard.routes.world import _ws_clients
|
||||
|
||||
ws = AsyncMock()
|
||||
_ws_clients.append(ws)
|
||||
try:
|
||||
presence = {
|
||||
"version": 1,
|
||||
"mood": "exploring",
|
||||
"current_focus": "testing",
|
||||
"energy": 0.8,
|
||||
"confidence": 0.9,
|
||||
}
|
||||
await broadcast_world_state(presence)
|
||||
|
||||
ws.send_text.assert_called_once()
|
||||
msg = json.loads(ws.send_text.call_args[0][0])
|
||||
assert msg["type"] == "timmy_state"
|
||||
assert msg["mood"] == "exploring"
|
||||
assert msg["activity"] == "testing"
|
||||
finally:
|
||||
_ws_clients.clear()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_world_state_removes_dead_clients():
|
||||
"""Dead WebSocket connections are cleaned up on broadcast."""
|
||||
from dashboard.routes.world import _ws_clients
|
||||
|
||||
dead_ws = AsyncMock()
|
||||
dead_ws.send_text.side_effect = ConnectionError("gone")
|
||||
_ws_clients.append(dead_ws)
|
||||
try:
|
||||
await broadcast_world_state({"mood": "idle"})
|
||||
assert dead_ws not in _ws_clients
|
||||
finally:
|
||||
_ws_clients.clear()
|
||||
|
||||
|
||||
def test_world_ws_endpoint_accepts_connection(client):
|
||||
"""WebSocket endpoint at /api/world/ws accepts connections."""
|
||||
with client.websocket_connect("/api/world/ws"):
|
||||
pass # Connection accepted — just close it
|
||||
|
||||
@@ -98,7 +98,8 @@ def test_state_hash_detects_mood_change():
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_write_if_changed_writes_on_first_call(tmp_path):
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_if_changed_writes_on_first_call(tmp_path):
|
||||
target = tmp_path / "presence.json"
|
||||
hb = WorkshopHeartbeat(interval=60, path=target)
|
||||
|
||||
@@ -109,7 +110,7 @@ def test_write_if_changed_writes_on_first_call(tmp_path):
|
||||
"current_focus": "testing",
|
||||
"mood": "focused",
|
||||
}
|
||||
hb._write_if_changed()
|
||||
await hb._write_if_changed()
|
||||
|
||||
assert target.exists()
|
||||
data = json.loads(target.read_text())
|
||||
@@ -117,22 +118,24 @@ def test_write_if_changed_writes_on_first_call(tmp_path):
|
||||
assert data["current_focus"] == "testing"
|
||||
|
||||
|
||||
def test_write_if_changed_skips_when_unchanged(tmp_path):
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_if_changed_skips_when_unchanged(tmp_path):
|
||||
target = tmp_path / "presence.json"
|
||||
hb = WorkshopHeartbeat(interval=60, path=target)
|
||||
|
||||
fixed_state = {"version": 1, "liveness": "t1", "mood": "idle"}
|
||||
|
||||
with patch("timmy.workshop_state.get_state_dict", return_value=fixed_state):
|
||||
hb._write_if_changed() # First write
|
||||
await hb._write_if_changed() # First write
|
||||
target.write_text("") # Clear to detect if second write happens
|
||||
hb._write_if_changed() # Should skip — state unchanged
|
||||
await hb._write_if_changed() # Should skip — state unchanged
|
||||
|
||||
# File should still be empty (second write was skipped)
|
||||
assert target.read_text() == ""
|
||||
|
||||
|
||||
def test_write_if_changed_writes_on_state_change(tmp_path):
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_if_changed_writes_on_state_change(tmp_path):
|
||||
target = tmp_path / "presence.json"
|
||||
hb = WorkshopHeartbeat(interval=60, path=target)
|
||||
|
||||
@@ -140,15 +143,54 @@ def test_write_if_changed_writes_on_state_change(tmp_path):
|
||||
state_b = {"version": 1, "liveness": "t2", "mood": "focused"}
|
||||
|
||||
with patch("timmy.workshop_state.get_state_dict", return_value=state_a):
|
||||
hb._write_if_changed()
|
||||
await hb._write_if_changed()
|
||||
|
||||
with patch("timmy.workshop_state.get_state_dict", return_value=state_b):
|
||||
hb._write_if_changed()
|
||||
await hb._write_if_changed()
|
||||
|
||||
data = json.loads(target.read_text())
|
||||
assert data["mood"] == "focused"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_if_changed_calls_on_change(tmp_path):
|
||||
"""on_change callback is invoked with state dict when state changes."""
|
||||
target = tmp_path / "presence.json"
|
||||
received = []
|
||||
|
||||
async def capture(state_dict):
|
||||
received.append(state_dict)
|
||||
|
||||
hb = WorkshopHeartbeat(interval=60, path=target, on_change=capture)
|
||||
|
||||
state = {"version": 1, "liveness": "t1", "mood": "focused"}
|
||||
with patch("timmy.workshop_state.get_state_dict", return_value=state):
|
||||
await hb._write_if_changed()
|
||||
|
||||
assert len(received) == 1
|
||||
assert received[0]["mood"] == "focused"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_if_changed_skips_on_change_when_unchanged(tmp_path):
|
||||
"""on_change is NOT called when state hash is unchanged."""
|
||||
target = tmp_path / "presence.json"
|
||||
call_count = 0
|
||||
|
||||
async def counter(_):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
|
||||
hb = WorkshopHeartbeat(interval=60, path=target, on_change=counter)
|
||||
|
||||
state = {"version": 1, "liveness": "t1", "mood": "idle"}
|
||||
with patch("timmy.workshop_state.get_state_dict", return_value=state):
|
||||
await hb._write_if_changed()
|
||||
await hb._write_if_changed()
|
||||
|
||||
assert call_count == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WorkshopHeartbeat — notify
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user