feat: broadcast Timmy state changes via WS relay (#380)

Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
This commit is contained in:
2026-03-19 00:25:11 -04:00
committed by hermes
parent aa4f1de138
commit da43421d4e
5 changed files with 171 additions and 16 deletions

View File

@@ -425,10 +425,11 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("Vault size check skipped: %s", exc)
# Start Workshop presence heartbeat
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
workshop_heartbeat = WorkshopHeartbeat()
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Start chat integrations in background

View File

@@ -1,9 +1,12 @@
"""Workshop world state API.
"""Workshop world state API and WebSocket relay.
Serves Timmy's current presence state to the Workshop 3D renderer.
The primary consumer is the browser on first load — before any
WebSocket events arrive, the client needs a full state snapshot.
The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever
the heartbeat detects a state change.
Source of truth: ``~/.timmy/presence.json`` written by
:class:`~timmy.workshop_state.WorkshopHeartbeat`.
Falls back to a live ``get_state_dict()`` call if the file is stale
@@ -15,7 +18,7 @@ import logging
import time
from datetime import UTC, datetime
from fastapi import APIRouter
from fastapi import APIRouter, WebSocket
from fastapi.responses import JSONResponse
from timmy.workshop_state import PRESENCE_FILE
@@ -23,6 +26,13 @@ from timmy.workshop_state import PRESENCE_FILE
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/world", tags=["world"])
# ---------------------------------------------------------------------------
# WebSocket relay for live state changes
# ---------------------------------------------------------------------------
_ws_clients: list[WebSocket] = []
_STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild
@@ -90,3 +100,44 @@ async def get_world_state() -> JSONResponse:
content=_build_world_state(presence),
headers={"Cache-Control": "no-cache, no-store"},
)
# ---------------------------------------------------------------------------
# WebSocket endpoint — streams timmy_state changes to Workshop clients
# ---------------------------------------------------------------------------
@router.websocket("/ws")
async def world_ws(websocket: WebSocket) -> None:
"""Accept a Workshop client and keep it alive for state broadcasts."""
await websocket.accept()
_ws_clients.append(websocket)
logger.info("World WS connected — %d clients", len(_ws_clients))
try:
while True:
await websocket.receive_text() # keep-alive
except Exception:
pass
finally:
if websocket in _ws_clients:
_ws_clients.remove(websocket)
logger.info("World WS disconnected — %d clients", len(_ws_clients))
async def broadcast_world_state(presence: dict) -> None:
"""Broadcast a ``timmy_state`` message to all connected Workshop clients.
Called by :class:`~timmy.workshop_state.WorkshopHeartbeat` via its
``on_change`` callback.
"""
state = _build_world_state(presence)
message = json.dumps({"type": "timmy_state", **state["timmyState"]})
dead: list[WebSocket] = []
for ws in _ws_clients:
try:
await ws.send_text(message)
except Exception:
dead.append(ws)
for ws in dead:
if ws in _ws_clients:
_ws_clients.remove(ws)

View File

@@ -11,6 +11,7 @@ import asyncio
import hashlib
import json
import logging
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from pathlib import Path
@@ -91,12 +92,14 @@ class WorkshopHeartbeat:
self,
interval: int = HEARTBEAT_INTERVAL,
path: Path | None = None,
on_change: Callable[[dict], Awaitable[None]] | None = None,
) -> None:
self._interval = interval
self._path = path or PRESENCE_FILE
self._last_hash: str | None = None
self._task: asyncio.Task | None = None
self._trigger = asyncio.Event()
self._on_change = on_change
async def start(self) -> None:
"""Start the heartbeat background loop."""
@@ -129,13 +132,13 @@ class WorkshopHeartbeat:
except TimeoutError:
pass # Normal periodic tick
self._write_if_changed()
await self._write_if_changed()
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Workshop heartbeat error: %s", exc)
def _write_if_changed(self) -> None:
async def _write_if_changed(self) -> None:
"""Build state, compare hash, write only if changed."""
state_dict = get_state_dict()
current_hash = _state_hash(state_dict)
@@ -143,6 +146,11 @@ class WorkshopHeartbeat:
return
self._last_hash = current_hash
write_state(state_dict, self._path)
if self._on_change:
try:
await self._on_change(state_dict)
except Exception as exc:
logger.warning("on_change callback failed: %s", exc)
def _subscribe_to_events(self) -> None:
"""Subscribe to cognitive state change events on the sensory bus."""