Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 243aa90ebc | |||
| a634cd1201 | |||
| 33611660b5 | |||
|
|
a70c18f20f |
@@ -121,6 +121,7 @@ Do not tell people to static-serve the repo root and expect a world.
|
||||
### What you can run now
|
||||
|
||||
- `python3 server.py` for the local websocket bridge
|
||||
- `python3 bin/ws_load_test.py --url ws://127.0.0.1:8765 --concurrency 25 --rounds 3` for concurrent WebSocket session benchmarking with memory profiling
|
||||
- Python modules under `nexus/` for heartbeat / cognition work
|
||||
|
||||
### Browser world restoration path
|
||||
|
||||
224
bin/ws_load_test.py
Normal file
224
bin/ws_load_test.py
Normal file
@@ -0,0 +1,224 @@
|
||||
#!/usr/bin/env python3
|
||||
"""WebSocket load test harness for Nexus gateway infrastructure."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import math
|
||||
import sys
|
||||
import time
|
||||
import tracemalloc
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable
|
||||
|
||||
try:
|
||||
import resource
|
||||
except ImportError: # pragma: no cover - not expected on Unix CI, but keep portable
|
||||
resource = None
|
||||
|
||||
try:
|
||||
import websockets
|
||||
except ImportError: # pragma: no cover - tests inject connector
|
||||
websockets = None
|
||||
|
||||
|
||||
def percentile(values: list[float], pct: float) -> float:
|
||||
if not values:
|
||||
return 0.0
|
||||
ordered = sorted(float(value) for value in values)
|
||||
if len(ordered) == 1:
|
||||
return round(ordered[0], 1)
|
||||
rank = (len(ordered) - 1) * (pct / 100.0)
|
||||
lower = math.floor(rank)
|
||||
upper = math.ceil(rank)
|
||||
if lower == upper:
|
||||
return round(ordered[lower], 1)
|
||||
weight = rank - lower
|
||||
interpolated = ordered[lower] + (ordered[upper] - ordered[lower]) * weight
|
||||
return round(interpolated, 1)
|
||||
|
||||
|
||||
def measure_memory() -> dict[str, int | None]:
|
||||
current_bytes, peak_bytes = tracemalloc.get_traced_memory()
|
||||
rss_bytes = None
|
||||
if resource is not None:
|
||||
try:
|
||||
rss = int(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
|
||||
rss_bytes = rss if sys.platform == "darwin" else rss * 1024
|
||||
except Exception:
|
||||
rss_bytes = None
|
||||
return {
|
||||
"rss_bytes": rss_bytes,
|
||||
"tracemalloc_current_bytes": int(current_bytes),
|
||||
"tracemalloc_peak_bytes": int(peak_bytes),
|
||||
}
|
||||
|
||||
|
||||
def write_report(path: str | Path, report: dict[str, Any]) -> None:
|
||||
output = Path(path)
|
||||
output.write_text(json.dumps(report, indent=2) + "\n")
|
||||
|
||||
|
||||
def _normalize_payload(payload: Any) -> str | None:
|
||||
if payload is None:
|
||||
return None
|
||||
if isinstance(payload, str):
|
||||
return payload
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
async def _connect_once(
|
||||
url: str,
|
||||
hold_seconds: float,
|
||||
payload: str | None,
|
||||
connector: Callable[[str], Any],
|
||||
) -> dict[str, Any]:
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
async with connector(url) as websocket:
|
||||
connect_ms = (time.perf_counter() - start) * 1000
|
||||
messages_sent = 0
|
||||
if payload is not None:
|
||||
await websocket.send(payload)
|
||||
messages_sent = 1
|
||||
if hold_seconds > 0:
|
||||
await asyncio.sleep(hold_seconds)
|
||||
return {
|
||||
"success": True,
|
||||
"connect_ms": connect_ms,
|
||||
"messages_sent": messages_sent,
|
||||
}
|
||||
except Exception as exc: # pragma: no cover - exercised in live use
|
||||
return {
|
||||
"success": False,
|
||||
"connect_ms": (time.perf_counter() - start) * 1000,
|
||||
"messages_sent": 0,
|
||||
"error": str(exc),
|
||||
}
|
||||
|
||||
|
||||
async def run_load_test(
|
||||
*,
|
||||
url: str,
|
||||
concurrency: int,
|
||||
rounds: int,
|
||||
hold_seconds: float = 0.1,
|
||||
payload: Any = None,
|
||||
connector: Callable[[str], Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
if concurrency < 1:
|
||||
raise ValueError("concurrency must be >= 1")
|
||||
if rounds < 1:
|
||||
raise ValueError("rounds must be >= 1")
|
||||
|
||||
if connector is None:
|
||||
if websockets is None:
|
||||
raise RuntimeError("websockets package is required for live load testing")
|
||||
|
||||
def connector(target_url: str):
|
||||
return websockets.connect(target_url, open_timeout=10)
|
||||
|
||||
payload_text = _normalize_payload(payload)
|
||||
started_tracing = False
|
||||
if not tracemalloc.is_tracing():
|
||||
tracemalloc.start()
|
||||
started_tracing = True
|
||||
|
||||
memory_before = measure_memory()
|
||||
wall_start = time.perf_counter()
|
||||
results: list[dict[str, Any]] = []
|
||||
|
||||
for _ in range(rounds):
|
||||
tasks = [
|
||||
asyncio.create_task(
|
||||
_connect_once(
|
||||
url=url,
|
||||
hold_seconds=hold_seconds,
|
||||
payload=payload_text,
|
||||
connector=connector,
|
||||
)
|
||||
)
|
||||
for _ in range(concurrency)
|
||||
]
|
||||
results.extend(await asyncio.gather(*tasks))
|
||||
|
||||
wall_time_ms = round((time.perf_counter() - wall_start) * 1000, 1)
|
||||
memory_after = measure_memory()
|
||||
if started_tracing:
|
||||
tracemalloc.stop()
|
||||
|
||||
attempted = len(results)
|
||||
successful = sum(1 for result in results if result["success"])
|
||||
failed = attempted - successful
|
||||
latencies = [result["connect_ms"] for result in results if result["success"]]
|
||||
messages_sent = sum(result["messages_sent"] for result in results)
|
||||
errors = [result.get("error") for result in results if result.get("error")]
|
||||
|
||||
avg_connect_ms = round(sum(latencies) / len(latencies), 1) if latencies else 0.0
|
||||
min_connect_ms = round(min(latencies), 1) if latencies else 0.0
|
||||
max_connect_ms = round(max(latencies), 1) if latencies else 0.0
|
||||
|
||||
return {
|
||||
"url": url,
|
||||
"concurrency": concurrency,
|
||||
"rounds": rounds,
|
||||
"hold_seconds": hold_seconds,
|
||||
"attempted_connections": attempted,
|
||||
"successful_connections": successful,
|
||||
"failed_connections": failed,
|
||||
"messages_sent": messages_sent,
|
||||
"success_rate": round(successful / attempted, 4) if attempted else 0.0,
|
||||
"avg_connect_ms": avg_connect_ms,
|
||||
"min_connect_ms": min_connect_ms,
|
||||
"max_connect_ms": max_connect_ms,
|
||||
"p95_connect_ms": percentile(latencies, 95),
|
||||
"wall_time_ms": wall_time_ms,
|
||||
"memory_before": memory_before,
|
||||
"memory_after": memory_after,
|
||||
"memory_peak_delta_bytes": max(
|
||||
0,
|
||||
memory_after["tracemalloc_peak_bytes"] - memory_before["tracemalloc_peak_bytes"],
|
||||
),
|
||||
"errors": errors[:5],
|
||||
}
|
||||
|
||||
|
||||
def _parse_payload(raw: str | None) -> Any:
|
||||
if raw is None:
|
||||
return None
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
return raw
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description="WebSocket load test harness for Nexus")
|
||||
parser.add_argument("--url", required=True, help="WebSocket URL to exercise, e.g. ws://127.0.0.1:8765")
|
||||
parser.add_argument("--concurrency", type=int, default=10, help="Concurrent connections per round")
|
||||
parser.add_argument("--rounds", type=int, default=1, help="Number of connection rounds to execute")
|
||||
parser.add_argument("--hold-seconds", type=float, default=0.1, help="How long to keep each connection open")
|
||||
parser.add_argument("--payload", help="Optional message payload to send after connect; JSON accepted")
|
||||
parser.add_argument("--output", help="Optional path to write JSON report")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
report = asyncio.run(
|
||||
run_load_test(
|
||||
url=args.url,
|
||||
concurrency=args.concurrency,
|
||||
rounds=args.rounds,
|
||||
hold_seconds=args.hold_seconds,
|
||||
payload=_parse_payload(args.payload),
|
||||
)
|
||||
)
|
||||
|
||||
if args.output:
|
||||
write_report(args.output, report)
|
||||
print(json.dumps(report, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
121
tests/test_ws_load_test.py
Normal file
121
tests/test_ws_load_test.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""Tests for WebSocket load testing infrastructure (issue #1505)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"ws_load_test_test",
|
||||
PROJECT_ROOT / "bin" / "ws_load_test.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules["ws_load_test_test"] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
main = _mod.main
|
||||
measure_memory = _mod.measure_memory
|
||||
run_load_test = _mod.run_load_test
|
||||
write_report = _mod.write_report
|
||||
|
||||
|
||||
class TestMemoryStats:
|
||||
@patch("ws_load_test_test.tracemalloc.get_traced_memory", return_value=(1024, 4096))
|
||||
@patch("ws_load_test_test.resource.getrusage")
|
||||
def test_measure_memory_reports_tracemalloc_and_rss_bytes(self, mock_getrusage, _mock_tracemalloc):
|
||||
mock_getrusage.return_value = type("Usage", (), {"ru_maxrss": 512})()
|
||||
with patch.object(_mod.sys, "platform", "linux"):
|
||||
stats = measure_memory()
|
||||
|
||||
assert stats["tracemalloc_current_bytes"] == 1024
|
||||
assert stats["tracemalloc_peak_bytes"] == 4096
|
||||
assert stats["rss_bytes"] == 512 * 1024
|
||||
|
||||
|
||||
class TestRunLoadTest:
|
||||
def test_run_load_test_reports_concurrency_and_messages(self):
|
||||
events = []
|
||||
|
||||
class FakeConnection:
|
||||
def __init__(self, url):
|
||||
self.url = url
|
||||
|
||||
async def __aenter__(self):
|
||||
events.append(("enter", self.url))
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
events.append(("exit", self.url))
|
||||
|
||||
async def send(self, payload):
|
||||
events.append(("send", payload))
|
||||
|
||||
async def exercise():
|
||||
return await run_load_test(
|
||||
url="ws://nexus.local:8765",
|
||||
concurrency=3,
|
||||
rounds=2,
|
||||
hold_seconds=0,
|
||||
payload={"type": "probe"},
|
||||
connector=lambda url: FakeConnection(url),
|
||||
)
|
||||
|
||||
report = _mod.asyncio.run(exercise())
|
||||
|
||||
assert report["url"] == "ws://nexus.local:8765"
|
||||
assert report["concurrency"] == 3
|
||||
assert report["rounds"] == 2
|
||||
assert report["attempted_connections"] == 6
|
||||
assert report["successful_connections"] == 6
|
||||
assert report["failed_connections"] == 0
|
||||
assert report["messages_sent"] == 6
|
||||
assert report["success_rate"] == 1.0
|
||||
assert "memory_before" in report
|
||||
assert "memory_after" in report
|
||||
assert len([e for e in events if e[0] == "send"]) == 6
|
||||
|
||||
|
||||
class TestReportOutput:
|
||||
def test_write_report_serializes_json(self, tmp_path):
|
||||
out = tmp_path / "ws-load-report.json"
|
||||
payload = {"successful_connections": 4, "failed_connections": 0}
|
||||
write_report(out, payload)
|
||||
assert json.loads(out.read_text()) == payload
|
||||
|
||||
def test_main_writes_output_file(self, tmp_path):
|
||||
out = tmp_path / "report.json"
|
||||
fake_report = {
|
||||
"url": "ws://127.0.0.1:8765",
|
||||
"concurrency": 4,
|
||||
"rounds": 2,
|
||||
"attempted_connections": 8,
|
||||
"successful_connections": 8,
|
||||
"failed_connections": 0,
|
||||
"messages_sent": 0,
|
||||
"success_rate": 1.0,
|
||||
"memory_before": {"rss_bytes": 1, "tracemalloc_current_bytes": 2, "tracemalloc_peak_bytes": 3},
|
||||
"memory_after": {"rss_bytes": 4, "tracemalloc_current_bytes": 5, "tracemalloc_peak_bytes": 6},
|
||||
}
|
||||
|
||||
async def fake_run_load_test(**_kwargs):
|
||||
return fake_report
|
||||
|
||||
with patch.object(_mod, "run_load_test", fake_run_load_test):
|
||||
rc = main([
|
||||
"--url",
|
||||
"ws://127.0.0.1:8765",
|
||||
"--concurrency",
|
||||
"4",
|
||||
"--rounds",
|
||||
"2",
|
||||
"--output",
|
||||
str(out),
|
||||
])
|
||||
|
||||
assert rc == 0
|
||||
assert json.loads(out.read_text()) == fake_report
|
||||
347
tests/ws_load_test.py
Executable file
347
tests/ws_load_test.py
Executable file
@@ -0,0 +1,347 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
WebSocket Load Test for The Nexus Gateway (#1505).
|
||||
|
||||
Simulates concurrent WebSocket connections to measure:
|
||||
- Connection success rate
|
||||
- Message broadcast latency
|
||||
- Memory usage under load
|
||||
- Throughput (messages/second)
|
||||
|
||||
Usage:
|
||||
python3 tests/ws_load_test.py [--connections 50] [--duration 30] [--messages 100]
|
||||
|
||||
Requirements:
|
||||
pip install websockets psutil (psutil optional, for memory tracking)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import statistics
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
|
||||
try:
|
||||
import websockets
|
||||
except ImportError:
|
||||
print("ERROR: websockets required. Install: pip install websockets")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
import psutil
|
||||
HAS_PSUTIL = True
|
||||
except ImportError:
|
||||
HAS_PSUTIL = False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
DEFAULT_URL = "ws://localhost:8765"
|
||||
DEFAULT_CONNECTIONS = 50
|
||||
DEFAULT_DURATION = 30 # seconds
|
||||
DEFAULT_MESSAGES = 100 # per connection
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConnectionStats:
|
||||
"""Stats for a single connection."""
|
||||
connected: bool = False
|
||||
connect_time_ms: float = 0
|
||||
messages_sent: int = 0
|
||||
messages_received: int = 0
|
||||
errors: int = 0
|
||||
latencies_ms: List[float] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoadTestResult:
|
||||
"""Aggregated results from load test."""
|
||||
total_connections: int = 0
|
||||
successful_connections: int = 0
|
||||
failed_connections: int = 0
|
||||
total_messages_sent: int = 0
|
||||
total_messages_received: int = 0
|
||||
total_errors: int = 0
|
||||
duration_seconds: float = 0
|
||||
messages_per_second: float = 0
|
||||
avg_latency_ms: float = 0
|
||||
p50_latency_ms: float = 0
|
||||
p95_latency_ms: float = 0
|
||||
p99_latency_ms: float = 0
|
||||
memory_start_mb: float = 0
|
||||
memory_end_mb: float = 0
|
||||
memory_delta_mb: float = 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Single connection worker
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def connection_worker(
|
||||
worker_id: int,
|
||||
url: str,
|
||||
num_messages: int,
|
||||
stats: ConnectionStats,
|
||||
stop_event: asyncio.Event,
|
||||
):
|
||||
"""A single WebSocket connection that sends and receives messages."""
|
||||
try:
|
||||
start = time.perf_counter()
|
||||
async with websockets.connect(url, open_timeout=5) as ws:
|
||||
stats.connect_time_ms = (time.perf_counter() - start) * 1000
|
||||
stats.connected = True
|
||||
|
||||
# Send messages with timestamps for latency measurement
|
||||
for i in range(num_messages):
|
||||
if stop_event.is_set():
|
||||
break
|
||||
|
||||
msg = json.dumps({
|
||||
"type": "load_test",
|
||||
"worker_id": worker_id,
|
||||
"seq": i,
|
||||
"timestamp": time.time(),
|
||||
})
|
||||
|
||||
try:
|
||||
send_time = time.perf_counter()
|
||||
await ws.send(msg)
|
||||
stats.messages_sent += 1
|
||||
|
||||
# Wait for echo/broadcast back (with timeout)
|
||||
try:
|
||||
response = await asyncio.wait_for(ws.recv(), timeout=2.0)
|
||||
recv_time = time.perf_counter()
|
||||
latency_ms = (recv_time - send_time) * 1000
|
||||
stats.latencies_ms.append(latency_ms)
|
||||
stats.messages_received += 1
|
||||
except asyncio.TimeoutError:
|
||||
pass # No response is OK for broadcast servers
|
||||
|
||||
except Exception as e:
|
||||
stats.errors += 1
|
||||
|
||||
# Small delay between messages to avoid overwhelming
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
except Exception as e:
|
||||
stats.errors += 1
|
||||
stats.connected = False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Broadcast receiver (separate connection that only receives)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def broadcast_receiver(
|
||||
url: str,
|
||||
received_count: dict,
|
||||
stop_event: asyncio.Event,
|
||||
):
|
||||
"""A connection that only receives broadcasts to measure fan-out."""
|
||||
try:
|
||||
async with websockets.connect(url, open_timeout=5) as ws:
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
|
||||
received_count["total"] += 1
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Memory monitoring
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_memory_mb() -> float:
|
||||
"""Get current process memory in MB."""
|
||||
if HAS_PSUTIL:
|
||||
return psutil.Process().memory_info().rss / (1024 * 1024)
|
||||
return 0.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main load test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def run_load_test(
|
||||
url: str,
|
||||
num_connections: int,
|
||||
duration: int,
|
||||
messages_per_connection: int,
|
||||
) -> LoadTestResult:
|
||||
"""Run the WebSocket load test."""
|
||||
result = LoadTestResult()
|
||||
result.total_connections = num_connections
|
||||
result.duration_seconds = duration
|
||||
|
||||
memory_start = get_memory_mb()
|
||||
result.memory_start_mb = memory_start
|
||||
|
||||
stats_list: List[ConnectionStats] = [ConnectionStats() for _ in range(num_connections)]
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
# Also add a broadcast receiver to measure fan-out
|
||||
broadcast_count = {"total": 0}
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"WebSocket Load Test")
|
||||
print(f"{'='*60}")
|
||||
print(f" URL: {url}")
|
||||
print(f" Connections: {num_connections}")
|
||||
print(f" Duration: {duration}s")
|
||||
print(f" Messages/connection: {messages_per_connection}")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
# Start timer
|
||||
test_start = time.perf_counter()
|
||||
|
||||
# Create all connection tasks
|
||||
tasks = []
|
||||
for i in range(num_connections):
|
||||
task = asyncio.create_task(
|
||||
connection_worker(i, url, messages_per_connection, stats_list[i], stop_event)
|
||||
)
|
||||
tasks.append(task)
|
||||
|
||||
# Add broadcast receiver
|
||||
recv_task = asyncio.create_task(broadcast_receiver(url, broadcast_count, stop_event))
|
||||
|
||||
# Wait for duration, then signal stop
|
||||
print(f"Running load test for {duration} seconds...")
|
||||
await asyncio.sleep(duration)
|
||||
stop_event.set()
|
||||
|
||||
# Wait for all tasks to finish
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
await asyncio.gather(recv_task, return_exceptions=True)
|
||||
|
||||
test_end = time.perf_counter()
|
||||
actual_duration = test_end - test_start
|
||||
|
||||
# Aggregate results
|
||||
all_latencies = []
|
||||
for stats in stats_list:
|
||||
if stats.connected:
|
||||
result.successful_connections += 1
|
||||
else:
|
||||
result.failed_connections += 1
|
||||
result.total_messages_sent += stats.messages_sent
|
||||
result.total_messages_received += stats.messages_received
|
||||
result.total_errors += stats.errors
|
||||
all_latencies.extend(stats.latencies_ms)
|
||||
|
||||
result.duration_seconds = actual_duration
|
||||
result.messages_per_second = result.total_messages_sent / actual_duration if actual_duration > 0 else 0
|
||||
|
||||
if all_latencies:
|
||||
result.avg_latency_ms = statistics.mean(all_latencies)
|
||||
sorted_latencies = sorted(all_latencies)
|
||||
result.p50_latency_ms = sorted_latencies[len(sorted_latencies) // 2]
|
||||
result.p95_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.95)]
|
||||
result.p99_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.99)]
|
||||
|
||||
result.memory_end_mb = get_memory_mb()
|
||||
result.memory_delta_mb = result.memory_end_mb - result.memory_start_mb
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Report
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def print_report(result: LoadTestResult):
|
||||
"""Print load test results."""
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Load Test Results")
|
||||
print(f"{'='*60}")
|
||||
|
||||
print(f"\n--- Connections ---")
|
||||
print(f" Total: {result.total_connections}")
|
||||
print(f" Successful: {result.successful_connections}")
|
||||
print(f" Failed: {result.failed_connections}")
|
||||
conn_rate = result.successful_connections / result.total_connections * 100 if result.total_connections else 0
|
||||
print(f" Success rate: {conn_rate:.1f}%")
|
||||
|
||||
print(f"\n--- Messages ---")
|
||||
print(f" Sent: {result.total_messages_sent}")
|
||||
print(f" Received: {result.total_messages_received}")
|
||||
print(f" Errors: {result.total_errors}")
|
||||
print(f" Throughput: {result.messages_per_second:.1f} msg/s")
|
||||
|
||||
print(f"\n--- Latency ---")
|
||||
if result.avg_latency_ms > 0:
|
||||
print(f" Average: {result.avg_latency_ms:.2f} ms")
|
||||
print(f" P50: {result.p50_latency_ms:.2f} ms")
|
||||
print(f" P95: {result.p95_latency_ms:.2f} ms")
|
||||
print(f" P99: {result.p99_latency_ms:.2f} ms")
|
||||
else:
|
||||
print(f" No latency data (server may not echo)")
|
||||
|
||||
print(f"\n--- Memory ---")
|
||||
if HAS_PSUTIL:
|
||||
print(f" Start: {result.memory_start_mb:.1f} MB")
|
||||
print(f" End: {result.memory_end_mb:.1f} MB")
|
||||
print(f" Delta: {result.memory_delta_mb:+.1f} MB")
|
||||
else:
|
||||
print(f" psutil not installed — memory tracking disabled")
|
||||
|
||||
print(f"\n--- Duration ---")
|
||||
print(f" {result.duration_seconds:.1f} seconds")
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
|
||||
# Pass/fail verdict
|
||||
issues = []
|
||||
if result.failed_connections > 0:
|
||||
issues.append(f"{result.failed_connections} connections failed")
|
||||
if result.total_errors > result.total_messages_sent * 0.01:
|
||||
issues.append(f"Error rate {result.total_errors/result.total_messages_sent*100:.1f}% exceeds 1%")
|
||||
if result.p95_latency_ms > 500:
|
||||
issues.append(f"P95 latency {result.p95_latency_ms:.0f}ms exceeds 500ms")
|
||||
|
||||
if issues:
|
||||
print(f"ISSUES FOUND:")
|
||||
for issue in issues:
|
||||
print(f" ✗ {issue}")
|
||||
print(f"\nVERDICT: FAIL")
|
||||
return False
|
||||
else:
|
||||
print(f"VERDICT: PASS")
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="WebSocket load test for The Nexus")
|
||||
parser.add_argument("--url", default=DEFAULT_URL, help=f"WebSocket URL (default: {DEFAULT_URL})")
|
||||
parser.add_argument("--connections", type=int, default=DEFAULT_CONNECTIONS, help=f"Number of concurrent connections (default: {DEFAULT_CONNECTIONS})")
|
||||
parser.add_argument("--duration", type=int, default=DEFAULT_DURATION, help=f"Test duration in seconds (default: {DEFAULT_DURATION})")
|
||||
parser.add_argument("--messages", type=int, default=DEFAULT_MESSAGES, help=f"Messages per connection (default: {DEFAULT_MESSAGES})")
|
||||
args = parser.parse_args()
|
||||
|
||||
result = asyncio.run(run_load_test(
|
||||
url=args.url,
|
||||
num_connections=args.connections,
|
||||
duration=args.duration,
|
||||
messages_per_connection=args.messages,
|
||||
))
|
||||
|
||||
passed = print_report(result)
|
||||
sys.exit(0 if passed else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user