From 2daedfb2a0317dfc0c28098ffb34beb22f4b9b47 Mon Sep 17 00:00:00 2001 From: manus Date: Sun, 5 Apr 2026 20:28:33 +0000 Subject: [PATCH] Refactor: Nexus WebSocket Gateway Improvements (#838) Co-authored-by: manus Co-committed-by: manus --- server.py | 122 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 102 insertions(+), 20 deletions(-) diff --git a/server.py b/server.py index 11cc6db..1dab0d1 100644 --- a/server.py +++ b/server.py @@ -1,37 +1,119 @@ #!/usr/bin/env python3 +""" +The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness. +This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py), +the body (Evennia/Morrowind), and the visualization surface. +""" import asyncio -import websockets +import json import logging +import signal +import sys +from typing import Set -logging.basicConfig(level=logging.INFO) -clients = set() +import websockets -async def broadcast_handler(websocket): +# Configuration +PORT = 8765 +HOST = "0.0.0.0" # Allow external connections if needed + +# Logging setup +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger("nexus-gateway") + +# State +clients: Set[websockets.WebSocketServerProtocol] = set() + +async def broadcast_handler(websocket: websockets.WebSocketServerProtocol): + """Handles individual client connections and message broadcasting.""" clients.add(websocket) - logging.info(f"Client connected. Total clients: {len(clients)}") + addr = websocket.remote_address + logger.info(f"Client connected from {addr}. Total clients: {len(clients)}") + try: async for message in websocket: + # Parse for logging/validation if it's JSON + try: + data = json.loads(message) + msg_type = data.get("type", "unknown") + # Optional: log specific important message types + if msg_type in ["agent_register", "thought", "action"]: + logger.debug(f"Received {msg_type} from {addr}") + except (json.JSONDecodeError, TypeError): + pass + # Broadcast to all OTHER clients + if not clients: + continue + disconnected = set() + # Create broadcast tasks for efficiency + tasks = [] for client in clients: - if client != websocket: - try: - await client.send(message) - except Exception as e: - logging.error(f"Failed to send to a client: {e}") - disconnected.add(client) - clients.difference_update(disconnected) + if client != websocket and client.open: + tasks.append(asyncio.create_task(client.send(message))) + + if tasks: + results = await asyncio.gather(*tasks, return_exceptions=True) + for i, result in enumerate(results): + if isinstance(result, Exception): + # Find the client that failed + target_client = [c for c in clients if c != websocket][i] + logger.error(f"Failed to send to a client {target_client.remote_address}: {result}") + disconnected.add(target_client) + + if disconnected: + clients.difference_update(disconnected) + except websockets.exceptions.ConnectionClosed: - pass + logger.debug(f"Connection closed by client {addr}") + except Exception as e: + logger.error(f"Error handling client {addr}: {e}") finally: - clients.discard(websocket) # discard is safe if not present - logging.info(f"Client disconnected. Total clients: {len(clients)}") + clients.discard(websocket) + logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}") async def main(): - port = 8765 - logging.info(f"Starting WS gateway on ws://localhost:{port}") - async with websockets.serve(broadcast_handler, "localhost", port): - await asyncio.Future() # Run forever + """Main server loop with graceful shutdown.""" + logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}") + + # Set up signal handlers for graceful shutdown + loop = asyncio.get_running_loop() + stop = loop.create_future() + + def shutdown(): + if not stop.done(): + stop.set_result(None) + + for sig in (signal.SIGINT, signal.SIGTERM): + try: + loop.add_signal_handler(sig, shutdown) + except NotImplementedError: + # Signal handlers not supported on Windows + pass + + async with websockets.serve(broadcast_handler, HOST, PORT): + logger.info("Gateway is ready and listening.") + await stop + + logger.info("Shutting down Nexus WS gateway...") + # Close all client connections + if clients: + logger.info(f"Closing {len(clients)} active connections...") + close_tasks = [client.close() for client in clients] + await asyncio.gather(*close_tasks, return_exceptions=True) + + logger.info("Shutdown complete.") if __name__ == "__main__": - asyncio.run(main()) + try: + asyncio.run(main()) + except KeyboardInterrupt: + pass + except Exception as e: + logger.critical(f"Fatal server error: {e}") + sys.exit(1)