- Per-user session state with isolated message history - Crisis detection with multi-turn 988 delivery tracking - HTTP POST /bridge/chat (curl-testable) + WebSocket per user - Room occupancy tracking across concurrent sessions - Session eviction when max capacity reached - Health and sessions list endpoints
447 lines
16 KiB
Python
447 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Multi-User AI Bridge for Nexus.
|
|
|
|
HTTP + WebSocket bridge that manages concurrent user sessions with full isolation.
|
|
Each user gets their own session state, message history, and AI routing.
|
|
|
|
Endpoints:
|
|
POST /bridge/chat — Send a chat message (curl-testable)
|
|
GET /bridge/sessions — List active sessions
|
|
GET /bridge/health — Health check
|
|
WS /bridge/ws/{user_id} — Real-time streaming per user
|
|
|
|
Session isolation:
|
|
- Each user_id gets independent message history (configurable window)
|
|
- Crisis detection runs per-session with multi-turn tracking
|
|
- Room state tracked per-user for multi-user world awareness
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import time
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
try:
|
|
from aiohttp import web, WSMsgType
|
|
except ImportError:
|
|
web = None
|
|
WSMsgType = None
|
|
|
|
logger = logging.getLogger("multi_user_bridge")
|
|
|
|
# ── Crisis Detection ──────────────────────────────────────────
|
|
|
|
CRISIS_PATTERNS = [
|
|
re.compile(r"\b(?:suicide|kill\s*(?:my)?self|end\s*(?:my\s*)?life)\b", re.I),
|
|
re.compile(r"\b(?:want\s*to\s*die|don'?t\s*want\s*to\s*(?:live|be\s*alive))\b", re.I),
|
|
re.compile(r"\b(?:self[\s-]?harm|cutting\s*(?:my)?self)\b", re.I),
|
|
]
|
|
|
|
CRISIS_988_MESSAGE = (
|
|
"If you're in crisis, please reach out:\n"
|
|
"• 988 Suicide & Crisis Lifeline: call or text 988 (US)\n"
|
|
"• Crisis Text Line: text HOME to 741741\n"
|
|
"• International: https://findahelpline.com/\n"
|
|
"You are not alone. Help is available right now."
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class CrisisState:
|
|
"""Tracks multi-turn crisis detection per session."""
|
|
turn_count: int = 0
|
|
first_flagged_at: Optional[float] = None
|
|
delivered_988: bool = False
|
|
flagged_messages: list[str] = field(default_factory=list)
|
|
|
|
CRISIS_TURN_WINDOW = 3 # consecutive turns before escalating
|
|
CRISIS_WINDOW_SECONDS = 300 # 5 minutes
|
|
|
|
def check(self, message: str) -> bool:
|
|
"""Returns True if 988 message should be delivered."""
|
|
is_crisis = any(p.search(message) for p in CRISIS_PATTERNS)
|
|
if not is_crisis:
|
|
self.turn_count = 0
|
|
self.first_flagged_at = None
|
|
return False
|
|
|
|
now = time.time()
|
|
self.turn_count += 1
|
|
self.flagged_messages.append(message[:200])
|
|
|
|
if self.first_flagged_at is None:
|
|
self.first_flagged_at = now
|
|
|
|
# Deliver 988 if: not yet delivered, within window, enough turns
|
|
if (
|
|
not self.delivered_988
|
|
and self.turn_count >= self.CRISIS_TURN_WINDOW
|
|
and (now - self.first_flagged_at) <= self.CRISIS_WINDOW_SECONDS
|
|
):
|
|
self.delivered_988 = True
|
|
return True
|
|
|
|
# Re-deliver if window expired and new crisis detected
|
|
if self.delivered_988 and (now - self.first_flagged_at) > self.CRISIS_WINDOW_SECONDS:
|
|
self.first_flagged_at = now
|
|
self.turn_count = 1
|
|
self.delivered_988 = True
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
# ── Session Management ────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class UserSession:
|
|
"""Isolated session state for a single user."""
|
|
user_id: str
|
|
username: str
|
|
room: str = "The Tower"
|
|
message_history: list[dict] = field(default_factory=list)
|
|
ws_connections: list = field(default_factory=list)
|
|
crisis_state: CrisisState = field(default_factory=CrisisState)
|
|
created_at: float = field(default_factory=time.time)
|
|
last_active: float = field(default_factory=time.time)
|
|
command_count: int = 0
|
|
|
|
def add_message(self, role: str, content: str) -> dict:
|
|
"""Add a message to this user's history."""
|
|
msg = {
|
|
"role": role,
|
|
"content": content,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"room": self.room,
|
|
}
|
|
self.message_history.append(msg)
|
|
self.last_active = time.time()
|
|
self.command_count += 1
|
|
return msg
|
|
|
|
def get_history(self, window: int = 20) -> list[dict]:
|
|
"""Return recent message history."""
|
|
return self.message_history[-window:]
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"user_id": self.user_id,
|
|
"username": self.username,
|
|
"room": self.room,
|
|
"message_count": len(self.message_history),
|
|
"command_count": self.command_count,
|
|
"connected_ws": len(self.ws_connections),
|
|
"created_at": datetime.fromtimestamp(self.created_at, tz=timezone.utc).isoformat(),
|
|
"last_active": datetime.fromtimestamp(self.last_active, tz=timezone.utc).isoformat(),
|
|
}
|
|
|
|
|
|
class SessionManager:
|
|
"""Manages isolated user sessions."""
|
|
|
|
def __init__(self, max_sessions: int = 100, history_window: int = 50):
|
|
self._sessions: dict[str, UserSession] = {}
|
|
self._max_sessions = max_sessions
|
|
self._history_window = history_window
|
|
self._room_occupants: dict[str, set[str]] = defaultdict(set)
|
|
|
|
def get_or_create(self, user_id: str, username: str = "", room: str = "") -> UserSession:
|
|
"""Get existing session or create new one."""
|
|
if user_id not in self._sessions:
|
|
if len(self._sessions) >= self._max_sessions:
|
|
self._evict_oldest()
|
|
|
|
session = UserSession(
|
|
user_id=user_id,
|
|
username=username or user_id,
|
|
room=room or "The Tower",
|
|
)
|
|
self._sessions[user_id] = session
|
|
self._room_occupants[session.room].add(user_id)
|
|
logger.info(f"Session created: {user_id} in room {session.room}")
|
|
else:
|
|
session = self._sessions[user_id]
|
|
session.username = username or session.username
|
|
if room and room != session.room:
|
|
self._room_occupants[session.room].discard(user_id)
|
|
session.room = room
|
|
self._room_occupants[room].add(user_id)
|
|
session.last_active = time.time()
|
|
|
|
return session
|
|
|
|
def get(self, user_id: str) -> Optional[UserSession]:
|
|
return self._sessions.get(user_id)
|
|
|
|
def remove(self, user_id: str) -> bool:
|
|
session = self._sessions.pop(user_id, None)
|
|
if session:
|
|
self._room_occupants[session.room].discard(user_id)
|
|
logger.info(f"Session removed: {user_id}")
|
|
return True
|
|
return False
|
|
|
|
def get_room_occupants(self, room: str) -> list[str]:
|
|
return list(self._room_occupants.get(room, set()))
|
|
|
|
def list_sessions(self) -> list[dict]:
|
|
return [s.to_dict() for s in self._sessions.values()]
|
|
|
|
def _evict_oldest(self):
|
|
if not self._sessions:
|
|
return
|
|
oldest = min(self._sessions.values(), key=lambda s: s.last_active)
|
|
self.remove(oldest.user_id)
|
|
|
|
@property
|
|
def active_count(self) -> int:
|
|
return len(self._sessions)
|
|
|
|
|
|
# ── Bridge Server ─────────────────────────────────────────────
|
|
|
|
class MultiUserBridge:
|
|
"""HTTP + WebSocket multi-user bridge."""
|
|
|
|
def __init__(self, host: str = "127.0.0.1", port: int = 4004):
|
|
self.host = host
|
|
self.port = port
|
|
self.sessions = SessionManager()
|
|
self._app: Optional[web.Application] = None
|
|
self._start_time = time.time()
|
|
|
|
def create_app(self) -> web.Application:
|
|
if web is None:
|
|
raise RuntimeError("aiohttp required: pip install aiohttp")
|
|
|
|
self._app = web.Application()
|
|
self._app.router.add_post("/bridge/chat", self.handle_chat)
|
|
self._app.router.add_get("/bridge/sessions", self.handle_sessions)
|
|
self._app.router.add_get("/bridge/health", self.handle_health)
|
|
self._app.router.add_get("/bridge/ws/{user_id}", self.handle_ws)
|
|
return self._app
|
|
|
|
async def handle_health(self, request: web.Request) -> web.Response:
|
|
uptime = time.time() - self._start_time
|
|
return web.json_response({
|
|
"status": "ok",
|
|
"uptime_seconds": round(uptime, 1),
|
|
"active_sessions": self.sessions.active_count,
|
|
})
|
|
|
|
async def handle_sessions(self, request: web.Request) -> web.Response:
|
|
return web.json_response({
|
|
"sessions": self.sessions.list_sessions(),
|
|
"total": self.sessions.active_count,
|
|
})
|
|
|
|
async def handle_chat(self, request: web.Request) -> web.Response:
|
|
"""
|
|
POST /bridge/chat
|
|
Body: {"user_id": "...", "username": "...", "message": "...", "room": "..."}
|
|
"""
|
|
try:
|
|
data = await request.json()
|
|
except Exception:
|
|
return web.json_response({"error": "invalid JSON"}, status=400)
|
|
|
|
user_id = data.get("user_id", "").strip()
|
|
message = data.get("message", "").strip()
|
|
username = data.get("username", user_id)
|
|
room = data.get("room", "")
|
|
|
|
if not user_id:
|
|
return web.json_response({"error": "user_id required"}, status=400)
|
|
if not message:
|
|
return web.json_response({"error": "message required"}, status=400)
|
|
|
|
session = self.sessions.get_or_create(user_id, username, room)
|
|
session.add_message("user", message)
|
|
|
|
# Crisis detection
|
|
crisis_triggered = session.crisis_state.check(message)
|
|
|
|
# Build response
|
|
response_parts = []
|
|
|
|
if crisis_triggered:
|
|
response_parts.append(CRISIS_988_MESSAGE)
|
|
|
|
# Generate echo response (placeholder — real AI routing goes here)
|
|
ai_response = self._generate_response(session, message)
|
|
response_parts.append(ai_response)
|
|
|
|
full_response = "\n\n".join(response_parts)
|
|
session.add_message("assistant", full_response)
|
|
|
|
# Broadcast to any WS connections
|
|
ws_event = {
|
|
"type": "chat_response",
|
|
"user_id": user_id,
|
|
"room": session.room,
|
|
"message": full_response,
|
|
"occupants": self.sessions.get_room_occupants(session.room),
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
await self._broadcast_to_user(session, ws_event)
|
|
|
|
return web.json_response({
|
|
"response": full_response,
|
|
"user_id": user_id,
|
|
"room": session.room,
|
|
"crisis_detected": crisis_triggered,
|
|
"session_messages": len(session.message_history),
|
|
"room_occupants": self.sessions.get_room_occupants(session.room),
|
|
})
|
|
|
|
async def handle_ws(self, request: web.Request) -> web.WebSocketResponse:
|
|
"""WebSocket endpoint for real-time streaming per user."""
|
|
user_id = request.match_info["user_id"]
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
|
|
session = self.sessions.get_or_create(user_id)
|
|
session.ws_connections.append(ws)
|
|
logger.info(f"WS connected: {user_id} ({len(session.ws_connections)} connections)")
|
|
|
|
# Send welcome
|
|
await ws.send_json({
|
|
"type": "connected",
|
|
"user_id": user_id,
|
|
"room": session.room,
|
|
"occupants": self.sessions.get_room_occupants(session.room),
|
|
})
|
|
|
|
try:
|
|
async for msg in ws:
|
|
if msg.type == WSMsgType.TEXT:
|
|
try:
|
|
data = json.loads(msg.data)
|
|
await self._handle_ws_message(session, data, ws)
|
|
except json.JSONDecodeError:
|
|
await ws.send_json({"error": "invalid JSON"})
|
|
elif msg.type in (WSMsgType.ERROR, WSMsgType.CLOSE):
|
|
break
|
|
finally:
|
|
session.ws_connections.remove(ws)
|
|
logger.info(f"WS disconnected: {user_id}")
|
|
|
|
return ws
|
|
|
|
async def _handle_ws_message(self, session: UserSession, data: dict, ws):
|
|
"""Handle incoming WS message from a user."""
|
|
msg_type = data.get("type", "chat")
|
|
|
|
if msg_type == "chat":
|
|
message = data.get("message", "")
|
|
if not message:
|
|
return
|
|
session.add_message("user", message)
|
|
crisis = session.crisis_state.check(message)
|
|
response = self._generate_response(session, message)
|
|
if crisis:
|
|
response = CRISIS_988_MESSAGE + "\n\n" + response
|
|
session.add_message("assistant", response)
|
|
await ws.send_json({
|
|
"type": "chat_response",
|
|
"message": response,
|
|
"crisis_detected": crisis,
|
|
"room": session.room,
|
|
"occupants": self.sessions.get_room_occupants(session.room),
|
|
})
|
|
elif msg_type == "move":
|
|
new_room = data.get("room", "")
|
|
if new_room and new_room != session.room:
|
|
self.sessions._room_occupants[session.room].discard(session.user_id)
|
|
session.room = new_room
|
|
self.sessions._room_occupants[new_room].add(session.user_id)
|
|
await ws.send_json({
|
|
"type": "room_changed",
|
|
"room": new_room,
|
|
"occupants": self.sessions.get_room_occupants(new_room),
|
|
})
|
|
|
|
def _generate_response(self, session: UserSession, message: str) -> str:
|
|
"""
|
|
Placeholder response generator.
|
|
Real implementation routes to AI model via Hermes/Evennia command adapter.
|
|
"""
|
|
msg_lower = message.lower().strip()
|
|
|
|
# MUD-like command handling
|
|
if msg_lower in ("look", "l"):
|
|
occupants = self.sessions.get_room_occupants(session.room)
|
|
others = [o for o in occupants if o != session.user_id]
|
|
others_str = ", ".join(others) if others else "no one else"
|
|
return f"You are in {session.room}. You see: {others_str}."
|
|
|
|
if msg_lower.startswith("say "):
|
|
speech = message[4:]
|
|
# Broadcast to room occupants (non-WS for now)
|
|
return f'You say: "{speech}"'
|
|
|
|
if msg_lower == "who":
|
|
all_sessions = self.sessions.list_sessions()
|
|
lines = [f" {s['username']} ({s['room']}) — {s['command_count']} commands" for s in all_sessions]
|
|
return f"Online ({len(all_sessions)}):\n" + "\n".join(lines)
|
|
|
|
# Default echo with session context
|
|
history_len = len(session.message_history)
|
|
return f"[{session.user_id}@{session.room}] received: {message} (msg #{history_len})"
|
|
|
|
async def _broadcast_to_user(self, session: UserSession, event: dict):
|
|
"""Send event to all WS connections for a user."""
|
|
dead = []
|
|
for ws in session.ws_connections:
|
|
try:
|
|
await ws.send_json(event)
|
|
except Exception:
|
|
dead.append(ws)
|
|
for ws in dead:
|
|
session.ws_connections.remove(ws)
|
|
|
|
async def start(self):
|
|
"""Start the bridge server."""
|
|
app = self.create_app()
|
|
runner = web.AppRunner(app)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, self.host, self.port)
|
|
await site.start()
|
|
logger.info(f"Multi-user bridge listening on {self.host}:{self.port}")
|
|
return runner
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
|
|
|
|
parser = argparse.ArgumentParser(description="Nexus Multi-User AI Bridge")
|
|
parser.add_argument("--host", default="127.0.0.1")
|
|
parser.add_argument("--port", type=int, default=4004)
|
|
args = parser.parse_args()
|
|
|
|
bridge = MultiUserBridge(host=args.host, port=args.port)
|
|
|
|
async def run():
|
|
runner = await bridge.start()
|
|
try:
|
|
while True:
|
|
await asyncio.sleep(3600)
|
|
except KeyboardInterrupt:
|
|
await runner.cleanup()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|