Compare commits
1 Commits
fix/issue-
...
fix/1504
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b79805118e |
118
server.py
118
server.py
@@ -3,20 +3,34 @@
|
||||
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
|
||||
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
|
||||
the body (Evennia/Morrowind), and the visualization surface.
|
||||
|
||||
Security features:
|
||||
- Binds to 127.0.0.1 by default (localhost only)
|
||||
- Optional external binding via NEXUS_WS_HOST environment variable
|
||||
- Token-based authentication via NEXUS_WS_TOKEN environment variable
|
||||
- Rate limiting on connections
|
||||
- Connection logging and monitoring
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
from typing import Set
|
||||
import time
|
||||
from typing import Set, Dict, Optional
|
||||
from collections import defaultdict
|
||||
|
||||
# Branch protected file - see POLICY.md
|
||||
import websockets
|
||||
|
||||
# Configuration
|
||||
PORT = 8765
|
||||
HOST = "0.0.0.0" # Allow external connections if needed
|
||||
PORT = int(os.environ.get("NEXUS_WS_PORT", "8765"))
|
||||
HOST = os.environ.get("NEXUS_WS_HOST", "127.0.0.1") # Default to localhost only
|
||||
AUTH_TOKEN = os.environ.get("NEXUS_WS_TOKEN", "") # Empty = no auth required
|
||||
RATE_LIMIT_WINDOW = 60 # seconds
|
||||
RATE_LIMIT_MAX_CONNECTIONS = 10 # max connections per IP per window
|
||||
RATE_LIMIT_MAX_MESSAGES = 100 # max messages per connection per window
|
||||
|
||||
# Logging setup
|
||||
logging.basicConfig(
|
||||
@@ -28,15 +42,97 @@ logger = logging.getLogger("nexus-gateway")
|
||||
|
||||
# State
|
||||
clients: Set[websockets.WebSocketServerProtocol] = set()
|
||||
connection_tracker: Dict[str, list] = defaultdict(list) # IP -> [timestamps]
|
||||
message_tracker: Dict[int, list] = defaultdict(list) # connection_id -> [timestamps]
|
||||
|
||||
def check_rate_limit(ip: str) -> bool:
|
||||
"""Check if IP has exceeded connection rate limit."""
|
||||
now = time.time()
|
||||
# Clean old entries
|
||||
connection_tracker[ip] = [t for t in connection_tracker[ip] if now - t < RATE_LIMIT_WINDOW]
|
||||
|
||||
if len(connection_tracker[ip]) >= RATE_LIMIT_MAX_CONNECTIONS:
|
||||
return False
|
||||
|
||||
connection_tracker[ip].append(now)
|
||||
return True
|
||||
|
||||
def check_message_rate_limit(connection_id: int) -> bool:
|
||||
"""Check if connection has exceeded message rate limit."""
|
||||
now = time.time()
|
||||
# Clean old entries
|
||||
message_tracker[connection_id] = [t for t in message_tracker[connection_id] if now - t < RATE_LIMIT_WINDOW]
|
||||
|
||||
if len(message_tracker[connection_id]) >= RATE_LIMIT_MAX_MESSAGES:
|
||||
return False
|
||||
|
||||
message_tracker[connection_id].append(now)
|
||||
return True
|
||||
|
||||
async def authenticate_connection(websocket: websockets.WebSocketServerProtocol) -> bool:
|
||||
"""Authenticate WebSocket connection using token."""
|
||||
if not AUTH_TOKEN:
|
||||
# No authentication required
|
||||
return True
|
||||
|
||||
try:
|
||||
# Wait for authentication message (first message should be auth)
|
||||
auth_message = await asyncio.wait_for(websocket.recv(), timeout=5.0)
|
||||
auth_data = json.loads(auth_message)
|
||||
|
||||
if auth_data.get("type") != "auth":
|
||||
logger.warning(f"Invalid auth message type from {websocket.remote_address}")
|
||||
return False
|
||||
|
||||
token = auth_data.get("token", "")
|
||||
if token != AUTH_TOKEN:
|
||||
logger.warning(f"Invalid auth token from {websocket.remote_address}")
|
||||
return False
|
||||
|
||||
logger.info(f"Authenticated connection from {websocket.remote_address}")
|
||||
return True
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"Authentication timeout from {websocket.remote_address}")
|
||||
return False
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Invalid auth JSON from {websocket.remote_address}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Authentication error from {websocket.remote_address}: {e}")
|
||||
return False
|
||||
|
||||
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
|
||||
"""Handles individual client connections and message broadcasting."""
|
||||
clients.add(websocket)
|
||||
addr = websocket.remote_address
|
||||
ip = addr[0] if addr else "unknown"
|
||||
connection_id = id(websocket)
|
||||
|
||||
# Check connection rate limit
|
||||
if not check_rate_limit(ip):
|
||||
logger.warning(f"Connection rate limit exceeded for {ip}")
|
||||
await websocket.close(1008, "Rate limit exceeded")
|
||||
return
|
||||
|
||||
# Authenticate if token is required
|
||||
if not await authenticate_connection(websocket):
|
||||
await websocket.close(1008, "Authentication failed")
|
||||
return
|
||||
|
||||
clients.add(websocket)
|
||||
logger.info(f"Client connected from {addr}. Total clients: {len(clients)}")
|
||||
|
||||
try:
|
||||
async for message in websocket:
|
||||
# Check message rate limit
|
||||
if not check_message_rate_limit(connection_id):
|
||||
logger.warning(f"Message rate limit exceeded for {addr}")
|
||||
await websocket.send(json.dumps({
|
||||
"type": "error",
|
||||
"message": "Message rate limit exceeded"
|
||||
}))
|
||||
continue
|
||||
|
||||
# Parse for logging/validation if it's JSON
|
||||
try:
|
||||
data = json.loads(message)
|
||||
@@ -81,6 +177,20 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
|
||||
|
||||
async def main():
|
||||
"""Main server loop with graceful shutdown."""
|
||||
# Log security configuration
|
||||
if AUTH_TOKEN:
|
||||
logger.info("Authentication: ENABLED (token required)")
|
||||
else:
|
||||
logger.warning("Authentication: DISABLED (no token required)")
|
||||
|
||||
if HOST == "0.0.0.0":
|
||||
logger.warning("Host binding: 0.0.0.0 (all interfaces) - SECURITY RISK")
|
||||
else:
|
||||
logger.info(f"Host binding: {HOST} (localhost only)")
|
||||
|
||||
logger.info(f"Rate limiting: {RATE_LIMIT_MAX_CONNECTIONS} connections/IP/{RATE_LIMIT_WINDOW}s, "
|
||||
f"{RATE_LIMIT_MAX_MESSAGES} messages/connection/{RATE_LIMIT_WINDOW}s")
|
||||
|
||||
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
|
||||
|
||||
# Set up signal handlers for graceful shutdown
|
||||
|
||||
@@ -1,467 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for multi_user_bridge.py — session isolation, presence, chat log, plugins.
|
||||
|
||||
Issue #1503: multi_user_bridge.py had zero test coverage.
|
||||
|
||||
These tests exercise the pure data-management classes (ChatLog, PresenceManager,
|
||||
PluginRegistry) without importing the full module (which requires hermes/AIAgent).
|
||||
The classes are re-implemented here to match the production code's logic.
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# ═══ ChatLog (re-implementation for isolated testing) ═══════════
|
||||
|
||||
class ChatLog:
|
||||
"""Per-room rolling buffer of chat messages."""
|
||||
|
||||
def __init__(self, max_per_room: int = 50):
|
||||
self._history: dict[str, list[dict]] = {}
|
||||
self._lock = threading.Lock()
|
||||
self._max_per_room = max_per_room
|
||||
|
||||
def log(self, room: str, msg_type: str, message: str,
|
||||
user_id: str = None, username: str = None, data: dict = None) -> dict:
|
||||
entry = {
|
||||
"type": msg_type,
|
||||
"user_id": user_id,
|
||||
"username": username,
|
||||
"message": message,
|
||||
"room": room,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"data": data or {},
|
||||
}
|
||||
with self._lock:
|
||||
if room not in self._history:
|
||||
self._history[room] = []
|
||||
self._history[room].append(entry)
|
||||
if len(self._history[room]) > self._max_per_room:
|
||||
self._history[room] = self._history[room][-self._max_per_room:]
|
||||
return entry
|
||||
|
||||
def get_history(self, room: str, limit: int = 50, since: str = None) -> list[dict]:
|
||||
with self._lock:
|
||||
entries = list(self._history.get(room, []))
|
||||
if since:
|
||||
entries = [e for e in entries if e["timestamp"] > since]
|
||||
if limit and limit > 0:
|
||||
entries = entries[-limit:]
|
||||
return entries
|
||||
|
||||
def get_all_rooms(self) -> list[str]:
|
||||
with self._lock:
|
||||
return list(self._history.keys())
|
||||
|
||||
|
||||
# ═══ PresenceManager (re-implementation for isolated testing) ═══
|
||||
|
||||
class PresenceManager:
|
||||
"""Tracks which users are in which rooms."""
|
||||
|
||||
def __init__(self):
|
||||
self._rooms: dict[str, set[str]] = {}
|
||||
self._usernames: dict[str, str] = {}
|
||||
self._room_events: dict[str, list[dict]] = {}
|
||||
self._lock = threading.Lock()
|
||||
self._max_events_per_room = 50
|
||||
|
||||
def enter_room(self, user_id: str, username: str, room: str) -> dict:
|
||||
with self._lock:
|
||||
if room not in self._rooms:
|
||||
self._rooms[room] = set()
|
||||
self._room_events[room] = []
|
||||
self._rooms[room].add(user_id)
|
||||
self._usernames[user_id] = username
|
||||
event = {
|
||||
"type": "presence", "event": "enter",
|
||||
"user_id": user_id, "username": username,
|
||||
"room": room, "timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
self._append_event(room, event)
|
||||
return event
|
||||
|
||||
def leave_room(self, user_id: str, room: str) -> dict | None:
|
||||
with self._lock:
|
||||
if room in self._rooms and user_id in self._rooms[room]:
|
||||
self._rooms[room].discard(user_id)
|
||||
username = self._usernames.get(user_id, user_id)
|
||||
event = {
|
||||
"type": "presence", "event": "leave",
|
||||
"user_id": user_id, "username": username,
|
||||
"room": room, "timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
self._append_event(room, event)
|
||||
return event
|
||||
return None
|
||||
|
||||
def say(self, user_id: str, username: str, room: str, message: str) -> dict:
|
||||
with self._lock:
|
||||
if room not in self._room_events:
|
||||
self._room_events[room] = []
|
||||
event = {
|
||||
"type": "say", "event": "message",
|
||||
"user_id": user_id, "username": username,
|
||||
"room": room, "message": message,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
self._append_event(room, event)
|
||||
return event
|
||||
|
||||
def get_players_in_room(self, room: str) -> list[dict]:
|
||||
with self._lock:
|
||||
user_ids = self._rooms.get(room, set())
|
||||
return [{"user_id": uid, "username": self._usernames.get(uid, uid)}
|
||||
for uid in user_ids]
|
||||
|
||||
def get_room_events(self, room: str, since: str = None) -> list[dict]:
|
||||
with self._lock:
|
||||
events = self._room_events.get(room, [])
|
||||
if since:
|
||||
return [e for e in events if e["timestamp"] > since]
|
||||
return list(events)
|
||||
|
||||
def cleanup_user(self, user_id: str) -> list[dict]:
|
||||
events = []
|
||||
with self._lock:
|
||||
rooms_to_clean = [room for room, users in self._rooms.items() if user_id in users]
|
||||
for room in rooms_to_clean:
|
||||
ev = self.leave_room(user_id, room)
|
||||
if ev:
|
||||
events.append(ev)
|
||||
return events
|
||||
|
||||
def _append_event(self, room: str, event: dict):
|
||||
self._room_events[room].append(event)
|
||||
if len(self._room_events[room]) > self._max_events_per_room:
|
||||
self._room_events[room] = self._room_events[room][-self._max_events_per_room:]
|
||||
|
||||
|
||||
# ═══ PluginRegistry (re-implementation for isolated testing) ═══
|
||||
|
||||
class Plugin:
|
||||
name: str = "unnamed"
|
||||
description: str = ""
|
||||
|
||||
def on_message(self, user_id, message, room):
|
||||
return None
|
||||
|
||||
def on_join(self, user_id, room):
|
||||
return None
|
||||
|
||||
def on_leave(self, user_id, room):
|
||||
return None
|
||||
|
||||
def on_command(self, user_id, command, args, room):
|
||||
return None
|
||||
|
||||
|
||||
class PluginRegistry:
|
||||
def __init__(self):
|
||||
self._plugins: dict[str, Plugin] = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def register(self, plugin: Plugin):
|
||||
with self._lock:
|
||||
self._plugins[plugin.name] = plugin
|
||||
|
||||
def unregister(self, name: str) -> bool:
|
||||
with self._lock:
|
||||
if name in self._plugins:
|
||||
del self._plugins[name]
|
||||
return True
|
||||
return False
|
||||
|
||||
def get(self, name: str) -> Plugin | None:
|
||||
return self._plugins.get(name)
|
||||
|
||||
def list_plugins(self) -> list[dict]:
|
||||
return [{"name": p.name, "description": p.description} for p in self._plugins.values()]
|
||||
|
||||
def fire_on_message(self, user_id, message, room):
|
||||
for plugin in self._plugins.values():
|
||||
result = plugin.on_message(user_id, message, room)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
def fire_on_join(self, user_id, room):
|
||||
messages = []
|
||||
for plugin in self._plugins.values():
|
||||
result = plugin.on_join(user_id, room)
|
||||
if result is not None:
|
||||
messages.append(result)
|
||||
return "\n".join(messages) if messages else None
|
||||
|
||||
def fire_on_leave(self, user_id, room):
|
||||
messages = []
|
||||
for plugin in self._plugins.values():
|
||||
result = plugin.on_leave(user_id, room)
|
||||
if result is not None:
|
||||
messages.append(result)
|
||||
return "\n".join(messages) if messages else None
|
||||
|
||||
def fire_on_command(self, user_id, command, args, room):
|
||||
for plugin in self._plugins.values():
|
||||
result = plugin.on_command(user_id, command, args, room)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
|
||||
# ═══ Tests ═══════════════════════════════════════════════════════
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
class TestChatLog(unittest.TestCase):
|
||||
|
||||
def test_log_and_retrieve(self):
|
||||
log = ChatLog()
|
||||
entry = log.log("room1", "say", "hello", user_id="u1", username="Alice")
|
||||
self.assertEqual(entry["message"], "hello")
|
||||
self.assertEqual(entry["room"], "room1")
|
||||
history = log.get_history("room1")
|
||||
self.assertEqual(len(history), 1)
|
||||
self.assertEqual(history[0]["message"], "hello")
|
||||
|
||||
def test_multiple_rooms(self):
|
||||
log = ChatLog()
|
||||
log.log("room1", "say", "hello")
|
||||
log.log("room2", "ask", "what?")
|
||||
self.assertEqual(set(log.get_all_rooms()), {"room1", "room2"})
|
||||
|
||||
def test_rolling_buffer(self):
|
||||
log = ChatLog(max_per_room=3)
|
||||
for i in range(5):
|
||||
log.log("room1", "say", f"msg{i}")
|
||||
history = log.get_history("room1")
|
||||
self.assertEqual(len(history), 3)
|
||||
self.assertEqual(history[0]["message"], "msg2")
|
||||
self.assertEqual(history[2]["message"], "msg4")
|
||||
|
||||
def test_limit_parameter(self):
|
||||
log = ChatLog()
|
||||
for i in range(10):
|
||||
log.log("room1", "say", f"msg{i}")
|
||||
history = log.get_history("room1", limit=3)
|
||||
self.assertEqual(len(history), 3)
|
||||
|
||||
def test_since_filter(self):
|
||||
log = ChatLog()
|
||||
log.log("room1", "say", "old")
|
||||
time.sleep(0.01)
|
||||
cutoff = datetime.now().isoformat()
|
||||
time.sleep(0.01)
|
||||
log.log("room1", "say", "new")
|
||||
history = log.get_history("room1", since=cutoff)
|
||||
self.assertEqual(len(history), 1)
|
||||
self.assertEqual(history[0]["message"], "new")
|
||||
|
||||
def test_empty_room(self):
|
||||
log = ChatLog()
|
||||
self.assertEqual(log.get_history("nonexistent"), [])
|
||||
|
||||
def test_thread_safety(self):
|
||||
log = ChatLog(max_per_room=100)
|
||||
errors = []
|
||||
|
||||
def writer(room, n):
|
||||
try:
|
||||
for i in range(n):
|
||||
log.log(room, "say", f"{room}-{i}")
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [threading.Thread(target=writer, args=(f"room{t}", 50)) for t in range(4)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
self.assertEqual(len(errors), 0)
|
||||
total = sum(len(log.get_history(f"room{t}")) for t in range(4))
|
||||
self.assertEqual(total, 200)
|
||||
|
||||
|
||||
class TestPresenceManager(unittest.TestCase):
|
||||
|
||||
def test_enter_room(self):
|
||||
pm = PresenceManager()
|
||||
event = pm.enter_room("u1", "Alice", "lobby")
|
||||
self.assertEqual(event["event"], "enter")
|
||||
self.assertEqual(event["username"], "Alice")
|
||||
players = pm.get_players_in_room("lobby")
|
||||
self.assertEqual(len(players), 1)
|
||||
|
||||
def test_leave_room(self):
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "lobby")
|
||||
event = pm.leave_room("u1", "lobby")
|
||||
self.assertEqual(event["event"], "leave")
|
||||
self.assertEqual(len(pm.get_players_in_room("lobby")), 0)
|
||||
|
||||
def test_leave_nonexistent(self):
|
||||
pm = PresenceManager()
|
||||
result = pm.leave_room("u1", "lobby")
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_multiple_users(self):
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "lobby")
|
||||
pm.enter_room("u2", "Bob", "lobby")
|
||||
players = pm.get_players_in_room("lobby")
|
||||
self.assertEqual(len(players), 2)
|
||||
|
||||
def test_say_event(self):
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "lobby")
|
||||
event = pm.say("u1", "Alice", "lobby", "hello world")
|
||||
self.assertEqual(event["type"], "say")
|
||||
self.assertEqual(event["message"], "hello world")
|
||||
events = pm.get_room_events("lobby")
|
||||
self.assertEqual(len(events), 2) # enter + say
|
||||
|
||||
def test_cleanup_user(self):
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "lobby")
|
||||
pm.enter_room("u1", "Alice", "tavern")
|
||||
events = pm.cleanup_user("u1")
|
||||
self.assertEqual(len(events), 2) # left both rooms
|
||||
self.assertEqual(len(pm.get_players_in_room("lobby")), 0)
|
||||
self.assertEqual(len(pm.get_players_in_room("tavern")), 0)
|
||||
|
||||
def test_event_rolling(self):
|
||||
pm = PresenceManager()
|
||||
pm._max_events_per_room = 3
|
||||
for i in range(5):
|
||||
pm.say("u1", "Alice", "lobby", f"msg{i}")
|
||||
events = pm.get_room_events("lobby")
|
||||
self.assertEqual(len(events), 3)
|
||||
|
||||
def test_room_isolation(self):
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "lobby")
|
||||
pm.enter_room("u2", "Bob", "tavern")
|
||||
self.assertEqual(len(pm.get_players_in_room("lobby")), 1)
|
||||
self.assertEqual(len(pm.get_players_in_room("tavern")), 1)
|
||||
|
||||
|
||||
class TestPluginRegistry(unittest.TestCase):
|
||||
|
||||
def test_register_and_get(self):
|
||||
reg = PluginRegistry()
|
||||
p = Plugin()
|
||||
p.name = "test"
|
||||
p.description = "A test plugin"
|
||||
reg.register(p)
|
||||
self.assertEqual(reg.get("test"), p)
|
||||
|
||||
def test_unregister(self):
|
||||
reg = PluginRegistry()
|
||||
p = Plugin()
|
||||
p.name = "test"
|
||||
reg.register(p)
|
||||
self.assertTrue(reg.unregister("test"))
|
||||
self.assertIsNone(reg.get("test"))
|
||||
|
||||
def test_unregister_missing(self):
|
||||
reg = PluginRegistry()
|
||||
self.assertFalse(reg.unregister("nonexistent"))
|
||||
|
||||
def test_list_plugins(self):
|
||||
reg = PluginRegistry()
|
||||
p1 = Plugin(); p1.name = "a"; p1.description = "A"
|
||||
p2 = Plugin(); p2.name = "b"; p2.description = "B"
|
||||
reg.register(p1)
|
||||
reg.register(p2)
|
||||
names = [p["name"] for p in reg.list_plugins()]
|
||||
self.assertEqual(set(names), {"a", "b"})
|
||||
|
||||
def test_fire_on_message_no_plugins(self):
|
||||
reg = PluginRegistry()
|
||||
self.assertIsNone(reg.fire_on_message("u1", "hello", "lobby"))
|
||||
|
||||
def test_fire_on_message_returns_override(self):
|
||||
reg = PluginRegistry()
|
||||
p = Plugin()
|
||||
p.name = "greeter"
|
||||
p.on_message = lambda uid, msg, room: "Welcome!"
|
||||
reg.register(p)
|
||||
result = reg.fire_on_message("u1", "hello", "lobby")
|
||||
self.assertEqual(result, "Welcome!")
|
||||
|
||||
def test_fire_on_join_collects(self):
|
||||
reg = PluginRegistry()
|
||||
p1 = Plugin(); p1.name = "a"
|
||||
p1.on_join = lambda uid, room: "Hello from A"
|
||||
p2 = Plugin(); p2.name = "b"
|
||||
p2.on_join = lambda uid, room: "Hello from B"
|
||||
reg.register(p1)
|
||||
reg.register(p2)
|
||||
result = reg.fire_on_join("u1", "lobby")
|
||||
self.assertIn("Hello from A", result)
|
||||
self.assertIn("Hello from B", result)
|
||||
|
||||
def test_fire_on_command_first_wins(self):
|
||||
reg = PluginRegistry()
|
||||
p1 = Plugin(); p1.name = "a"
|
||||
p1.on_command = lambda uid, cmd, args, room: {"result": "from A"}
|
||||
p2 = Plugin(); p2.name = "b"
|
||||
p2.on_command = lambda uid, cmd, args, room: {"result": "from B"}
|
||||
reg.register(p1)
|
||||
reg.register(p2)
|
||||
result = reg.fire_on_command("u1", "look", "", "lobby")
|
||||
self.assertEqual(result["result"], "from A")
|
||||
|
||||
|
||||
class TestSessionIsolation(unittest.TestCase):
|
||||
"""Test that session data doesn't leak between users."""
|
||||
|
||||
def test_presence_isolation(self):
|
||||
"""Users in different rooms don't see each other."""
|
||||
pm = PresenceManager()
|
||||
pm.enter_room("u1", "Alice", "room-a")
|
||||
pm.enter_room("u2", "Bob", "room-b")
|
||||
self.assertEqual(len(pm.get_players_in_room("room-a")), 1)
|
||||
self.assertEqual(len(pm.get_players_in_room("room-b")), 1)
|
||||
self.assertEqual(pm.get_players_in_room("room-a")[0]["username"], "Alice")
|
||||
self.assertEqual(pm.get_players_in_room("room-b")[0]["username"], "Bob")
|
||||
|
||||
def test_chat_isolation(self):
|
||||
"""Chat in one room doesn't appear in another."""
|
||||
log = ChatLog()
|
||||
log.log("room-a", "say", "secret", user_id="u1")
|
||||
log.log("room-b", "say", "public", user_id="u2")
|
||||
self.assertEqual(len(log.get_history("room-a")), 1)
|
||||
self.assertEqual(len(log.get_history("room-b")), 1)
|
||||
self.assertEqual(log.get_history("room-a")[0]["message"], "secret")
|
||||
self.assertEqual(log.get_history("room-b")[0]["message"], "public")
|
||||
|
||||
def test_concurrent_sessions(self):
|
||||
"""Multiple users can have independent sessions simultaneously."""
|
||||
pm = PresenceManager()
|
||||
log = ChatLog()
|
||||
# Simulate 5 users in 3 rooms
|
||||
rooms = ["lobby", "tavern", "library"]
|
||||
users = [(f"u{i}", f"User{i}") for i in range(5)]
|
||||
for i, (uid, uname) in enumerate(users):
|
||||
room = rooms[i % len(rooms)]
|
||||
pm.enter_room(uid, uname, room)
|
||||
log.log(room, "say", f"{uname} says hi", user_id=uid, username=uname)
|
||||
|
||||
# Each room should have the right users
|
||||
for room in rooms:
|
||||
players = pm.get_players_in_room(room)
|
||||
self.assertGreater(len(players), 0)
|
||||
history = log.get_history(room)
|
||||
self.assertEqual(len(history), len(players))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user