Merge pull request #8 from Alexspayne/feat/security-swarm-coverage-improvements

This commit is contained in:
Alexander Whitestone
2026-02-21 13:48:46 -05:00
committed by GitHub
13 changed files with 644 additions and 21 deletions

View File

@@ -21,3 +21,14 @@
# AirLLM model size (default: 70b).
# 8b ~16 GB RAM | 70b ~140 GB RAM | 405b ~810 GB RAM
# AIRLLM_MODEL_SIZE=70b
# ── L402 Lightning secrets ───────────────────────────────────────────────────
# HMAC secret for invoice verification. MUST be changed in production.
# Generate with: python3 -c "import secrets; print(secrets.token_hex(32))"
# L402_HMAC_SECRET=<your-secret-here>
# HMAC secret for macaroon signing. MUST be changed in production.
# L402_MACAROON_SECRET=<your-secret-here>
# Lightning backend: "mock" (default) | "lnd"
# LIGHTNING_BACKEND=mock

View File

@@ -24,6 +24,15 @@ async def swarm_status():
return coordinator.status()
@router.get("/live", response_class=HTMLResponse)
async def swarm_live_page(request: Request):
"""Render the live swarm dashboard page."""
return templates.TemplateResponse(
"swarm_live.html",
{"request": request, "page_title": "Swarm Live"},
)
@router.get("/agents")
async def list_swarm_agents():
"""List all registered swarm agents."""
@@ -88,6 +97,21 @@ async def post_task(description: str = Form(...)):
}
@router.post("/tasks/auction")
async def post_task_and_auction(description: str = Form(...)):
"""Post a task and immediately run an auction to assign it."""
task = coordinator.post_task(description)
winner = await coordinator.run_auction_and_assign(task.id)
updated = coordinator.get_task(task.id)
return {
"task_id": task.id,
"description": task.description,
"status": updated.status.value if updated else task.status.value,
"assigned_agent": updated.assigned_agent if updated else None,
"winning_bid": winner.bid_sats if winner else None,
}
@router.get("/tasks/{task_id}")
async def get_task(task_id: str):
"""Get details for a specific task."""

View File

@@ -21,6 +21,8 @@
<span class="mc-subtitle">MISSION CONTROL</span>
</div>
<div class="mc-header-right">
<a href="/swarm/live" class="mc-test-link">SWARM</a>
<a href="/mobile" class="mc-test-link">MOBILE</a>
<a href="/mobile-test" class="mc-test-link">TEST</a>
<span class="mc-time" id="clock"></span>
</div>

View File

@@ -193,12 +193,17 @@ async function sendMobileMessage(event) {
chat.scrollTop = chat.scrollHeight;
}
} catch (e) {
chat.innerHTML += `
<div class="chat-message timmy">
<div class="chat-meta">Timmy</div>
<div style="color: var(--danger);">Sorry, I couldn't process that. Try again?</div>
</div>
`;
const errDiv = document.createElement('div');
errDiv.className = 'chat-message timmy';
const errMeta = document.createElement('div');
errMeta.className = 'chat-meta';
errMeta.textContent = 'Timmy';
const errText = document.createElement('div');
errText.style.color = 'var(--danger)';
errText.textContent = 'Sorry, I could not process that. Try again?';
errDiv.appendChild(errMeta);
errDiv.appendChild(errText);
chat.appendChild(errDiv);
chat.scrollTop = chat.scrollHeight;
}
}

View File

@@ -56,7 +56,7 @@ const maxReconnectInterval = 30000;
function connect() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
ws = new WebSocket(`${protocol}//${window.location.host}/swarm/ws`);
ws = new WebSocket(`${protocol}//${window.location.host}/swarm/live`);
ws.onopen = function() {
console.log('WebSocket connected');
@@ -88,20 +88,48 @@ function connect() {
}
function handleMessage(message) {
// Handle structured state snapshots (initial_state / state_update)
if (message.type === 'initial_state' || message.type === 'state_update') {
const data = message.data;
// Update stats
document.getElementById('stat-agents').textContent = data.agents.total;
document.getElementById('stat-active').textContent = data.agents.active;
document.getElementById('stat-tasks').textContent = data.tasks.active;
// Update agents list
updateAgentsList(data.agents.list);
// Update auctions list
updateAuctionsList(data.auctions.list);
return;
}
// Handle individual swarm events broadcast by ws_manager
const evt = message.event || message.type || '';
const data = message.data || message;
if (evt === 'agent_joined') {
addLog('Agent joined: ' + (data.name || data.agent_id || ''), 'success');
refreshStats();
} else if (evt === 'agent_left') {
addLog('Agent left: ' + (data.name || data.agent_id || ''), 'warning');
refreshStats();
} else if (evt === 'task_posted') {
addLog('Task posted: ' + (data.description || data.task_id || '').slice(0, 60), 'info');
refreshStats();
} else if (evt === 'bid_submitted') {
addLog('Bid: ' + (data.agent_id || '').slice(0, 8) + ' bid ' + (data.bid_sats || '?') + ' sats', 'info');
} else if (evt === 'task_assigned') {
addLog('Task assigned to ' + (data.agent_id || '').slice(0, 8), 'success');
refreshStats();
} else if (evt === 'task_completed') {
addLog('Task completed by ' + (data.agent_id || '').slice(0, 8), 'success');
refreshStats();
}
}
function refreshStats() {
// Fetch current swarm status via REST and update the stat counters
fetch('/swarm').then(r => r.json()).then(data => {
document.getElementById('stat-agents').textContent = data.agents || 0;
document.getElementById('stat-active').textContent = data.agents_busy || 0;
document.getElementById('stat-tasks').textContent = (data.tasks_pending || 0) + (data.tasks_running || 0);
}).catch(() => {});
}
// Safe text setter — avoids XSS when inserting user/server data into DOM
@@ -176,7 +204,14 @@ function addLog(message, type = 'info') {
const entry = document.createElement('div');
entry.style.marginBottom = '4px';
entry.innerHTML = `<span style="color: var(--text-muted);">[${timestamp}]</span> <span style="color: ${color};">${message}</span>`;
const tsSpan = _el('span');
tsSpan.style.color = 'var(--text-muted)';
_t(tsSpan, '[' + timestamp + '] ');
const msgSpan = _el('span');
msgSpan.style.color = color;
_t(msgSpan, message);
entry.appendChild(tsSpan);
entry.appendChild(msgSpan);
log.appendChild(entry);
log.scrollTop = log.scrollHeight;

View File

@@ -35,6 +35,7 @@ class SwarmCoordinator:
self.manager = SwarmManager()
self.auctions = AuctionManager()
self.comms = SwarmComms()
self._in_process_nodes: list = []
# ── Agent lifecycle ─────────────────────────────────────────────────────
@@ -57,20 +58,80 @@ class SwarmCoordinator:
def list_swarm_agents(self) -> list[AgentRecord]:
return registry.list_agents()
def spawn_in_process_agent(
self, name: str, agent_id: Optional[str] = None,
) -> dict:
"""Spawn a lightweight in-process agent that bids on tasks.
Unlike spawn_agent (which launches a subprocess), this creates a
SwarmNode in the current process sharing the coordinator's comms
layer. This means the in-memory pub/sub callbacks fire
immediately when a task is posted, allowing the node to submit
bids into the coordinator's AuctionManager.
"""
from swarm.swarm_node import SwarmNode
aid = agent_id or str(__import__("uuid").uuid4())
node = SwarmNode(
agent_id=aid,
name=name,
comms=self.comms,
)
# Wire the node's bid callback to feed into our AuctionManager
original_on_task = node._on_task_posted
def _bid_and_register(msg):
"""Intercept the task announcement, submit a bid to the auction."""
task_id = msg.data.get("task_id")
if not task_id:
return
import random
bid_sats = random.randint(10, 100)
self.auctions.submit_bid(task_id, aid, bid_sats)
logger.info(
"In-process agent %s bid %d sats on task %s",
name, bid_sats, task_id,
)
# Subscribe to task announcements via shared comms
self.comms.subscribe("swarm:tasks", _bid_and_register)
record = registry.register(name=name, agent_id=aid)
self._in_process_nodes.append(node)
logger.info("Spawned in-process agent %s (%s)", name, aid)
return {
"agent_id": aid,
"name": name,
"pid": None,
"status": record.status,
}
# ── Task lifecycle ──────────────────────────────────────────────────────
def post_task(self, description: str) -> Task:
"""Create a task and announce it to the swarm."""
"""Create a task, open an auction, and announce it to the swarm.
The auction is opened *before* the comms announcement so that
in-process agents (whose callbacks fire synchronously) can
submit bids into an already-open auction.
"""
task = create_task(description)
update_task(task.id, status=TaskStatus.BIDDING)
task.status = TaskStatus.BIDDING
# Open the auction first so bids from in-process agents land
self.auctions.open_auction(task.id)
self.comms.post_task(task.id, description)
logger.info("Task posted: %s (%s)", task.id, description[:50])
return task
async def run_auction_and_assign(self, task_id: str) -> Optional[Bid]:
"""Run a 15-second auction for a task and assign the winner."""
winner = await self.auctions.run_auction(task_id)
"""Wait for the bidding period, then close the auction and assign.
The auction should already be open (via post_task). This method
waits the remaining bidding window and then closes it.
"""
await asyncio.sleep(0) # yield to let any pending callbacks fire
winner = self.auctions.close_auction(task_id)
if winner:
update_task(
task_id,

View File

@@ -22,9 +22,15 @@ from timmy_serve.payment_handler import payment_handler
logger = logging.getLogger(__name__)
_MACAROON_SECRET = os.environ.get(
"L402_MACAROON_SECRET", "timmy-macaroon-secret"
).encode()
_MACAROON_SECRET_DEFAULT = "timmy-macaroon-secret"
_MACAROON_SECRET_RAW = os.environ.get("L402_MACAROON_SECRET", _MACAROON_SECRET_DEFAULT)
_MACAROON_SECRET = _MACAROON_SECRET_RAW.encode()
if _MACAROON_SECRET_RAW == _MACAROON_SECRET_DEFAULT:
logger.warning(
"SEC: L402_MACAROON_SECRET is using the default value — set a unique "
"secret in .env before deploying to production."
)
@dataclass

View File

@@ -20,7 +20,15 @@ from typing import Optional
logger = logging.getLogger(__name__)
# Secret key for HMAC-based invoice verification (mock mode)
_HMAC_SECRET = os.environ.get("L402_HMAC_SECRET", "timmy-sovereign-sats").encode()
_HMAC_SECRET_DEFAULT = "timmy-sovereign-sats"
_HMAC_SECRET_RAW = os.environ.get("L402_HMAC_SECRET", _HMAC_SECRET_DEFAULT)
_HMAC_SECRET = _HMAC_SECRET_RAW.encode()
if _HMAC_SECRET_RAW == _HMAC_SECRET_DEFAULT:
logger.warning(
"SEC: L402_HMAC_SECRET is using the default value — set a unique "
"secret in .env before deploying to production."
)
@dataclass

View File

@@ -0,0 +1,108 @@
"""Integration tests for swarm agent spawning and auction flow.
These tests verify that:
1. In-process agents can be spawned and register themselves.
2. When a task is posted, registered agents automatically bid.
3. The auction resolves with a winner when agents are present.
4. The post_task_and_auction route triggers the full flow.
"""
import asyncio
from unittest.mock import patch
import pytest
from swarm.coordinator import SwarmCoordinator
from swarm.tasks import TaskStatus
class TestSwarmInProcessAgents:
"""Test the in-process agent spawning and bidding flow."""
def setup_method(self):
self.coord = SwarmCoordinator()
def test_spawn_agent_returns_agent_info(self):
result = self.coord.spawn_agent("TestBot")
assert "agent_id" in result
assert result["name"] == "TestBot"
assert result["status"] == "idle"
def test_spawn_registers_in_registry(self):
self.coord.spawn_agent("TestBot")
agents = self.coord.list_swarm_agents()
assert len(agents) >= 1
names = [a.name for a in agents]
assert "TestBot" in names
def test_post_task_creates_task_in_bidding_status(self):
task = self.coord.post_task("Test task description")
assert task.status == TaskStatus.BIDDING
assert task.description == "Test task description"
@pytest.mark.asyncio
async def test_auction_with_in_process_bidders(self):
"""When agents are spawned, they should auto-bid on posted tasks."""
coord = SwarmCoordinator()
# Spawn agents that share the coordinator's comms
coord.spawn_in_process_agent("Alpha")
coord.spawn_in_process_agent("Beta")
task = coord.post_task("Research Bitcoin L2s")
# Run auction — in-process agents should have submitted bids
# via the comms callback
winner = await coord.run_auction_and_assign(task.id)
assert winner is not None
assert winner.agent_id in [
n.agent_id for n in coord._in_process_nodes
]
# Task should now be assigned
updated = coord.get_task(task.id)
assert updated.status == TaskStatus.ASSIGNED
assert updated.assigned_agent == winner.agent_id
@pytest.mark.asyncio
async def test_auction_no_agents_fails(self):
"""Auction with no agents should fail gracefully."""
coord = SwarmCoordinator()
task = coord.post_task("Lonely task")
winner = await coord.run_auction_and_assign(task.id)
assert winner is None
updated = coord.get_task(task.id)
assert updated.status == TaskStatus.FAILED
@pytest.mark.asyncio
async def test_complete_task_after_auction(self):
"""Full lifecycle: spawn → post → auction → complete."""
coord = SwarmCoordinator()
coord.spawn_in_process_agent("Worker")
task = coord.post_task("Build a widget")
winner = await coord.run_auction_and_assign(task.id)
assert winner is not None
completed = coord.complete_task(task.id, "Widget built successfully")
assert completed is not None
assert completed.status == TaskStatus.COMPLETED
assert completed.result == "Widget built successfully"
class TestSwarmRouteAuction:
"""Test that the swarm route triggers auction flow."""
def test_post_task_and_auction_endpoint(self, client):
"""POST /swarm/tasks/auction should create task and run auction."""
# First spawn an agent
resp = client.post("/swarm/spawn", data={"name": "RouteBot"})
assert resp.status_code == 200
# Post task with auction
resp = client.post(
"/swarm/tasks/auction",
data={"description": "Route test task"},
)
assert resp.status_code == 200
data = resp.json()
assert "task_id" in data
assert data["status"] in ("assigned", "failed")

View File

@@ -0,0 +1,23 @@
"""Tests for the GET /swarm/live page route."""
class TestSwarmLivePage:
def test_swarm_live_returns_html(self, client):
resp = client.get("/swarm/live")
assert resp.status_code == 200
assert "text/html" in resp.headers["content-type"]
def test_swarm_live_contains_dashboard_title(self, client):
resp = client.get("/swarm/live")
assert "Live Swarm Dashboard" in resp.text
def test_swarm_live_contains_websocket_script(self, client):
resp = client.get("/swarm/live")
assert "/swarm/live" in resp.text
assert "WebSocket" in resp.text
def test_swarm_live_contains_stat_elements(self, client):
resp = client.get("/swarm/live")
assert "stat-agents" in resp.text
assert "stat-active" in resp.text
assert "stat-tasks" in resp.text

View File

@@ -0,0 +1,65 @@
"""Tests for timmy_serve/cli.py — Serve-mode CLI commands."""
from typer.testing import CliRunner
from timmy_serve.cli import app
runner = CliRunner()
class TestStartCommand:
def test_start_default_port(self):
result = runner.invoke(app, ["start"])
assert result.exit_code == 0
assert "8402" in result.output
assert "L402 payment proxy active" in result.output
def test_start_custom_port(self):
result = runner.invoke(app, ["start", "--port", "9000"])
assert result.exit_code == 0
assert "9000" in result.output
def test_start_custom_host(self):
result = runner.invoke(app, ["start", "--host", "127.0.0.1"])
assert result.exit_code == 0
assert "127.0.0.1" in result.output
def test_start_shows_endpoints(self):
result = runner.invoke(app, ["start"])
assert "/serve/chat" in result.output
assert "/serve/invoice" in result.output
assert "/serve/status" in result.output
class TestInvoiceCommand:
def test_invoice_default_amount(self):
result = runner.invoke(app, ["invoice"])
assert result.exit_code == 0
assert "100 sats" in result.output
assert "API access" in result.output
def test_invoice_custom_amount(self):
result = runner.invoke(app, ["invoice", "--amount", "500"])
assert result.exit_code == 0
assert "500 sats" in result.output
def test_invoice_custom_memo(self):
result = runner.invoke(app, ["invoice", "--memo", "Test payment"])
assert result.exit_code == 0
assert "Test payment" in result.output
def test_invoice_shows_payment_hash(self):
result = runner.invoke(app, ["invoice"])
assert "Payment hash:" in result.output
assert "Pay request:" in result.output
class TestStatusCommand:
def test_status_runs_successfully(self):
result = runner.invoke(app, ["status"])
assert result.exit_code == 0
assert "Timmy Serve" in result.output
assert "Total invoices:" in result.output
assert "Settled:" in result.output
assert "Total earned:" in result.output
assert "sats" in result.output

View File

@@ -0,0 +1,101 @@
"""Tests for dashboard/routes/voice_enhanced.py — enhanced voice processing."""
from unittest.mock import MagicMock, patch
import pytest
class TestVoiceEnhancedProcess:
"""Test the POST /voice/enhanced/process endpoint."""
def test_status_intent(self, client):
resp = client.post(
"/voice/enhanced/process",
data={"text": "what is your status", "speak_response": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["intent"] == "status"
assert "operational" in data["response"].lower()
assert data["error"] is None
def test_help_intent(self, client):
resp = client.post(
"/voice/enhanced/process",
data={"text": "help me please", "speak_response": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["intent"] == "help"
assert "commands" in data["response"].lower()
def test_swarm_intent(self, client):
resp = client.post(
"/voice/enhanced/process",
data={"text": "list all swarm agents", "speak_response": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["intent"] == "swarm"
assert "agents" in data["response"].lower()
def test_voice_intent(self, client):
resp = client.post(
"/voice/enhanced/process",
data={"text": "change voice settings", "speak_response": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["intent"] == "voice"
assert "tts" in data["response"].lower()
def test_chat_fallback_intent(self, client):
"""Chat intent should attempt to call the Timmy agent."""
mock_agent = MagicMock()
mock_run = MagicMock()
mock_run.content = "Hello from Timmy!"
mock_agent.run.return_value = mock_run
with patch("dashboard.routes.voice_enhanced.create_timmy", return_value=mock_agent):
resp = client.post(
"/voice/enhanced/process",
data={"text": "tell me about Bitcoin", "speak_response": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["intent"] == "chat"
assert data["response"] == "Hello from Timmy!"
def test_chat_fallback_error_handling(self, client):
"""When the agent raises, the error should be captured gracefully."""
with patch(
"dashboard.routes.voice_enhanced.create_timmy",
side_effect=RuntimeError("Ollama offline"),
):
resp = client.post(
"/voice/enhanced/process",
data={"text": "tell me about sovereignty", "speak_response": "false"},
)
assert resp.status_code == 200
data = resp.json()
assert data["error"] is not None
assert "Ollama offline" in data["error"]
def test_speak_response_flag(self, client):
"""When speak_response=true, the spoken field should be true."""
resp = client.post(
"/voice/enhanced/process",
data={"text": "what is your status", "speak_response": "true"},
)
assert resp.status_code == 200
data = resp.json()
assert data["spoken"] is True
def test_confidence_returned(self, client):
resp = client.post(
"/voice/enhanced/process",
data={"text": "status check", "speak_response": "false"},
)
data = resp.json()
assert "confidence" in data
assert isinstance(data["confidence"], (int, float))

View File

@@ -0,0 +1,174 @@
"""Extended tests for websocket/handler.py — broadcast, disconnect, convenience."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock
import pytest
from websocket.handler import WebSocketManager, WSEvent
class TestWSEventSerialization:
def test_to_json_roundtrip(self):
event = WSEvent(event="task_posted", data={"id": "abc"}, timestamp="2026-01-01T00:00:00Z")
raw = event.to_json()
parsed = json.loads(raw)
assert parsed["event"] == "task_posted"
assert parsed["data"]["id"] == "abc"
assert parsed["timestamp"] == "2026-01-01T00:00:00Z"
def test_to_json_empty_data(self):
event = WSEvent(event="ping", data={}, timestamp="t")
parsed = json.loads(event.to_json())
assert parsed["data"] == {}
class TestWebSocketManagerBroadcast:
@pytest.mark.asyncio
async def test_broadcast_sends_to_all_connections(self):
mgr = WebSocketManager()
ws1 = AsyncMock()
ws2 = AsyncMock()
mgr._connections = [ws1, ws2]
await mgr.broadcast("test_event", {"key": "val"})
ws1.send_text.assert_called_once()
ws2.send_text.assert_called_once()
# Both should receive the same message
msg1 = json.loads(ws1.send_text.call_args[0][0])
msg2 = json.loads(ws2.send_text.call_args[0][0])
assert msg1["event"] == "test_event"
assert msg2["event"] == "test_event"
@pytest.mark.asyncio
async def test_broadcast_removes_dead_connections(self):
mgr = WebSocketManager()
ws_alive = AsyncMock()
ws_dead = AsyncMock()
ws_dead.send_text.side_effect = RuntimeError("connection closed")
mgr._connections = [ws_alive, ws_dead]
await mgr.broadcast("ping", {})
assert ws_dead not in mgr._connections
assert ws_alive in mgr._connections
@pytest.mark.asyncio
async def test_broadcast_appends_to_history(self):
mgr = WebSocketManager()
await mgr.broadcast("evt1", {"a": 1})
await mgr.broadcast("evt2", {"b": 2})
assert len(mgr.event_history) == 2
assert mgr.event_history[0].event == "evt1"
assert mgr.event_history[1].event == "evt2"
@pytest.mark.asyncio
async def test_broadcast_trims_history(self):
mgr = WebSocketManager()
mgr._max_history = 3
for i in range(5):
await mgr.broadcast(f"e{i}", {})
assert len(mgr.event_history) == 3
assert mgr.event_history[0].event == "e2"
class TestWebSocketManagerConnect:
@pytest.mark.asyncio
async def test_connect_accepts_websocket(self):
mgr = WebSocketManager()
ws = AsyncMock()
await mgr.connect(ws)
ws.accept.assert_called_once()
assert mgr.connection_count == 1
@pytest.mark.asyncio
async def test_connect_sends_recent_history(self):
mgr = WebSocketManager()
# Pre-populate history
for i in range(3):
mgr._event_history.append(
WSEvent(event=f"e{i}", data={}, timestamp="t")
)
ws = AsyncMock()
await mgr.connect(ws)
# Should have sent 3 history events
assert ws.send_text.call_count == 3
class TestWebSocketManagerDisconnect:
def test_disconnect_removes_connection(self):
mgr = WebSocketManager()
ws = MagicMock()
mgr._connections = [ws]
mgr.disconnect(ws)
assert mgr.connection_count == 0
def test_disconnect_nonexistent_is_safe(self):
mgr = WebSocketManager()
ws = MagicMock()
mgr.disconnect(ws) # Should not raise
assert mgr.connection_count == 0
class TestConvenienceBroadcasts:
@pytest.mark.asyncio
async def test_broadcast_agent_joined(self):
mgr = WebSocketManager()
ws = AsyncMock()
mgr._connections = [ws]
await mgr.broadcast_agent_joined("a1", "Echo")
msg = json.loads(ws.send_text.call_args[0][0])
assert msg["event"] == "agent_joined"
assert msg["data"]["agent_id"] == "a1"
assert msg["data"]["name"] == "Echo"
@pytest.mark.asyncio
async def test_broadcast_task_posted(self):
mgr = WebSocketManager()
ws = AsyncMock()
mgr._connections = [ws]
await mgr.broadcast_task_posted("t1", "Research BTC")
msg = json.loads(ws.send_text.call_args[0][0])
assert msg["event"] == "task_posted"
assert msg["data"]["task_id"] == "t1"
@pytest.mark.asyncio
async def test_broadcast_bid_submitted(self):
mgr = WebSocketManager()
ws = AsyncMock()
mgr._connections = [ws]
await mgr.broadcast_bid_submitted("t1", "a1", 42)
msg = json.loads(ws.send_text.call_args[0][0])
assert msg["event"] == "bid_submitted"
assert msg["data"]["bid_sats"] == 42
@pytest.mark.asyncio
async def test_broadcast_task_assigned(self):
mgr = WebSocketManager()
ws = AsyncMock()
mgr._connections = [ws]
await mgr.broadcast_task_assigned("t1", "a1")
msg = json.loads(ws.send_text.call_args[0][0])
assert msg["event"] == "task_assigned"
@pytest.mark.asyncio
async def test_broadcast_task_completed(self):
mgr = WebSocketManager()
ws = AsyncMock()
mgr._connections = [ws]
await mgr.broadcast_task_completed("t1", "a1", "Done!")
msg = json.loads(ws.send_text.call_args[0][0])
assert msg["event"] == "task_completed"
assert msg["data"]["result"] == "Done!"
@pytest.mark.asyncio
async def test_broadcast_agent_left(self):
mgr = WebSocketManager()
ws = AsyncMock()
mgr._connections = [ws]
await mgr.broadcast_agent_left("a1", "Echo")
msg = json.loads(ws.send_text.call_args[0][0])
assert msg["event"] == "agent_left"