forked from Rockachopa/Timmy-time-dashboard
Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
238 lines
7.9 KiB
Python
238 lines
7.9 KiB
Python
"""Workshop world state API and WebSocket relay.
|
|
|
|
Serves Timmy's current presence state to the Workshop 3D renderer.
|
|
The primary consumer is the browser on first load — before any
|
|
WebSocket events arrive, the client needs a full state snapshot.
|
|
|
|
The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever
|
|
the heartbeat detects a state change. It also accepts ``visitor_message``
|
|
frames from the 3D client and responds with ``timmy_speech`` barks.
|
|
|
|
Source of truth: ``~/.timmy/presence.json`` written by
|
|
:class:`~timmy.workshop_state.WorkshopHeartbeat`.
|
|
Falls back to a live ``get_state_dict()`` call if the file is stale
|
|
or missing.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from collections import deque
|
|
from datetime import UTC, datetime
|
|
|
|
from fastapi import APIRouter, WebSocket
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from timmy.workshop_state import PRESENCE_FILE
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/world", tags=["world"])
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WebSocket relay for live state changes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ws_clients: list[WebSocket] = []
|
|
|
|
_STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild
|
|
|
|
# Recent conversation buffer — kept in memory for the Workshop overlay.
|
|
# Stores the last _MAX_EXCHANGES (visitor_text, timmy_text) pairs.
|
|
_MAX_EXCHANGES = 3
|
|
_conversation: deque[dict] = deque(maxlen=_MAX_EXCHANGES)
|
|
|
|
_WORKSHOP_SESSION_ID = "workshop"
|
|
|
|
|
|
def _read_presence_file() -> dict | None:
|
|
"""Read presence.json if it exists and is fresh enough."""
|
|
try:
|
|
if not PRESENCE_FILE.exists():
|
|
return None
|
|
age = time.time() - PRESENCE_FILE.stat().st_mtime
|
|
if age > _STALE_THRESHOLD:
|
|
logger.debug("presence.json is stale (%.0fs old)", age)
|
|
return None
|
|
return json.loads(PRESENCE_FILE.read_text())
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
logger.warning("Failed to read presence.json: %s", exc)
|
|
return None
|
|
|
|
|
|
def _build_world_state(presence: dict) -> dict:
|
|
"""Transform presence dict into the world/state API response."""
|
|
return {
|
|
"timmyState": {
|
|
"mood": presence.get("mood", "calm"),
|
|
"activity": presence.get("current_focus", "idle"),
|
|
"energy": presence.get("energy", 0.5),
|
|
"confidence": presence.get("confidence", 0.7),
|
|
},
|
|
"activeThreads": presence.get("active_threads", []),
|
|
"recentEvents": presence.get("recent_events", []),
|
|
"concerns": presence.get("concerns", []),
|
|
"visitorPresent": False,
|
|
"updatedAt": presence.get("liveness", datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")),
|
|
"version": presence.get("version", 1),
|
|
}
|
|
|
|
|
|
def _get_current_state() -> dict:
|
|
"""Build the current world-state dict from best available source."""
|
|
presence = _read_presence_file()
|
|
|
|
if presence is None:
|
|
try:
|
|
from timmy.workshop_state import get_state_dict
|
|
|
|
presence = get_state_dict()
|
|
except Exception as exc:
|
|
logger.warning("Live state build failed: %s", exc)
|
|
presence = {
|
|
"version": 1,
|
|
"liveness": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
"mood": "calm",
|
|
"current_focus": "",
|
|
"active_threads": [],
|
|
"recent_events": [],
|
|
"concerns": [],
|
|
}
|
|
|
|
return _build_world_state(presence)
|
|
|
|
|
|
@router.get("/state")
|
|
async def get_world_state() -> JSONResponse:
|
|
"""Return Timmy's current world state for Workshop bootstrap.
|
|
|
|
Reads from ``~/.timmy/presence.json`` if fresh, otherwise
|
|
rebuilds live from cognitive state.
|
|
"""
|
|
return JSONResponse(
|
|
content=_get_current_state(),
|
|
headers={"Cache-Control": "no-cache, no-store"},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WebSocket endpoint — streams timmy_state changes to Workshop clients
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@router.websocket("/ws")
|
|
async def world_ws(websocket: WebSocket) -> None:
|
|
"""Accept a Workshop client and keep it alive for state broadcasts.
|
|
|
|
Sends a full ``world_state`` snapshot immediately on connect so the
|
|
client never starts from a blank slate. Incoming frames are parsed
|
|
as JSON — ``visitor_message`` triggers a bark response.
|
|
"""
|
|
await websocket.accept()
|
|
_ws_clients.append(websocket)
|
|
logger.info("World WS connected — %d clients", len(_ws_clients))
|
|
|
|
# Send full world-state snapshot so client bootstraps instantly
|
|
try:
|
|
snapshot = _get_current_state()
|
|
await websocket.send_text(json.dumps({"type": "world_state", **snapshot}))
|
|
except Exception as exc:
|
|
logger.warning("Failed to send WS snapshot: %s", exc)
|
|
try:
|
|
while True:
|
|
raw = await websocket.receive_text()
|
|
await _handle_client_message(raw)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
if websocket in _ws_clients:
|
|
_ws_clients.remove(websocket)
|
|
logger.info("World WS disconnected — %d clients", len(_ws_clients))
|
|
|
|
|
|
async def _broadcast(message: str) -> None:
|
|
"""Send *message* to every connected Workshop client, pruning dead ones."""
|
|
dead: list[WebSocket] = []
|
|
for ws in _ws_clients:
|
|
try:
|
|
await ws.send_text(message)
|
|
except Exception:
|
|
dead.append(ws)
|
|
for ws in dead:
|
|
if ws in _ws_clients:
|
|
_ws_clients.remove(ws)
|
|
|
|
|
|
async def broadcast_world_state(presence: dict) -> None:
|
|
"""Broadcast a ``timmy_state`` message to all connected Workshop clients.
|
|
|
|
Called by :class:`~timmy.workshop_state.WorkshopHeartbeat` via its
|
|
``on_change`` callback.
|
|
"""
|
|
state = _build_world_state(presence)
|
|
await _broadcast(json.dumps({"type": "timmy_state", **state["timmyState"]}))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Visitor chat — bark engine
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _handle_client_message(raw: str) -> None:
|
|
"""Dispatch an incoming WebSocket frame from the Workshop client."""
|
|
try:
|
|
data = json.loads(raw)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return # ignore non-JSON keep-alive pings
|
|
|
|
if data.get("type") == "visitor_message":
|
|
text = (data.get("text") or "").strip()
|
|
if text:
|
|
task = asyncio.create_task(_bark_and_broadcast(text))
|
|
task.add_done_callback(_log_bark_failure)
|
|
|
|
|
|
def _log_bark_failure(task: asyncio.Task) -> None:
|
|
"""Log unhandled exceptions from fire-and-forget bark tasks."""
|
|
if task.cancelled():
|
|
return
|
|
exc = task.exception()
|
|
if exc is not None:
|
|
logger.error("Bark task failed: %s", exc)
|
|
|
|
|
|
async def _bark_and_broadcast(visitor_text: str) -> None:
|
|
"""Generate a bark response and broadcast it to all Workshop clients."""
|
|
await _broadcast(json.dumps({"type": "timmy_thinking"}))
|
|
|
|
reply = await _generate_bark(visitor_text)
|
|
|
|
_conversation.append({"visitor": visitor_text, "timmy": reply})
|
|
|
|
await _broadcast(
|
|
json.dumps(
|
|
{
|
|
"type": "timmy_speech",
|
|
"text": reply,
|
|
"recentExchanges": list(_conversation),
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
async def _generate_bark(visitor_text: str) -> str:
|
|
"""Generate a short in-character bark response.
|
|
|
|
Uses the existing Timmy session with a dedicated workshop session ID.
|
|
Gracefully degrades to a canned response if inference fails.
|
|
"""
|
|
try:
|
|
from timmy import session as _session
|
|
|
|
response = await _session.chat(visitor_text, session_id=_WORKSHOP_SESSION_ID)
|
|
return response
|
|
except Exception as exc:
|
|
logger.warning("Bark generation failed: %s", exc)
|
|
return "Hmm, my thoughts are a bit tangled right now."
|