Files
the-nexus/nexus/multi_user_bridge.py
Alexander Whitestone bb21d6c790 feat: add /bridge/stats endpoint, go/emote MUD commands
- /bridge/stats returns uptime, sessions, messages, rooms, WS connections
- 'go <room>' moves user between rooms via HTTP (was WS-only)
- 'emote <action>' broadcasts third-person action to room occupants
- Both commands notify other room occupants via room_events
2026-04-13 15:08:38 -04:00

636 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Multi-User AI Bridge for Nexus.
HTTP + WebSocket bridge that manages concurrent user sessions with full isolation.
Each user gets their own session state, message history, and AI routing.
Endpoints:
POST /bridge/chat — Send a chat message (curl-testable)
GET /bridge/sessions — List active sessions
GET /bridge/rooms — List all rooms with occupants
GET /bridge/stats — Aggregate bridge statistics
GET /bridge/health — Health check
WS /bridge/ws/{user_id} — Real-time streaming per user
Session isolation:
- Each user_id gets independent message history (configurable window)
- Crisis detection runs per-session with multi-turn tracking
- Room state tracked per-user for multi-user world awareness
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
try:
from aiohttp import web, WSMsgType
except ImportError:
web = None
WSMsgType = None
logger = logging.getLogger("multi_user_bridge")
# ── Crisis Detection ──────────────────────────────────────────
CRISIS_PATTERNS = [
re.compile(r"\b(?:suicide|kill\s*(?:my)?self|end\s*(?:my\s*)?life)\b", re.I),
re.compile(r"\b(?:want\s*to\s*die|don'?t\s*want\s*to\s*(?:live|be\s*alive))\b", re.I),
re.compile(r"\b(?:self[\s-]?harm|cutting\s*(?:my)?self)\b", re.I),
]
CRISIS_988_MESSAGE = (
"If you're in crisis, please reach out:\n"
"• 988 Suicide & Crisis Lifeline: call or text 988 (US)\n"
"• Crisis Text Line: text HOME to 741741\n"
"• International: https://findahelpline.com/\n"
"You are not alone. Help is available right now."
)
@dataclass
class CrisisState:
"""Tracks multi-turn crisis detection per session."""
turn_count: int = 0
first_flagged_at: Optional[float] = None
delivered_988: bool = False
flagged_messages: list[str] = field(default_factory=list)
CRISIS_TURN_WINDOW = 3 # consecutive turns before escalating
CRISIS_WINDOW_SECONDS = 300 # 5 minutes
def check(self, message: str) -> bool:
"""Returns True if 988 message should be delivered."""
is_crisis = any(p.search(message) for p in CRISIS_PATTERNS)
if not is_crisis:
self.turn_count = 0
self.first_flagged_at = None
return False
now = time.time()
self.turn_count += 1
self.flagged_messages.append(message[:200])
if self.first_flagged_at is None:
self.first_flagged_at = now
# Deliver 988 if: not yet delivered, within window, enough turns
if (
not self.delivered_988
and self.turn_count >= self.CRISIS_TURN_WINDOW
and (now - self.first_flagged_at) <= self.CRISIS_WINDOW_SECONDS
):
self.delivered_988 = True
return True
# Re-deliver if window expired and new crisis detected
if self.delivered_988 and (now - self.first_flagged_at) > self.CRISIS_WINDOW_SECONDS:
self.first_flagged_at = now
self.turn_count = 1
self.delivered_988 = True
return True
return False
# ── Rate Limiting ──────────────────────────────────────────────
class RateLimiter:
"""Per-user token-bucket rate limiter.
Allows `max_tokens` requests per `window_seconds` per user.
Tokens refill at a steady rate. Requests beyond the bucket
capacity are rejected with 429.
"""
def __init__(self, max_tokens: int = 60, window_seconds: float = 60.0):
self._max_tokens = max_tokens
self._window = window_seconds
self._buckets: dict[str, tuple[float, float]] = {}
def check(self, user_id: str) -> bool:
"""Returns True if the request is allowed (a token was consumed)."""
now = time.time()
tokens, last_refill = self._buckets.get(user_id, (self._max_tokens, now))
elapsed = now - last_refill
tokens = min(self._max_tokens, tokens + elapsed * (self._max_tokens / self._window))
if tokens < 1.0:
self._buckets[user_id] = (tokens, now)
return False
self._buckets[user_id] = (tokens - 1.0, now)
return True
def remaining(self, user_id: str) -> int:
"""Return remaining tokens for a user."""
now = time.time()
tokens, last_refill = self._buckets.get(user_id, (self._max_tokens, now))
elapsed = now - last_refill
tokens = min(self._max_tokens, tokens + elapsed * (self._max_tokens / self._window))
return int(tokens)
def reset(self, user_id: str):
"""Reset a user's bucket to full."""
self._buckets.pop(user_id, None)
# ── Session Management ────────────────────────────────────────
@dataclass
class UserSession:
"""Isolated session state for a single user."""
user_id: str
username: str
room: str = "The Tower"
message_history: list[dict] = field(default_factory=list)
ws_connections: list = field(default_factory=list)
room_events: list[dict] = field(default_factory=list)
crisis_state: CrisisState = field(default_factory=CrisisState)
created_at: float = field(default_factory=time.time)
last_active: float = field(default_factory=time.time)
command_count: int = 0
def add_message(self, role: str, content: str) -> dict:
"""Add a message to this user's history."""
msg = {
"role": role,
"content": content,
"timestamp": datetime.now(timezone.utc).isoformat(),
"room": self.room,
}
self.message_history.append(msg)
self.last_active = time.time()
self.command_count += 1
return msg
def get_history(self, window: int = 20) -> list[dict]:
"""Return recent message history."""
return self.message_history[-window:]
def to_dict(self) -> dict:
return {
"user_id": self.user_id,
"username": self.username,
"room": self.room,
"message_count": len(self.message_history),
"command_count": self.command_count,
"connected_ws": len(self.ws_connections),
"created_at": datetime.fromtimestamp(self.created_at, tz=timezone.utc).isoformat(),
"last_active": datetime.fromtimestamp(self.last_active, tz=timezone.utc).isoformat(),
}
class SessionManager:
"""Manages isolated user sessions."""
def __init__(self, max_sessions: int = 100, history_window: int = 50):
self._sessions: dict[str, UserSession] = {}
self._max_sessions = max_sessions
self._history_window = history_window
self._room_occupants: dict[str, set[str]] = defaultdict(set)
def get_or_create(self, user_id: str, username: str = "", room: str = "") -> UserSession:
"""Get existing session or create new one."""
if user_id not in self._sessions:
if len(self._sessions) >= self._max_sessions:
self._evict_oldest()
session = UserSession(
user_id=user_id,
username=username or user_id,
room=room or "The Tower",
)
self._sessions[user_id] = session
self._room_occupants[session.room].add(user_id)
logger.info(f"Session created: {user_id} in room {session.room}")
else:
session = self._sessions[user_id]
session.username = username or session.username
if room and room != session.room:
self._room_occupants[session.room].discard(user_id)
session.room = room
self._room_occupants[room].add(user_id)
session.last_active = time.time()
return session
def get(self, user_id: str) -> Optional[UserSession]:
return self._sessions.get(user_id)
def remove(self, user_id: str) -> bool:
session = self._sessions.pop(user_id, None)
if session:
self._room_occupants[session.room].discard(user_id)
logger.info(f"Session removed: {user_id}")
return True
return False
def get_room_occupants(self, room: str) -> list[str]:
return list(self._room_occupants.get(room, set()))
def list_sessions(self) -> list[dict]:
return [s.to_dict() for s in self._sessions.values()]
def _evict_oldest(self):
if not self._sessions:
return
oldest = min(self._sessions.values(), key=lambda s: s.last_active)
self.remove(oldest.user_id)
@property
def active_count(self) -> int:
return len(self._sessions)
# ── Bridge Server ─────────────────────────────────────────────
class MultiUserBridge:
"""HTTP + WebSocket multi-user bridge."""
def __init__(self, host: str = "127.0.0.1", port: int = 4004,
rate_limit: int = 60, rate_window: float = 60.0):
self.host = host
self.port = port
self.sessions = SessionManager()
self.rate_limiter = RateLimiter(max_tokens=rate_limit, window_seconds=rate_window)
self._app: Optional[web.Application] = None
self._start_time = time.time()
def create_app(self) -> web.Application:
if web is None:
raise RuntimeError("aiohttp required: pip install aiohttp")
self._app = web.Application()
self._app.router.add_post("/bridge/chat", self.handle_chat)
self._app.router.add_get("/bridge/sessions", self.handle_sessions)
self._app.router.add_get("/bridge/health", self.handle_health)
self._app.router.add_get("/bridge/rooms", self.handle_rooms)
self._app.router.add_get("/bridge/stats", self.handle_stats)
self._app.router.add_get("/bridge/room_events/{user_id}", self.handle_room_events)
self._app.router.add_get("/bridge/ws/{user_id}", self.handle_ws)
return self._app
async def handle_health(self, request: web.Request) -> web.Response:
uptime = time.time() - self._start_time
return web.json_response({
"status": "ok",
"uptime_seconds": round(uptime, 1),
"active_sessions": self.sessions.active_count,
})
async def handle_sessions(self, request: web.Request) -> web.Response:
return web.json_response({
"sessions": self.sessions.list_sessions(),
"total": self.sessions.active_count,
})
async def handle_rooms(self, request: web.Request) -> web.Response:
"""GET /bridge/rooms — List all rooms with occupants."""
rooms = {}
for room_name, user_ids in self.sessions._room_occupants.items():
if user_ids:
occupants = []
for uid in user_ids:
session = self.sessions.get(uid)
if session:
occupants.append({
"user_id": uid,
"username": session.username,
"last_active": datetime.fromtimestamp(
session.last_active, tz=timezone.utc
).isoformat(),
})
rooms[room_name] = {
"occupants": occupants,
"count": len(occupants),
}
return web.json_response({
"rooms": rooms,
"total_rooms": len(rooms),
"total_users": self.sessions.active_count,
})
async def handle_stats(self, request: web.Request) -> web.Response:
"""GET /bridge/stats — Aggregate bridge statistics."""
uptime = time.time() - self._start_time
total_messages = sum(len(s.message_history) for s in self.sessions._sessions.values())
total_commands = sum(s.command_count for s in self.sessions._sessions.values())
rooms = {r: len(users) for r, users in self.sessions._room_occupants.items() if users}
ws_connections = sum(len(s.ws_connections) for s in self.sessions._sessions.values())
return web.json_response({
"uptime_seconds": round(uptime, 1),
"active_sessions": self.sessions.active_count,
"total_messages": total_messages,
"total_commands": total_commands,
"rooms": rooms,
"room_count": len(rooms),
"ws_connections": ws_connections,
})
async def handle_room_events(self, request: web.Request) -> web.Response:
"""GET /bridge/room_events/{user_id} — Drain pending room events for a user."""
user_id = request.match_info["user_id"]
session = self.sessions.get(user_id)
if not session:
return web.json_response({"error": "session not found"}, status=404)
events = list(session.room_events)
session.room_events.clear()
return web.json_response({
"user_id": user_id,
"events": events,
"count": len(events),
})
async def handle_chat(self, request: web.Request) -> web.Response:
"""
POST /bridge/chat
Body: {"user_id": "...", "username": "...", "message": "...", "room": "..."}
"""
try:
data = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON"}, status=400)
user_id = data.get("user_id", "").strip()
message = data.get("message", "").strip()
username = data.get("username", user_id)
room = data.get("room", "")
if not user_id:
return web.json_response({"error": "user_id required"}, status=400)
if not message:
return web.json_response({"error": "message required"}, status=400)
# Rate limiting
if not self.rate_limiter.check(user_id):
return web.json_response(
{"error": "rate limit exceeded", "user_id": user_id},
status=429,
headers={
"X-RateLimit-Limit": str(self.rate_limiter._max_tokens),
"X-RateLimit-Remaining": "0",
"Retry-After": "1",
},
)
session = self.sessions.get_or_create(user_id, username, room)
session.add_message("user", message)
# Crisis detection
crisis_triggered = session.crisis_state.check(message)
# Build response
response_parts = []
if crisis_triggered:
response_parts.append(CRISIS_988_MESSAGE)
# Generate echo response (placeholder — real AI routing goes here)
ai_response = self._generate_response(session, message)
response_parts.append(ai_response)
full_response = "\n\n".join(response_parts)
session.add_message("assistant", full_response)
# Broadcast to any WS connections
ws_event = {
"type": "chat_response",
"user_id": user_id,
"room": session.room,
"message": full_response,
"occupants": self.sessions.get_room_occupants(session.room),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
await self._broadcast_to_user(session, ws_event)
# Deliver room events to other users' WS connections (non-destructive)
for other_session in self.sessions._sessions.values():
if other_session.user_id != user_id and other_session.room_events:
for event in other_session.room_events:
if event.get("from_user") == user_id:
await self._broadcast_to_user(other_session, event)
return web.json_response({
"response": full_response,
"user_id": user_id,
"room": session.room,
"crisis_detected": crisis_triggered,
"session_messages": len(session.message_history),
"room_occupants": self.sessions.get_room_occupants(session.room),
}, headers={
"X-RateLimit-Limit": str(self.rate_limiter._max_tokens),
"X-RateLimit-Remaining": str(self.rate_limiter.remaining(user_id)),
})
async def handle_ws(self, request: web.Request) -> web.WebSocketResponse:
"""WebSocket endpoint for real-time streaming per user."""
user_id = request.match_info["user_id"]
ws = web.WebSocketResponse()
await ws.prepare(request)
session = self.sessions.get_or_create(user_id)
session.ws_connections.append(ws)
logger.info(f"WS connected: {user_id} ({len(session.ws_connections)} connections)")
# Send welcome
await ws.send_json({
"type": "connected",
"user_id": user_id,
"room": session.room,
"occupants": self.sessions.get_room_occupants(session.room),
})
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
try:
data = json.loads(msg.data)
await self._handle_ws_message(session, data, ws)
except json.JSONDecodeError:
await ws.send_json({"error": "invalid JSON"})
elif msg.type in (WSMsgType.ERROR, WSMsgType.CLOSE):
break
finally:
session.ws_connections.remove(ws)
logger.info(f"WS disconnected: {user_id}")
return ws
async def _handle_ws_message(self, session: UserSession, data: dict, ws):
"""Handle incoming WS message from a user."""
msg_type = data.get("type", "chat")
if msg_type == "chat":
message = data.get("message", "")
if not message:
return
session.add_message("user", message)
crisis = session.crisis_state.check(message)
response = self._generate_response(session, message)
if crisis:
response = CRISIS_988_MESSAGE + "\n\n" + response
session.add_message("assistant", response)
await ws.send_json({
"type": "chat_response",
"message": response,
"crisis_detected": crisis,
"room": session.room,
"occupants": self.sessions.get_room_occupants(session.room),
})
elif msg_type == "move":
new_room = data.get("room", "")
if new_room and new_room != session.room:
self.sessions._room_occupants[session.room].discard(session.user_id)
session.room = new_room
self.sessions._room_occupants[new_room].add(session.user_id)
await ws.send_json({
"type": "room_changed",
"room": new_room,
"occupants": self.sessions.get_room_occupants(new_room),
})
def _generate_response(self, session: UserSession, message: str) -> str:
"""
Placeholder response generator.
Real implementation routes to AI model via Hermes/Evennia command adapter.
"""
msg_lower = message.lower().strip()
# MUD-like command handling
if msg_lower in ("look", "l"):
occupants = self.sessions.get_room_occupants(session.room)
others = [o for o in occupants if o != session.user_id]
others_str = ", ".join(others) if others else "no one else"
return f"You are in {session.room}. You see: {others_str}."
if msg_lower.startswith("say "):
speech = message[4:]
# Broadcast to other occupants in same room
occupants = self.sessions.get_room_occupants(session.room)
others = [o for o in occupants if o != session.user_id]
if others:
broadcast = {
"type": "room_broadcast",
"from_user": session.user_id,
"from_username": session.username,
"room": session.room,
"message": f'{session.username} says: "{speech}"',
}
for other_id in others:
other_session = self.sessions.get(other_id)
if other_session:
other_session.room_events.append(broadcast)
return f'You say: \"{speech}\"'
if msg_lower.startswith("go ") or msg_lower.startswith("move "):
# Move to a new room (HTTP equivalent of WS move)
parts = message.split(None, 1)
if len(parts) < 2 or not parts[1].strip():
return "Go where? Usage: go <room>"
new_room = parts[1].strip()
old_room = session.room
if new_room == old_room:
return f"You're already in {new_room}."
# Update room tracking
self.sessions._room_occupants[old_room].discard(session.user_id)
session.room = new_room
self.sessions._room_occupants[new_room].add(session.user_id)
# Notify occupants in old room
old_occupants = self.sessions.get_room_occupants(old_room)
for other_id in old_occupants:
other_session = self.sessions.get(other_id)
if other_session:
other_session.room_events.append({
"type": "room_broadcast",
"from_user": session.user_id,
"from_username": session.username,
"room": old_room,
"message": f"{session.username} leaves for {new_room}.",
})
return f"You leave {old_room} and arrive in {new_room}."
if msg_lower.startswith("emote ") or msg_lower.startswith("/me "):
# Emote — broadcast action to room
action = message.split(None, 1)[1] if len(message.split(None, 1)) > 1 else ""
if not action:
return "Emote what? Usage: emote <action>"
occupants = self.sessions.get_room_occupants(session.room)
others = [o for o in occupants if o != session.user_id]
for other_id in others:
other_session = self.sessions.get(other_id)
if other_session:
other_session.room_events.append({
"type": "room_broadcast",
"from_user": session.user_id,
"from_username": session.username,
"room": session.room,
"message": f"{session.username} {action}",
})
return f"You {action}"
if msg_lower == "who":
all_sessions = self.sessions.list_sessions()
lines = [f" {s['username']} ({s['room']}) — {s['command_count']} commands" for s in all_sessions]
return f"Online ({len(all_sessions)}):\n" + "\n".join(lines)
# Default echo with session context
history_len = len(session.message_history)
return f"[{session.user_id}@{session.room}] received: {message} (msg #{history_len})"
async def _broadcast_to_user(self, session: UserSession, event: dict):
"""Send event to all WS connections for a user."""
dead = []
for ws in session.ws_connections:
try:
await ws.send_json(event)
except Exception:
dead.append(ws)
for ws in dead:
session.ws_connections.remove(ws)
async def start(self):
"""Start the bridge server."""
app = self.create_app()
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
logger.info(f"Multi-user bridge listening on {self.host}:{self.port}")
return runner
def main():
import argparse
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
parser = argparse.ArgumentParser(description="Nexus Multi-User AI Bridge")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=4004)
args = parser.parse_args()
bridge = MultiUserBridge(host=args.host, port=args.port)
async def run():
runner = await bridge.start()
try:
while True:
await asyncio.sleep(3600)
except KeyboardInterrupt:
await runner.cleanup()
asyncio.run(run())
if __name__ == "__main__":
main()