Compare commits
1 Commits
fix/1601-m
...
fix/1505-w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
68a1098e33 |
236
tests/ws_load_test.py
Normal file
236
tests/ws_load_test.py
Normal file
@@ -0,0 +1,236 @@
|
||||
#!/usr/bin/env python3
|
||||
"""WebSocket Load Test — Measure concurrent connection capacity.
|
||||
|
||||
Simulates N concurrent WebSocket connections to the Nexus gateway
|
||||
and measures latency, throughput, and memory under load.
|
||||
|
||||
Usage:
|
||||
python3 tests/ws_load_test.py --url ws://localhost:8080 --connections 50
|
||||
python3 tests/ws_load_test.py --url ws://localhost:8080 --connections 100 --duration 30
|
||||
|
||||
Requirements: websockets (pip install websockets)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import websockets
|
||||
except ImportError:
|
||||
print("ERROR: websockets not installed. Run: pip install websockets")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConnectionStats:
|
||||
"""Stats for a single WebSocket connection."""
|
||||
connected: bool = False
|
||||
messages_sent: int = 0
|
||||
messages_received: int = 0
|
||||
errors: int = 0
|
||||
latencies: list = field(default_factory=list)
|
||||
connect_time: float = 0.0
|
||||
disconnect_time: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoadTestResults:
|
||||
"""Aggregate results for the load test."""
|
||||
total_connections: int = 0
|
||||
successful_connections: int = 0
|
||||
failed_connections: int = 0
|
||||
total_messages_sent: int = 0
|
||||
total_messages_received: int = 0
|
||||
total_errors: int = 0
|
||||
latencies: list = field(default_factory=list)
|
||||
duration: float = 0.0
|
||||
peak_memory_mb: float = 0.0
|
||||
|
||||
|
||||
async def connect_and_test(
|
||||
url: str,
|
||||
client_id: int,
|
||||
duration: int,
|
||||
message_interval: float,
|
||||
stats: ConnectionStats,
|
||||
results: LoadTestResults,
|
||||
):
|
||||
"""Single client: connect, send messages, measure responses."""
|
||||
start = time.time()
|
||||
try:
|
||||
async with websockets.connect(url, open_timeout=10) as ws:
|
||||
stats.connected = True
|
||||
stats.connect_time = time.time() - start
|
||||
results.successful_connections += 1
|
||||
|
||||
# Send a test message
|
||||
test_msg = json.dumps({
|
||||
"type": "ping",
|
||||
"client_id": client_id,
|
||||
"timestamp": time.time(),
|
||||
})
|
||||
|
||||
end_time = time.time() + duration
|
||||
while time.time() < end_time:
|
||||
try:
|
||||
send_time = time.time()
|
||||
await ws.send(test_msg)
|
||||
stats.messages_sent += 1
|
||||
results.total_messages_sent += 1
|
||||
|
||||
# Wait for response
|
||||
response = await asyncio.wait_for(ws.recv(), timeout=5.0)
|
||||
recv_time = time.time()
|
||||
latency = (recv_time - send_time) * 1000 # ms
|
||||
stats.latencies.append(latency)
|
||||
results.latencies.append(latency)
|
||||
stats.messages_received += 1
|
||||
results.total_messages_received += 1
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
stats.errors += 1
|
||||
results.total_errors += 1
|
||||
except Exception as e:
|
||||
stats.errors += 1
|
||||
results.total_errors += 1
|
||||
|
||||
await asyncio.sleep(message_interval)
|
||||
|
||||
except Exception as e:
|
||||
stats.connected = False
|
||||
stats.errors += 1
|
||||
results.failed_connections += 1
|
||||
results.total_errors += 1
|
||||
|
||||
stats.disconnect_time = time.time()
|
||||
|
||||
|
||||
def get_memory_mb() -> float:
|
||||
"""Get current process memory in MB."""
|
||||
try:
|
||||
import psutil
|
||||
return psutil.Process().memory_info().rss / 1024 / 1024
|
||||
except ImportError:
|
||||
return 0.0
|
||||
|
||||
|
||||
async def run_load_test(
|
||||
url: str,
|
||||
num_connections: int,
|
||||
duration: int,
|
||||
message_interval: float,
|
||||
) -> LoadTestResults:
|
||||
"""Run the load test with N concurrent connections."""
|
||||
results = LoadTestResults(total_connections=num_connections)
|
||||
stats_list = [ConnectionStats() for _ in range(num_connections)]
|
||||
|
||||
print(f"Starting load test: {num_connections} connections to {url}")
|
||||
print(f"Duration: {duration}s, Message interval: {message_interval}s")
|
||||
print()
|
||||
|
||||
start_time = time.time()
|
||||
start_memory = get_memory_mb()
|
||||
|
||||
# Launch all connections concurrently
|
||||
tasks = [
|
||||
connect_and_test(
|
||||
url=url,
|
||||
client_id=i,
|
||||
duration=duration,
|
||||
message_interval=message_interval,
|
||||
stats=stats_list[i],
|
||||
results=results,
|
||||
)
|
||||
for i in range(num_connections)
|
||||
]
|
||||
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
end_time = time.time()
|
||||
end_memory = get_memory_mb()
|
||||
|
||||
results.duration = end_time - start_time
|
||||
results.peak_memory_mb = max(start_memory, end_memory)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def print_results(results: LoadTestResults):
|
||||
"""Print load test results."""
|
||||
print()
|
||||
print("=" * 60)
|
||||
print("WEBSOCKET LOAD TEST RESULTS")
|
||||
print("=" * 60)
|
||||
print(f"Connections: {results.total_connections}")
|
||||
print(f"Successful: {results.successful_connections}")
|
||||
print(f"Failed: {results.failed_connections}")
|
||||
print(f"Duration: {results.duration:.1f}s")
|
||||
print()
|
||||
print(f"Messages sent: {results.total_messages_sent}")
|
||||
print(f"Messages recv: {results.total_messages_received}")
|
||||
print(f"Errors: {results.total_errors}")
|
||||
print(f"Throughput: {results.total_messages_sent / max(results.duration, 1):.1f} msg/s")
|
||||
print()
|
||||
|
||||
if results.latencies:
|
||||
results.latencies.sort()
|
||||
n = len(results.latencies)
|
||||
print(f"Latency (ms):")
|
||||
print(f" p50: {results.latencies[n // 2]:.1f}")
|
||||
print(f" p90: {results.latencies[int(n * 0.9)]:.1f}")
|
||||
print(f" p95: {results.latencies[int(n * 0.95)]:.1f}")
|
||||
print(f" p99: {results.latencies[min(int(n * 0.99), n-1)]:.1f}")
|
||||
print(f" max: {results.latencies[-1]:.1f}")
|
||||
print(f" mean: {sum(results.latencies) / n:.1f}")
|
||||
|
||||
print()
|
||||
print(f"Memory delta: {results.peak_memory_mb:.1f} MB")
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="WebSocket load test")
|
||||
parser.add_argument("--url", default="ws://localhost:8080", help="WebSocket URL")
|
||||
parser.add_argument("--connections", type=int, default=10, help="Number of concurrent connections")
|
||||
parser.add_argument("--duration", type=int, default=10, help="Test duration in seconds")
|
||||
parser.add_argument("--interval", type=float, default=0.5, help="Message interval in seconds")
|
||||
parser.add_argument("--output", help="Save results to JSON file")
|
||||
args = parser.parse_args()
|
||||
|
||||
results = asyncio.run(run_load_test(
|
||||
url=args.url,
|
||||
num_connections=args.connections,
|
||||
duration=args.duration,
|
||||
message_interval=args.interval,
|
||||
))
|
||||
|
||||
print_results(results)
|
||||
|
||||
if args.output:
|
||||
data = {
|
||||
"url": args.url,
|
||||
"connections": args.connections,
|
||||
"duration": args.duration,
|
||||
"interval": args.interval,
|
||||
"total_connections": results.total_connections,
|
||||
"successful": results.successful_connections,
|
||||
"failed": results.failed_connections,
|
||||
"messages_sent": results.total_messages_sent,
|
||||
"messages_received": results.total_messages_received,
|
||||
"errors": results.total_errors,
|
||||
"duration_seconds": results.duration,
|
||||
"memory_mb": results.peak_memory_mb,
|
||||
}
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
print(f"\nResults saved to {args.output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user