forked from Rockachopa/Timmy-time-dashboard
feat: Workshop Phase 4 — visitor chat via WebSocket bark engine (#394)
Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
This commit is contained in:
@@ -5,7 +5,8 @@ The primary consumer is the browser on first load — before any
|
||||
WebSocket events arrive, the client needs a full state snapshot.
|
||||
|
||||
The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever
|
||||
the heartbeat detects a state change.
|
||||
the heartbeat detects a state change. It also accepts ``visitor_message``
|
||||
frames from the 3D client and responds with ``timmy_speech`` barks.
|
||||
|
||||
Source of truth: ``~/.timmy/presence.json`` written by
|
||||
:class:`~timmy.workshop_state.WorkshopHeartbeat`.
|
||||
@@ -13,9 +14,11 @@ Falls back to a live ``get_state_dict()`` call if the file is stale
|
||||
or missing.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from collections import deque
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from fastapi import APIRouter, WebSocket
|
||||
@@ -35,6 +38,13 @@ _ws_clients: list[WebSocket] = []
|
||||
|
||||
_STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild
|
||||
|
||||
# Recent conversation buffer — kept in memory for the Workshop overlay.
|
||||
# Stores the last _MAX_EXCHANGES (visitor_text, timmy_text) pairs.
|
||||
_MAX_EXCHANGES = 3
|
||||
_conversation: deque[dict] = deque(maxlen=_MAX_EXCHANGES)
|
||||
|
||||
_WORKSHOP_SESSION_ID = "workshop"
|
||||
|
||||
|
||||
def _read_presence_file() -> dict | None:
|
||||
"""Read presence.json if it exists and is fresh enough."""
|
||||
@@ -116,7 +126,8 @@ async def world_ws(websocket: WebSocket) -> None:
|
||||
"""Accept a Workshop client and keep it alive for state broadcasts.
|
||||
|
||||
Sends a full ``world_state`` snapshot immediately on connect so the
|
||||
client never starts from a blank slate.
|
||||
client never starts from a blank slate. Incoming frames are parsed
|
||||
as JSON — ``visitor_message`` triggers a bark response.
|
||||
"""
|
||||
await websocket.accept()
|
||||
_ws_clients.append(websocket)
|
||||
@@ -130,7 +141,8 @@ async def world_ws(websocket: WebSocket) -> None:
|
||||
logger.warning("Failed to send WS snapshot: %s", exc)
|
||||
try:
|
||||
while True:
|
||||
await websocket.receive_text() # keep-alive
|
||||
raw = await websocket.receive_text()
|
||||
await _handle_client_message(raw)
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
@@ -156,3 +168,71 @@ async def broadcast_world_state(presence: dict) -> None:
|
||||
for ws in dead:
|
||||
if ws in _ws_clients:
|
||||
_ws_clients.remove(ws)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Visitor chat — bark engine
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _handle_client_message(raw: str) -> None:
|
||||
"""Dispatch an incoming WebSocket frame from the Workshop client."""
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return # ignore non-JSON keep-alive pings
|
||||
|
||||
if data.get("type") == "visitor_message":
|
||||
text = (data.get("text") or "").strip()
|
||||
if text:
|
||||
asyncio.create_task(_bark_and_broadcast(text))
|
||||
|
||||
|
||||
async def _bark_and_broadcast(visitor_text: str) -> None:
|
||||
"""Generate a bark response and broadcast it to all Workshop clients."""
|
||||
# Signal "thinking" state
|
||||
await _broadcast_speech({"type": "timmy_thinking"})
|
||||
|
||||
reply = await _generate_bark(visitor_text)
|
||||
|
||||
# Store exchange in conversation buffer
|
||||
_conversation.append({"visitor": visitor_text, "timmy": reply})
|
||||
|
||||
# Broadcast speech bubble + conversation history
|
||||
await _broadcast_speech(
|
||||
{
|
||||
"type": "timmy_speech",
|
||||
"text": reply,
|
||||
"recentExchanges": list(_conversation),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def _generate_bark(visitor_text: str) -> str:
|
||||
"""Generate a short in-character bark response.
|
||||
|
||||
Uses the existing Timmy session with a dedicated workshop session ID.
|
||||
Gracefully degrades to a canned response if inference fails.
|
||||
"""
|
||||
try:
|
||||
from timmy import session as _session
|
||||
|
||||
response = await _session.chat(visitor_text, session_id=_WORKSHOP_SESSION_ID)
|
||||
return response
|
||||
except Exception as exc:
|
||||
logger.warning("Bark generation failed: %s", exc)
|
||||
return "Hmm, my thoughts are a bit tangled right now."
|
||||
|
||||
|
||||
async def _broadcast_speech(payload: dict) -> None:
|
||||
"""Broadcast a speech message to all connected Workshop clients."""
|
||||
message = json.dumps(payload)
|
||||
dead: list[WebSocket] = []
|
||||
for ws in _ws_clients:
|
||||
try:
|
||||
await ws.send_text(message)
|
||||
except Exception:
|
||||
dead.append(ws)
|
||||
for ws in dead:
|
||||
if ws in _ws_clients:
|
||||
_ws_clients.remove(ws)
|
||||
|
||||
@@ -8,7 +8,12 @@ import pytest
|
||||
|
||||
from dashboard.routes.world import (
|
||||
_STALE_THRESHOLD,
|
||||
_bark_and_broadcast,
|
||||
_broadcast_speech,
|
||||
_build_world_state,
|
||||
_conversation,
|
||||
_generate_bark,
|
||||
_handle_client_message,
|
||||
_read_presence_file,
|
||||
broadcast_world_state,
|
||||
)
|
||||
@@ -245,3 +250,121 @@ def test_world_ws_sends_snapshot_on_connect(client, tmp_path):
|
||||
assert msg["timmyState"]["mood"] == "exploring"
|
||||
assert msg["timmyState"]["activity"] == "testing"
|
||||
assert "updatedAt" in msg
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Visitor chat — bark engine
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_client_message_ignores_non_json():
|
||||
"""Non-JSON messages are silently ignored."""
|
||||
await _handle_client_message("not json") # should not raise
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_client_message_ignores_unknown_type():
|
||||
"""Unknown message types are ignored."""
|
||||
await _handle_client_message(json.dumps({"type": "unknown"}))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_client_message_ignores_empty_text():
|
||||
"""Empty visitor_message text is ignored."""
|
||||
await _handle_client_message(json.dumps({"type": "visitor_message", "text": " "}))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_bark_returns_response():
|
||||
"""_generate_bark returns the chat response."""
|
||||
with patch("timmy.session.chat", new_callable=AsyncMock) as mock_chat:
|
||||
mock_chat.return_value = "Woof! Good to see you."
|
||||
result = await _generate_bark("Hey Timmy!")
|
||||
|
||||
assert result == "Woof! Good to see you."
|
||||
mock_chat.assert_called_once_with("Hey Timmy!", session_id="workshop")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_bark_fallback_on_error():
|
||||
"""_generate_bark returns canned response when chat fails."""
|
||||
with patch(
|
||||
"timmy.session.chat",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=RuntimeError("no model"),
|
||||
):
|
||||
result = await _generate_bark("Hello?")
|
||||
|
||||
assert "tangled" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bark_and_broadcast_sends_thinking_then_speech():
|
||||
"""_bark_and_broadcast sends thinking indicator then speech."""
|
||||
from dashboard.routes.world import _ws_clients
|
||||
|
||||
ws = AsyncMock()
|
||||
_ws_clients.append(ws)
|
||||
_conversation.clear()
|
||||
try:
|
||||
with patch(
|
||||
"timmy.session.chat",
|
||||
new_callable=AsyncMock,
|
||||
return_value="All good here!",
|
||||
):
|
||||
await _bark_and_broadcast("How are you?")
|
||||
|
||||
# Should have sent two messages: thinking + speech
|
||||
assert ws.send_text.call_count == 2
|
||||
thinking = json.loads(ws.send_text.call_args_list[0][0][0])
|
||||
speech = json.loads(ws.send_text.call_args_list[1][0][0])
|
||||
|
||||
assert thinking["type"] == "timmy_thinking"
|
||||
assert speech["type"] == "timmy_speech"
|
||||
assert speech["text"] == "All good here!"
|
||||
assert len(speech["recentExchanges"]) == 1
|
||||
assert speech["recentExchanges"][0]["visitor"] == "How are you?"
|
||||
finally:
|
||||
_ws_clients.clear()
|
||||
_conversation.clear()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_speech_removes_dead_clients():
|
||||
"""Dead clients are cleaned up during speech broadcast."""
|
||||
from dashboard.routes.world import _ws_clients
|
||||
|
||||
dead = AsyncMock()
|
||||
dead.send_text.side_effect = ConnectionError("gone")
|
||||
_ws_clients.append(dead)
|
||||
try:
|
||||
await _broadcast_speech({"type": "timmy_speech", "text": "test"})
|
||||
assert dead not in _ws_clients
|
||||
finally:
|
||||
_ws_clients.clear()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conversation_buffer_caps_at_max():
|
||||
"""Conversation buffer only keeps the last _MAX_EXCHANGES entries."""
|
||||
from dashboard.routes.world import _MAX_EXCHANGES, _ws_clients
|
||||
|
||||
ws = AsyncMock()
|
||||
_ws_clients.append(ws)
|
||||
_conversation.clear()
|
||||
try:
|
||||
with patch(
|
||||
"timmy.session.chat",
|
||||
new_callable=AsyncMock,
|
||||
return_value="reply",
|
||||
):
|
||||
for i in range(_MAX_EXCHANGES + 2):
|
||||
await _bark_and_broadcast(f"msg {i}")
|
||||
|
||||
assert len(_conversation) == _MAX_EXCHANGES
|
||||
# Oldest messages should have been evicted
|
||||
assert _conversation[0]["visitor"] == f"msg {_MAX_EXCHANGES + 2 - _MAX_EXCHANGES}"
|
||||
finally:
|
||||
_ws_clients.clear()
|
||||
_conversation.clear()
|
||||
|
||||
Reference in New Issue
Block a user