forked from Rockachopa/Timmy-time-dashboard
test: close coverage gaps for timmy_serve CLI, voice_enhanced, WebSocket, and swarm live page
- Add 8 tests for timmy_serve/cli.py (start, invoice, status commands) covering default args, custom args, and output validation - Add 8 tests for voice_enhanced route covering all intent types (status, help, swarm, voice, chat fallback) plus error handling - Add 17 tests for websocket/handler.py covering broadcast to multiple clients, dead connection cleanup, history trimming, connect/disconnect, and all convenience broadcast methods - Add 4 tests for the new GET /swarm/live page route Total new tests: 37
This commit is contained in:
23
tests/test_swarm_live_page.py
Normal file
23
tests/test_swarm_live_page.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""Tests for the GET /swarm/live page route."""
|
||||
|
||||
|
||||
class TestSwarmLivePage:
|
||||
def test_swarm_live_returns_html(self, client):
|
||||
resp = client.get("/swarm/live")
|
||||
assert resp.status_code == 200
|
||||
assert "text/html" in resp.headers["content-type"]
|
||||
|
||||
def test_swarm_live_contains_dashboard_title(self, client):
|
||||
resp = client.get("/swarm/live")
|
||||
assert "Live Swarm Dashboard" in resp.text
|
||||
|
||||
def test_swarm_live_contains_websocket_script(self, client):
|
||||
resp = client.get("/swarm/live")
|
||||
assert "/swarm/live" in resp.text
|
||||
assert "WebSocket" in resp.text
|
||||
|
||||
def test_swarm_live_contains_stat_elements(self, client):
|
||||
resp = client.get("/swarm/live")
|
||||
assert "stat-agents" in resp.text
|
||||
assert "stat-active" in resp.text
|
||||
assert "stat-tasks" in resp.text
|
||||
65
tests/test_timmy_serve_cli.py
Normal file
65
tests/test_timmy_serve_cli.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""Tests for timmy_serve/cli.py — Serve-mode CLI commands."""
|
||||
|
||||
from typer.testing import CliRunner
|
||||
|
||||
from timmy_serve.cli import app
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
|
||||
class TestStartCommand:
|
||||
def test_start_default_port(self):
|
||||
result = runner.invoke(app, ["start"])
|
||||
assert result.exit_code == 0
|
||||
assert "8402" in result.output
|
||||
assert "L402 payment proxy active" in result.output
|
||||
|
||||
def test_start_custom_port(self):
|
||||
result = runner.invoke(app, ["start", "--port", "9000"])
|
||||
assert result.exit_code == 0
|
||||
assert "9000" in result.output
|
||||
|
||||
def test_start_custom_host(self):
|
||||
result = runner.invoke(app, ["start", "--host", "127.0.0.1"])
|
||||
assert result.exit_code == 0
|
||||
assert "127.0.0.1" in result.output
|
||||
|
||||
def test_start_shows_endpoints(self):
|
||||
result = runner.invoke(app, ["start"])
|
||||
assert "/serve/chat" in result.output
|
||||
assert "/serve/invoice" in result.output
|
||||
assert "/serve/status" in result.output
|
||||
|
||||
|
||||
class TestInvoiceCommand:
|
||||
def test_invoice_default_amount(self):
|
||||
result = runner.invoke(app, ["invoice"])
|
||||
assert result.exit_code == 0
|
||||
assert "100 sats" in result.output
|
||||
assert "API access" in result.output
|
||||
|
||||
def test_invoice_custom_amount(self):
|
||||
result = runner.invoke(app, ["invoice", "--amount", "500"])
|
||||
assert result.exit_code == 0
|
||||
assert "500 sats" in result.output
|
||||
|
||||
def test_invoice_custom_memo(self):
|
||||
result = runner.invoke(app, ["invoice", "--memo", "Test payment"])
|
||||
assert result.exit_code == 0
|
||||
assert "Test payment" in result.output
|
||||
|
||||
def test_invoice_shows_payment_hash(self):
|
||||
result = runner.invoke(app, ["invoice"])
|
||||
assert "Payment hash:" in result.output
|
||||
assert "Pay request:" in result.output
|
||||
|
||||
|
||||
class TestStatusCommand:
|
||||
def test_status_runs_successfully(self):
|
||||
result = runner.invoke(app, ["status"])
|
||||
assert result.exit_code == 0
|
||||
assert "Timmy Serve" in result.output
|
||||
assert "Total invoices:" in result.output
|
||||
assert "Settled:" in result.output
|
||||
assert "Total earned:" in result.output
|
||||
assert "sats" in result.output
|
||||
101
tests/test_voice_enhanced.py
Normal file
101
tests/test_voice_enhanced.py
Normal file
@@ -0,0 +1,101 @@
|
||||
"""Tests for dashboard/routes/voice_enhanced.py — enhanced voice processing."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestVoiceEnhancedProcess:
|
||||
"""Test the POST /voice/enhanced/process endpoint."""
|
||||
|
||||
def test_status_intent(self, client):
|
||||
resp = client.post(
|
||||
"/voice/enhanced/process",
|
||||
data={"text": "what is your status", "speak_response": "false"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["intent"] == "status"
|
||||
assert "operational" in data["response"].lower()
|
||||
assert data["error"] is None
|
||||
|
||||
def test_help_intent(self, client):
|
||||
resp = client.post(
|
||||
"/voice/enhanced/process",
|
||||
data={"text": "help me please", "speak_response": "false"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["intent"] == "help"
|
||||
assert "commands" in data["response"].lower()
|
||||
|
||||
def test_swarm_intent(self, client):
|
||||
resp = client.post(
|
||||
"/voice/enhanced/process",
|
||||
data={"text": "list all swarm agents", "speak_response": "false"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["intent"] == "swarm"
|
||||
assert "agents" in data["response"].lower()
|
||||
|
||||
def test_voice_intent(self, client):
|
||||
resp = client.post(
|
||||
"/voice/enhanced/process",
|
||||
data={"text": "change voice settings", "speak_response": "false"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["intent"] == "voice"
|
||||
assert "tts" in data["response"].lower()
|
||||
|
||||
def test_chat_fallback_intent(self, client):
|
||||
"""Chat intent should attempt to call the Timmy agent."""
|
||||
mock_agent = MagicMock()
|
||||
mock_run = MagicMock()
|
||||
mock_run.content = "Hello from Timmy!"
|
||||
mock_agent.run.return_value = mock_run
|
||||
|
||||
with patch("dashboard.routes.voice_enhanced.create_timmy", return_value=mock_agent):
|
||||
resp = client.post(
|
||||
"/voice/enhanced/process",
|
||||
data={"text": "tell me about Bitcoin", "speak_response": "false"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["intent"] == "chat"
|
||||
assert data["response"] == "Hello from Timmy!"
|
||||
|
||||
def test_chat_fallback_error_handling(self, client):
|
||||
"""When the agent raises, the error should be captured gracefully."""
|
||||
with patch(
|
||||
"dashboard.routes.voice_enhanced.create_timmy",
|
||||
side_effect=RuntimeError("Ollama offline"),
|
||||
):
|
||||
resp = client.post(
|
||||
"/voice/enhanced/process",
|
||||
data={"text": "tell me about sovereignty", "speak_response": "false"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["error"] is not None
|
||||
assert "Ollama offline" in data["error"]
|
||||
|
||||
def test_speak_response_flag(self, client):
|
||||
"""When speak_response=true, the spoken field should be true."""
|
||||
resp = client.post(
|
||||
"/voice/enhanced/process",
|
||||
data={"text": "what is your status", "speak_response": "true"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["spoken"] is True
|
||||
|
||||
def test_confidence_returned(self, client):
|
||||
resp = client.post(
|
||||
"/voice/enhanced/process",
|
||||
data={"text": "status check", "speak_response": "false"},
|
||||
)
|
||||
data = resp.json()
|
||||
assert "confidence" in data
|
||||
assert isinstance(data["confidence"], (int, float))
|
||||
174
tests/test_websocket_extended.py
Normal file
174
tests/test_websocket_extended.py
Normal file
@@ -0,0 +1,174 @@
|
||||
"""Extended tests for websocket/handler.py — broadcast, disconnect, convenience."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from websocket.handler import WebSocketManager, WSEvent
|
||||
|
||||
|
||||
class TestWSEventSerialization:
|
||||
def test_to_json_roundtrip(self):
|
||||
event = WSEvent(event="task_posted", data={"id": "abc"}, timestamp="2026-01-01T00:00:00Z")
|
||||
raw = event.to_json()
|
||||
parsed = json.loads(raw)
|
||||
assert parsed["event"] == "task_posted"
|
||||
assert parsed["data"]["id"] == "abc"
|
||||
assert parsed["timestamp"] == "2026-01-01T00:00:00Z"
|
||||
|
||||
def test_to_json_empty_data(self):
|
||||
event = WSEvent(event="ping", data={}, timestamp="t")
|
||||
parsed = json.loads(event.to_json())
|
||||
assert parsed["data"] == {}
|
||||
|
||||
|
||||
class TestWebSocketManagerBroadcast:
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_sends_to_all_connections(self):
|
||||
mgr = WebSocketManager()
|
||||
ws1 = AsyncMock()
|
||||
ws2 = AsyncMock()
|
||||
mgr._connections = [ws1, ws2]
|
||||
|
||||
await mgr.broadcast("test_event", {"key": "val"})
|
||||
|
||||
ws1.send_text.assert_called_once()
|
||||
ws2.send_text.assert_called_once()
|
||||
# Both should receive the same message
|
||||
msg1 = json.loads(ws1.send_text.call_args[0][0])
|
||||
msg2 = json.loads(ws2.send_text.call_args[0][0])
|
||||
assert msg1["event"] == "test_event"
|
||||
assert msg2["event"] == "test_event"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_removes_dead_connections(self):
|
||||
mgr = WebSocketManager()
|
||||
ws_alive = AsyncMock()
|
||||
ws_dead = AsyncMock()
|
||||
ws_dead.send_text.side_effect = RuntimeError("connection closed")
|
||||
mgr._connections = [ws_alive, ws_dead]
|
||||
|
||||
await mgr.broadcast("ping", {})
|
||||
|
||||
assert ws_dead not in mgr._connections
|
||||
assert ws_alive in mgr._connections
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_appends_to_history(self):
|
||||
mgr = WebSocketManager()
|
||||
await mgr.broadcast("evt1", {"a": 1})
|
||||
await mgr.broadcast("evt2", {"b": 2})
|
||||
|
||||
assert len(mgr.event_history) == 2
|
||||
assert mgr.event_history[0].event == "evt1"
|
||||
assert mgr.event_history[1].event == "evt2"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_trims_history(self):
|
||||
mgr = WebSocketManager()
|
||||
mgr._max_history = 3
|
||||
for i in range(5):
|
||||
await mgr.broadcast(f"e{i}", {})
|
||||
assert len(mgr.event_history) == 3
|
||||
assert mgr.event_history[0].event == "e2"
|
||||
|
||||
|
||||
class TestWebSocketManagerConnect:
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_accepts_websocket(self):
|
||||
mgr = WebSocketManager()
|
||||
ws = AsyncMock()
|
||||
await mgr.connect(ws)
|
||||
ws.accept.assert_called_once()
|
||||
assert mgr.connection_count == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_sends_recent_history(self):
|
||||
mgr = WebSocketManager()
|
||||
# Pre-populate history
|
||||
for i in range(3):
|
||||
mgr._event_history.append(
|
||||
WSEvent(event=f"e{i}", data={}, timestamp="t")
|
||||
)
|
||||
ws = AsyncMock()
|
||||
await mgr.connect(ws)
|
||||
# Should have sent 3 history events
|
||||
assert ws.send_text.call_count == 3
|
||||
|
||||
|
||||
class TestWebSocketManagerDisconnect:
|
||||
def test_disconnect_removes_connection(self):
|
||||
mgr = WebSocketManager()
|
||||
ws = MagicMock()
|
||||
mgr._connections = [ws]
|
||||
mgr.disconnect(ws)
|
||||
assert mgr.connection_count == 0
|
||||
|
||||
def test_disconnect_nonexistent_is_safe(self):
|
||||
mgr = WebSocketManager()
|
||||
ws = MagicMock()
|
||||
mgr.disconnect(ws) # Should not raise
|
||||
assert mgr.connection_count == 0
|
||||
|
||||
|
||||
class TestConvenienceBroadcasts:
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_agent_joined(self):
|
||||
mgr = WebSocketManager()
|
||||
ws = AsyncMock()
|
||||
mgr._connections = [ws]
|
||||
await mgr.broadcast_agent_joined("a1", "Echo")
|
||||
msg = json.loads(ws.send_text.call_args[0][0])
|
||||
assert msg["event"] == "agent_joined"
|
||||
assert msg["data"]["agent_id"] == "a1"
|
||||
assert msg["data"]["name"] == "Echo"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_task_posted(self):
|
||||
mgr = WebSocketManager()
|
||||
ws = AsyncMock()
|
||||
mgr._connections = [ws]
|
||||
await mgr.broadcast_task_posted("t1", "Research BTC")
|
||||
msg = json.loads(ws.send_text.call_args[0][0])
|
||||
assert msg["event"] == "task_posted"
|
||||
assert msg["data"]["task_id"] == "t1"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_bid_submitted(self):
|
||||
mgr = WebSocketManager()
|
||||
ws = AsyncMock()
|
||||
mgr._connections = [ws]
|
||||
await mgr.broadcast_bid_submitted("t1", "a1", 42)
|
||||
msg = json.loads(ws.send_text.call_args[0][0])
|
||||
assert msg["event"] == "bid_submitted"
|
||||
assert msg["data"]["bid_sats"] == 42
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_task_assigned(self):
|
||||
mgr = WebSocketManager()
|
||||
ws = AsyncMock()
|
||||
mgr._connections = [ws]
|
||||
await mgr.broadcast_task_assigned("t1", "a1")
|
||||
msg = json.loads(ws.send_text.call_args[0][0])
|
||||
assert msg["event"] == "task_assigned"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_task_completed(self):
|
||||
mgr = WebSocketManager()
|
||||
ws = AsyncMock()
|
||||
mgr._connections = [ws]
|
||||
await mgr.broadcast_task_completed("t1", "a1", "Done!")
|
||||
msg = json.loads(ws.send_text.call_args[0][0])
|
||||
assert msg["event"] == "task_completed"
|
||||
assert msg["data"]["result"] == "Done!"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_agent_left(self):
|
||||
mgr = WebSocketManager()
|
||||
ws = AsyncMock()
|
||||
mgr._connections = [ws]
|
||||
await mgr.broadcast_agent_left("a1", "Echo")
|
||||
msg = json.loads(ws.send_text.call_args[0][0])
|
||||
assert msg["event"] == "agent_left"
|
||||
Reference in New Issue
Block a user