feat: add concurrent user benchmark experiment (5-user latency/throughput/isolation)

This commit is contained in:
Alexander Whitestone
2026-04-12 21:41:22 -04:00
parent e47939cb8d
commit 5442d5b02f

View File

@@ -0,0 +1,209 @@
#!/usr/bin/env python3
"""
Benchmark: Multi-User Bridge — 5 concurrent users, session isolation verification.
Measures:
1. Per-user latency (p50, p95, p99)
2. Throughput (messages/sec) under concurrent load
3. Session isolation (no cross-user history leakage)
4. Room occupancy correctness
5. Crisis detection under concurrent load
Usage:
python experiments/benchmark_concurrent_users.py [--users 5] [--messages 20]
"""
import asyncio
import json
import statistics
import sys
import time
from dataclasses import dataclass, field
import aiohttp
BRIDGE_URL = "http://127.0.0.1:4004"
@dataclass
class UserStats:
user_id: str
latencies: list[float] = field(default_factory=list)
messages_sent: int = 0
errors: int = 0
responses: list[dict] = field(default_factory=list)
async def run_user(session: aiohttp.ClientSession, user_stats: UserStats,
messages: int, rooms: list[str]):
"""Simulate one user sending messages across rooms."""
for i in range(messages):
room = rooms[i % len(rooms)]
payload = {
"user_id": user_stats.user_id,
"username": f"User_{user_stats.user_id}",
"message": f"message {i} from {user_stats.user_id} in {room}",
"room": room,
}
t0 = time.perf_counter()
try:
async with session.post(f"{BRIDGE_URL}/bridge/chat", json=payload) as resp:
data = await resp.json()
latency = (time.perf_counter() - t0) * 1000 # ms
user_stats.latencies.append(latency)
user_stats.messages_sent += 1
user_stats.responses.append(data)
except Exception as e:
user_stats.errors += 1
async def run_crisis_user(session: aiohttp.ClientSession, user_stats: UserStats):
"""Send crisis messages to verify detection under load."""
crisis_msgs = [
{"user_id": user_stats.user_id, "message": "I want to die", "room": "Tower"},
{"user_id": user_stats.user_id, "message": "I don't want to live", "room": "Tower"},
{"user_id": user_stats.user_id, "message": "I want to kill myself", "room": "Tower"},
]
for payload in crisis_msgs:
t0 = time.perf_counter()
async with session.post(f"{BRIDGE_URL}/bridge/chat", json=payload) as resp:
data = await resp.json()
latency = (time.perf_counter() - t0) * 1000
user_stats.latencies.append(latency)
user_stats.messages_sent += 1
user_stats.responses.append(data)
async def main():
num_users = 5
messages_per_user = 20
rooms = ["Tower", "Chapel", "Library", "Garden", "Dungeon"]
print(f"═══ Multi-User Bridge Benchmark ═══")
print(f"Users: {num_users} | Messages/user: {messages_per_user}")
print(f"Bridge: {BRIDGE_URL}")
print()
# Check bridge health
async with aiohttp.ClientSession() as http:
try:
async with http.get(f"{BRIDGE_URL}/bridge/health") as resp:
health = await resp.json()
print(f"Bridge health: {health}")
except Exception as e:
print(f"ERROR: Bridge not reachable at {BRIDGE_URL}: {e}")
sys.exit(1)
# ── Test 1: Concurrent normal users ──
print("\n── Test 1: Concurrent message throughput ──")
stats = [UserStats(user_id=f"user_{i}") for i in range(num_users)]
t_start = time.perf_counter()
await asyncio.gather(*[
run_user(http, s, messages_per_user, rooms)
for s in stats
])
t_total = time.perf_counter() - t_start
all_latencies = []
total_msgs = 0
total_errors = 0
for s in stats:
all_latencies.extend(s.latencies)
total_msgs += s.messages_sent
total_errors += s.errors
all_latencies.sort()
p50 = all_latencies[len(all_latencies) // 2]
p95 = all_latencies[int(len(all_latencies) * 0.95)]
p99 = all_latencies[int(len(all_latencies) * 0.99)]
print(f" Total messages: {total_msgs}")
print(f" Total errors: {total_errors}")
print(f" Wall time: {t_total:.2f}s")
print(f" Throughput: {total_msgs / t_total:.1f} msg/s")
print(f" Latency p50: {p50:.1f}ms")
print(f" Latency p95: {p95:.1f}ms")
print(f" Latency p99: {p99:.1f}ms")
# ── Test 2: Session isolation ──
print("\n── Test 2: Session isolation verification ──")
async with http.get(f"{BRIDGE_URL}/bridge/sessions") as resp:
sessions_data = await resp.json()
isolated = True
for s in stats:
user_resp = [r for r in s.responses if r.get("user_id") == s.user_id]
# Check that each user's responses reference their own user_id
others_in_my_responses = set()
for r in s.responses:
if r.get("user_id") and r["user_id"] != s.user_id:
others_in_my_responses.add(r["user_id"])
if others_in_my_responses:
print(f" FAIL: {s.user_id} got responses referencing {others_in_my_responses}")
isolated = False
if isolated:
print(f" PASS: All {num_users} users have isolated response streams")
# Check session count
session_count = sessions_data["total"]
print(f" Sessions tracked: {session_count}")
if session_count >= num_users:
print(f" PASS: All {num_users} users have active sessions")
else:
print(f" FAIL: Expected {num_users} sessions, got {session_count}")
# ── Test 3: Room occupancy across users ──
print("\n── Test 3: Room occupancy consistency ──")
# Send look commands from each user
room_data = {}
for s in stats:
resp = await http.post(f"{BRIDGE_URL}/bridge/chat", json={
"user_id": s.user_id, "message": "look", "room": "Tower"
})
data = await resp.json()
room_data[s.user_id] = set(data.get("room_occupants", []))
# All users in Tower should see the same set of occupants
occupant_sets = list(room_data.values())
if len(set(frozenset(s) for s in occupant_sets)) == 1:
print(f" PASS: All users in Tower see same occupants: {occupant_sets[0]}")
else:
print(f" FAIL: Occupant mismatch: {room_data}")
# ── Test 4: Crisis detection under load ──
print("\n── Test 4: Crisis detection under concurrent load ──")
crisis_stats = UserStats(user_id="crisis_user")
await run_crisis_user(http, crisis_stats)
crisis_triggered = any(r.get("crisis_detected") for r in crisis_stats.responses)
if crisis_triggered:
crisis_resp = [r for r in crisis_stats.responses if r.get("crisis_detected")]
print(f" PASS: Crisis detected on turn {len(crisis_stats.responses) - len(crisis_resp) + 1}")
if "988" in crisis_resp[0].get("response", ""):
print(f" PASS: 988 message included in crisis response")
else:
print(f" FAIL: 988 message missing from crisis response")
else:
print(f" FAIL: Crisis not detected after {len(crisis_stats.responses)} messages")
# ── Summary ──
print("\n═══ Benchmark Complete ═══")
results = {
"users": num_users,
"messages_per_user": messages_per_user,
"total_messages": total_msgs,
"total_errors": total_errors,
"throughput_msg_per_sec": round(total_msgs / t_total, 1),
"latency_p50_ms": round(p50, 1),
"latency_p95_ms": round(p95, 1),
"latency_p99_ms": round(p99, 1),
"wall_time_sec": round(t_total, 2),
"session_isolation": isolated,
"crisis_detection": crisis_triggered,
}
print(json.dumps(results, indent=2))
return results
if __name__ == "__main__":
results = asyncio.run(main())