diff --git a/experiments/benchmark_concurrent_users.py b/experiments/benchmark_concurrent_users.py new file mode 100644 index 00000000..63418691 --- /dev/null +++ b/experiments/benchmark_concurrent_users.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 +""" +Benchmark: Multi-User Bridge — 5 concurrent users, session isolation verification. + +Measures: + 1. Per-user latency (p50, p95, p99) + 2. Throughput (messages/sec) under concurrent load + 3. Session isolation (no cross-user history leakage) + 4. Room occupancy correctness + 5. Crisis detection under concurrent load + +Usage: + python experiments/benchmark_concurrent_users.py [--users 5] [--messages 20] +""" + +import asyncio +import json +import statistics +import sys +import time +from dataclasses import dataclass, field + +import aiohttp + +BRIDGE_URL = "http://127.0.0.1:4004" + + +@dataclass +class UserStats: + user_id: str + latencies: list[float] = field(default_factory=list) + messages_sent: int = 0 + errors: int = 0 + responses: list[dict] = field(default_factory=list) + + +async def run_user(session: aiohttp.ClientSession, user_stats: UserStats, + messages: int, rooms: list[str]): + """Simulate one user sending messages across rooms.""" + for i in range(messages): + room = rooms[i % len(rooms)] + payload = { + "user_id": user_stats.user_id, + "username": f"User_{user_stats.user_id}", + "message": f"message {i} from {user_stats.user_id} in {room}", + "room": room, + } + t0 = time.perf_counter() + try: + async with session.post(f"{BRIDGE_URL}/bridge/chat", json=payload) as resp: + data = await resp.json() + latency = (time.perf_counter() - t0) * 1000 # ms + user_stats.latencies.append(latency) + user_stats.messages_sent += 1 + user_stats.responses.append(data) + except Exception as e: + user_stats.errors += 1 + + +async def run_crisis_user(session: aiohttp.ClientSession, user_stats: UserStats): + """Send crisis messages to verify detection under load.""" + crisis_msgs = [ + {"user_id": user_stats.user_id, "message": "I want to die", "room": "Tower"}, + {"user_id": user_stats.user_id, "message": "I don't want to live", "room": "Tower"}, + {"user_id": user_stats.user_id, "message": "I want to kill myself", "room": "Tower"}, + ] + for payload in crisis_msgs: + t0 = time.perf_counter() + async with session.post(f"{BRIDGE_URL}/bridge/chat", json=payload) as resp: + data = await resp.json() + latency = (time.perf_counter() - t0) * 1000 + user_stats.latencies.append(latency) + user_stats.messages_sent += 1 + user_stats.responses.append(data) + + +async def main(): + num_users = 5 + messages_per_user = 20 + rooms = ["Tower", "Chapel", "Library", "Garden", "Dungeon"] + + print(f"═══ Multi-User Bridge Benchmark ═══") + print(f"Users: {num_users} | Messages/user: {messages_per_user}") + print(f"Bridge: {BRIDGE_URL}") + print() + + # Check bridge health + async with aiohttp.ClientSession() as http: + try: + async with http.get(f"{BRIDGE_URL}/bridge/health") as resp: + health = await resp.json() + print(f"Bridge health: {health}") + except Exception as e: + print(f"ERROR: Bridge not reachable at {BRIDGE_URL}: {e}") + sys.exit(1) + + # ── Test 1: Concurrent normal users ── + print("\n── Test 1: Concurrent message throughput ──") + stats = [UserStats(user_id=f"user_{i}") for i in range(num_users)] + t_start = time.perf_counter() + await asyncio.gather(*[ + run_user(http, s, messages_per_user, rooms) + for s in stats + ]) + t_total = time.perf_counter() - t_start + + all_latencies = [] + total_msgs = 0 + total_errors = 0 + for s in stats: + all_latencies.extend(s.latencies) + total_msgs += s.messages_sent + total_errors += s.errors + + all_latencies.sort() + p50 = all_latencies[len(all_latencies) // 2] + p95 = all_latencies[int(len(all_latencies) * 0.95)] + p99 = all_latencies[int(len(all_latencies) * 0.99)] + + print(f" Total messages: {total_msgs}") + print(f" Total errors: {total_errors}") + print(f" Wall time: {t_total:.2f}s") + print(f" Throughput: {total_msgs / t_total:.1f} msg/s") + print(f" Latency p50: {p50:.1f}ms") + print(f" Latency p95: {p95:.1f}ms") + print(f" Latency p99: {p99:.1f}ms") + + # ── Test 2: Session isolation ── + print("\n── Test 2: Session isolation verification ──") + async with http.get(f"{BRIDGE_URL}/bridge/sessions") as resp: + sessions_data = await resp.json() + + isolated = True + for s in stats: + user_resp = [r for r in s.responses if r.get("user_id") == s.user_id] + # Check that each user's responses reference their own user_id + others_in_my_responses = set() + for r in s.responses: + if r.get("user_id") and r["user_id"] != s.user_id: + others_in_my_responses.add(r["user_id"]) + if others_in_my_responses: + print(f" FAIL: {s.user_id} got responses referencing {others_in_my_responses}") + isolated = False + + if isolated: + print(f" PASS: All {num_users} users have isolated response streams") + + # Check session count + session_count = sessions_data["total"] + print(f" Sessions tracked: {session_count}") + if session_count >= num_users: + print(f" PASS: All {num_users} users have active sessions") + else: + print(f" FAIL: Expected {num_users} sessions, got {session_count}") + + # ── Test 3: Room occupancy across users ── + print("\n── Test 3: Room occupancy consistency ──") + # Send look commands from each user + room_data = {} + for s in stats: + resp = await http.post(f"{BRIDGE_URL}/bridge/chat", json={ + "user_id": s.user_id, "message": "look", "room": "Tower" + }) + data = await resp.json() + room_data[s.user_id] = set(data.get("room_occupants", [])) + + # All users in Tower should see the same set of occupants + occupant_sets = list(room_data.values()) + if len(set(frozenset(s) for s in occupant_sets)) == 1: + print(f" PASS: All users in Tower see same occupants: {occupant_sets[0]}") + else: + print(f" FAIL: Occupant mismatch: {room_data}") + + # ── Test 4: Crisis detection under load ── + print("\n── Test 4: Crisis detection under concurrent load ──") + crisis_stats = UserStats(user_id="crisis_user") + await run_crisis_user(http, crisis_stats) + crisis_triggered = any(r.get("crisis_detected") for r in crisis_stats.responses) + if crisis_triggered: + crisis_resp = [r for r in crisis_stats.responses if r.get("crisis_detected")] + print(f" PASS: Crisis detected on turn {len(crisis_stats.responses) - len(crisis_resp) + 1}") + if "988" in crisis_resp[0].get("response", ""): + print(f" PASS: 988 message included in crisis response") + else: + print(f" FAIL: 988 message missing from crisis response") + else: + print(f" FAIL: Crisis not detected after {len(crisis_stats.responses)} messages") + + # ── Summary ── + print("\n═══ Benchmark Complete ═══") + results = { + "users": num_users, + "messages_per_user": messages_per_user, + "total_messages": total_msgs, + "total_errors": total_errors, + "throughput_msg_per_sec": round(total_msgs / t_total, 1), + "latency_p50_ms": round(p50, 1), + "latency_p95_ms": round(p95, 1), + "latency_p99_ms": round(p99, 1), + "wall_time_sec": round(t_total, 2), + "session_isolation": isolated, + "crisis_detection": crisis_triggered, + } + print(json.dumps(results, indent=2)) + return results + + +if __name__ == "__main__": + results = asyncio.run(main())