forked from Rockachopa/Timmy-time-dashboard
All `except Exception:` now catch as `except Exception as exc:` with appropriate logging (warning for critical paths, debug for graceful degradation). Added logger setup to 4 files that lacked it: - src/timmy/memory/vector_store.py - src/dashboard/middleware/csrf.py - src/dashboard/middleware/security_headers.py - src/spark/memory.py 31 files changed across timmy core, dashboard, infrastructure, integrations. Zero bare excepts remain. 1340 tests passing.
177 lines
5.2 KiB
Python
177 lines
5.2 KiB
Python
"""WebSocket manager for the live swarm dashboard.
|
|
|
|
Manages WebSocket connections and broadcasts swarm events to all
|
|
connected clients in real time. Used by the /swarm/live route
|
|
to provide a live feed of agent activity, task auctions, and
|
|
system events.
|
|
"""
|
|
|
|
import collections
|
|
import json
|
|
import logging
|
|
from dataclasses import asdict, dataclass
|
|
from datetime import UTC, datetime
|
|
|
|
from fastapi import WebSocket
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class WSEvent:
|
|
"""A WebSocket event to broadcast to connected clients."""
|
|
|
|
event: str
|
|
data: dict
|
|
timestamp: str
|
|
|
|
def to_json(self) -> str:
|
|
return json.dumps(asdict(self))
|
|
|
|
|
|
class WebSocketManager:
|
|
"""Manages WebSocket connections and event broadcasting."""
|
|
|
|
def __init__(self) -> None:
|
|
self._connections: list[WebSocket] = []
|
|
self._event_history: collections.deque[WSEvent] = collections.deque(maxlen=100)
|
|
|
|
async def connect(self, websocket: WebSocket, *, accept: bool = True) -> None:
|
|
"""Accept a new WebSocket connection and add it to the broadcast pool.
|
|
|
|
Args:
|
|
websocket: The WebSocket to register.
|
|
accept: If False, skip the accept() call (caller already accepted).
|
|
"""
|
|
if accept:
|
|
await websocket.accept()
|
|
self._connections.append(websocket)
|
|
logger.info(
|
|
"WebSocket connected — %d active connections",
|
|
len(self._connections),
|
|
)
|
|
# Send recent history to the new client
|
|
for event in list(self._event_history)[-20:]:
|
|
try:
|
|
await websocket.send_text(event.to_json())
|
|
except Exception as exc:
|
|
logger.warning("WebSocket history send error: %s", exc)
|
|
break
|
|
|
|
def disconnect(self, websocket: WebSocket) -> None:
|
|
"""Remove a disconnected WebSocket."""
|
|
if websocket in self._connections:
|
|
self._connections.remove(websocket)
|
|
logger.info(
|
|
"WebSocket disconnected — %d active connections",
|
|
len(self._connections),
|
|
)
|
|
|
|
async def broadcast(self, event: str, data: dict | None = None) -> None:
|
|
"""Broadcast an event to all connected WebSocket clients."""
|
|
ws_event = WSEvent(
|
|
event=event,
|
|
data=data or {},
|
|
timestamp=datetime.now(UTC).isoformat(),
|
|
)
|
|
self._event_history.append(ws_event)
|
|
|
|
message = ws_event.to_json()
|
|
disconnected = []
|
|
|
|
for ws in self._connections:
|
|
try:
|
|
await ws.send_text(message)
|
|
except ConnectionError:
|
|
disconnected.append(ws)
|
|
except Exception as exc:
|
|
logger.warning("Unexpected WebSocket send error: %s", exc)
|
|
disconnected.append(ws)
|
|
|
|
# Clean up dead connections
|
|
for ws in disconnected:
|
|
self.disconnect(ws)
|
|
|
|
async def broadcast_agent_joined(self, agent_id: str, name: str) -> None:
|
|
await self.broadcast("agent_joined", {"agent_id": agent_id, "name": name})
|
|
|
|
async def broadcast_agent_left(self, agent_id: str, name: str) -> None:
|
|
await self.broadcast("agent_left", {"agent_id": agent_id, "name": name})
|
|
|
|
async def broadcast_task_posted(self, task_id: str, description: str) -> None:
|
|
await self.broadcast(
|
|
"task_posted",
|
|
{
|
|
"task_id": task_id,
|
|
"description": description,
|
|
},
|
|
)
|
|
|
|
async def broadcast_bid_submitted(self, task_id: str, agent_id: str, bid_sats: int) -> None:
|
|
await self.broadcast(
|
|
"bid_submitted",
|
|
{
|
|
"task_id": task_id,
|
|
"agent_id": agent_id,
|
|
"bid_sats": bid_sats,
|
|
},
|
|
)
|
|
|
|
async def broadcast_task_assigned(self, task_id: str, agent_id: str) -> None:
|
|
await self.broadcast(
|
|
"task_assigned",
|
|
{
|
|
"task_id": task_id,
|
|
"agent_id": agent_id,
|
|
},
|
|
)
|
|
|
|
async def broadcast_task_completed(self, task_id: str, agent_id: str, result: str) -> None:
|
|
await self.broadcast(
|
|
"task_completed",
|
|
{
|
|
"task_id": task_id,
|
|
"agent_id": agent_id,
|
|
"result": result[:200],
|
|
},
|
|
)
|
|
|
|
@property
|
|
def connection_count(self) -> int:
|
|
return len(self._connections)
|
|
|
|
async def broadcast_json(self, data: dict) -> int:
|
|
"""Broadcast raw JSON data to all connected clients.
|
|
|
|
Args:
|
|
data: Dictionary to send as JSON
|
|
|
|
Returns:
|
|
Number of clients notified
|
|
"""
|
|
message = json.dumps(data)
|
|
disconnected = []
|
|
count = 0
|
|
|
|
for ws in self._connections:
|
|
try:
|
|
await ws.send_text(message)
|
|
count += 1
|
|
except Exception as exc:
|
|
logger.warning("WebSocket direct send error: %s", exc)
|
|
disconnected.append(ws)
|
|
|
|
# Clean up dead connections
|
|
for ws in disconnected:
|
|
self.disconnect(ws)
|
|
|
|
return count
|
|
|
|
@property
|
|
def event_history(self) -> list[WSEvent]:
|
|
return list(self._event_history)
|
|
|
|
|
|
# Module-level singleton
|
|
ws_manager = WebSocketManager()
|