Compare commits

...

4 Commits

Author SHA1 Message Date
243aa90ebc docs: add websocket load test usage (#1505) 2026-04-15 09:18:30 +00:00
a634cd1201 feat: add websocket load test harness (#1505) 2026-04-15 09:18:28 +00:00
33611660b5 wip: add websocket load test regression tests 2026-04-15 09:17:46 +00:00
Alexander Whitestone
a70c18f20f test: WebSocket load testing infrastructure (#1505)
Some checks failed
CI / test (pull_request) Failing after 1m31s
CI / validate (pull_request) Failing after 1m36s
Review Approval Gate / verify-review (pull_request) Successful in 10s
Add tests/ws_load_test.py — concurrent connection load test.

Measures:
- Connection success rate
- Message broadcast latency (P50/P95/P99)
- Throughput (msg/s)
- Memory usage delta
- Fan-out via broadcast receiver

Usage:
  python3 tests/ws_load_test.py --connections 50 --duration 30
  python3 tests/ws_load_test.py --connections 200 --duration 60 --messages 200

Pass/fail verdict based on:
- Connection success rate
- Error rate < 1%
- P95 latency < 500ms

Closes #1505
2026-04-14 22:45:14 -04:00
4 changed files with 693 additions and 0 deletions

View File

@@ -121,6 +121,7 @@ Do not tell people to static-serve the repo root and expect a world.
### What you can run now
- `python3 server.py` for the local websocket bridge
- `python3 bin/ws_load_test.py --url ws://127.0.0.1:8765 --concurrency 25 --rounds 3` for concurrent WebSocket session benchmarking with memory profiling
- Python modules under `nexus/` for heartbeat / cognition work
### Browser world restoration path

224
bin/ws_load_test.py Normal file
View File

@@ -0,0 +1,224 @@
#!/usr/bin/env python3
"""WebSocket load test harness for Nexus gateway infrastructure."""
from __future__ import annotations
import argparse
import asyncio
import json
import math
import sys
import time
import tracemalloc
from pathlib import Path
from typing import Any, Callable
try:
import resource
except ImportError: # pragma: no cover - not expected on Unix CI, but keep portable
resource = None
try:
import websockets
except ImportError: # pragma: no cover - tests inject connector
websockets = None
def percentile(values: list[float], pct: float) -> float:
if not values:
return 0.0
ordered = sorted(float(value) for value in values)
if len(ordered) == 1:
return round(ordered[0], 1)
rank = (len(ordered) - 1) * (pct / 100.0)
lower = math.floor(rank)
upper = math.ceil(rank)
if lower == upper:
return round(ordered[lower], 1)
weight = rank - lower
interpolated = ordered[lower] + (ordered[upper] - ordered[lower]) * weight
return round(interpolated, 1)
def measure_memory() -> dict[str, int | None]:
current_bytes, peak_bytes = tracemalloc.get_traced_memory()
rss_bytes = None
if resource is not None:
try:
rss = int(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
rss_bytes = rss if sys.platform == "darwin" else rss * 1024
except Exception:
rss_bytes = None
return {
"rss_bytes": rss_bytes,
"tracemalloc_current_bytes": int(current_bytes),
"tracemalloc_peak_bytes": int(peak_bytes),
}
def write_report(path: str | Path, report: dict[str, Any]) -> None:
output = Path(path)
output.write_text(json.dumps(report, indent=2) + "\n")
def _normalize_payload(payload: Any) -> str | None:
if payload is None:
return None
if isinstance(payload, str):
return payload
return json.dumps(payload)
async def _connect_once(
url: str,
hold_seconds: float,
payload: str | None,
connector: Callable[[str], Any],
) -> dict[str, Any]:
start = time.perf_counter()
try:
async with connector(url) as websocket:
connect_ms = (time.perf_counter() - start) * 1000
messages_sent = 0
if payload is not None:
await websocket.send(payload)
messages_sent = 1
if hold_seconds > 0:
await asyncio.sleep(hold_seconds)
return {
"success": True,
"connect_ms": connect_ms,
"messages_sent": messages_sent,
}
except Exception as exc: # pragma: no cover - exercised in live use
return {
"success": False,
"connect_ms": (time.perf_counter() - start) * 1000,
"messages_sent": 0,
"error": str(exc),
}
async def run_load_test(
*,
url: str,
concurrency: int,
rounds: int,
hold_seconds: float = 0.1,
payload: Any = None,
connector: Callable[[str], Any] | None = None,
) -> dict[str, Any]:
if concurrency < 1:
raise ValueError("concurrency must be >= 1")
if rounds < 1:
raise ValueError("rounds must be >= 1")
if connector is None:
if websockets is None:
raise RuntimeError("websockets package is required for live load testing")
def connector(target_url: str):
return websockets.connect(target_url, open_timeout=10)
payload_text = _normalize_payload(payload)
started_tracing = False
if not tracemalloc.is_tracing():
tracemalloc.start()
started_tracing = True
memory_before = measure_memory()
wall_start = time.perf_counter()
results: list[dict[str, Any]] = []
for _ in range(rounds):
tasks = [
asyncio.create_task(
_connect_once(
url=url,
hold_seconds=hold_seconds,
payload=payload_text,
connector=connector,
)
)
for _ in range(concurrency)
]
results.extend(await asyncio.gather(*tasks))
wall_time_ms = round((time.perf_counter() - wall_start) * 1000, 1)
memory_after = measure_memory()
if started_tracing:
tracemalloc.stop()
attempted = len(results)
successful = sum(1 for result in results if result["success"])
failed = attempted - successful
latencies = [result["connect_ms"] for result in results if result["success"]]
messages_sent = sum(result["messages_sent"] for result in results)
errors = [result.get("error") for result in results if result.get("error")]
avg_connect_ms = round(sum(latencies) / len(latencies), 1) if latencies else 0.0
min_connect_ms = round(min(latencies), 1) if latencies else 0.0
max_connect_ms = round(max(latencies), 1) if latencies else 0.0
return {
"url": url,
"concurrency": concurrency,
"rounds": rounds,
"hold_seconds": hold_seconds,
"attempted_connections": attempted,
"successful_connections": successful,
"failed_connections": failed,
"messages_sent": messages_sent,
"success_rate": round(successful / attempted, 4) if attempted else 0.0,
"avg_connect_ms": avg_connect_ms,
"min_connect_ms": min_connect_ms,
"max_connect_ms": max_connect_ms,
"p95_connect_ms": percentile(latencies, 95),
"wall_time_ms": wall_time_ms,
"memory_before": memory_before,
"memory_after": memory_after,
"memory_peak_delta_bytes": max(
0,
memory_after["tracemalloc_peak_bytes"] - memory_before["tracemalloc_peak_bytes"],
),
"errors": errors[:5],
}
def _parse_payload(raw: str | None) -> Any:
if raw is None:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return raw
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="WebSocket load test harness for Nexus")
parser.add_argument("--url", required=True, help="WebSocket URL to exercise, e.g. ws://127.0.0.1:8765")
parser.add_argument("--concurrency", type=int, default=10, help="Concurrent connections per round")
parser.add_argument("--rounds", type=int, default=1, help="Number of connection rounds to execute")
parser.add_argument("--hold-seconds", type=float, default=0.1, help="How long to keep each connection open")
parser.add_argument("--payload", help="Optional message payload to send after connect; JSON accepted")
parser.add_argument("--output", help="Optional path to write JSON report")
args = parser.parse_args(argv)
report = asyncio.run(
run_load_test(
url=args.url,
concurrency=args.concurrency,
rounds=args.rounds,
hold_seconds=args.hold_seconds,
payload=_parse_payload(args.payload),
)
)
if args.output:
write_report(args.output, report)
print(json.dumps(report, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

121
tests/test_ws_load_test.py Normal file
View File

@@ -0,0 +1,121 @@
"""Tests for WebSocket load testing infrastructure (issue #1505)."""
from __future__ import annotations
import importlib.util
import json
import sys
from pathlib import Path
from unittest.mock import patch
PROJECT_ROOT = Path(__file__).parent.parent
_spec = importlib.util.spec_from_file_location(
"ws_load_test_test",
PROJECT_ROOT / "bin" / "ws_load_test.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules["ws_load_test_test"] = _mod
_spec.loader.exec_module(_mod)
main = _mod.main
measure_memory = _mod.measure_memory
run_load_test = _mod.run_load_test
write_report = _mod.write_report
class TestMemoryStats:
@patch("ws_load_test_test.tracemalloc.get_traced_memory", return_value=(1024, 4096))
@patch("ws_load_test_test.resource.getrusage")
def test_measure_memory_reports_tracemalloc_and_rss_bytes(self, mock_getrusage, _mock_tracemalloc):
mock_getrusage.return_value = type("Usage", (), {"ru_maxrss": 512})()
with patch.object(_mod.sys, "platform", "linux"):
stats = measure_memory()
assert stats["tracemalloc_current_bytes"] == 1024
assert stats["tracemalloc_peak_bytes"] == 4096
assert stats["rss_bytes"] == 512 * 1024
class TestRunLoadTest:
def test_run_load_test_reports_concurrency_and_messages(self):
events = []
class FakeConnection:
def __init__(self, url):
self.url = url
async def __aenter__(self):
events.append(("enter", self.url))
return self
async def __aexit__(self, exc_type, exc, tb):
events.append(("exit", self.url))
async def send(self, payload):
events.append(("send", payload))
async def exercise():
return await run_load_test(
url="ws://nexus.local:8765",
concurrency=3,
rounds=2,
hold_seconds=0,
payload={"type": "probe"},
connector=lambda url: FakeConnection(url),
)
report = _mod.asyncio.run(exercise())
assert report["url"] == "ws://nexus.local:8765"
assert report["concurrency"] == 3
assert report["rounds"] == 2
assert report["attempted_connections"] == 6
assert report["successful_connections"] == 6
assert report["failed_connections"] == 0
assert report["messages_sent"] == 6
assert report["success_rate"] == 1.0
assert "memory_before" in report
assert "memory_after" in report
assert len([e for e in events if e[0] == "send"]) == 6
class TestReportOutput:
def test_write_report_serializes_json(self, tmp_path):
out = tmp_path / "ws-load-report.json"
payload = {"successful_connections": 4, "failed_connections": 0}
write_report(out, payload)
assert json.loads(out.read_text()) == payload
def test_main_writes_output_file(self, tmp_path):
out = tmp_path / "report.json"
fake_report = {
"url": "ws://127.0.0.1:8765",
"concurrency": 4,
"rounds": 2,
"attempted_connections": 8,
"successful_connections": 8,
"failed_connections": 0,
"messages_sent": 0,
"success_rate": 1.0,
"memory_before": {"rss_bytes": 1, "tracemalloc_current_bytes": 2, "tracemalloc_peak_bytes": 3},
"memory_after": {"rss_bytes": 4, "tracemalloc_current_bytes": 5, "tracemalloc_peak_bytes": 6},
}
async def fake_run_load_test(**_kwargs):
return fake_report
with patch.object(_mod, "run_load_test", fake_run_load_test):
rc = main([
"--url",
"ws://127.0.0.1:8765",
"--concurrency",
"4",
"--rounds",
"2",
"--output",
str(out),
])
assert rc == 0
assert json.loads(out.read_text()) == fake_report

347
tests/ws_load_test.py Executable file
View File

@@ -0,0 +1,347 @@
#!/usr/bin/env python3
"""
WebSocket Load Test for The Nexus Gateway (#1505).
Simulates concurrent WebSocket connections to measure:
- Connection success rate
- Message broadcast latency
- Memory usage under load
- Throughput (messages/second)
Usage:
python3 tests/ws_load_test.py [--connections 50] [--duration 30] [--messages 100]
Requirements:
pip install websockets psutil (psutil optional, for memory tracking)
"""
import argparse
import asyncio
import json
import os
import statistics
import sys
import time
from dataclasses import dataclass, field
from typing import List, Optional
try:
import websockets
except ImportError:
print("ERROR: websockets required. Install: pip install websockets")
sys.exit(1)
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DEFAULT_URL = "ws://localhost:8765"
DEFAULT_CONNECTIONS = 50
DEFAULT_DURATION = 30 # seconds
DEFAULT_MESSAGES = 100 # per connection
@dataclass
class ConnectionStats:
"""Stats for a single connection."""
connected: bool = False
connect_time_ms: float = 0
messages_sent: int = 0
messages_received: int = 0
errors: int = 0
latencies_ms: List[float] = field(default_factory=list)
@dataclass
class LoadTestResult:
"""Aggregated results from load test."""
total_connections: int = 0
successful_connections: int = 0
failed_connections: int = 0
total_messages_sent: int = 0
total_messages_received: int = 0
total_errors: int = 0
duration_seconds: float = 0
messages_per_second: float = 0
avg_latency_ms: float = 0
p50_latency_ms: float = 0
p95_latency_ms: float = 0
p99_latency_ms: float = 0
memory_start_mb: float = 0
memory_end_mb: float = 0
memory_delta_mb: float = 0
# ---------------------------------------------------------------------------
# Single connection worker
# ---------------------------------------------------------------------------
async def connection_worker(
worker_id: int,
url: str,
num_messages: int,
stats: ConnectionStats,
stop_event: asyncio.Event,
):
"""A single WebSocket connection that sends and receives messages."""
try:
start = time.perf_counter()
async with websockets.connect(url, open_timeout=5) as ws:
stats.connect_time_ms = (time.perf_counter() - start) * 1000
stats.connected = True
# Send messages with timestamps for latency measurement
for i in range(num_messages):
if stop_event.is_set():
break
msg = json.dumps({
"type": "load_test",
"worker_id": worker_id,
"seq": i,
"timestamp": time.time(),
})
try:
send_time = time.perf_counter()
await ws.send(msg)
stats.messages_sent += 1
# Wait for echo/broadcast back (with timeout)
try:
response = await asyncio.wait_for(ws.recv(), timeout=2.0)
recv_time = time.perf_counter()
latency_ms = (recv_time - send_time) * 1000
stats.latencies_ms.append(latency_ms)
stats.messages_received += 1
except asyncio.TimeoutError:
pass # No response is OK for broadcast servers
except Exception as e:
stats.errors += 1
# Small delay between messages to avoid overwhelming
await asyncio.sleep(0.01)
except Exception as e:
stats.errors += 1
stats.connected = False
# ---------------------------------------------------------------------------
# Broadcast receiver (separate connection that only receives)
# ---------------------------------------------------------------------------
async def broadcast_receiver(
url: str,
received_count: dict,
stop_event: asyncio.Event,
):
"""A connection that only receives broadcasts to measure fan-out."""
try:
async with websockets.connect(url, open_timeout=5) as ws:
while not stop_event.is_set():
try:
msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
received_count["total"] += 1
except asyncio.TimeoutError:
continue
except Exception:
pass
# ---------------------------------------------------------------------------
# Memory monitoring
# ---------------------------------------------------------------------------
def get_memory_mb() -> float:
"""Get current process memory in MB."""
if HAS_PSUTIL:
return psutil.Process().memory_info().rss / (1024 * 1024)
return 0.0
# ---------------------------------------------------------------------------
# Main load test
# ---------------------------------------------------------------------------
async def run_load_test(
url: str,
num_connections: int,
duration: int,
messages_per_connection: int,
) -> LoadTestResult:
"""Run the WebSocket load test."""
result = LoadTestResult()
result.total_connections = num_connections
result.duration_seconds = duration
memory_start = get_memory_mb()
result.memory_start_mb = memory_start
stats_list: List[ConnectionStats] = [ConnectionStats() for _ in range(num_connections)]
stop_event = asyncio.Event()
# Also add a broadcast receiver to measure fan-out
broadcast_count = {"total": 0}
print(f"\n{'='*60}")
print(f"WebSocket Load Test")
print(f"{'='*60}")
print(f" URL: {url}")
print(f" Connections: {num_connections}")
print(f" Duration: {duration}s")
print(f" Messages/connection: {messages_per_connection}")
print(f"{'='*60}\n")
# Start timer
test_start = time.perf_counter()
# Create all connection tasks
tasks = []
for i in range(num_connections):
task = asyncio.create_task(
connection_worker(i, url, messages_per_connection, stats_list[i], stop_event)
)
tasks.append(task)
# Add broadcast receiver
recv_task = asyncio.create_task(broadcast_receiver(url, broadcast_count, stop_event))
# Wait for duration, then signal stop
print(f"Running load test for {duration} seconds...")
await asyncio.sleep(duration)
stop_event.set()
# Wait for all tasks to finish
await asyncio.gather(*tasks, return_exceptions=True)
await asyncio.gather(recv_task, return_exceptions=True)
test_end = time.perf_counter()
actual_duration = test_end - test_start
# Aggregate results
all_latencies = []
for stats in stats_list:
if stats.connected:
result.successful_connections += 1
else:
result.failed_connections += 1
result.total_messages_sent += stats.messages_sent
result.total_messages_received += stats.messages_received
result.total_errors += stats.errors
all_latencies.extend(stats.latencies_ms)
result.duration_seconds = actual_duration
result.messages_per_second = result.total_messages_sent / actual_duration if actual_duration > 0 else 0
if all_latencies:
result.avg_latency_ms = statistics.mean(all_latencies)
sorted_latencies = sorted(all_latencies)
result.p50_latency_ms = sorted_latencies[len(sorted_latencies) // 2]
result.p95_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.95)]
result.p99_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.99)]
result.memory_end_mb = get_memory_mb()
result.memory_delta_mb = result.memory_end_mb - result.memory_start_mb
return result
# ---------------------------------------------------------------------------
# Report
# ---------------------------------------------------------------------------
def print_report(result: LoadTestResult):
"""Print load test results."""
print(f"\n{'='*60}")
print(f"Load Test Results")
print(f"{'='*60}")
print(f"\n--- Connections ---")
print(f" Total: {result.total_connections}")
print(f" Successful: {result.successful_connections}")
print(f" Failed: {result.failed_connections}")
conn_rate = result.successful_connections / result.total_connections * 100 if result.total_connections else 0
print(f" Success rate: {conn_rate:.1f}%")
print(f"\n--- Messages ---")
print(f" Sent: {result.total_messages_sent}")
print(f" Received: {result.total_messages_received}")
print(f" Errors: {result.total_errors}")
print(f" Throughput: {result.messages_per_second:.1f} msg/s")
print(f"\n--- Latency ---")
if result.avg_latency_ms > 0:
print(f" Average: {result.avg_latency_ms:.2f} ms")
print(f" P50: {result.p50_latency_ms:.2f} ms")
print(f" P95: {result.p95_latency_ms:.2f} ms")
print(f" P99: {result.p99_latency_ms:.2f} ms")
else:
print(f" No latency data (server may not echo)")
print(f"\n--- Memory ---")
if HAS_PSUTIL:
print(f" Start: {result.memory_start_mb:.1f} MB")
print(f" End: {result.memory_end_mb:.1f} MB")
print(f" Delta: {result.memory_delta_mb:+.1f} MB")
else:
print(f" psutil not installed — memory tracking disabled")
print(f"\n--- Duration ---")
print(f" {result.duration_seconds:.1f} seconds")
print(f"\n{'='*60}")
# Pass/fail verdict
issues = []
if result.failed_connections > 0:
issues.append(f"{result.failed_connections} connections failed")
if result.total_errors > result.total_messages_sent * 0.01:
issues.append(f"Error rate {result.total_errors/result.total_messages_sent*100:.1f}% exceeds 1%")
if result.p95_latency_ms > 500:
issues.append(f"P95 latency {result.p95_latency_ms:.0f}ms exceeds 500ms")
if issues:
print(f"ISSUES FOUND:")
for issue in issues:
print(f"{issue}")
print(f"\nVERDICT: FAIL")
return False
else:
print(f"VERDICT: PASS")
return True
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="WebSocket load test for The Nexus")
parser.add_argument("--url", default=DEFAULT_URL, help=f"WebSocket URL (default: {DEFAULT_URL})")
parser.add_argument("--connections", type=int, default=DEFAULT_CONNECTIONS, help=f"Number of concurrent connections (default: {DEFAULT_CONNECTIONS})")
parser.add_argument("--duration", type=int, default=DEFAULT_DURATION, help=f"Test duration in seconds (default: {DEFAULT_DURATION})")
parser.add_argument("--messages", type=int, default=DEFAULT_MESSAGES, help=f"Messages per connection (default: {DEFAULT_MESSAGES})")
args = parser.parse_args()
result = asyncio.run(run_load_test(
url=args.url,
num_connections=args.connections,
duration=args.duration,
messages_per_connection=args.messages,
))
passed = print_report(result)
sys.exit(0 if passed else 1)
if __name__ == "__main__":
main()