#!/usr/bin/env python3 """ The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness. This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py), the body (Evennia/Morrowind), and the visualization surface. """ import asyncio import json import logging import signal import sys from typing import Set import websockets # Configuration PORT = 8765 HOST = "0.0.0.0" # Allow external connections if needed # Logging setup logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger("nexus-gateway") # State clients: Set[websockets.WebSocketServerProtocol] = set() async def broadcast_handler(websocket: websockets.WebSocketServerProtocol): """Handles individual client connections and message broadcasting.""" clients.add(websocket) addr = websocket.remote_address logger.info(f"Client connected from {addr}. Total clients: {len(clients)}") try: async for message in websocket: # Parse for logging/validation if it's JSON try: data = json.loads(message) msg_type = data.get("type", "unknown") # Optional: log specific important message types if msg_type in ["agent_register", "thought", "action"]: logger.debug(f"Received {msg_type} from {addr}") except (json.JSONDecodeError, TypeError): pass # Broadcast to all OTHER clients if not clients: continue disconnected = set() # Create broadcast tasks for efficiency tasks = [] for client in clients: if client != websocket and client.open: tasks.append(asyncio.create_task(client.send(message))) if tasks: results = await asyncio.gather(*tasks, return_exceptions=True) for i, result in enumerate(results): if isinstance(result, Exception): # Find the client that failed target_client = [c for c in clients if c != websocket][i] logger.error(f"Failed to send to a client {target_client.remote_address}: {result}") disconnected.add(target_client) if disconnected: clients.difference_update(disconnected) except websockets.exceptions.ConnectionClosed: logger.debug(f"Connection closed by client {addr}") except Exception as e: logger.error(f"Error handling client {addr}: {e}") finally: clients.discard(websocket) logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}") async def main(): """Main server loop with graceful shutdown.""" logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}") # Set up signal handlers for graceful shutdown loop = asyncio.get_running_loop() stop = loop.create_future() def shutdown(): if not stop.done(): stop.set_result(None) for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, shutdown) except NotImplementedError: # Signal handlers not supported on Windows pass async with websockets.serve(broadcast_handler, HOST, PORT): logger.info("Gateway is ready and listening.") await stop logger.info("Shutting down Nexus WS gateway...") # Close all client connections if clients: logger.info(f"Closing {len(clients)} active connections...") close_tasks = [client.close() for client in clients] await asyncio.gather(*close_tasks, return_exceptions=True) logger.info("Shutdown complete.") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass except Exception as e: logger.critical(f"Fatal server error: {e}") sys.exit(1)