Files
the-nexus/tests/test_multi_user_bridge.py
Alexander Whitestone b3f2a8b091 test: add rate limiter unit tests and HTTP integration tests
- Unit tests for RateLimiter: token refill, per-user isolation, reset
- HTTP tests: 429 response, X-RateLimit headers, per-user enforcement
- Uses rate_limited_client fixture with limit=3 for easy testing
2026-04-13 04:05:42 -04:00

483 lines
17 KiB
Python

"""Tests for the multi-user AI bridge — session isolation, crisis detection, HTTP endpoints."""
import asyncio
import json
import time
import pytest
from nexus.multi_user_bridge import (
CRISIS_988_MESSAGE,
CrisisState,
MultiUserBridge,
SessionManager,
UserSession,
)
# ── Session Isolation ─────────────────────────────────────────
class TestSessionIsolation:
def test_separate_users_have_independent_history(self):
mgr = SessionManager()
s1 = mgr.get_or_create("alice", "Alice", "Tower")
s2 = mgr.get_or_create("bob", "Bob", "Tower")
s1.add_message("user", "hello from alice")
s2.add_message("user", "hello from bob")
assert len(s1.message_history) == 1
assert len(s2.message_history) == 1
assert s1.message_history[0]["content"] == "hello from alice"
assert s2.message_history[0]["content"] == "hello from bob"
def test_same_user_reuses_session(self):
mgr = SessionManager()
s1 = mgr.get_or_create("alice", "Alice", "Tower")
s1.add_message("user", "msg1")
s2 = mgr.get_or_create("alice", "Alice", "Tower")
s2.add_message("user", "msg2")
assert s1 is s2
assert len(s1.message_history) == 2
def test_room_transitions_track_occupants(self):
mgr = SessionManager()
mgr.get_or_create("alice", "Alice", "Tower")
mgr.get_or_create("bob", "Bob", "Tower")
assert set(mgr.get_room_occupants("Tower")) == {"alice", "bob"}
# Alice moves
mgr.get_or_create("alice", "Alice", "Chapel")
assert mgr.get_room_occupants("Tower") == ["bob"]
assert mgr.get_room_occupants("Chapel") == ["alice"]
def test_max_sessions_evicts_oldest(self):
mgr = SessionManager(max_sessions=2)
mgr.get_or_create("a", "A", "Tower")
time.sleep(0.01)
mgr.get_or_create("b", "B", "Tower")
time.sleep(0.01)
mgr.get_or_create("c", "C", "Tower")
assert mgr.get("a") is None # evicted
assert mgr.get("b") is not None
assert mgr.get("c") is not None
assert mgr.active_count == 2
def test_history_window(self):
s = UserSession(user_id="test", username="Test")
for i in range(30):
s.add_message("user", f"msg{i}")
assert len(s.message_history) == 30
recent = s.get_history(window=5)
assert len(recent) == 5
assert recent[-1]["content"] == "msg29"
def test_session_to_dict(self):
s = UserSession(user_id="alice", username="Alice", room="Chapel")
s.add_message("user", "hello")
d = s.to_dict()
assert d["user_id"] == "alice"
assert d["username"] == "Alice"
assert d["room"] == "Chapel"
assert d["message_count"] == 1
assert d["command_count"] == 1
# ── Crisis Detection ──────────────────────────────────────────
class TestCrisisDetection:
def test_no_crisis_on_normal_messages(self):
cs = CrisisState()
assert cs.check("hello world") is False
assert cs.check("how are you") is False
def test_crisis_triggers_after_3_turns(self):
cs = CrisisState()
assert cs.check("I want to die") is False # turn 1
assert cs.check("I want to die") is False # turn 2
assert cs.check("I want to die") is True # turn 3 -> deliver 988
def test_crisis_resets_on_normal_message(self):
cs = CrisisState()
cs.check("I want to die") # turn 1
cs.check("actually never mind") # resets
assert cs.turn_count == 0
assert cs.check("I want to die") is False # turn 1 again
def test_crisis_delivers_once_per_window(self):
cs = CrisisState()
cs.check("I want to die")
cs.check("I want to die")
assert cs.check("I want to die") is True # delivered
assert cs.check("I want to die") is False # already delivered
def test_crisis_pattern_variations(self):
cs = CrisisState()
assert cs.check("I want to kill myself") is False # flagged, turn 1
assert cs.check("I want to kill myself") is False # turn 2
assert cs.check("I want to kill myself") is True # turn 3
def test_crisis_expired_window_redelivers(self):
cs = CrisisState()
cs.CRISIS_WINDOW_SECONDS = 0.1
cs.check("I want to die")
cs.check("I want to die")
assert cs.check("I want to die") is True
time.sleep(0.15)
# New window — should redeliver after 1 turn since window expired
assert cs.check("I want to die") is True
def test_self_harm_pattern(self):
cs = CrisisState()
# Note: "self-harming" doesn't match (has trailing "ing"), "self-harm" does
assert cs.check("I've been doing self-harm") is False # turn 1
assert cs.check("self harm is getting worse") is False # turn 2
assert cs.check("I can't stop self-harm") is True # turn 3
# ── HTTP Endpoint Tests (requires aiohttp test client) ────────
@pytest.fixture
async def bridge_app():
bridge = MultiUserBridge()
app = bridge.create_app()
yield app, bridge
@pytest.fixture
async def client(bridge_app):
from aiohttp.test_utils import TestClient, TestServer
app, bridge = bridge_app
async with TestClient(TestServer(app)) as client:
yield client, bridge
class TestHTTPEndpoints:
@pytest.mark.asyncio
async def test_health_endpoint(self, client):
c, bridge = client
resp = await c.get("/bridge/health")
data = await resp.json()
assert data["status"] == "ok"
assert data["active_sessions"] == 0
@pytest.mark.asyncio
async def test_chat_creates_session(self, client):
c, bridge = client
resp = await c.post("/bridge/chat", json={
"user_id": "alice",
"username": "Alice",
"message": "hello",
"room": "Tower",
})
data = await resp.json()
assert "response" in data
assert data["user_id"] == "alice"
assert data["room"] == "Tower"
assert data["session_messages"] == 2 # user + assistant
@pytest.mark.asyncio
async def test_chat_missing_user_id(self, client):
c, _ = client
resp = await c.post("/bridge/chat", json={"message": "hello"})
assert resp.status == 400
@pytest.mark.asyncio
async def test_chat_missing_message(self, client):
c, _ = client
resp = await c.post("/bridge/chat", json={"user_id": "alice"})
assert resp.status == 400
@pytest.mark.asyncio
async def test_sessions_list(self, client):
c, _ = client
await c.post("/bridge/chat", json={
"user_id": "alice", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "bob", "message": "hey", "room": "Chapel"
})
resp = await c.get("/bridge/sessions")
data = await resp.json()
assert data["total"] == 2
user_ids = {s["user_id"] for s in data["sessions"]}
assert user_ids == {"alice", "bob"}
@pytest.mark.asyncio
async def test_look_command_returns_occupants(self, client):
c, _ = client
await c.post("/bridge/chat", json={
"user_id": "alice", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "bob", "message": "hey", "room": "Tower"
})
resp = await c.post("/bridge/chat", json={
"user_id": "alice", "message": "look", "room": "Tower"
})
data = await resp.json()
assert "bob" in data["response"].lower() or "bob" in str(data.get("room_occupants", []))
@pytest.mark.asyncio
async def test_room_occupants_tracked(self, client):
c, _ = client
await c.post("/bridge/chat", json={
"user_id": "alice", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "bob", "message": "hey", "room": "Tower"
})
resp = await c.post("/bridge/chat", json={
"user_id": "alice", "message": "look", "room": "Tower"
})
data = await resp.json()
assert set(data["room_occupants"]) == {"alice", "bob"}
@pytest.mark.asyncio
async def test_crisis_detection_returns_flag(self, client):
c, _ = client
for i in range(3):
resp = await c.post("/bridge/chat", json={
"user_id": "user1",
"message": "I want to die",
})
data = await resp.json()
assert data["crisis_detected"] is True
assert "988" in data["response"]
@pytest.mark.asyncio
async def test_concurrent_users_independent_responses(self, client):
c, _ = client
r1 = await c.post("/bridge/chat", json={
"user_id": "alice", "message": "I love cats"
})
r2 = await c.post("/bridge/chat", json={
"user_id": "bob", "message": "I love dogs"
})
d1 = await r1.json()
d2 = await r2.json()
# Each user's response references their own message
assert "cats" in d1["response"].lower() or d1["user_id"] == "alice"
assert "dogs" in d2["response"].lower() or d2["user_id"] == "bob"
assert d1["user_id"] != d2["user_id"]
# ── Room Broadcast Tests ─────────────────────────────────────
class TestRoomBroadcast:
@pytest.mark.asyncio
async def test_say_broadcasts_to_room_occupants(self, client):
c, _ = client
# Position both users in the same room
await c.post("/bridge/chat", json={
"user_id": "alice", "username": "Alice", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "bob", "username": "Bob", "message": "hi", "room": "Tower"
})
# Alice says something
await c.post("/bridge/chat", json={
"user_id": "alice", "username": "Alice", "message": "say Hello everyone!", "room": "Tower"
})
# Bob should have a pending room event
resp = await c.get("/bridge/room_events/bob")
data = await resp.json()
assert data["count"] >= 1
assert any("Alice" in e.get("message", "") for e in data["events"])
@pytest.mark.asyncio
async def test_say_does_not_echo_to_speaker(self, client):
c, _ = client
await c.post("/bridge/chat", json={
"user_id": "alice", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "bob", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "alice", "message": 'say Hello!', "room": "Tower"
})
# Alice should NOT have room events from herself
resp = await c.get("/bridge/room_events/alice")
data = await resp.json()
alice_events = [e for e in data["events"] if e.get("from_user") == "alice"]
assert len(alice_events) == 0
@pytest.mark.asyncio
async def test_say_no_broadcast_to_different_room(self, client):
c, _ = client
await c.post("/bridge/chat", json={
"user_id": "alice", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "bob", "message": "hi", "room": "Chapel"
})
await c.post("/bridge/chat", json={
"user_id": "alice", "message": 'say Hello!', "room": "Tower"
})
# Bob is in Chapel, shouldn't get Tower broadcasts
resp = await c.get("/bridge/room_events/bob")
data = await resp.json()
assert data["count"] == 0
@pytest.mark.asyncio
async def test_room_events_drain_after_read(self, client):
c, _ = client
await c.post("/bridge/chat", json={
"user_id": "alice", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "bob", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "alice", "message": 'say First!', "room": "Tower"
})
# First read drains
resp = await c.get("/bridge/room_events/bob")
data = await resp.json()
assert data["count"] >= 1
# Second read is empty
resp2 = await c.get("/bridge/room_events/bob")
data2 = await resp2.json()
assert data2["count"] == 0
@pytest.mark.asyncio
async def test_room_events_404_for_unknown_user(self, client):
c, _ = client
resp = await c.get("/bridge/room_events/nonexistent")
assert resp.status == 404
@pytest.mark.asyncio
async def test_rooms_lists_all_rooms_with_occupants(self, client):
c, bridge = client
await c.post("/bridge/chat", json={
"user_id": "alice", "username": "Alice", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "bob", "username": "Bob", "message": "hi", "room": "Tower"
})
await c.post("/bridge/chat", json={
"user_id": "carol", "username": "Carol", "message": "hi", "room": "Library"
})
resp = await c.get("/bridge/rooms")
assert resp.status == 200
data = await resp.json()
assert data["total_rooms"] == 2
assert data["total_users"] == 3
assert "Tower" in data["rooms"]
assert "Library" in data["rooms"]
assert data["rooms"]["Tower"]["count"] == 2
assert data["rooms"]["Library"]["count"] == 1
tower_users = {o["user_id"] for o in data["rooms"]["Tower"]["occupants"]}
assert tower_users == {"alice", "bob"}
@pytest.mark.asyncio
async def test_rooms_empty_when_no_sessions(self, client):
c, _ = client
resp = await c.get("/bridge/rooms")
data = await resp.json()
assert data["total_rooms"] == 0
assert data["total_users"] == 0
assert data["rooms"] == {}
# ── Rate Limiting Tests ──────────────────────────────────────
@pytest.fixture
async def rate_limited_client():
"""Bridge with very low rate limit for testing."""
from aiohttp.test_utils import TestClient, TestServer
bridge = MultiUserBridge(rate_limit=3, rate_window=60.0)
app = bridge.create_app()
async with TestClient(TestServer(app)) as client:
yield client, bridge
class TestRateLimitingHTTP:
@pytest.mark.asyncio
async def test_allowed_within_limit(self, rate_limited_client):
c, _ = rate_limited_client
for i in range(3):
resp = await c.post("/bridge/chat", json={
"user_id": "alice", "message": f"msg {i}",
})
assert resp.status == 200
@pytest.mark.asyncio
async def test_returns_429_on_exceed(self, rate_limited_client):
c, _ = rate_limited_client
for i in range(3):
await c.post("/bridge/chat", json={
"user_id": "alice", "message": f"msg {i}",
})
resp = await c.post("/bridge/chat", json={
"user_id": "alice", "message": "one too many",
})
assert resp.status == 429
data = await resp.json()
assert "rate limit" in data["error"].lower()
@pytest.mark.asyncio
async def test_rate_limit_headers_on_success(self, rate_limited_client):
c, _ = rate_limited_client
resp = await c.post("/bridge/chat", json={
"user_id": "alice", "message": "hello",
})
assert resp.status == 200
assert "X-RateLimit-Limit" in resp.headers
assert "X-RateLimit-Remaining" in resp.headers
assert resp.headers["X-RateLimit-Limit"] == "3"
assert resp.headers["X-RateLimit-Remaining"] == "2"
@pytest.mark.asyncio
async def test_rate_limit_headers_on_reject(self, rate_limited_client):
c, _ = rate_limited_client
for _ in range(3):
await c.post("/bridge/chat", json={
"user_id": "alice", "message": "msg",
})
resp = await c.post("/bridge/chat", json={
"user_id": "alice", "message": "excess",
})
assert resp.status == 429
assert resp.headers.get("Retry-After") == "1"
assert resp.headers.get("X-RateLimit-Remaining") == "0"
@pytest.mark.asyncio
async def test_rate_limit_is_per_user(self, rate_limited_client):
c, _ = rate_limited_client
# Exhaust alice
for _ in range(3):
await c.post("/bridge/chat", json={
"user_id": "alice", "message": "msg",
})
resp = await c.post("/bridge/chat", json={
"user_id": "alice", "message": "blocked",
})
assert resp.status == 429
# Bob should still work
resp2 = await c.post("/bridge/chat", json={
"user_id": "bob", "message": "im fine",
})
assert resp2.status == 200