Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy
93b8475405 fix(#1356): replace HTTPServer with ThreadingHTTPServer in multi-user bridge
Some checks failed
CI / test (pull_request) Failing after 2s
Review Approval Gate / verify-review (pull_request) Failing after 8s
CI / validate (pull_request) Failing after 40s
The single-threaded HTTPServer queues requests sequentially, causing
60% timeout under 10 concurrent users (30s timeout, 7.8s avg).

Fix: use ThreadingHTTPServer (already defined in multi_user_bridge.py)
for thread-per-request concurrency.

- multi_user_bridge.py: line 2883 HTTPServer -> ThreadingHTTPServer
- world/multi_user_bridge.py: add ThreadingMixIn import + class, fix server init

Refs #1356
2026-04-13 17:23:47 -04:00
9 changed files with 84 additions and 212 deletions

View File

@@ -27,10 +27,8 @@ jobs:
username: ${{ secrets.DEPLOY_USER }}
key: ${{ secrets.DEPLOY_SSH_KEY }}
script: |
cd ~/the-nexus || git clone https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus.git ~/the-nexus
cd ~/the-nexus || git clone http://143.198.27.163:3000/Timmy_Foundation/the-nexus.git ~/the-nexus
cd ~/the-nexus
git fetch origin main
git reset --hard origin/main
docker compose build nexus-main
docker compose up -d --force-recreate nexus-main
echo "Nexus deployed — HTTP :8080, WS :8765"
./deploy.sh main

View File

@@ -11,16 +11,11 @@ COPY nexus/ nexus/
COPY server.py ./
# Frontend assets referenced by index.html
COPY index.html help.html style.css app.js boot.js bootstrap.mjs gofai_worker.js mempalace.js service-worker.js manifest.json ./
COPY index.html help.html style.css app.js service-worker.js manifest.json ./
# Config/data
COPY portals.json vision.json robots.txt ./
# Icons
COPY icons/ icons/
# Expose HTTP (static) and WebSocket
EXPOSE 8080
EXPOSE 8765
CMD ["python3", "server.py"]

6
app.js
View File

@@ -2188,11 +2188,7 @@ function connectHermes() {
}
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
// WS gateway runs on HTTP port - 115 (8080->8765, 8081->8766)
const wsHost = window.location.hostname;
const httpPort = parseInt(window.location.port) || 8080;
const wsPort = httpPort === 8081 ? 8766 : 8765;
const wsUrl = `${protocol}//${wsHost}:${wsPort}/api/world/ws`;
const wsUrl = `${protocol}//${window.location.host}/api/world/ws`;
console.log(`Connecting to Hermes at ${wsUrl}...`);
hermesWs = new WebSocket(wsUrl);

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env bash
# deploy.sh — spin up (or update) the Nexus staging environment
# Usage: ./deploy.sh — rebuild and restart nexus-main (HTTP :8080, WS :8765)
# ./deploy.sh staging — rebuild and restart nexus-staging (HTTP :8081, WS :8766)
# Usage: ./deploy.sh — rebuild and restart nexus-main (port 4200)
# ./deploy.sh staging — rebuild and restart nexus-staging (port 4201)
set -euo pipefail
SERVICE="${1:-nexus-main}"
@@ -14,12 +14,4 @@ esac
echo "==> Deploying $SERVICE"
docker compose build "$SERVICE"
docker compose up -d --force-recreate "$SERVICE"
if [ "$SERVICE" = "nexus-main" ]; then
echo "==> HTTP: http://localhost:8080"
echo "==> WS: ws://localhost:8765"
else
echo "==> HTTP: http://localhost:8081"
echo "==> WS: ws://localhost:8766"
fi
echo "==> Done. Container: $SERVICE"

View File

@@ -6,12 +6,10 @@ services:
container_name: nexus-main
restart: unless-stopped
ports:
- "8080:8080"
- "8765:8765"
nexus-staging:
build: .
container_name: nexus-staging
restart: unless-stopped
ports:
- "8081:8080"
- "8766:8765"
- "8766:8765"

View File

@@ -2880,7 +2880,7 @@ def main():
# Start world tick system
world_tick_system.start()
server = HTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler)
server = ThreadingHTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler)
server.serve_forever()

16
run.sh
View File

@@ -1,16 +0,0 @@
#!/usr/bin/env bash
# run.sh — run Nexus locally without Docker
# Usage: ./run.sh — HTTP :8080, WS :8765
# NEXUS_HTTP_PORT=9090 ./run.sh — custom HTTP port
set -euo pipefail
cd "$(dirname "$0")"
# Install deps if missing
if ! python3 -c "import websockets" 2>/dev/null; then
echo "==> Installing dependencies..."
pip3 install -r requirements.txt
fi
echo "==> Starting Nexus server..."
exec python3 server.py

233
server.py
View File

@@ -1,190 +1,92 @@
#!/usr/bin/env python3
"""
The Nexus — Unified HTTP + WebSocket server.
Serves static frontend files (Three.js app) over HTTP on port 8080
and runs the WebSocket gateway on port 8765.
Single-process, single-command deployment — no nginx required.
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
the body (Evennia/Morrowind), and the visualization surface.
"""
import asyncio
import json
import logging
import mimetypes
import os
import signal
import sys
from http import HTTPStatus
from pathlib import Path
from typing import Set
# Branch protected file - see POLICY.md
import websockets
import websockets.asyncio.server
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
HTTP_PORT = int(os.environ.get("NEXUS_HTTP_PORT", "8080"))
WS_PORT = int(os.environ.get("NEXUS_WS_PORT", "8765"))
HOST = os.environ.get("NEXUS_HOST", "0.0.0.0")
ROOT = Path(__file__).resolve().parent
PORT = 8765
HOST = "0.0.0.0" # Allow external connections if needed
# Static file extensions we're willing to serve
SAFE_SUFFIXES = {
".html", ".htm", ".css", ".js", ".mjs", ".json",
".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico",
".woff", ".woff2", ".ttf", ".eot",
".txt", ".xml", ".webmanifest",
}
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
# Logging setup
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
log = logging.getLogger("nexus")
logger = logging.getLogger("nexus-gateway")
# ---------------------------------------------------------------------------
# HTTP — static file server
# ---------------------------------------------------------------------------
async def http_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
"""Minimal async HTTP/1.1 server for static files."""
try:
# Read request line + headers (max 8 KB)
data = b""
while b"\r\n\r\n" not in data and len(data) < 8192:
chunk = await asyncio.wait_for(reader.read(4096), timeout=10)
if not chunk:
return
data += chunk
header_text = data.split(b"\r\n\r\n", 1)[0].decode("utf-8", errors="replace")
lines = header_text.split("\r\n")
request_line = lines[0]
parts = request_line.split(" ", 2)
if len(parts) < 2:
writer.close()
return
method, raw_path = parts[0], parts[1]
# State
clients: Set[websockets.WebSocketServerProtocol] = set()
if method not in ("GET", "HEAD"):
_write_response(writer, HTTPStatus.METHOD_NOT_ALLOWED, b"Method Not Allowed")
return
# Normalise path — prevent directory traversal
safe_path = raw_path.split("?", 1)[0].split("#", 1)[0]
safe_path = os.path.normpath(safe_path).lstrip("/")
if not safe_path:
safe_path = "index.html"
file_path = ROOT / safe_path
# Reject traversal
if not str(file_path.resolve()).startswith(str(ROOT)):
_write_response(writer, HTTPStatus.FORBIDDEN, b"Forbidden")
return
if not file_path.exists() or file_path.is_dir():
# Try index.html for directories
if file_path.is_dir() and (file_path / "index.html").exists():
file_path = file_path / "index.html"
else:
_write_response(writer, HTTPStatus.NOT_FOUND, b"Not Found")
return
suffix = file_path.suffix.lower()
if suffix not in SAFE_SUFFIXES:
_write_response(writer, HTTPStatus.FORBIDDEN, b"Forbidden")
return
content_type = mimetypes.guess_type(str(file_path))[0] or "application/octet-stream"
body = file_path.read_bytes()
headers = {
"Content-Type": content_type,
"Content-Length": str(len(body)),
"Cache-Control": "no-cache",
"Access-Control-Allow-Origin": "*",
}
_write_response(writer, HTTPStatus.OK, body, headers, method == "HEAD")
except (asyncio.TimeoutError, ConnectionError):
pass
except Exception as exc:
log.warning("HTTP handler error: %s", exc)
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
def _write_response(writer, status, body, headers=None, head_only=False):
line = f"HTTP/1.1 {status.value} {status.phrase}\r\n"
hdr = ""
if headers:
for k, v in headers.items():
hdr += f"{k}: {v}\r\n"
response = (line + hdr + "\r\n").encode()
if not head_only:
response += body if isinstance(body, bytes) else body.encode()
writer.write(response)
# ---------------------------------------------------------------------------
# WebSocket — broadcast gateway
# ---------------------------------------------------------------------------
ws_clients: Set[websockets.asyncio.server.ServerConnection] = set()
async def ws_handler(websocket: websockets.asyncio.server.ServerConnection):
ws_clients.add(websocket)
log.info("WS client connected from %s. Total: %d", websocket.remote_address, len(ws_clients))
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
"""Handles individual client connections and message broadcasting."""
clients.add(websocket)
addr = websocket.remote_address
logger.info(f"Client connected from {addr}. Total clients: {len(clients)}")
try:
async for message in websocket:
# Parse for logging/validation if it's JSON
try:
data = json.loads(message)
msg_type = data.get("type", "unknown")
if msg_type in ("agent_register", "thought", "action"):
log.debug("WS %s from %s", msg_type, websocket.remote_address)
# Optional: log specific important message types
if msg_type in ["agent_register", "thought", "action"]:
logger.debug(f"Received {msg_type} from {addr}")
except (json.JSONDecodeError, TypeError):
pass
# Broadcast to all OTHER clients
if not clients:
continue
disconnected = set()
for client in ws_clients:
# Create broadcast tasks, tracking which client each task targets
task_client_pairs = []
for client in clients:
if client != websocket and client.open:
try:
await client.send(message)
except Exception:
disconnected.add(client)
ws_clients.difference_update(disconnected)
task = asyncio.create_task(client.send(message))
task_client_pairs.append((task, client))
if task_client_pairs:
tasks = [pair[0] for pair in task_client_pairs]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
target_client = task_client_pairs[i][1]
logger.error(f"Failed to send to client {target_client.remote_address}: {result}")
disconnected.add(target_client)
if disconnected:
clients.difference_update(disconnected)
except websockets.exceptions.ConnectionClosed:
pass
except Exception as exc:
log.error("WS handler error for %s: %s", websocket.remote_address, exc)
logger.debug(f"Connection closed by client {addr}")
except Exception as e:
logger.error(f"Error handling client {addr}: {e}")
finally:
ws_clients.discard(websocket)
log.info("WS client disconnected. Total: %d", len(ws_clients))
clients.discard(websocket)
logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}")
# ---------------------------------------------------------------------------
# Main — run both servers concurrently
# ---------------------------------------------------------------------------
async def main():
# HTTP server
http_server = await asyncio.start_server(http_handler, HOST, HTTP_PORT)
log.info("HTTP server listening on http://%s:%d", HOST, HTTP_PORT)
# WebSocket server
ws_server = await websockets.asyncio.server.serve(ws_handler, HOST, WS_PORT)
log.info("WebSocket server listening on ws://%s:%d", HOST, WS_PORT)
log.info("Nexus is live — open http://%s:%d in a browser", HOST, HTTP_PORT)
# Graceful shutdown
"""Main server loop with graceful shutdown."""
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
stop = loop.create_future()
def shutdown():
if not stop.done():
stop.set_result(None)
@@ -193,28 +95,29 @@ async def main():
try:
loop.add_signal_handler(sig, shutdown)
except NotImplementedError:
# Signal handlers not supported on Windows
pass
await stop
log.info("Shutting down...")
http_server.close()
await http_server.wait_closed()
ws_server.close()
await ws_server.wait_closed()
remaining = {c for c in ws_clients if c.open}
async with websockets.serve(broadcast_handler, HOST, PORT):
logger.info("Gateway is ready and listening.")
await stop
logger.info("Shutting down Nexus WS gateway...")
# Close any remaining client connections (handlers may have already cleaned up)
remaining = {c for c in clients if c.open}
if remaining:
await asyncio.gather(*(c.close() for c in remaining), return_exceptions=True)
ws_clients.clear()
log.info("Shutdown complete.")
logger.info(f"Closing {len(remaining)} active connections...")
close_tasks = [client.close() for client in remaining]
await asyncio.gather(*close_tasks, return_exceptions=True)
clients.clear()
logger.info("Shutdown complete.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
except Exception as exc:
log.critical("Fatal: %s", exc)
except Exception as e:
logger.critical(f"Fatal server error: {e}")
sys.exit(1)

View File

@@ -26,11 +26,17 @@ import threading
import hashlib
import os
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from pathlib import Path
from datetime import datetime
from typing import Optional
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
"""Thread-per-request HTTP server."""
daemon_threads = True
# ── Configuration ──────────────────────────────────────────────────────
BRIDGE_PORT = int(os.environ.get('TIMMY_BRIDGE_PORT', 4004))
@@ -274,7 +280,7 @@ def main():
print(f" POST /bridge/move — Move user to room (user_id, room)")
print()
server = HTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler)
server = ThreadingHTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler)
server.serve_forever()