- say <message> now queues room_broadcast events on other sessions
- New GET /bridge/room_events/{user_id} endpoint (drain-on-read)
- WS connections receive real-time room broadcasts
- 5 new tests: broadcast, no-echo, room isolation, drain, 404
- Total tests: 27 (all passing)
367 lines
13 KiB
Python
367 lines
13 KiB
Python
"""Tests for the multi-user AI bridge — session isolation, crisis detection, HTTP endpoints."""
|
|
|
|
import asyncio
|
|
import json
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from nexus.multi_user_bridge import (
|
|
CRISIS_988_MESSAGE,
|
|
CrisisState,
|
|
MultiUserBridge,
|
|
SessionManager,
|
|
UserSession,
|
|
)
|
|
|
|
|
|
# ── Session Isolation ─────────────────────────────────────────
|
|
|
|
class TestSessionIsolation:
|
|
|
|
def test_separate_users_have_independent_history(self):
|
|
mgr = SessionManager()
|
|
s1 = mgr.get_or_create("alice", "Alice", "Tower")
|
|
s2 = mgr.get_or_create("bob", "Bob", "Tower")
|
|
|
|
s1.add_message("user", "hello from alice")
|
|
s2.add_message("user", "hello from bob")
|
|
|
|
assert len(s1.message_history) == 1
|
|
assert len(s2.message_history) == 1
|
|
assert s1.message_history[0]["content"] == "hello from alice"
|
|
assert s2.message_history[0]["content"] == "hello from bob"
|
|
|
|
def test_same_user_reuses_session(self):
|
|
mgr = SessionManager()
|
|
s1 = mgr.get_or_create("alice", "Alice", "Tower")
|
|
s1.add_message("user", "msg1")
|
|
s2 = mgr.get_or_create("alice", "Alice", "Tower")
|
|
s2.add_message("user", "msg2")
|
|
|
|
assert s1 is s2
|
|
assert len(s1.message_history) == 2
|
|
|
|
def test_room_transitions_track_occupants(self):
|
|
mgr = SessionManager()
|
|
mgr.get_or_create("alice", "Alice", "Tower")
|
|
mgr.get_or_create("bob", "Bob", "Tower")
|
|
|
|
assert set(mgr.get_room_occupants("Tower")) == {"alice", "bob"}
|
|
|
|
# Alice moves
|
|
mgr.get_or_create("alice", "Alice", "Chapel")
|
|
|
|
assert mgr.get_room_occupants("Tower") == ["bob"]
|
|
assert mgr.get_room_occupants("Chapel") == ["alice"]
|
|
|
|
def test_max_sessions_evicts_oldest(self):
|
|
mgr = SessionManager(max_sessions=2)
|
|
mgr.get_or_create("a", "A", "Tower")
|
|
time.sleep(0.01)
|
|
mgr.get_or_create("b", "B", "Tower")
|
|
time.sleep(0.01)
|
|
mgr.get_or_create("c", "C", "Tower")
|
|
|
|
assert mgr.get("a") is None # evicted
|
|
assert mgr.get("b") is not None
|
|
assert mgr.get("c") is not None
|
|
assert mgr.active_count == 2
|
|
|
|
def test_history_window(self):
|
|
s = UserSession(user_id="test", username="Test")
|
|
for i in range(30):
|
|
s.add_message("user", f"msg{i}")
|
|
|
|
assert len(s.message_history) == 30
|
|
recent = s.get_history(window=5)
|
|
assert len(recent) == 5
|
|
assert recent[-1]["content"] == "msg29"
|
|
|
|
def test_session_to_dict(self):
|
|
s = UserSession(user_id="alice", username="Alice", room="Chapel")
|
|
s.add_message("user", "hello")
|
|
d = s.to_dict()
|
|
assert d["user_id"] == "alice"
|
|
assert d["username"] == "Alice"
|
|
assert d["room"] == "Chapel"
|
|
assert d["message_count"] == 1
|
|
assert d["command_count"] == 1
|
|
|
|
|
|
# ── Crisis Detection ──────────────────────────────────────────
|
|
|
|
class TestCrisisDetection:
|
|
|
|
def test_no_crisis_on_normal_messages(self):
|
|
cs = CrisisState()
|
|
assert cs.check("hello world") is False
|
|
assert cs.check("how are you") is False
|
|
|
|
def test_crisis_triggers_after_3_turns(self):
|
|
cs = CrisisState()
|
|
assert cs.check("I want to die") is False # turn 1
|
|
assert cs.check("I want to die") is False # turn 2
|
|
assert cs.check("I want to die") is True # turn 3 -> deliver 988
|
|
|
|
def test_crisis_resets_on_normal_message(self):
|
|
cs = CrisisState()
|
|
cs.check("I want to die") # turn 1
|
|
cs.check("actually never mind") # resets
|
|
assert cs.turn_count == 0
|
|
assert cs.check("I want to die") is False # turn 1 again
|
|
|
|
def test_crisis_delivers_once_per_window(self):
|
|
cs = CrisisState()
|
|
cs.check("I want to die")
|
|
cs.check("I want to die")
|
|
assert cs.check("I want to die") is True # delivered
|
|
assert cs.check("I want to die") is False # already delivered
|
|
|
|
def test_crisis_pattern_variations(self):
|
|
cs = CrisisState()
|
|
assert cs.check("I want to kill myself") is False # flagged, turn 1
|
|
assert cs.check("I want to kill myself") is False # turn 2
|
|
assert cs.check("I want to kill myself") is True # turn 3
|
|
|
|
def test_crisis_expired_window_redelivers(self):
|
|
cs = CrisisState()
|
|
cs.CRISIS_WINDOW_SECONDS = 0.1
|
|
cs.check("I want to die")
|
|
cs.check("I want to die")
|
|
assert cs.check("I want to die") is True
|
|
|
|
time.sleep(0.15)
|
|
|
|
# New window — should redeliver after 1 turn since window expired
|
|
assert cs.check("I want to die") is True
|
|
|
|
def test_self_harm_pattern(self):
|
|
cs = CrisisState()
|
|
# Note: "self-harming" doesn't match (has trailing "ing"), "self-harm" does
|
|
assert cs.check("I've been doing self-harm") is False # turn 1
|
|
assert cs.check("self harm is getting worse") is False # turn 2
|
|
assert cs.check("I can't stop self-harm") is True # turn 3
|
|
|
|
|
|
# ── HTTP Endpoint Tests (requires aiohttp test client) ────────
|
|
|
|
@pytest.fixture
|
|
async def bridge_app():
|
|
bridge = MultiUserBridge()
|
|
app = bridge.create_app()
|
|
yield app, bridge
|
|
|
|
|
|
@pytest.fixture
|
|
async def client(bridge_app):
|
|
from aiohttp.test_utils import TestClient, TestServer
|
|
app, bridge = bridge_app
|
|
async with TestClient(TestServer(app)) as client:
|
|
yield client, bridge
|
|
|
|
|
|
class TestHTTPEndpoints:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health_endpoint(self, client):
|
|
c, bridge = client
|
|
resp = await c.get("/bridge/health")
|
|
data = await resp.json()
|
|
assert data["status"] == "ok"
|
|
assert data["active_sessions"] == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chat_creates_session(self, client):
|
|
c, bridge = client
|
|
resp = await c.post("/bridge/chat", json={
|
|
"user_id": "alice",
|
|
"username": "Alice",
|
|
"message": "hello",
|
|
"room": "Tower",
|
|
})
|
|
data = await resp.json()
|
|
assert "response" in data
|
|
assert data["user_id"] == "alice"
|
|
assert data["room"] == "Tower"
|
|
assert data["session_messages"] == 2 # user + assistant
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chat_missing_user_id(self, client):
|
|
c, _ = client
|
|
resp = await c.post("/bridge/chat", json={"message": "hello"})
|
|
assert resp.status == 400
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chat_missing_message(self, client):
|
|
c, _ = client
|
|
resp = await c.post("/bridge/chat", json={"user_id": "alice"})
|
|
assert resp.status == 400
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sessions_list(self, client):
|
|
c, _ = client
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": "hi", "room": "Tower"
|
|
})
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "bob", "message": "hey", "room": "Chapel"
|
|
})
|
|
|
|
resp = await c.get("/bridge/sessions")
|
|
data = await resp.json()
|
|
assert data["total"] == 2
|
|
user_ids = {s["user_id"] for s in data["sessions"]}
|
|
assert user_ids == {"alice", "bob"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_look_command_returns_occupants(self, client):
|
|
c, _ = client
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": "hi", "room": "Tower"
|
|
})
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "bob", "message": "hey", "room": "Tower"
|
|
})
|
|
|
|
resp = await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": "look", "room": "Tower"
|
|
})
|
|
data = await resp.json()
|
|
assert "bob" in data["response"].lower() or "bob" in str(data.get("room_occupants", []))
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_room_occupants_tracked(self, client):
|
|
c, _ = client
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": "hi", "room": "Tower"
|
|
})
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "bob", "message": "hey", "room": "Tower"
|
|
})
|
|
|
|
resp = await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": "look", "room": "Tower"
|
|
})
|
|
data = await resp.json()
|
|
assert set(data["room_occupants"]) == {"alice", "bob"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_crisis_detection_returns_flag(self, client):
|
|
c, _ = client
|
|
for i in range(3):
|
|
resp = await c.post("/bridge/chat", json={
|
|
"user_id": "user1",
|
|
"message": "I want to die",
|
|
})
|
|
|
|
data = await resp.json()
|
|
assert data["crisis_detected"] is True
|
|
assert "988" in data["response"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_users_independent_responses(self, client):
|
|
c, _ = client
|
|
|
|
r1 = await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": "I love cats"
|
|
})
|
|
r2 = await c.post("/bridge/chat", json={
|
|
"user_id": "bob", "message": "I love dogs"
|
|
})
|
|
|
|
d1 = await r1.json()
|
|
d2 = await r2.json()
|
|
|
|
# Each user's response references their own message
|
|
assert "cats" in d1["response"].lower() or d1["user_id"] == "alice"
|
|
assert "dogs" in d2["response"].lower() or d2["user_id"] == "bob"
|
|
assert d1["user_id"] != d2["user_id"]
|
|
|
|
|
|
# ── Room Broadcast Tests ─────────────────────────────────────
|
|
|
|
class TestRoomBroadcast:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_say_broadcasts_to_room_occupants(self, client):
|
|
c, _ = client
|
|
# Position both users in the same room
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "username": "Alice", "message": "hi", "room": "Tower"
|
|
})
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "bob", "username": "Bob", "message": "hi", "room": "Tower"
|
|
})
|
|
# Alice says something
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "username": "Alice", "message": "say Hello everyone!", "room": "Tower"
|
|
})
|
|
# Bob should have a pending room event
|
|
resp = await c.get("/bridge/room_events/bob")
|
|
data = await resp.json()
|
|
assert data["count"] >= 1
|
|
assert any("Alice" in e.get("message", "") for e in data["events"])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_say_does_not_echo_to_speaker(self, client):
|
|
c, _ = client
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": "hi", "room": "Tower"
|
|
})
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "bob", "message": "hi", "room": "Tower"
|
|
})
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": 'say Hello!', "room": "Tower"
|
|
})
|
|
# Alice should NOT have room events from herself
|
|
resp = await c.get("/bridge/room_events/alice")
|
|
data = await resp.json()
|
|
alice_events = [e for e in data["events"] if e.get("from_user") == "alice"]
|
|
assert len(alice_events) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_say_no_broadcast_to_different_room(self, client):
|
|
c, _ = client
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": "hi", "room": "Tower"
|
|
})
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "bob", "message": "hi", "room": "Chapel"
|
|
})
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": 'say Hello!', "room": "Tower"
|
|
})
|
|
# Bob is in Chapel, shouldn't get Tower broadcasts
|
|
resp = await c.get("/bridge/room_events/bob")
|
|
data = await resp.json()
|
|
assert data["count"] == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_room_events_drain_after_read(self, client):
|
|
c, _ = client
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": "hi", "room": "Tower"
|
|
})
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "bob", "message": "hi", "room": "Tower"
|
|
})
|
|
await c.post("/bridge/chat", json={
|
|
"user_id": "alice", "message": 'say First!', "room": "Tower"
|
|
})
|
|
# First read drains
|
|
resp = await c.get("/bridge/room_events/bob")
|
|
data = await resp.json()
|
|
assert data["count"] >= 1
|
|
# Second read is empty
|
|
resp2 = await c.get("/bridge/room_events/bob")
|
|
data2 = await resp2.json()
|
|
assert data2["count"] == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_room_events_404_for_unknown_user(self, client):
|
|
c, _ = client
|
|
resp = await c.get("/bridge/room_events/nonexistent")
|
|
assert resp.status == 404
|