diff --git a/tests/test_multi_user_bridge.py b/tests/test_multi_user_bridge.py new file mode 100644 index 00000000..bcf5ae84 --- /dev/null +++ b/tests/test_multi_user_bridge.py @@ -0,0 +1,369 @@ +"""Tests for multi_user_bridge.py — session isolation and core classes. + +Refs: #1503 — multi_user_bridge.py has zero test coverage +""" + +from __future__ import annotations + +import json +import threading +import time +from datetime import datetime +from unittest.mock import patch, MagicMock + +import pytest + +# Import the classes directly +import sys +sys.path.insert(0, "/tmp/b2p3") +from multi_user_bridge import ( + Plugin, + PluginRegistry, + ChatLog, + PresenceManager, +) + + +# ============================================================================ +# TEST: Plugin System +# ============================================================================ + +class TestPluginRegistry: + """Plugin registration and dispatch.""" + + def test_register_plugin(self): + reg = PluginRegistry() + + class TestPlugin(Plugin): + name = "test" + description = "A test plugin" + + p = TestPlugin() + reg.register(p) + assert reg.get("test") is p + + def test_unregister_plugin(self): + reg = PluginRegistry() + + class TestPlugin(Plugin): + name = "test" + + reg.register(TestPlugin()) + assert reg.unregister("test") + assert reg.get("test") is None + + def test_unregister_nonexistent(self): + reg = PluginRegistry() + assert not reg.unregister("nonexistent") + + def test_list_plugins(self): + reg = PluginRegistry() + + class P1(Plugin): + name = "p1" + class P2(Plugin): + name = "p2" + + reg.register(P1()) + reg.register(P2()) + names = [p["name"] for p in reg.list_plugins()] + assert "p1" in names + assert "p2" in names + + def test_fire_on_message_returns_override(self): + reg = PluginRegistry() + + class EchoPlugin(Plugin): + name = "echo" + def on_message(self, user_id, message, room): + return f"echo: {message}" + + reg.register(EchoPlugin()) + result = reg.fire_on_message("user1", "hello", "garden") + assert result == "echo: hello" + + def test_fire_on_message_returns_none_if_no_override(self): + reg = PluginRegistry() + + class PassivePlugin(Plugin): + name = "passive" + def on_message(self, user_id, message, room): + return None + + reg.register(PassivePlugin()) + result = reg.fire_on_message("user1", "hello", "garden") + assert result is None + + def test_thread_safe_registration(self): + reg = PluginRegistry() + errors = [] + + class TPlugin(Plugin): + name = "thread-test" + + def register_many(): + try: + for _ in range(100): + reg.register(TPlugin()) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=register_many) for _ in range(4)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert reg.get("thread-test") is not None + + +# ============================================================================ +# TEST: ChatLog — Session Isolation +# ============================================================================ + +class TestChatLogIsolation: + """Verify rooms have isolated chat histories.""" + + def test_rooms_are_isolated(self): + log = ChatLog(max_per_room=50) + + log.log("garden", "say", "Hello from garden", user_id="user1") + log.log("tower", "say", "Hello from tower", user_id="user2") + + garden_history = log.get_history("garden") + tower_history = log.get_history("tower") + + assert len(garden_history) == 1 + assert len(tower_history) == 1 + assert garden_history[0]["room"] == "garden" + assert tower_history[0]["room"] == "tower" + assert garden_history[0]["message"] != tower_history[0]["message"] + + def test_user_messages_dont_leak(self): + log = ChatLog() + + log.log("garden", "say", "Private message", user_id="user1") + log.log("garden", "say", "Public message", user_id="user2") + + # Both messages are in the same room (shared world) + history = log.get_history("garden") + assert len(history) == 2 + # But user_id is tracked per message + user1_msgs = [e for e in history if e["user_id"] == "user1"] + assert len(user1_msgs) == 1 + assert user1_msgs[0]["message"] == "Private message" + + def test_rolling_buffer_limits(self): + log = ChatLog(max_per_room=5) + + for i in range(10): + log.log("garden", "say", f"msg {i}") + + history = log.get_history("garden") + assert len(history) == 5 + assert history[0]["message"] == "msg 5" # oldest kept + assert history[-1]["message"] == "msg 9" # newest + + def test_get_history_with_limit(self): + log = ChatLog() + + for i in range(20): + log.log("garden", "say", f"msg {i}") + + history = log.get_history("garden", limit=5) + assert len(history) == 5 + assert history[-1]["message"] == "msg 19" + + def test_get_history_with_since(self): + log = ChatLog() + + log.log("garden", "say", "old message") + time.sleep(0.01) + cutoff = datetime.now().isoformat() + time.sleep(0.01) + log.log("garden", "say", "new message") + + history = log.get_history("garden", since=cutoff) + assert len(history) == 1 + assert history[0]["message"] == "new message" + + def test_get_all_rooms(self): + log = ChatLog() + + log.log("garden", "say", "msg1") + log.log("tower", "say", "msg2") + log.log("forge", "say", "msg3") + + rooms = log.get_all_rooms() + assert set(rooms) == {"garden", "tower", "forge"} + + def test_empty_room_returns_empty(self): + log = ChatLog() + assert log.get_history("nonexistent") == [] + + def test_thread_safe_logging(self): + log = ChatLog(max_per_room=500) + errors = [] + + def log_many(room, count): + try: + for i in range(count): + log.log(room, "say", f"{room} msg {i}") + except Exception as e: + errors.append(e) + + threads = [ + threading.Thread(target=log_many, args=("garden", 50)), + threading.Thread(target=log_many, args=("tower", 50)), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert len(log.get_history("garden")) == 50 + assert len(log.get_history("tower")) == 50 + + +# ============================================================================ +# TEST: PresenceManager +# ============================================================================ + +class TestPresenceManager: + """User presence tracking and room isolation.""" + + def test_enter_room(self): + pm = PresenceManager() + result = pm.enter_room("user1", "Alice", "garden") + assert result is not None + assert result["event"] == "enter" + assert result["username"] == "Alice" + + def test_leave_room(self): + pm = PresenceManager() + pm.enter_room("user1", "Alice", "garden") + result = pm.leave_room("user1", "garden") + assert result is not None + assert result["event"] == "leave" + + def test_leave_nonexistent(self): + pm = PresenceManager() + result = pm.leave_room("user1", "nonexistent") + assert result is None + + def test_get_room_users(self): + pm = PresenceManager() + pm.enter_room("user1", "Alice", "garden") + pm.enter_room("user2", "Bob", "garden") + pm.enter_room("user3", "Charlie", "tower") + + garden_players = pm.get_players_in_room("garden") + garden_ids = [p["user_id"] for p in garden_players] + assert "user1" in garden_ids + assert "user2" in garden_ids + assert "user3" not in garden_ids + + def test_presence_tracks_user_in_correct_room(self): + pm = PresenceManager() + pm.enter_room("user1", "Alice", "garden") + pm.enter_room("user2", "Bob", "tower") + + garden_players = pm.get_players_in_room("garden") + tower_players = pm.get_players_in_room("tower") + + garden_ids = [p["user_id"] for p in garden_players] + tower_ids = [p["user_id"] for p in tower_players] + + assert "user1" in garden_ids + assert "user1" not in tower_ids + assert "user2" in tower_ids + assert "user2" not in garden_ids + + def test_presence_isolation_between_rooms(self): + pm = PresenceManager() + pm.enter_room("user1", "Alice", "garden") + pm.enter_room("user2", "Bob", "tower") + + garden = pm.get_players_in_room("garden") + tower = pm.get_players_in_room("tower") + + garden_ids = [p["user_id"] for p in garden] + tower_ids = [p["user_id"] for p in tower] + + assert "user1" in garden_ids + assert "user1" not in tower_ids + assert "user2" in tower_ids + assert "user2" not in garden_ids + + def test_thread_safe_presence(self): + pm = PresenceManager() + errors = [] + + def enter_leave(user, room, count): + try: + for _ in range(count): + pm.enter_room(user, f"user-{user}", room) + pm.leave_room(user, room) + except Exception as e: + errors.append(e) + + threads = [ + threading.Thread(target=enter_leave, args=(f"u{i}", f"room-{i % 3}", 50)) + for i in range(10) + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + + +# ============================================================================ +# TEST: Concurrent Multi-User Simulation +# ============================================================================ + +class TestConcurrentUsers: + """Simulate multiple users interacting simultaneously.""" + + def test_concurrent_chat_isolation(self): + """Multiple users chatting in different rooms simultaneously. + Verifies rooms are isolated — messages don't cross room boundaries.""" + log = ChatLog(max_per_room=200) + pm = PresenceManager() + errors = [] + + def simulate_user(user_id, username, room, msg_count): + try: + pm.enter_room(user_id, username, room) + for i in range(msg_count): + log.log(room, "say", f"{username}: message {i}", user_id=user_id) + pm.leave_room(user_id, room) + except Exception as e: + errors.append(e) + + threads = [ + threading.Thread(target=simulate_user, args=("u1", "Alice", "garden", 20)), + threading.Thread(target=simulate_user, args=("u2", "Bob", "tower", 20)), + threading.Thread(target=simulate_user, args=("u3", "Diana", "garden", 20)), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + # Verify room isolation: garden has Alice+Diana, tower has only Bob + garden_history = log.get_history("garden") + tower_history = log.get_history("tower") + assert len(garden_history) >= 20 # At least 20 (file I/O may drop some) + assert len(tower_history) >= 15 + # Verify no cross-contamination + for entry in garden_history: + assert entry["room"] == "garden" + assert entry["user_id"] in ("u1", "u3") + for entry in tower_history: + assert entry["room"] == "tower" + assert entry["user_id"] == "u2"