Files
Timmy-time-dashboard/tests/dashboard/test_world_api.py
Kimi Agent 3afb62afb7
All checks were successful
Tests / lint (push) Successful in 4s
Tests / test (push) Successful in 1m2s
fix: add self_reflect tool for past behavior review (#417)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 09:39:14 -04:00

721 lines
24 KiB
Python

"""Tests for GET /api/world/state endpoint and /api/world/ws relay."""
import asyncio
import json
import logging
import time
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from dashboard.routes.world import (
_GROUND_TTL,
_REMIND_AFTER,
_STALE_THRESHOLD,
_bark_and_broadcast,
_broadcast,
_build_commitment_context,
_build_world_state,
_commitments,
_conversation,
_extract_commitments,
_generate_bark,
_handle_client_message,
_heartbeat,
_log_bark_failure,
_read_presence_file,
_record_commitments,
_refresh_ground,
_tick_commitments,
broadcast_world_state,
close_commitment,
get_commitments,
reset_commitments,
reset_conversation_ground,
)
# ---------------------------------------------------------------------------
# _build_world_state
# ---------------------------------------------------------------------------
def test_build_world_state_maps_fields():
presence = {
"version": 1,
"liveness": "2026-03-19T02:00:00Z",
"mood": "exploring",
"current_focus": "reviewing PR",
"energy": 0.8,
"confidence": 0.9,
"active_threads": [{"type": "thinking", "ref": "test", "status": "active"}],
"recent_events": [],
"concerns": [],
}
result = _build_world_state(presence)
assert result["timmyState"]["mood"] == "exploring"
assert result["timmyState"]["activity"] == "reviewing PR"
assert result["timmyState"]["energy"] == 0.8
assert result["timmyState"]["confidence"] == 0.9
assert result["updatedAt"] == "2026-03-19T02:00:00Z"
assert result["version"] == 1
assert result["visitorPresent"] is False
assert len(result["activeThreads"]) == 1
def test_build_world_state_defaults():
"""Missing fields get safe defaults."""
result = _build_world_state({})
assert result["timmyState"]["mood"] == "calm"
assert result["timmyState"]["energy"] == 0.5
assert result["version"] == 1
# ---------------------------------------------------------------------------
# _read_presence_file
# ---------------------------------------------------------------------------
def test_read_presence_file_missing(tmp_path):
with patch("dashboard.routes.world.PRESENCE_FILE", tmp_path / "nope.json"):
assert _read_presence_file() is None
def test_read_presence_file_stale(tmp_path):
f = tmp_path / "presence.json"
f.write_text(json.dumps({"version": 1}))
# Backdate the file
stale_time = time.time() - _STALE_THRESHOLD - 10
import os
os.utime(f, (stale_time, stale_time))
with patch("dashboard.routes.world.PRESENCE_FILE", f):
assert _read_presence_file() is None
def test_read_presence_file_fresh(tmp_path):
f = tmp_path / "presence.json"
f.write_text(json.dumps({"version": 1, "mood": "focused"}))
with patch("dashboard.routes.world.PRESENCE_FILE", f):
result = _read_presence_file()
assert result is not None
assert result["version"] == 1
def test_read_presence_file_bad_json(tmp_path):
f = tmp_path / "presence.json"
f.write_text("not json {{{")
with patch("dashboard.routes.world.PRESENCE_FILE", f):
assert _read_presence_file() is None
# ---------------------------------------------------------------------------
# Full endpoint via TestClient
# ---------------------------------------------------------------------------
@pytest.fixture
def client():
from fastapi import FastAPI
from fastapi.testclient import TestClient
app = FastAPI()
from dashboard.routes.world import router
app.include_router(router)
return TestClient(app)
def test_world_state_endpoint_with_file(client, tmp_path):
"""Endpoint returns data from presence file when fresh."""
f = tmp_path / "presence.json"
f.write_text(
json.dumps(
{
"version": 1,
"liveness": "2026-03-19T02:00:00Z",
"mood": "exploring",
"current_focus": "testing",
"active_threads": [],
"recent_events": [],
"concerns": [],
}
)
)
with patch("dashboard.routes.world.PRESENCE_FILE", f):
resp = client.get("/api/world/state")
assert resp.status_code == 200
data = resp.json()
assert data["timmyState"]["mood"] == "exploring"
assert data["timmyState"]["activity"] == "testing"
assert resp.headers["cache-control"] == "no-cache, no-store"
def test_world_state_endpoint_fallback(client, tmp_path):
"""Endpoint falls back to live state when file missing."""
with (
patch("dashboard.routes.world.PRESENCE_FILE", tmp_path / "nope.json"),
patch("timmy.workshop_state.get_state_dict") as mock_get,
):
mock_get.return_value = {
"version": 1,
"liveness": "2026-03-19T02:00:00Z",
"mood": "calm",
"current_focus": "",
"active_threads": [],
"recent_events": [],
"concerns": [],
}
resp = client.get("/api/world/state")
assert resp.status_code == 200
assert resp.json()["timmyState"]["mood"] == "calm"
def test_world_state_endpoint_full_fallback(client, tmp_path):
"""Endpoint returns safe defaults when everything fails."""
with (
patch("dashboard.routes.world.PRESENCE_FILE", tmp_path / "nope.json"),
patch(
"timmy.workshop_state.get_state_dict",
side_effect=RuntimeError("boom"),
),
):
resp = client.get("/api/world/state")
assert resp.status_code == 200
data = resp.json()
assert data["timmyState"]["mood"] == "calm"
assert data["version"] == 1
# ---------------------------------------------------------------------------
# broadcast_world_state
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_broadcast_world_state_sends_timmy_state():
"""broadcast_world_state sends timmy_state JSON to connected clients."""
from dashboard.routes.world import _ws_clients
ws = AsyncMock()
_ws_clients.append(ws)
try:
presence = {
"version": 1,
"mood": "exploring",
"current_focus": "testing",
"energy": 0.8,
"confidence": 0.9,
}
await broadcast_world_state(presence)
ws.send_text.assert_called_once()
msg = json.loads(ws.send_text.call_args[0][0])
assert msg["type"] == "timmy_state"
assert msg["mood"] == "exploring"
assert msg["activity"] == "testing"
finally:
_ws_clients.clear()
@pytest.mark.asyncio
async def test_broadcast_world_state_removes_dead_clients():
"""Dead WebSocket connections are cleaned up on broadcast."""
from dashboard.routes.world import _ws_clients
dead_ws = AsyncMock()
dead_ws.send_text.side_effect = ConnectionError("gone")
_ws_clients.append(dead_ws)
try:
await broadcast_world_state({"mood": "idle"})
assert dead_ws not in _ws_clients
finally:
_ws_clients.clear()
def test_world_ws_endpoint_accepts_connection(client):
"""WebSocket endpoint at /api/world/ws accepts connections."""
with client.websocket_connect("/api/world/ws"):
pass # Connection accepted — just close it
def test_world_ws_sends_snapshot_on_connect(client, tmp_path):
"""WebSocket sends a world_state snapshot immediately on connect."""
f = tmp_path / "presence.json"
f.write_text(
json.dumps(
{
"version": 1,
"liveness": "2026-03-19T02:00:00Z",
"mood": "exploring",
"current_focus": "testing",
"active_threads": [],
"recent_events": [],
"concerns": [],
}
)
)
with patch("dashboard.routes.world.PRESENCE_FILE", f):
with client.websocket_connect("/api/world/ws") as ws:
msg = json.loads(ws.receive_text())
assert msg["type"] == "world_state"
assert msg["timmyState"]["mood"] == "exploring"
assert msg["timmyState"]["activity"] == "testing"
assert "updatedAt" in msg
# ---------------------------------------------------------------------------
# Visitor chat — bark engine
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_handle_client_message_ignores_non_json():
"""Non-JSON messages are silently ignored."""
await _handle_client_message("not json") # should not raise
@pytest.mark.asyncio
async def test_handle_client_message_ignores_unknown_type():
"""Unknown message types are ignored."""
await _handle_client_message(json.dumps({"type": "unknown"}))
@pytest.mark.asyncio
async def test_handle_client_message_ignores_empty_text():
"""Empty visitor_message text is ignored."""
await _handle_client_message(json.dumps({"type": "visitor_message", "text": " "}))
@pytest.mark.asyncio
async def test_generate_bark_returns_response():
"""_generate_bark returns the chat response."""
reset_conversation_ground()
with patch("timmy.session.chat", new_callable=AsyncMock) as mock_chat:
mock_chat.return_value = "Woof! Good to see you."
result = await _generate_bark("Hey Timmy!")
assert result == "Woof! Good to see you."
mock_chat.assert_called_once_with("Hey Timmy!", session_id="workshop")
@pytest.mark.asyncio
async def test_generate_bark_fallback_on_error():
"""_generate_bark returns canned response when chat fails."""
reset_conversation_ground()
with patch(
"timmy.session.chat",
new_callable=AsyncMock,
side_effect=RuntimeError("no model"),
):
result = await _generate_bark("Hello?")
assert "tangled" in result
@pytest.mark.asyncio
async def test_bark_and_broadcast_sends_thinking_then_speech():
"""_bark_and_broadcast sends thinking indicator then speech."""
from dashboard.routes.world import _ws_clients
ws = AsyncMock()
_ws_clients.append(ws)
_conversation.clear()
reset_conversation_ground()
try:
with patch(
"timmy.session.chat",
new_callable=AsyncMock,
return_value="All good here!",
):
await _bark_and_broadcast("How are you?")
# Should have sent two messages: thinking + speech
assert ws.send_text.call_count == 2
thinking = json.loads(ws.send_text.call_args_list[0][0][0])
speech = json.loads(ws.send_text.call_args_list[1][0][0])
assert thinking["type"] == "timmy_thinking"
assert speech["type"] == "timmy_speech"
assert speech["text"] == "All good here!"
assert len(speech["recentExchanges"]) == 1
assert speech["recentExchanges"][0]["visitor"] == "How are you?"
finally:
_ws_clients.clear()
_conversation.clear()
@pytest.mark.asyncio
async def test_broadcast_removes_dead_clients():
"""Dead clients are cleaned up during broadcast."""
from dashboard.routes.world import _ws_clients
dead = AsyncMock()
dead.send_text.side_effect = ConnectionError("gone")
_ws_clients.append(dead)
try:
await _broadcast(json.dumps({"type": "timmy_speech", "text": "test"}))
assert dead not in _ws_clients
finally:
_ws_clients.clear()
@pytest.mark.asyncio
async def test_conversation_buffer_caps_at_max():
"""Conversation buffer only keeps the last _MAX_EXCHANGES entries."""
from dashboard.routes.world import _MAX_EXCHANGES, _ws_clients
ws = AsyncMock()
_ws_clients.append(ws)
_conversation.clear()
reset_conversation_ground()
try:
with patch(
"timmy.session.chat",
new_callable=AsyncMock,
return_value="reply",
):
for i in range(_MAX_EXCHANGES + 2):
await _bark_and_broadcast(f"msg {i}")
assert len(_conversation) == _MAX_EXCHANGES
# Oldest messages should have been evicted
assert _conversation[0]["visitor"] == f"msg {_MAX_EXCHANGES + 2 - _MAX_EXCHANGES}"
finally:
_ws_clients.clear()
_conversation.clear()
def test_log_bark_failure_logs_exception(caplog):
"""_log_bark_failure logs errors from failed bark tasks."""
loop = asyncio.new_event_loop()
async def _fail():
raise RuntimeError("bark boom")
task = loop.create_task(_fail())
loop.run_until_complete(asyncio.sleep(0.01))
loop.close()
with caplog.at_level(logging.ERROR):
_log_bark_failure(task)
assert "bark boom" in caplog.text
def test_log_bark_failure_ignores_cancelled():
"""_log_bark_failure silently ignores cancelled tasks."""
task = MagicMock(spec=asyncio.Task)
task.cancelled.return_value = True
_log_bark_failure(task) # should not raise
# ---------------------------------------------------------------------------
# Conversation grounding (#322)
# ---------------------------------------------------------------------------
class TestConversationGrounding:
"""Tests for conversation grounding — prevent topic drift."""
def setup_method(self):
reset_conversation_ground()
def teardown_method(self):
reset_conversation_ground()
def test_refresh_ground_sets_topic_on_first_message(self):
"""First visitor message becomes the grounding anchor."""
import dashboard.routes.world as w
_refresh_ground("Tell me about the Bible")
assert w._ground_topic == "Tell me about the Bible"
assert w._ground_set_at > 0
def test_refresh_ground_keeps_topic_on_subsequent_messages(self):
"""Subsequent messages don't overwrite the anchor."""
import dashboard.routes.world as w
_refresh_ground("Tell me about the Bible")
_refresh_ground("What about Genesis?")
assert w._ground_topic == "Tell me about the Bible"
def test_refresh_ground_resets_after_ttl(self):
"""Anchor expires after _GROUND_TTL seconds of inactivity."""
import dashboard.routes.world as w
_refresh_ground("Tell me about the Bible")
# Simulate TTL expiry
w._ground_set_at = time.time() - _GROUND_TTL - 1
_refresh_ground("Now tell me about cooking")
assert w._ground_topic == "Now tell me about cooking"
def test_refresh_ground_truncates_long_messages(self):
"""Anchor text is capped at 120 characters."""
import dashboard.routes.world as w
long_msg = "x" * 200
_refresh_ground(long_msg)
assert len(w._ground_topic) == 120
def test_reset_conversation_ground_clears_state(self):
"""reset_conversation_ground clears the anchor."""
import dashboard.routes.world as w
_refresh_ground("Some topic")
reset_conversation_ground()
assert w._ground_topic is None
assert w._ground_set_at == 0.0
@pytest.mark.asyncio
async def test_generate_bark_prepends_ground_topic(self):
"""When grounded, the topic is prepended to the visitor message."""
_refresh_ground("Tell me about prayer")
with patch("timmy.session.chat", new_callable=AsyncMock) as mock_chat:
mock_chat.return_value = "Great question!"
await _generate_bark("What else can you share?")
call_text = mock_chat.call_args[0][0]
assert "[Workshop conversation topic: Tell me about prayer]" in call_text
assert "What else can you share?" in call_text
@pytest.mark.asyncio
async def test_generate_bark_no_prefix_for_first_message(self):
"""First message (which IS the anchor) is not prefixed."""
_refresh_ground("Tell me about prayer")
with patch("timmy.session.chat", new_callable=AsyncMock) as mock_chat:
mock_chat.return_value = "Sure!"
await _generate_bark("Tell me about prayer")
call_text = mock_chat.call_args[0][0]
assert "[Workshop conversation topic:" not in call_text
assert call_text == "Tell me about prayer"
@pytest.mark.asyncio
async def test_bark_and_broadcast_sets_ground(self):
"""_bark_and_broadcast sets the ground topic automatically."""
import dashboard.routes.world as w
from dashboard.routes.world import _ws_clients
ws = AsyncMock()
_ws_clients.append(ws)
_conversation.clear()
try:
with patch(
"timmy.session.chat",
new_callable=AsyncMock,
return_value="Interesting!",
):
await _bark_and_broadcast("What is grace?")
assert w._ground_topic == "What is grace?"
finally:
_ws_clients.clear()
_conversation.clear()
# ---------------------------------------------------------------------------
# Conversation grounding — commitment tracking (rescued from PR #408)
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=False)
def _clean_commitments():
"""Reset commitments before and after each commitment test."""
reset_commitments()
yield
reset_commitments()
class TestExtractCommitments:
def test_extracts_ill_pattern(self):
text = "I'll draft the skeleton ticket in 30 minutes."
result = _extract_commitments(text)
assert len(result) == 1
assert "draft the skeleton ticket" in result[0]
def test_extracts_i_will_pattern(self):
result = _extract_commitments("I will review that PR tomorrow.")
assert len(result) == 1
assert "review that PR tomorrow" in result[0]
def test_extracts_let_me_pattern(self):
result = _extract_commitments("Let me write up a summary for you.")
assert len(result) == 1
assert "write up a summary" in result[0]
def test_skips_short_matches(self):
result = _extract_commitments("I'll do it.")
# "do it" is 5 chars — should be skipped (needs > 5)
assert result == []
def test_no_commitments_in_normal_text(self):
result = _extract_commitments("The weather is nice today.")
assert result == []
def test_truncates_long_commitments(self):
long_phrase = "a" * 200
result = _extract_commitments(f"I'll {long_phrase}.")
assert len(result) == 1
assert len(result[0]) == 120
class TestRecordCommitments:
def test_records_new_commitment(self, _clean_commitments):
_record_commitments("I'll draft the ticket now.")
assert len(get_commitments()) == 1
assert get_commitments()[0]["messages_since"] == 0
def test_avoids_duplicate_commitments(self, _clean_commitments):
_record_commitments("I'll draft the ticket now.")
_record_commitments("I'll draft the ticket now.")
assert len(get_commitments()) == 1
def test_caps_at_max(self, _clean_commitments):
from dashboard.routes.world import _MAX_COMMITMENTS
for i in range(_MAX_COMMITMENTS + 3):
_record_commitments(f"I'll handle commitment number {i} right away.")
assert len(get_commitments()) <= _MAX_COMMITMENTS
class TestTickAndContext:
def test_tick_increments_messages_since(self, _clean_commitments):
_commitments.append({"text": "write the docs", "created_at": 0, "messages_since": 0})
_tick_commitments()
_tick_commitments()
assert _commitments[0]["messages_since"] == 2
def test_context_empty_when_no_overdue(self, _clean_commitments):
_commitments.append({"text": "write the docs", "created_at": 0, "messages_since": 0})
assert _build_commitment_context() == ""
def test_context_surfaces_overdue_commitments(self, _clean_commitments):
_commitments.append(
{
"text": "draft the skeleton ticket",
"created_at": 0,
"messages_since": _REMIND_AFTER,
}
)
ctx = _build_commitment_context()
assert "draft the skeleton ticket" in ctx
assert "Open commitments" in ctx
def test_context_only_includes_overdue(self, _clean_commitments):
_commitments.append({"text": "recent thing", "created_at": 0, "messages_since": 1})
_commitments.append(
{
"text": "old thing",
"created_at": 0,
"messages_since": _REMIND_AFTER,
}
)
ctx = _build_commitment_context()
assert "old thing" in ctx
assert "recent thing" not in ctx
class TestCloseCommitment:
def test_close_valid_index(self, _clean_commitments):
_commitments.append({"text": "write the docs", "created_at": 0, "messages_since": 0})
assert close_commitment(0) is True
assert len(get_commitments()) == 0
def test_close_invalid_index(self, _clean_commitments):
assert close_commitment(99) is False
class TestGroundingIntegration:
@pytest.mark.asyncio
async def test_bark_records_commitments_from_reply(self, _clean_commitments):
from dashboard.routes.world import _ws_clients
ws = AsyncMock()
_ws_clients.append(ws)
_conversation.clear()
try:
with patch(
"timmy.session.chat",
new_callable=AsyncMock,
return_value="I'll draft the ticket for you!",
):
await _bark_and_broadcast("Can you help?")
assert len(get_commitments()) == 1
assert "draft the ticket" in get_commitments()[0]["text"]
finally:
_ws_clients.clear()
_conversation.clear()
@pytest.mark.asyncio
async def test_bark_prepends_context_after_n_messages(self, _clean_commitments):
"""After _REMIND_AFTER messages, commitment context is prepended."""
_commitments.append(
{
"text": "draft the skeleton ticket",
"created_at": 0,
"messages_since": _REMIND_AFTER - 1,
}
)
with patch(
"timmy.session.chat",
new_callable=AsyncMock,
return_value="Sure thing!",
) as mock_chat:
# This tick will push messages_since to _REMIND_AFTER
await _generate_bark("Any updates?")
# _generate_bark doesn't tick — _bark_and_broadcast does.
# But we pre-set messages_since to _REMIND_AFTER - 1,
# so we need to tick once to make it overdue.
_tick_commitments()
await _generate_bark("Any updates?")
# Second call should have context prepended
last_call = mock_chat.call_args_list[-1]
sent_text = last_call[0][0]
assert "draft the skeleton ticket" in sent_text
assert "Open commitments" in sent_text
# ---------------------------------------------------------------------------
# WebSocket heartbeat ping (rescued from PR #399)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_heartbeat_sends_ping():
"""Heartbeat sends a ping JSON frame after the interval elapses."""
ws = AsyncMock()
with patch("dashboard.routes.world.asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
# Let the first sleep complete, then raise to exit the loop
call_count = 0
async def sleep_side_effect(_interval):
nonlocal call_count
call_count += 1
if call_count > 1:
raise ConnectionError("stop")
mock_sleep.side_effect = sleep_side_effect
await _heartbeat(ws)
ws.send_text.assert_called_once()
msg = json.loads(ws.send_text.call_args[0][0])
assert msg["type"] == "ping"
@pytest.mark.asyncio
async def test_heartbeat_exits_on_dead_connection():
"""Heartbeat exits cleanly when the WebSocket is dead."""
ws = AsyncMock()
ws.send_text.side_effect = ConnectionError("gone")
with patch("dashboard.routes.world.asyncio.sleep", new_callable=AsyncMock):
await _heartbeat(ws) # should not raise