From 86224d042de523607adeb60dc5605b6dd6f01519 Mon Sep 17 00:00:00 2001 From: Kimi Agent Date: Thu, 19 Mar 2026 01:54:06 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20Workshop=20Phase=204=20=E2=80=94=20visi?= =?UTF-8?q?tor=20chat=20via=20WebSocket=20bark=20engine=20(#394)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Kimi Agent Co-committed-by: Kimi Agent --- src/dashboard/routes/world.py | 86 ++++++++++++++++++++- tests/dashboard/test_world_api.py | 123 ++++++++++++++++++++++++++++++ 2 files changed, 206 insertions(+), 3 deletions(-) diff --git a/src/dashboard/routes/world.py b/src/dashboard/routes/world.py index be4d4c4..7321ede 100644 --- a/src/dashboard/routes/world.py +++ b/src/dashboard/routes/world.py @@ -5,7 +5,8 @@ The primary consumer is the browser on first load — before any WebSocket events arrive, the client needs a full state snapshot. The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever -the heartbeat detects a state change. +the heartbeat detects a state change. It also accepts ``visitor_message`` +frames from the 3D client and responds with ``timmy_speech`` barks. Source of truth: ``~/.timmy/presence.json`` written by :class:`~timmy.workshop_state.WorkshopHeartbeat`. @@ -13,9 +14,11 @@ Falls back to a live ``get_state_dict()`` call if the file is stale or missing. """ +import asyncio import json import logging import time +from collections import deque from datetime import UTC, datetime from fastapi import APIRouter, WebSocket @@ -35,6 +38,13 @@ _ws_clients: list[WebSocket] = [] _STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild +# Recent conversation buffer — kept in memory for the Workshop overlay. +# Stores the last _MAX_EXCHANGES (visitor_text, timmy_text) pairs. +_MAX_EXCHANGES = 3 +_conversation: deque[dict] = deque(maxlen=_MAX_EXCHANGES) + +_WORKSHOP_SESSION_ID = "workshop" + def _read_presence_file() -> dict | None: """Read presence.json if it exists and is fresh enough.""" @@ -116,7 +126,8 @@ async def world_ws(websocket: WebSocket) -> None: """Accept a Workshop client and keep it alive for state broadcasts. Sends a full ``world_state`` snapshot immediately on connect so the - client never starts from a blank slate. + client never starts from a blank slate. Incoming frames are parsed + as JSON — ``visitor_message`` triggers a bark response. """ await websocket.accept() _ws_clients.append(websocket) @@ -130,7 +141,8 @@ async def world_ws(websocket: WebSocket) -> None: logger.warning("Failed to send WS snapshot: %s", exc) try: while True: - await websocket.receive_text() # keep-alive + raw = await websocket.receive_text() + await _handle_client_message(raw) except Exception: pass finally: @@ -156,3 +168,71 @@ async def broadcast_world_state(presence: dict) -> None: for ws in dead: if ws in _ws_clients: _ws_clients.remove(ws) + + +# --------------------------------------------------------------------------- +# Visitor chat — bark engine +# --------------------------------------------------------------------------- + + +async def _handle_client_message(raw: str) -> None: + """Dispatch an incoming WebSocket frame from the Workshop client.""" + try: + data = json.loads(raw) + except (json.JSONDecodeError, TypeError): + return # ignore non-JSON keep-alive pings + + if data.get("type") == "visitor_message": + text = (data.get("text") or "").strip() + if text: + asyncio.create_task(_bark_and_broadcast(text)) + + +async def _bark_and_broadcast(visitor_text: str) -> None: + """Generate a bark response and broadcast it to all Workshop clients.""" + # Signal "thinking" state + await _broadcast_speech({"type": "timmy_thinking"}) + + reply = await _generate_bark(visitor_text) + + # Store exchange in conversation buffer + _conversation.append({"visitor": visitor_text, "timmy": reply}) + + # Broadcast speech bubble + conversation history + await _broadcast_speech( + { + "type": "timmy_speech", + "text": reply, + "recentExchanges": list(_conversation), + } + ) + + +async def _generate_bark(visitor_text: str) -> str: + """Generate a short in-character bark response. + + Uses the existing Timmy session with a dedicated workshop session ID. + Gracefully degrades to a canned response if inference fails. + """ + try: + from timmy import session as _session + + response = await _session.chat(visitor_text, session_id=_WORKSHOP_SESSION_ID) + return response + except Exception as exc: + logger.warning("Bark generation failed: %s", exc) + return "Hmm, my thoughts are a bit tangled right now." + + +async def _broadcast_speech(payload: dict) -> None: + """Broadcast a speech message to all connected Workshop clients.""" + message = json.dumps(payload) + dead: list[WebSocket] = [] + for ws in _ws_clients: + try: + await ws.send_text(message) + except Exception: + dead.append(ws) + for ws in dead: + if ws in _ws_clients: + _ws_clients.remove(ws) diff --git a/tests/dashboard/test_world_api.py b/tests/dashboard/test_world_api.py index ddbbb80..4037ee3 100644 --- a/tests/dashboard/test_world_api.py +++ b/tests/dashboard/test_world_api.py @@ -8,7 +8,12 @@ import pytest from dashboard.routes.world import ( _STALE_THRESHOLD, + _bark_and_broadcast, + _broadcast_speech, _build_world_state, + _conversation, + _generate_bark, + _handle_client_message, _read_presence_file, broadcast_world_state, ) @@ -245,3 +250,121 @@ def test_world_ws_sends_snapshot_on_connect(client, tmp_path): assert msg["timmyState"]["mood"] == "exploring" assert msg["timmyState"]["activity"] == "testing" assert "updatedAt" in msg + + +# --------------------------------------------------------------------------- +# Visitor chat — bark engine +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_handle_client_message_ignores_non_json(): + """Non-JSON messages are silently ignored.""" + await _handle_client_message("not json") # should not raise + + +@pytest.mark.asyncio +async def test_handle_client_message_ignores_unknown_type(): + """Unknown message types are ignored.""" + await _handle_client_message(json.dumps({"type": "unknown"})) + + +@pytest.mark.asyncio +async def test_handle_client_message_ignores_empty_text(): + """Empty visitor_message text is ignored.""" + await _handle_client_message(json.dumps({"type": "visitor_message", "text": " "})) + + +@pytest.mark.asyncio +async def test_generate_bark_returns_response(): + """_generate_bark returns the chat response.""" + with patch("timmy.session.chat", new_callable=AsyncMock) as mock_chat: + mock_chat.return_value = "Woof! Good to see you." + result = await _generate_bark("Hey Timmy!") + + assert result == "Woof! Good to see you." + mock_chat.assert_called_once_with("Hey Timmy!", session_id="workshop") + + +@pytest.mark.asyncio +async def test_generate_bark_fallback_on_error(): + """_generate_bark returns canned response when chat fails.""" + with patch( + "timmy.session.chat", + new_callable=AsyncMock, + side_effect=RuntimeError("no model"), + ): + result = await _generate_bark("Hello?") + + assert "tangled" in result + + +@pytest.mark.asyncio +async def test_bark_and_broadcast_sends_thinking_then_speech(): + """_bark_and_broadcast sends thinking indicator then speech.""" + from dashboard.routes.world import _ws_clients + + ws = AsyncMock() + _ws_clients.append(ws) + _conversation.clear() + try: + with patch( + "timmy.session.chat", + new_callable=AsyncMock, + return_value="All good here!", + ): + await _bark_and_broadcast("How are you?") + + # Should have sent two messages: thinking + speech + assert ws.send_text.call_count == 2 + thinking = json.loads(ws.send_text.call_args_list[0][0][0]) + speech = json.loads(ws.send_text.call_args_list[1][0][0]) + + assert thinking["type"] == "timmy_thinking" + assert speech["type"] == "timmy_speech" + assert speech["text"] == "All good here!" + assert len(speech["recentExchanges"]) == 1 + assert speech["recentExchanges"][0]["visitor"] == "How are you?" + finally: + _ws_clients.clear() + _conversation.clear() + + +@pytest.mark.asyncio +async def test_broadcast_speech_removes_dead_clients(): + """Dead clients are cleaned up during speech broadcast.""" + from dashboard.routes.world import _ws_clients + + dead = AsyncMock() + dead.send_text.side_effect = ConnectionError("gone") + _ws_clients.append(dead) + try: + await _broadcast_speech({"type": "timmy_speech", "text": "test"}) + assert dead not in _ws_clients + finally: + _ws_clients.clear() + + +@pytest.mark.asyncio +async def test_conversation_buffer_caps_at_max(): + """Conversation buffer only keeps the last _MAX_EXCHANGES entries.""" + from dashboard.routes.world import _MAX_EXCHANGES, _ws_clients + + ws = AsyncMock() + _ws_clients.append(ws) + _conversation.clear() + try: + with patch( + "timmy.session.chat", + new_callable=AsyncMock, + return_value="reply", + ): + for i in range(_MAX_EXCHANGES + 2): + await _bark_and_broadcast(f"msg {i}") + + assert len(_conversation) == _MAX_EXCHANGES + # Oldest messages should have been evicted + assert _conversation[0]["visitor"] == f"msg {_MAX_EXCHANGES + 2 - _MAX_EXCHANGES}" + finally: + _ws_clients.clear() + _conversation.clear()