forked from Rockachopa/Timmy-time-dashboard
Model upgrade: - qwen2.5:14b → qwen3.5:latest across config, tools, and docs - Added qwen3.5 to multimodal model registry Self-hosted Gitea CI: - .gitea/workflows/tests.yml: lint + test jobs via act_runner - Unified Dockerfile: pre-baked deps from poetry.lock for fast CI - sitepackages=true in tox for ~2s dep resolution (was ~40s) - OLLAMA_URL set to dead port in CI to prevent real LLM calls Test isolation fixes: - Smoke test fixture mocks create_timmy (was hitting real Ollama) - WebSocket sends initial_state before joining broadcast pool (race fix) - Tests use settings.ollama_model/url instead of hardcoded values - skip_ci marker for Ollama-dependent tests, excluded in CI tox envs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
175 lines
5.1 KiB
Python
175 lines
5.1 KiB
Python
"""WebSocket manager for the live swarm dashboard.
|
|
|
|
Manages WebSocket connections and broadcasts swarm events to all
|
|
connected clients in real time. Used by the /swarm/live route
|
|
to provide a live feed of agent activity, task auctions, and
|
|
system events.
|
|
"""
|
|
|
|
import collections
|
|
import json
|
|
import logging
|
|
from dataclasses import asdict, dataclass
|
|
from datetime import UTC, datetime
|
|
|
|
from fastapi import WebSocket
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class WSEvent:
|
|
"""A WebSocket event to broadcast to connected clients."""
|
|
|
|
event: str
|
|
data: dict
|
|
timestamp: str
|
|
|
|
def to_json(self) -> str:
|
|
return json.dumps(asdict(self))
|
|
|
|
|
|
class WebSocketManager:
|
|
"""Manages WebSocket connections and event broadcasting."""
|
|
|
|
def __init__(self) -> None:
|
|
self._connections: list[WebSocket] = []
|
|
self._event_history: collections.deque[WSEvent] = collections.deque(maxlen=100)
|
|
|
|
async def connect(self, websocket: WebSocket, *, accept: bool = True) -> None:
|
|
"""Accept a new WebSocket connection and add it to the broadcast pool.
|
|
|
|
Args:
|
|
websocket: The WebSocket to register.
|
|
accept: If False, skip the accept() call (caller already accepted).
|
|
"""
|
|
if accept:
|
|
await websocket.accept()
|
|
self._connections.append(websocket)
|
|
logger.info(
|
|
"WebSocket connected — %d active connections",
|
|
len(self._connections),
|
|
)
|
|
# Send recent history to the new client
|
|
for event in list(self._event_history)[-20:]:
|
|
try:
|
|
await websocket.send_text(event.to_json())
|
|
except Exception:
|
|
break
|
|
|
|
def disconnect(self, websocket: WebSocket) -> None:
|
|
"""Remove a disconnected WebSocket."""
|
|
if websocket in self._connections:
|
|
self._connections.remove(websocket)
|
|
logger.info(
|
|
"WebSocket disconnected — %d active connections",
|
|
len(self._connections),
|
|
)
|
|
|
|
async def broadcast(self, event: str, data: dict | None = None) -> None:
|
|
"""Broadcast an event to all connected WebSocket clients."""
|
|
ws_event = WSEvent(
|
|
event=event,
|
|
data=data or {},
|
|
timestamp=datetime.now(UTC).isoformat(),
|
|
)
|
|
self._event_history.append(ws_event)
|
|
|
|
message = ws_event.to_json()
|
|
disconnected = []
|
|
|
|
for ws in self._connections:
|
|
try:
|
|
await ws.send_text(message)
|
|
except ConnectionError:
|
|
disconnected.append(ws)
|
|
except Exception:
|
|
logger.warning("Unexpected WebSocket send error", exc_info=True)
|
|
disconnected.append(ws)
|
|
|
|
# Clean up dead connections
|
|
for ws in disconnected:
|
|
self.disconnect(ws)
|
|
|
|
async def broadcast_agent_joined(self, agent_id: str, name: str) -> None:
|
|
await self.broadcast("agent_joined", {"agent_id": agent_id, "name": name})
|
|
|
|
async def broadcast_agent_left(self, agent_id: str, name: str) -> None:
|
|
await self.broadcast("agent_left", {"agent_id": agent_id, "name": name})
|
|
|
|
async def broadcast_task_posted(self, task_id: str, description: str) -> None:
|
|
await self.broadcast(
|
|
"task_posted",
|
|
{
|
|
"task_id": task_id,
|
|
"description": description,
|
|
},
|
|
)
|
|
|
|
async def broadcast_bid_submitted(self, task_id: str, agent_id: str, bid_sats: int) -> None:
|
|
await self.broadcast(
|
|
"bid_submitted",
|
|
{
|
|
"task_id": task_id,
|
|
"agent_id": agent_id,
|
|
"bid_sats": bid_sats,
|
|
},
|
|
)
|
|
|
|
async def broadcast_task_assigned(self, task_id: str, agent_id: str) -> None:
|
|
await self.broadcast(
|
|
"task_assigned",
|
|
{
|
|
"task_id": task_id,
|
|
"agent_id": agent_id,
|
|
},
|
|
)
|
|
|
|
async def broadcast_task_completed(self, task_id: str, agent_id: str, result: str) -> None:
|
|
await self.broadcast(
|
|
"task_completed",
|
|
{
|
|
"task_id": task_id,
|
|
"agent_id": agent_id,
|
|
"result": result[:200],
|
|
},
|
|
)
|
|
|
|
@property
|
|
def connection_count(self) -> int:
|
|
return len(self._connections)
|
|
|
|
async def broadcast_json(self, data: dict) -> int:
|
|
"""Broadcast raw JSON data to all connected clients.
|
|
|
|
Args:
|
|
data: Dictionary to send as JSON
|
|
|
|
Returns:
|
|
Number of clients notified
|
|
"""
|
|
message = json.dumps(data)
|
|
disconnected = []
|
|
count = 0
|
|
|
|
for ws in self._connections:
|
|
try:
|
|
await ws.send_text(message)
|
|
count += 1
|
|
except Exception:
|
|
disconnected.append(ws)
|
|
|
|
# Clean up dead connections
|
|
for ws in disconnected:
|
|
self.disconnect(ws)
|
|
|
|
return count
|
|
|
|
@property
|
|
def event_history(self) -> list[WSEvent]:
|
|
return list(self._event_history)
|
|
|
|
|
|
# Module-level singleton
|
|
ws_manager = WebSocketManager()
|