Refactor: Nexus WebSocket Gateway Improvements (#838)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Co-authored-by: manus <manus@timmy.local> Co-committed-by: manus <manus@timmy.local>
This commit was merged in pull request #838.
This commit is contained in:
122
server.py
122
server.py
@@ -1,37 +1,119 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
|
||||||
|
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
|
||||||
|
the body (Evennia/Morrowind), and the visualization surface.
|
||||||
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
import websockets
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
|
from typing import Set
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
import websockets
|
||||||
clients = set()
|
|
||||||
|
|
||||||
async def broadcast_handler(websocket):
|
# Configuration
|
||||||
|
PORT = 8765
|
||||||
|
HOST = "0.0.0.0" # Allow external connections if needed
|
||||||
|
|
||||||
|
# Logging setup
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s [%(levelname)s] %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S'
|
||||||
|
)
|
||||||
|
logger = logging.getLogger("nexus-gateway")
|
||||||
|
|
||||||
|
# State
|
||||||
|
clients: Set[websockets.WebSocketServerProtocol] = set()
|
||||||
|
|
||||||
|
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
|
||||||
|
"""Handles individual client connections and message broadcasting."""
|
||||||
clients.add(websocket)
|
clients.add(websocket)
|
||||||
logging.info(f"Client connected. Total clients: {len(clients)}")
|
addr = websocket.remote_address
|
||||||
|
logger.info(f"Client connected from {addr}. Total clients: {len(clients)}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async for message in websocket:
|
async for message in websocket:
|
||||||
|
# Parse for logging/validation if it's JSON
|
||||||
|
try:
|
||||||
|
data = json.loads(message)
|
||||||
|
msg_type = data.get("type", "unknown")
|
||||||
|
# Optional: log specific important message types
|
||||||
|
if msg_type in ["agent_register", "thought", "action"]:
|
||||||
|
logger.debug(f"Received {msg_type} from {addr}")
|
||||||
|
except (json.JSONDecodeError, TypeError):
|
||||||
|
pass
|
||||||
|
|
||||||
# Broadcast to all OTHER clients
|
# Broadcast to all OTHER clients
|
||||||
|
if not clients:
|
||||||
|
continue
|
||||||
|
|
||||||
disconnected = set()
|
disconnected = set()
|
||||||
|
# Create broadcast tasks for efficiency
|
||||||
|
tasks = []
|
||||||
for client in clients:
|
for client in clients:
|
||||||
if client != websocket:
|
if client != websocket and client.open:
|
||||||
try:
|
tasks.append(asyncio.create_task(client.send(message)))
|
||||||
await client.send(message)
|
|
||||||
except Exception as e:
|
if tasks:
|
||||||
logging.error(f"Failed to send to a client: {e}")
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
disconnected.add(client)
|
for i, result in enumerate(results):
|
||||||
clients.difference_update(disconnected)
|
if isinstance(result, Exception):
|
||||||
|
# Find the client that failed
|
||||||
|
target_client = [c for c in clients if c != websocket][i]
|
||||||
|
logger.error(f"Failed to send to a client {target_client.remote_address}: {result}")
|
||||||
|
disconnected.add(target_client)
|
||||||
|
|
||||||
|
if disconnected:
|
||||||
|
clients.difference_update(disconnected)
|
||||||
|
|
||||||
except websockets.exceptions.ConnectionClosed:
|
except websockets.exceptions.ConnectionClosed:
|
||||||
pass
|
logger.debug(f"Connection closed by client {addr}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error handling client {addr}: {e}")
|
||||||
finally:
|
finally:
|
||||||
clients.discard(websocket) # discard is safe if not present
|
clients.discard(websocket)
|
||||||
logging.info(f"Client disconnected. Total clients: {len(clients)}")
|
logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}")
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
port = 8765
|
"""Main server loop with graceful shutdown."""
|
||||||
logging.info(f"Starting WS gateway on ws://localhost:{port}")
|
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
|
||||||
async with websockets.serve(broadcast_handler, "localhost", port):
|
|
||||||
await asyncio.Future() # Run forever
|
# Set up signal handlers for graceful shutdown
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
stop = loop.create_future()
|
||||||
|
|
||||||
|
def shutdown():
|
||||||
|
if not stop.done():
|
||||||
|
stop.set_result(None)
|
||||||
|
|
||||||
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||||
|
try:
|
||||||
|
loop.add_signal_handler(sig, shutdown)
|
||||||
|
except NotImplementedError:
|
||||||
|
# Signal handlers not supported on Windows
|
||||||
|
pass
|
||||||
|
|
||||||
|
async with websockets.serve(broadcast_handler, HOST, PORT):
|
||||||
|
logger.info("Gateway is ready and listening.")
|
||||||
|
await stop
|
||||||
|
|
||||||
|
logger.info("Shutting down Nexus WS gateway...")
|
||||||
|
# Close all client connections
|
||||||
|
if clients:
|
||||||
|
logger.info(f"Closing {len(clients)} active connections...")
|
||||||
|
close_tasks = [client.close() for client in clients]
|
||||||
|
await asyncio.gather(*close_tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
logger.info("Shutdown complete.")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
asyncio.run(main())
|
try:
|
||||||
|
asyncio.run(main())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"Fatal server error: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|||||||
Reference in New Issue
Block a user