Files
the-nexus/server.py
manus 2daedfb2a0
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Refactor: Nexus WebSocket Gateway Improvements (#838)
Co-authored-by: manus <manus@timmy.local>
Co-committed-by: manus <manus@timmy.local>
2026-04-05 20:28:33 +00:00

120 lines
4.0 KiB
Python

#!/usr/bin/env python3
"""
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
the body (Evennia/Morrowind), and the visualization surface.
"""
import asyncio
import json
import logging
import signal
import sys
from typing import Set
import websockets
# Configuration
PORT = 8765
HOST = "0.0.0.0" # Allow external connections if needed
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("nexus-gateway")
# State
clients: Set[websockets.WebSocketServerProtocol] = set()
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
"""Handles individual client connections and message broadcasting."""
clients.add(websocket)
addr = websocket.remote_address
logger.info(f"Client connected from {addr}. Total clients: {len(clients)}")
try:
async for message in websocket:
# Parse for logging/validation if it's JSON
try:
data = json.loads(message)
msg_type = data.get("type", "unknown")
# Optional: log specific important message types
if msg_type in ["agent_register", "thought", "action"]:
logger.debug(f"Received {msg_type} from {addr}")
except (json.JSONDecodeError, TypeError):
pass
# Broadcast to all OTHER clients
if not clients:
continue
disconnected = set()
# Create broadcast tasks for efficiency
tasks = []
for client in clients:
if client != websocket and client.open:
tasks.append(asyncio.create_task(client.send(message)))
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
# Find the client that failed
target_client = [c for c in clients if c != websocket][i]
logger.error(f"Failed to send to a client {target_client.remote_address}: {result}")
disconnected.add(target_client)
if disconnected:
clients.difference_update(disconnected)
except websockets.exceptions.ConnectionClosed:
logger.debug(f"Connection closed by client {addr}")
except Exception as e:
logger.error(f"Error handling client {addr}: {e}")
finally:
clients.discard(websocket)
logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}")
async def main():
"""Main server loop with graceful shutdown."""
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
stop = loop.create_future()
def shutdown():
if not stop.done():
stop.set_result(None)
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, shutdown)
except NotImplementedError:
# Signal handlers not supported on Windows
pass
async with websockets.serve(broadcast_handler, HOST, PORT):
logger.info("Gateway is ready and listening.")
await stop
logger.info("Shutting down Nexus WS gateway...")
# Close all client connections
if clients:
logger.info(f"Closing {len(clients)} active connections...")
close_tasks = [client.close() for client in clients]
await asyncio.gather(*close_tasks, return_exceptions=True)
logger.info("Shutdown complete.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
except Exception as e:
logger.critical(f"Fatal server error: {e}")
sys.exit(1)