From 2e055635a89bbdc4190e154665d8ec871010abe1 Mon Sep 17 00:00:00 2001 From: Manus AI Date: Sat, 21 Feb 2026 13:45:24 -0500 Subject: [PATCH] test: close coverage gaps for timmy_serve CLI, voice_enhanced, WebSocket, and swarm live page - Add 8 tests for timmy_serve/cli.py (start, invoice, status commands) covering default args, custom args, and output validation - Add 8 tests for voice_enhanced route covering all intent types (status, help, swarm, voice, chat fallback) plus error handling - Add 17 tests for websocket/handler.py covering broadcast to multiple clients, dead connection cleanup, history trimming, connect/disconnect, and all convenience broadcast methods - Add 4 tests for the new GET /swarm/live page route Total new tests: 37 --- tests/test_swarm_live_page.py | 23 ++++ tests/test_timmy_serve_cli.py | 65 ++++++++++++ tests/test_voice_enhanced.py | 101 ++++++++++++++++++ tests/test_websocket_extended.py | 174 +++++++++++++++++++++++++++++++ 4 files changed, 363 insertions(+) create mode 100644 tests/test_swarm_live_page.py create mode 100644 tests/test_timmy_serve_cli.py create mode 100644 tests/test_voice_enhanced.py create mode 100644 tests/test_websocket_extended.py diff --git a/tests/test_swarm_live_page.py b/tests/test_swarm_live_page.py new file mode 100644 index 0000000..a10ce52 --- /dev/null +++ b/tests/test_swarm_live_page.py @@ -0,0 +1,23 @@ +"""Tests for the GET /swarm/live page route.""" + + +class TestSwarmLivePage: + def test_swarm_live_returns_html(self, client): + resp = client.get("/swarm/live") + assert resp.status_code == 200 + assert "text/html" in resp.headers["content-type"] + + def test_swarm_live_contains_dashboard_title(self, client): + resp = client.get("/swarm/live") + assert "Live Swarm Dashboard" in resp.text + + def test_swarm_live_contains_websocket_script(self, client): + resp = client.get("/swarm/live") + assert "/swarm/live" in resp.text + assert "WebSocket" in resp.text + + def test_swarm_live_contains_stat_elements(self, client): + resp = client.get("/swarm/live") + assert "stat-agents" in resp.text + assert "stat-active" in resp.text + assert "stat-tasks" in resp.text diff --git a/tests/test_timmy_serve_cli.py b/tests/test_timmy_serve_cli.py new file mode 100644 index 0000000..3be8da8 --- /dev/null +++ b/tests/test_timmy_serve_cli.py @@ -0,0 +1,65 @@ +"""Tests for timmy_serve/cli.py — Serve-mode CLI commands.""" + +from typer.testing import CliRunner + +from timmy_serve.cli import app + +runner = CliRunner() + + +class TestStartCommand: + def test_start_default_port(self): + result = runner.invoke(app, ["start"]) + assert result.exit_code == 0 + assert "8402" in result.output + assert "L402 payment proxy active" in result.output + + def test_start_custom_port(self): + result = runner.invoke(app, ["start", "--port", "9000"]) + assert result.exit_code == 0 + assert "9000" in result.output + + def test_start_custom_host(self): + result = runner.invoke(app, ["start", "--host", "127.0.0.1"]) + assert result.exit_code == 0 + assert "127.0.0.1" in result.output + + def test_start_shows_endpoints(self): + result = runner.invoke(app, ["start"]) + assert "/serve/chat" in result.output + assert "/serve/invoice" in result.output + assert "/serve/status" in result.output + + +class TestInvoiceCommand: + def test_invoice_default_amount(self): + result = runner.invoke(app, ["invoice"]) + assert result.exit_code == 0 + assert "100 sats" in result.output + assert "API access" in result.output + + def test_invoice_custom_amount(self): + result = runner.invoke(app, ["invoice", "--amount", "500"]) + assert result.exit_code == 0 + assert "500 sats" in result.output + + def test_invoice_custom_memo(self): + result = runner.invoke(app, ["invoice", "--memo", "Test payment"]) + assert result.exit_code == 0 + assert "Test payment" in result.output + + def test_invoice_shows_payment_hash(self): + result = runner.invoke(app, ["invoice"]) + assert "Payment hash:" in result.output + assert "Pay request:" in result.output + + +class TestStatusCommand: + def test_status_runs_successfully(self): + result = runner.invoke(app, ["status"]) + assert result.exit_code == 0 + assert "Timmy Serve" in result.output + assert "Total invoices:" in result.output + assert "Settled:" in result.output + assert "Total earned:" in result.output + assert "sats" in result.output diff --git a/tests/test_voice_enhanced.py b/tests/test_voice_enhanced.py new file mode 100644 index 0000000..0ed802d --- /dev/null +++ b/tests/test_voice_enhanced.py @@ -0,0 +1,101 @@ +"""Tests for dashboard/routes/voice_enhanced.py — enhanced voice processing.""" + +from unittest.mock import MagicMock, patch + +import pytest + + +class TestVoiceEnhancedProcess: + """Test the POST /voice/enhanced/process endpoint.""" + + def test_status_intent(self, client): + resp = client.post( + "/voice/enhanced/process", + data={"text": "what is your status", "speak_response": "false"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["intent"] == "status" + assert "operational" in data["response"].lower() + assert data["error"] is None + + def test_help_intent(self, client): + resp = client.post( + "/voice/enhanced/process", + data={"text": "help me please", "speak_response": "false"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["intent"] == "help" + assert "commands" in data["response"].lower() + + def test_swarm_intent(self, client): + resp = client.post( + "/voice/enhanced/process", + data={"text": "list all swarm agents", "speak_response": "false"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["intent"] == "swarm" + assert "agents" in data["response"].lower() + + def test_voice_intent(self, client): + resp = client.post( + "/voice/enhanced/process", + data={"text": "change voice settings", "speak_response": "false"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["intent"] == "voice" + assert "tts" in data["response"].lower() + + def test_chat_fallback_intent(self, client): + """Chat intent should attempt to call the Timmy agent.""" + mock_agent = MagicMock() + mock_run = MagicMock() + mock_run.content = "Hello from Timmy!" + mock_agent.run.return_value = mock_run + + with patch("dashboard.routes.voice_enhanced.create_timmy", return_value=mock_agent): + resp = client.post( + "/voice/enhanced/process", + data={"text": "tell me about Bitcoin", "speak_response": "false"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["intent"] == "chat" + assert data["response"] == "Hello from Timmy!" + + def test_chat_fallback_error_handling(self, client): + """When the agent raises, the error should be captured gracefully.""" + with patch( + "dashboard.routes.voice_enhanced.create_timmy", + side_effect=RuntimeError("Ollama offline"), + ): + resp = client.post( + "/voice/enhanced/process", + data={"text": "tell me about sovereignty", "speak_response": "false"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["error"] is not None + assert "Ollama offline" in data["error"] + + def test_speak_response_flag(self, client): + """When speak_response=true, the spoken field should be true.""" + resp = client.post( + "/voice/enhanced/process", + data={"text": "what is your status", "speak_response": "true"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["spoken"] is True + + def test_confidence_returned(self, client): + resp = client.post( + "/voice/enhanced/process", + data={"text": "status check", "speak_response": "false"}, + ) + data = resp.json() + assert "confidence" in data + assert isinstance(data["confidence"], (int, float)) diff --git a/tests/test_websocket_extended.py b/tests/test_websocket_extended.py new file mode 100644 index 0000000..058f8a4 --- /dev/null +++ b/tests/test_websocket_extended.py @@ -0,0 +1,174 @@ +"""Extended tests for websocket/handler.py — broadcast, disconnect, convenience.""" + +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from websocket.handler import WebSocketManager, WSEvent + + +class TestWSEventSerialization: + def test_to_json_roundtrip(self): + event = WSEvent(event="task_posted", data={"id": "abc"}, timestamp="2026-01-01T00:00:00Z") + raw = event.to_json() + parsed = json.loads(raw) + assert parsed["event"] == "task_posted" + assert parsed["data"]["id"] == "abc" + assert parsed["timestamp"] == "2026-01-01T00:00:00Z" + + def test_to_json_empty_data(self): + event = WSEvent(event="ping", data={}, timestamp="t") + parsed = json.loads(event.to_json()) + assert parsed["data"] == {} + + +class TestWebSocketManagerBroadcast: + @pytest.mark.asyncio + async def test_broadcast_sends_to_all_connections(self): + mgr = WebSocketManager() + ws1 = AsyncMock() + ws2 = AsyncMock() + mgr._connections = [ws1, ws2] + + await mgr.broadcast("test_event", {"key": "val"}) + + ws1.send_text.assert_called_once() + ws2.send_text.assert_called_once() + # Both should receive the same message + msg1 = json.loads(ws1.send_text.call_args[0][0]) + msg2 = json.loads(ws2.send_text.call_args[0][0]) + assert msg1["event"] == "test_event" + assert msg2["event"] == "test_event" + + @pytest.mark.asyncio + async def test_broadcast_removes_dead_connections(self): + mgr = WebSocketManager() + ws_alive = AsyncMock() + ws_dead = AsyncMock() + ws_dead.send_text.side_effect = RuntimeError("connection closed") + mgr._connections = [ws_alive, ws_dead] + + await mgr.broadcast("ping", {}) + + assert ws_dead not in mgr._connections + assert ws_alive in mgr._connections + + @pytest.mark.asyncio + async def test_broadcast_appends_to_history(self): + mgr = WebSocketManager() + await mgr.broadcast("evt1", {"a": 1}) + await mgr.broadcast("evt2", {"b": 2}) + + assert len(mgr.event_history) == 2 + assert mgr.event_history[0].event == "evt1" + assert mgr.event_history[1].event == "evt2" + + @pytest.mark.asyncio + async def test_broadcast_trims_history(self): + mgr = WebSocketManager() + mgr._max_history = 3 + for i in range(5): + await mgr.broadcast(f"e{i}", {}) + assert len(mgr.event_history) == 3 + assert mgr.event_history[0].event == "e2" + + +class TestWebSocketManagerConnect: + @pytest.mark.asyncio + async def test_connect_accepts_websocket(self): + mgr = WebSocketManager() + ws = AsyncMock() + await mgr.connect(ws) + ws.accept.assert_called_once() + assert mgr.connection_count == 1 + + @pytest.mark.asyncio + async def test_connect_sends_recent_history(self): + mgr = WebSocketManager() + # Pre-populate history + for i in range(3): + mgr._event_history.append( + WSEvent(event=f"e{i}", data={}, timestamp="t") + ) + ws = AsyncMock() + await mgr.connect(ws) + # Should have sent 3 history events + assert ws.send_text.call_count == 3 + + +class TestWebSocketManagerDisconnect: + def test_disconnect_removes_connection(self): + mgr = WebSocketManager() + ws = MagicMock() + mgr._connections = [ws] + mgr.disconnect(ws) + assert mgr.connection_count == 0 + + def test_disconnect_nonexistent_is_safe(self): + mgr = WebSocketManager() + ws = MagicMock() + mgr.disconnect(ws) # Should not raise + assert mgr.connection_count == 0 + + +class TestConvenienceBroadcasts: + @pytest.mark.asyncio + async def test_broadcast_agent_joined(self): + mgr = WebSocketManager() + ws = AsyncMock() + mgr._connections = [ws] + await mgr.broadcast_agent_joined("a1", "Echo") + msg = json.loads(ws.send_text.call_args[0][0]) + assert msg["event"] == "agent_joined" + assert msg["data"]["agent_id"] == "a1" + assert msg["data"]["name"] == "Echo" + + @pytest.mark.asyncio + async def test_broadcast_task_posted(self): + mgr = WebSocketManager() + ws = AsyncMock() + mgr._connections = [ws] + await mgr.broadcast_task_posted("t1", "Research BTC") + msg = json.loads(ws.send_text.call_args[0][0]) + assert msg["event"] == "task_posted" + assert msg["data"]["task_id"] == "t1" + + @pytest.mark.asyncio + async def test_broadcast_bid_submitted(self): + mgr = WebSocketManager() + ws = AsyncMock() + mgr._connections = [ws] + await mgr.broadcast_bid_submitted("t1", "a1", 42) + msg = json.loads(ws.send_text.call_args[0][0]) + assert msg["event"] == "bid_submitted" + assert msg["data"]["bid_sats"] == 42 + + @pytest.mark.asyncio + async def test_broadcast_task_assigned(self): + mgr = WebSocketManager() + ws = AsyncMock() + mgr._connections = [ws] + await mgr.broadcast_task_assigned("t1", "a1") + msg = json.loads(ws.send_text.call_args[0][0]) + assert msg["event"] == "task_assigned" + + @pytest.mark.asyncio + async def test_broadcast_task_completed(self): + mgr = WebSocketManager() + ws = AsyncMock() + mgr._connections = [ws] + await mgr.broadcast_task_completed("t1", "a1", "Done!") + msg = json.loads(ws.send_text.call_args[0][0]) + assert msg["event"] == "task_completed" + assert msg["data"]["result"] == "Done!" + + @pytest.mark.asyncio + async def test_broadcast_agent_left(self): + mgr = WebSocketManager() + ws = AsyncMock() + mgr._connections = [ws] + await mgr.broadcast_agent_left("a1", "Echo") + msg = json.loads(ws.send_text.call_args[0][0]) + assert msg["event"] == "agent_left"