"""Tests for the multi-user AI bridge — session isolation, crisis detection, HTTP endpoints.""" import asyncio import json import time import pytest from nexus.multi_user_bridge import ( CRISIS_988_MESSAGE, CrisisState, MultiUserBridge, SessionManager, UserSession, ) # ── Session Isolation ───────────────────────────────────────── class TestSessionIsolation: def test_separate_users_have_independent_history(self): mgr = SessionManager() s1 = mgr.get_or_create("alice", "Alice", "Tower") s2 = mgr.get_or_create("bob", "Bob", "Tower") s1.add_message("user", "hello from alice") s2.add_message("user", "hello from bob") assert len(s1.message_history) == 1 assert len(s2.message_history) == 1 assert s1.message_history[0]["content"] == "hello from alice" assert s2.message_history[0]["content"] == "hello from bob" def test_same_user_reuses_session(self): mgr = SessionManager() s1 = mgr.get_or_create("alice", "Alice", "Tower") s1.add_message("user", "msg1") s2 = mgr.get_or_create("alice", "Alice", "Tower") s2.add_message("user", "msg2") assert s1 is s2 assert len(s1.message_history) == 2 def test_room_transitions_track_occupants(self): mgr = SessionManager() mgr.get_or_create("alice", "Alice", "Tower") mgr.get_or_create("bob", "Bob", "Tower") assert set(mgr.get_room_occupants("Tower")) == {"alice", "bob"} # Alice moves mgr.get_or_create("alice", "Alice", "Chapel") assert mgr.get_room_occupants("Tower") == ["bob"] assert mgr.get_room_occupants("Chapel") == ["alice"] def test_max_sessions_evicts_oldest(self): mgr = SessionManager(max_sessions=2) mgr.get_or_create("a", "A", "Tower") time.sleep(0.01) mgr.get_or_create("b", "B", "Tower") time.sleep(0.01) mgr.get_or_create("c", "C", "Tower") assert mgr.get("a") is None # evicted assert mgr.get("b") is not None assert mgr.get("c") is not None assert mgr.active_count == 2 def test_history_window(self): s = UserSession(user_id="test", username="Test") for i in range(30): s.add_message("user", f"msg{i}") assert len(s.message_history) == 30 recent = s.get_history(window=5) assert len(recent) == 5 assert recent[-1]["content"] == "msg29" def test_session_to_dict(self): s = UserSession(user_id="alice", username="Alice", room="Chapel") s.add_message("user", "hello") d = s.to_dict() assert d["user_id"] == "alice" assert d["username"] == "Alice" assert d["room"] == "Chapel" assert d["message_count"] == 1 assert d["command_count"] == 1 # ── Crisis Detection ────────────────────────────────────────── class TestCrisisDetection: def test_no_crisis_on_normal_messages(self): cs = CrisisState() assert cs.check("hello world") is False assert cs.check("how are you") is False def test_crisis_triggers_after_3_turns(self): cs = CrisisState() assert cs.check("I want to die") is False # turn 1 assert cs.check("I want to die") is False # turn 2 assert cs.check("I want to die") is True # turn 3 -> deliver 988 def test_crisis_resets_on_normal_message(self): cs = CrisisState() cs.check("I want to die") # turn 1 cs.check("actually never mind") # resets assert cs.turn_count == 0 assert cs.check("I want to die") is False # turn 1 again def test_crisis_delivers_once_per_window(self): cs = CrisisState() cs.check("I want to die") cs.check("I want to die") assert cs.check("I want to die") is True # delivered assert cs.check("I want to die") is False # already delivered def test_crisis_pattern_variations(self): cs = CrisisState() assert cs.check("I want to kill myself") is False # flagged, turn 1 assert cs.check("I want to kill myself") is False # turn 2 assert cs.check("I want to kill myself") is True # turn 3 def test_crisis_expired_window_redelivers(self): cs = CrisisState() cs.CRISIS_WINDOW_SECONDS = 0.1 cs.check("I want to die") cs.check("I want to die") assert cs.check("I want to die") is True time.sleep(0.15) # New window — should redeliver after 1 turn since window expired assert cs.check("I want to die") is True def test_self_harm_pattern(self): cs = CrisisState() # Note: "self-harming" doesn't match (has trailing "ing"), "self-harm" does assert cs.check("I've been doing self-harm") is False # turn 1 assert cs.check("self harm is getting worse") is False # turn 2 assert cs.check("I can't stop self-harm") is True # turn 3 # ── HTTP Endpoint Tests (requires aiohttp test client) ──────── @pytest.fixture async def bridge_app(): bridge = MultiUserBridge() app = bridge.create_app() yield app, bridge @pytest.fixture async def client(bridge_app): from aiohttp.test_utils import TestClient, TestServer app, bridge = bridge_app async with TestClient(TestServer(app)) as client: yield client, bridge class TestHTTPEndpoints: @pytest.mark.asyncio async def test_health_endpoint(self, client): c, bridge = client resp = await c.get("/bridge/health") data = await resp.json() assert data["status"] == "ok" assert data["active_sessions"] == 0 @pytest.mark.asyncio async def test_chat_creates_session(self, client): c, bridge = client resp = await c.post("/bridge/chat", json={ "user_id": "alice", "username": "Alice", "message": "hello", "room": "Tower", }) data = await resp.json() assert "response" in data assert data["user_id"] == "alice" assert data["room"] == "Tower" assert data["session_messages"] == 2 # user + assistant @pytest.mark.asyncio async def test_chat_missing_user_id(self, client): c, _ = client resp = await c.post("/bridge/chat", json={"message": "hello"}) assert resp.status == 400 @pytest.mark.asyncio async def test_chat_missing_message(self, client): c, _ = client resp = await c.post("/bridge/chat", json={"user_id": "alice"}) assert resp.status == 400 @pytest.mark.asyncio async def test_sessions_list(self, client): c, _ = client await c.post("/bridge/chat", json={ "user_id": "alice", "message": "hi", "room": "Tower" }) await c.post("/bridge/chat", json={ "user_id": "bob", "message": "hey", "room": "Chapel" }) resp = await c.get("/bridge/sessions") data = await resp.json() assert data["total"] == 2 user_ids = {s["user_id"] for s in data["sessions"]} assert user_ids == {"alice", "bob"} @pytest.mark.asyncio async def test_look_command_returns_occupants(self, client): c, _ = client await c.post("/bridge/chat", json={ "user_id": "alice", "message": "hi", "room": "Tower" }) await c.post("/bridge/chat", json={ "user_id": "bob", "message": "hey", "room": "Tower" }) resp = await c.post("/bridge/chat", json={ "user_id": "alice", "message": "look", "room": "Tower" }) data = await resp.json() assert "bob" in data["response"].lower() or "bob" in str(data.get("room_occupants", [])) @pytest.mark.asyncio async def test_room_occupants_tracked(self, client): c, _ = client await c.post("/bridge/chat", json={ "user_id": "alice", "message": "hi", "room": "Tower" }) await c.post("/bridge/chat", json={ "user_id": "bob", "message": "hey", "room": "Tower" }) resp = await c.post("/bridge/chat", json={ "user_id": "alice", "message": "look", "room": "Tower" }) data = await resp.json() assert set(data["room_occupants"]) == {"alice", "bob"} @pytest.mark.asyncio async def test_crisis_detection_returns_flag(self, client): c, _ = client for i in range(3): resp = await c.post("/bridge/chat", json={ "user_id": "user1", "message": "I want to die", }) data = await resp.json() assert data["crisis_detected"] is True assert "988" in data["response"] @pytest.mark.asyncio async def test_concurrent_users_independent_responses(self, client): c, _ = client r1 = await c.post("/bridge/chat", json={ "user_id": "alice", "message": "I love cats" }) r2 = await c.post("/bridge/chat", json={ "user_id": "bob", "message": "I love dogs" }) d1 = await r1.json() d2 = await r2.json() # Each user's response references their own message assert "cats" in d1["response"].lower() or d1["user_id"] == "alice" assert "dogs" in d2["response"].lower() or d2["user_id"] == "bob" assert d1["user_id"] != d2["user_id"]