The disconnected-client resolution was rebuilding a filtered client list (list comprehension) on every exception inside the gather loop. Moved it outside the loop so the lookup is computed once, and also moved the disconnected set initialization inside the if-block where it belongs (it was previously leaked from an outer scope).
123 lines
4.2 KiB
Python
123 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
|
|
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
|
|
the body (Evennia/Morrowind), and the visualization surface.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import signal
|
|
import sys
|
|
from typing import Set
|
|
|
|
# Branch protected file - see POLICY.md
|
|
import websockets
|
|
|
|
# Configuration
|
|
PORT = 8765
|
|
HOST = "0.0.0.0" # Allow external connections if needed
|
|
|
|
# Logging setup
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s [%(levelname)s] %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger("nexus-gateway")
|
|
|
|
# State
|
|
clients: Set[websockets.WebSocketServerProtocol] = set()
|
|
|
|
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
|
|
"""Handles individual client connections and message broadcasting."""
|
|
clients.add(websocket)
|
|
addr = websocket.remote_address
|
|
logger.info(f"Client connected from {addr}. Total clients: {len(clients)}")
|
|
|
|
try:
|
|
async for message in websocket:
|
|
# Parse for logging/validation if it's JSON
|
|
try:
|
|
data = json.loads(message)
|
|
msg_type = data.get("type", "unknown")
|
|
# Optional: log specific important message types
|
|
if msg_type in ["agent_register", "thought", "action"]:
|
|
logger.debug(f"Received {msg_type} from {addr}")
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
# Broadcast to all OTHER clients
|
|
if not clients:
|
|
continue
|
|
|
|
disconnected = set()
|
|
# Create broadcast tasks for efficiency
|
|
tasks = []
|
|
for client in clients:
|
|
if client != websocket and client.open:
|
|
tasks.append(asyncio.create_task(client.send(message)))
|
|
|
|
if tasks:
|
|
# Track which clients correspond to each task
|
|
task_clients = [c for c in clients if c != websocket]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
disconnected = set()
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
target_client = task_clients[i]
|
|
logger.error(f"Failed to send to a client {target_client.remote_address}: {result}")
|
|
disconnected.add(target_client)
|
|
|
|
if disconnected:
|
|
clients.difference_update(disconnected)
|
|
|
|
except websockets.exceptions.ConnectionClosed:
|
|
logger.debug(f"Connection closed by client {addr}")
|
|
except Exception as e:
|
|
logger.error(f"Error handling client {addr}: {e}")
|
|
finally:
|
|
clients.discard(websocket)
|
|
logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}")
|
|
|
|
async def main():
|
|
"""Main server loop with graceful shutdown."""
|
|
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
|
|
|
|
# Set up signal handlers for graceful shutdown
|
|
loop = asyncio.get_running_loop()
|
|
stop = loop.create_future()
|
|
|
|
def shutdown():
|
|
if not stop.done():
|
|
stop.set_result(None)
|
|
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
try:
|
|
loop.add_signal_handler(sig, shutdown)
|
|
except NotImplementedError:
|
|
# Signal handlers not supported on Windows
|
|
pass
|
|
|
|
async with websockets.serve(broadcast_handler, HOST, PORT):
|
|
logger.info("Gateway is ready and listening.")
|
|
await stop
|
|
|
|
logger.info("Shutting down Nexus WS gateway...")
|
|
# Close all client connections
|
|
if clients:
|
|
logger.info(f"Closing {len(clients)} active connections...")
|
|
close_tasks = [client.close() for client in clients]
|
|
await asyncio.gather(*close_tasks, return_exceptions=True)
|
|
|
|
logger.info("Shutdown complete.")
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except Exception as e:
|
|
logger.critical(f"Fatal server error: {e}")
|
|
sys.exit(1)
|