Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27925c9a5d | ||
|
|
417f92b7a5 | ||
|
|
de3f41e819 | ||
| bd0497b998 | |||
|
|
4ab84a59ab |
@@ -121,7 +121,6 @@ Do not tell people to static-serve the repo root and expect a world.
|
||||
### What you can run now
|
||||
|
||||
- `python3 server.py` for the local websocket bridge
|
||||
- `python3 bin/ws_load_test.py --url ws://127.0.0.1:8765 --concurrency 25 --rounds 3` for concurrent WebSocket session benchmarking with memory profiling
|
||||
- Python modules under `nexus/` for heartbeat / cognition work
|
||||
|
||||
### Browser world restoration path
|
||||
|
||||
@@ -1,224 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""WebSocket load test harness for Nexus gateway infrastructure."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import math
|
||||
import sys
|
||||
import time
|
||||
import tracemalloc
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable
|
||||
|
||||
try:
|
||||
import resource
|
||||
except ImportError: # pragma: no cover - not expected on Unix CI, but keep portable
|
||||
resource = None
|
||||
|
||||
try:
|
||||
import websockets
|
||||
except ImportError: # pragma: no cover - tests inject connector
|
||||
websockets = None
|
||||
|
||||
|
||||
def percentile(values: list[float], pct: float) -> float:
|
||||
if not values:
|
||||
return 0.0
|
||||
ordered = sorted(float(value) for value in values)
|
||||
if len(ordered) == 1:
|
||||
return round(ordered[0], 1)
|
||||
rank = (len(ordered) - 1) * (pct / 100.0)
|
||||
lower = math.floor(rank)
|
||||
upper = math.ceil(rank)
|
||||
if lower == upper:
|
||||
return round(ordered[lower], 1)
|
||||
weight = rank - lower
|
||||
interpolated = ordered[lower] + (ordered[upper] - ordered[lower]) * weight
|
||||
return round(interpolated, 1)
|
||||
|
||||
|
||||
def measure_memory() -> dict[str, int | None]:
|
||||
current_bytes, peak_bytes = tracemalloc.get_traced_memory()
|
||||
rss_bytes = None
|
||||
if resource is not None:
|
||||
try:
|
||||
rss = int(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
|
||||
rss_bytes = rss if sys.platform == "darwin" else rss * 1024
|
||||
except Exception:
|
||||
rss_bytes = None
|
||||
return {
|
||||
"rss_bytes": rss_bytes,
|
||||
"tracemalloc_current_bytes": int(current_bytes),
|
||||
"tracemalloc_peak_bytes": int(peak_bytes),
|
||||
}
|
||||
|
||||
|
||||
def write_report(path: str | Path, report: dict[str, Any]) -> None:
|
||||
output = Path(path)
|
||||
output.write_text(json.dumps(report, indent=2) + "\n")
|
||||
|
||||
|
||||
def _normalize_payload(payload: Any) -> str | None:
|
||||
if payload is None:
|
||||
return None
|
||||
if isinstance(payload, str):
|
||||
return payload
|
||||
return json.dumps(payload)
|
||||
|
||||
|
||||
async def _connect_once(
|
||||
url: str,
|
||||
hold_seconds: float,
|
||||
payload: str | None,
|
||||
connector: Callable[[str], Any],
|
||||
) -> dict[str, Any]:
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
async with connector(url) as websocket:
|
||||
connect_ms = (time.perf_counter() - start) * 1000
|
||||
messages_sent = 0
|
||||
if payload is not None:
|
||||
await websocket.send(payload)
|
||||
messages_sent = 1
|
||||
if hold_seconds > 0:
|
||||
await asyncio.sleep(hold_seconds)
|
||||
return {
|
||||
"success": True,
|
||||
"connect_ms": connect_ms,
|
||||
"messages_sent": messages_sent,
|
||||
}
|
||||
except Exception as exc: # pragma: no cover - exercised in live use
|
||||
return {
|
||||
"success": False,
|
||||
"connect_ms": (time.perf_counter() - start) * 1000,
|
||||
"messages_sent": 0,
|
||||
"error": str(exc),
|
||||
}
|
||||
|
||||
|
||||
async def run_load_test(
|
||||
*,
|
||||
url: str,
|
||||
concurrency: int,
|
||||
rounds: int,
|
||||
hold_seconds: float = 0.1,
|
||||
payload: Any = None,
|
||||
connector: Callable[[str], Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
if concurrency < 1:
|
||||
raise ValueError("concurrency must be >= 1")
|
||||
if rounds < 1:
|
||||
raise ValueError("rounds must be >= 1")
|
||||
|
||||
if connector is None:
|
||||
if websockets is None:
|
||||
raise RuntimeError("websockets package is required for live load testing")
|
||||
|
||||
def connector(target_url: str):
|
||||
return websockets.connect(target_url, open_timeout=10)
|
||||
|
||||
payload_text = _normalize_payload(payload)
|
||||
started_tracing = False
|
||||
if not tracemalloc.is_tracing():
|
||||
tracemalloc.start()
|
||||
started_tracing = True
|
||||
|
||||
memory_before = measure_memory()
|
||||
wall_start = time.perf_counter()
|
||||
results: list[dict[str, Any]] = []
|
||||
|
||||
for _ in range(rounds):
|
||||
tasks = [
|
||||
asyncio.create_task(
|
||||
_connect_once(
|
||||
url=url,
|
||||
hold_seconds=hold_seconds,
|
||||
payload=payload_text,
|
||||
connector=connector,
|
||||
)
|
||||
)
|
||||
for _ in range(concurrency)
|
||||
]
|
||||
results.extend(await asyncio.gather(*tasks))
|
||||
|
||||
wall_time_ms = round((time.perf_counter() - wall_start) * 1000, 1)
|
||||
memory_after = measure_memory()
|
||||
if started_tracing:
|
||||
tracemalloc.stop()
|
||||
|
||||
attempted = len(results)
|
||||
successful = sum(1 for result in results if result["success"])
|
||||
failed = attempted - successful
|
||||
latencies = [result["connect_ms"] for result in results if result["success"]]
|
||||
messages_sent = sum(result["messages_sent"] for result in results)
|
||||
errors = [result.get("error") for result in results if result.get("error")]
|
||||
|
||||
avg_connect_ms = round(sum(latencies) / len(latencies), 1) if latencies else 0.0
|
||||
min_connect_ms = round(min(latencies), 1) if latencies else 0.0
|
||||
max_connect_ms = round(max(latencies), 1) if latencies else 0.0
|
||||
|
||||
return {
|
||||
"url": url,
|
||||
"concurrency": concurrency,
|
||||
"rounds": rounds,
|
||||
"hold_seconds": hold_seconds,
|
||||
"attempted_connections": attempted,
|
||||
"successful_connections": successful,
|
||||
"failed_connections": failed,
|
||||
"messages_sent": messages_sent,
|
||||
"success_rate": round(successful / attempted, 4) if attempted else 0.0,
|
||||
"avg_connect_ms": avg_connect_ms,
|
||||
"min_connect_ms": min_connect_ms,
|
||||
"max_connect_ms": max_connect_ms,
|
||||
"p95_connect_ms": percentile(latencies, 95),
|
||||
"wall_time_ms": wall_time_ms,
|
||||
"memory_before": memory_before,
|
||||
"memory_after": memory_after,
|
||||
"memory_peak_delta_bytes": max(
|
||||
0,
|
||||
memory_after["tracemalloc_peak_bytes"] - memory_before["tracemalloc_peak_bytes"],
|
||||
),
|
||||
"errors": errors[:5],
|
||||
}
|
||||
|
||||
|
||||
def _parse_payload(raw: str | None) -> Any:
|
||||
if raw is None:
|
||||
return None
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
return raw
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description="WebSocket load test harness for Nexus")
|
||||
parser.add_argument("--url", required=True, help="WebSocket URL to exercise, e.g. ws://127.0.0.1:8765")
|
||||
parser.add_argument("--concurrency", type=int, default=10, help="Concurrent connections per round")
|
||||
parser.add_argument("--rounds", type=int, default=1, help="Number of connection rounds to execute")
|
||||
parser.add_argument("--hold-seconds", type=float, default=0.1, help="How long to keep each connection open")
|
||||
parser.add_argument("--payload", help="Optional message payload to send after connect; JSON accepted")
|
||||
parser.add_argument("--output", help="Optional path to write JSON report")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
report = asyncio.run(
|
||||
run_load_test(
|
||||
url=args.url,
|
||||
concurrency=args.concurrency,
|
||||
rounds=args.rounds,
|
||||
hold_seconds=args.hold_seconds,
|
||||
payload=_parse_payload(args.payload),
|
||||
)
|
||||
)
|
||||
|
||||
if args.output:
|
||||
write_report(args.output, report)
|
||||
print(json.dumps(report, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
34
config/mission_bus_profiles.json
Normal file
34
config/mission_bus_profiles.json
Normal file
@@ -0,0 +1,34 @@
|
||||
{
|
||||
"roles": {
|
||||
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
|
||||
"write": ["publish", "checkpoint", "handoff", "read"],
|
||||
"read": ["read"],
|
||||
"audit": ["read", "audit"]
|
||||
},
|
||||
"isolation_profiles": [
|
||||
{
|
||||
"name": "level1-directory",
|
||||
"label": "Level 1 — directory workspace",
|
||||
"level": 1,
|
||||
"mechanism": "directory_workspace",
|
||||
"description": "Single mission cell in an isolated workspace directory.",
|
||||
"supports_resume": true
|
||||
},
|
||||
{
|
||||
"name": "level2-mount-namespace",
|
||||
"label": "Level 2 — mount namespace",
|
||||
"level": 2,
|
||||
"mechanism": "mount_namespace",
|
||||
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
|
||||
"supports_resume": true
|
||||
},
|
||||
{
|
||||
"name": "level3-rootless-podman",
|
||||
"label": "Level 3 — rootless Podman",
|
||||
"level": 3,
|
||||
"mechanism": "rootless_podman",
|
||||
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
|
||||
"supports_resume": true
|
||||
}
|
||||
]
|
||||
}
|
||||
31
docs/mission-bus.md
Normal file
31
docs/mission-bus.md
Normal file
@@ -0,0 +1,31 @@
|
||||
# Mission Bus
|
||||
|
||||
The Mission Bus grounds the multi-agent teaming epic with a concrete, executable shared module.
|
||||
|
||||
## What it adds
|
||||
- one unified mission stream for messages, checkpoints, and handoffs
|
||||
- role-based permissions for `lead`, `write`, `read`, and `audit`
|
||||
- cross-agent handoff packets so Agent A can checkpoint and Agent B can resume
|
||||
- declared isolation profiles for Level 1, Level 2, and Level 3 mission cells
|
||||
|
||||
## Files
|
||||
- `nexus/mission_bus.py`
|
||||
- `config/mission_bus_profiles.json`
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from nexus.mission_bus import MissionBus, MissionRole, load_profiles
|
||||
from pathlib import Path
|
||||
|
||||
bus = MissionBus("mission-883", title="multi-agent teaming", config=load_profiles(Path("config/mission_bus_profiles.json")))
|
||||
bus.register_participant("timmy", MissionRole.LEAD)
|
||||
bus.register_participant("ezra", MissionRole.WRITE)
|
||||
checkpoint = bus.create_checkpoint("ezra", summary="checkpoint", state={"branch": "fix/883"})
|
||||
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume from here")
|
||||
packet = bus.build_resume_packet(bus.events[-1].handoff_id)
|
||||
```
|
||||
|
||||
## Scope of this slice
|
||||
This slice does not yet wire a live transport or rootless container launcher.
|
||||
It codifies the mission bus contract, role permissions, handoff packet, and isolation profile surface so later work can execute against a stable interface.
|
||||
@@ -14,6 +14,16 @@ from nexus.perception_adapter import (
|
||||
)
|
||||
from nexus.experience_store import ExperienceStore
|
||||
from nexus.trajectory_logger import TrajectoryLogger
|
||||
from nexus.mission_bus import (
|
||||
MissionBus,
|
||||
MissionRole,
|
||||
MissionParticipant,
|
||||
MissionMessage,
|
||||
MissionCheckpoint,
|
||||
MissionHandoff,
|
||||
IsolationProfile,
|
||||
load_profiles,
|
||||
)
|
||||
|
||||
try:
|
||||
from nexus.nexus_think import NexusMind
|
||||
@@ -28,5 +38,13 @@ __all__ = [
|
||||
"Action",
|
||||
"ExperienceStore",
|
||||
"TrajectoryLogger",
|
||||
"MissionBus",
|
||||
"MissionRole",
|
||||
"MissionParticipant",
|
||||
"MissionMessage",
|
||||
"MissionCheckpoint",
|
||||
"MissionHandoff",
|
||||
"IsolationProfile",
|
||||
"load_profiles",
|
||||
"NexusMind",
|
||||
]
|
||||
|
||||
358
nexus/mission_bus.py
Normal file
358
nexus/mission_bus.py
Normal file
@@ -0,0 +1,358 @@
|
||||
"""Mission bus, role permissions, cross-agent handoff, and isolation profiles.
|
||||
|
||||
Grounded implementation slice for #883.
|
||||
The bus gives a single mission cell a unified event stream, permission-checked
|
||||
roles, checkpoint + resume handoff, and declared isolation profiles for Level
|
||||
1/2/3 execution boundaries.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"roles": {
|
||||
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
|
||||
"write": ["publish", "checkpoint", "handoff", "read"],
|
||||
"read": ["read"],
|
||||
"audit": ["read", "audit"],
|
||||
},
|
||||
"isolation_profiles": [
|
||||
{
|
||||
"name": "level1-directory",
|
||||
"label": "Level 1 — directory workspace",
|
||||
"level": 1,
|
||||
"mechanism": "directory_workspace",
|
||||
"description": "Single mission cell in an isolated workspace directory.",
|
||||
"supports_resume": True,
|
||||
},
|
||||
{
|
||||
"name": "level2-mount-namespace",
|
||||
"label": "Level 2 — mount namespace",
|
||||
"level": 2,
|
||||
"mechanism": "mount_namespace",
|
||||
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
|
||||
"supports_resume": True,
|
||||
},
|
||||
{
|
||||
"name": "level3-rootless-podman",
|
||||
"label": "Level 3 — rootless Podman",
|
||||
"level": 3,
|
||||
"mechanism": "rootless_podman",
|
||||
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
|
||||
"supports_resume": True,
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def utcnow_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def load_profiles(path: Path) -> Dict[str, Any]:
|
||||
if not path.exists():
|
||||
return json.loads(json.dumps(DEFAULT_CONFIG))
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
data = json.load(handle)
|
||||
data.setdefault("roles", DEFAULT_CONFIG["roles"])
|
||||
data.setdefault("isolation_profiles", DEFAULT_CONFIG["isolation_profiles"])
|
||||
return data
|
||||
|
||||
|
||||
class MissionRole(str, Enum):
|
||||
LEAD = "lead"
|
||||
WRITE = "write"
|
||||
READ = "read"
|
||||
AUDIT = "audit"
|
||||
|
||||
|
||||
@dataclass
|
||||
class IsolationProfile:
|
||||
name: str
|
||||
label: str
|
||||
level: int
|
||||
mechanism: str
|
||||
description: str = ""
|
||||
supports_resume: bool = True
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"name": self.name,
|
||||
"label": self.label,
|
||||
"level": self.level,
|
||||
"mechanism": self.mechanism,
|
||||
"description": self.description,
|
||||
"supports_resume": self.supports_resume,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "IsolationProfile":
|
||||
return cls(
|
||||
name=data["name"],
|
||||
label=data["label"],
|
||||
level=int(data["level"]),
|
||||
mechanism=data["mechanism"],
|
||||
description=data.get("description", ""),
|
||||
supports_resume=bool(data.get("supports_resume", True)),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionParticipant:
|
||||
name: str
|
||||
role: MissionRole
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"name": self.name,
|
||||
"role": self.role.value,
|
||||
"metadata": self.metadata,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "MissionParticipant":
|
||||
return cls(name=data["name"], role=MissionRole(data["role"]), metadata=data.get("metadata", {}))
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionMessage:
|
||||
sender: str
|
||||
topic: str
|
||||
payload: Dict[str, Any]
|
||||
sequence: int
|
||||
timestamp: str = field(default_factory=utcnow_iso)
|
||||
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
event_type: str = field(default="message", init=False)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"event_type": self.event_type,
|
||||
"sender": self.sender,
|
||||
"topic": self.topic,
|
||||
"payload": self.payload,
|
||||
"sequence": self.sequence,
|
||||
"timestamp": self.timestamp,
|
||||
"message_id": self.message_id,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "MissionMessage":
|
||||
return cls(
|
||||
sender=data["sender"],
|
||||
topic=data["topic"],
|
||||
payload=data["payload"],
|
||||
sequence=int(data["sequence"]),
|
||||
timestamp=data.get("timestamp", utcnow_iso()),
|
||||
message_id=data.get("message_id") or data.get("messageId") or str(uuid.uuid4()),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionCheckpoint:
|
||||
sender: str
|
||||
summary: str
|
||||
state: Dict[str, Any]
|
||||
sequence: int
|
||||
artifacts: List[str] = field(default_factory=list)
|
||||
timestamp: str = field(default_factory=utcnow_iso)
|
||||
checkpoint_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
event_type: str = field(default="checkpoint", init=False)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"event_type": self.event_type,
|
||||
"sender": self.sender,
|
||||
"summary": self.summary,
|
||||
"state": self.state,
|
||||
"artifacts": self.artifacts,
|
||||
"sequence": self.sequence,
|
||||
"timestamp": self.timestamp,
|
||||
"checkpoint_id": self.checkpoint_id,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "MissionCheckpoint":
|
||||
return cls(
|
||||
sender=data["sender"],
|
||||
summary=data["summary"],
|
||||
state=data.get("state", {}),
|
||||
artifacts=list(data.get("artifacts", [])),
|
||||
sequence=int(data["sequence"]),
|
||||
timestamp=data.get("timestamp", utcnow_iso()),
|
||||
checkpoint_id=data.get("checkpoint_id") or data.get("checkpointId") or str(uuid.uuid4()),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionHandoff:
|
||||
sender: str
|
||||
recipient: str
|
||||
checkpoint_id: str
|
||||
sequence: int
|
||||
note: str = ""
|
||||
timestamp: str = field(default_factory=utcnow_iso)
|
||||
handoff_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
event_type: str = field(default="handoff", init=False)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"event_type": self.event_type,
|
||||
"sender": self.sender,
|
||||
"recipient": self.recipient,
|
||||
"checkpoint_id": self.checkpoint_id,
|
||||
"note": self.note,
|
||||
"sequence": self.sequence,
|
||||
"timestamp": self.timestamp,
|
||||
"handoff_id": self.handoff_id,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "MissionHandoff":
|
||||
return cls(
|
||||
sender=data["sender"],
|
||||
recipient=data["recipient"],
|
||||
checkpoint_id=data["checkpoint_id"] if "checkpoint_id" in data else data["checkpointId"],
|
||||
note=data.get("note", ""),
|
||||
sequence=int(data["sequence"]),
|
||||
timestamp=data.get("timestamp", utcnow_iso()),
|
||||
handoff_id=data.get("handoff_id") or data.get("handoffId") or str(uuid.uuid4()),
|
||||
)
|
||||
|
||||
|
||||
MissionEvent = Union[MissionMessage, MissionCheckpoint, MissionHandoff]
|
||||
|
||||
|
||||
def event_from_dict(data: Dict[str, Any]) -> MissionEvent:
|
||||
kind = data["event_type"]
|
||||
if kind == "message":
|
||||
return MissionMessage.from_dict(data)
|
||||
if kind == "checkpoint":
|
||||
return MissionCheckpoint.from_dict(data)
|
||||
if kind == "handoff":
|
||||
return MissionHandoff.from_dict(data)
|
||||
raise ValueError(f"Unknown mission event type: {kind}")
|
||||
|
||||
|
||||
class MissionBus:
|
||||
def __init__(self, mission_id: str, title: str = "", config: Dict[str, Any] | None = None):
|
||||
self.mission_id = mission_id
|
||||
self.title = title
|
||||
self.config = config or json.loads(json.dumps(DEFAULT_CONFIG))
|
||||
self.role_permissions = {
|
||||
role: set(perms) for role, perms in self.config.get("roles", {}).items()
|
||||
}
|
||||
self.isolation_profiles = [
|
||||
IsolationProfile.from_dict(entry) for entry in self.config.get("isolation_profiles", [])
|
||||
]
|
||||
self.participants: Dict[str, MissionParticipant] = {}
|
||||
self.events: List[MissionEvent] = []
|
||||
|
||||
def register_participant(self, name: str, role: MissionRole, metadata: Dict[str, Any] | None = None) -> MissionParticipant:
|
||||
participant = MissionParticipant(name=name, role=role, metadata=metadata or {})
|
||||
self.participants[name] = participant
|
||||
return participant
|
||||
|
||||
def allowed(self, name: str, capability: str) -> bool:
|
||||
participant = self.participants.get(name)
|
||||
if participant is None:
|
||||
return False
|
||||
return capability in self.role_permissions.get(participant.role.value, set())
|
||||
|
||||
def _require(self, name: str, capability: str) -> None:
|
||||
if not self.allowed(name, capability):
|
||||
raise PermissionError(f"{name} lacks '{capability}' permission")
|
||||
|
||||
def _next_sequence(self) -> int:
|
||||
return len(self.events) + 1
|
||||
|
||||
def publish(self, sender: str, topic: str, payload: Dict[str, Any]) -> MissionMessage:
|
||||
self._require(sender, "publish")
|
||||
event = MissionMessage(sender=sender, topic=topic, payload=payload, sequence=self._next_sequence())
|
||||
self.events.append(event)
|
||||
return event
|
||||
|
||||
def create_checkpoint(
|
||||
self,
|
||||
sender: str,
|
||||
summary: str,
|
||||
state: Dict[str, Any],
|
||||
artifacts: List[str] | None = None,
|
||||
) -> MissionCheckpoint:
|
||||
self._require(sender, "checkpoint")
|
||||
event = MissionCheckpoint(
|
||||
sender=sender,
|
||||
summary=summary,
|
||||
state=state,
|
||||
artifacts=list(artifacts or []),
|
||||
sequence=self._next_sequence(),
|
||||
)
|
||||
self.events.append(event)
|
||||
return event
|
||||
|
||||
def _get_checkpoint(self, checkpoint_id: str) -> MissionCheckpoint:
|
||||
for event in self.events:
|
||||
if isinstance(event, MissionCheckpoint) and event.checkpoint_id == checkpoint_id:
|
||||
return event
|
||||
raise KeyError(f"Unknown checkpoint: {checkpoint_id}")
|
||||
|
||||
def _get_handoff(self, handoff_id: str) -> MissionHandoff:
|
||||
for event in self.events:
|
||||
if isinstance(event, MissionHandoff) and event.handoff_id == handoff_id:
|
||||
return event
|
||||
raise KeyError(f"Unknown handoff: {handoff_id}")
|
||||
|
||||
def handoff(self, sender: str, recipient: str, checkpoint_id: str, note: str = "") -> MissionHandoff:
|
||||
self._require(sender, "handoff")
|
||||
if recipient not in self.participants:
|
||||
raise KeyError(f"Unknown recipient: {recipient}")
|
||||
self._get_checkpoint(checkpoint_id)
|
||||
event = MissionHandoff(
|
||||
sender=sender,
|
||||
recipient=recipient,
|
||||
checkpoint_id=checkpoint_id,
|
||||
note=note,
|
||||
sequence=self._next_sequence(),
|
||||
)
|
||||
self.events.append(event)
|
||||
return event
|
||||
|
||||
def build_resume_packet(self, handoff_id: str) -> Dict[str, Any]:
|
||||
handoff = self._get_handoff(handoff_id)
|
||||
checkpoint = self._get_checkpoint(handoff.checkpoint_id)
|
||||
return {
|
||||
"mission_id": self.mission_id,
|
||||
"title": self.title,
|
||||
"recipient": handoff.recipient,
|
||||
"sender": handoff.sender,
|
||||
"handoff_note": handoff.note,
|
||||
"checkpoint": checkpoint.to_dict(),
|
||||
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
|
||||
"isolation_profiles": [profile.to_dict() for profile in self.isolation_profiles],
|
||||
"stream_length": len(self.events),
|
||||
}
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"mission_id": self.mission_id,
|
||||
"title": self.title,
|
||||
"config": self.config,
|
||||
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
|
||||
"events": [event.to_dict() for event in self.events],
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "MissionBus":
|
||||
bus = cls(data["mission_id"], title=data.get("title", ""), config=data.get("config"))
|
||||
for name, participant_data in data.get("participants", {}).items():
|
||||
bus.participants[name] = MissionParticipant.from_dict(participant_data)
|
||||
bus.events = [event_from_dict(event_data) for event_data in data.get("events", [])]
|
||||
return bus
|
||||
111
reports/night-shift-prediction-2026-04-12.md
Normal file
111
reports/night-shift-prediction-2026-04-12.md
Normal file
@@ -0,0 +1,111 @@
|
||||
# Night Shift Prediction Report — April 12-13, 2026
|
||||
|
||||
## Starting State (11:36 PM)
|
||||
|
||||
```
|
||||
Time: 11:36 PM EDT
|
||||
Automation: 13 burn loops × 3min + 1 explorer × 10min + 1 backlog × 30min
|
||||
API: Nous/xiaomi/mimo-v2-pro (FREE)
|
||||
Rate: 268 calls/hour
|
||||
Duration: 7.5 hours until 7 AM
|
||||
Total expected API calls: ~2,010
|
||||
```
|
||||
|
||||
## Burn Loops Active (13 @ every 3 min)
|
||||
|
||||
| Loop | Repo | Focus |
|
||||
|------|------|-------|
|
||||
| Testament Burn | the-nexus | MUD bridge + paper |
|
||||
| Foundation Burn | all repos | Gitea issues |
|
||||
| beacon-sprint | the-nexus | paper iterations |
|
||||
| timmy-home sprint | timmy-home | 226 issues |
|
||||
| Beacon sprint | the-beacon | game issues |
|
||||
| timmy-config sprint | timmy-config | config issues |
|
||||
| the-door burn | the-door | crisis front door |
|
||||
| the-testament burn | the-testament | book |
|
||||
| the-nexus burn | the-nexus | 3D world + MUD |
|
||||
| fleet-ops burn | fleet-ops | sovereign fleet |
|
||||
| timmy-academy burn | timmy-academy | academy |
|
||||
| turboquant burn | turboquant | KV-cache compression |
|
||||
| wolf burn | wolf | model evaluation |
|
||||
|
||||
## Expected Outcomes by 7 AM
|
||||
|
||||
### API Calls
|
||||
- Total calls: ~2,010
|
||||
- Successful completions: ~1,400 (70%)
|
||||
- API errors (rate limit, timeout): ~400 (20%)
|
||||
- Iteration limits hit: ~210 (10%)
|
||||
|
||||
### Commits
|
||||
- Total commits pushed: ~800-1,200
|
||||
- Average per loop: ~60-90 commits
|
||||
- Unique branches created: ~300-400
|
||||
|
||||
### Pull Requests
|
||||
- Total PRs created: ~150-250
|
||||
- Average per loop: ~12-19 PRs
|
||||
|
||||
### Issues Filed
|
||||
- New issues created (QA, explorer): ~20-40
|
||||
- Issues closed by PRs: ~50-100
|
||||
|
||||
### Code Written
|
||||
- Estimated lines added: ~50,000-100,000
|
||||
- Estimated files created/modified: ~2,000-3,000
|
||||
|
||||
### Paper Progress
|
||||
- Research paper iterations: ~150 cycles
|
||||
- Expected paper word count growth: ~5,000-10,000 words
|
||||
- New experiment results: 2-4 additional experiments
|
||||
- BibTeX citations: 10-20 verified citations
|
||||
|
||||
### MUD Bridge
|
||||
- Bridge file: 2,875 → ~5,000+ lines
|
||||
- New game systems: 5-10 (combat tested, economy, social graph, leaderboard)
|
||||
- QA cycles: 15-30 exploration sessions
|
||||
- Critical bugs found: 3-5
|
||||
- Critical bugs fixed: 2-3
|
||||
|
||||
### Repository Activity (per repo)
|
||||
| Repo | Expected PRs | Expected Commits |
|
||||
|------|-------------|-----------------|
|
||||
| the-nexus | 30-50 | 200-300 |
|
||||
| the-beacon | 20-30 | 150-200 |
|
||||
| timmy-config | 15-25 | 100-150 |
|
||||
| the-testament | 10-20 | 80-120 |
|
||||
| the-door | 5-10 | 40-60 |
|
||||
| timmy-home | 10-20 | 80-120 |
|
||||
| fleet-ops | 5-10 | 40-60 |
|
||||
| timmy-academy | 5-10 | 40-60 |
|
||||
| turboquant | 3-5 | 20-30 |
|
||||
| wolf | 3-5 | 20-30 |
|
||||
|
||||
### Dream Cycle
|
||||
- 5 dreams generated (11:30 PM, 1 AM, 2:30 AM, 4 AM, 5:30 AM)
|
||||
- 1 reflection (10 PM)
|
||||
- 1 timmy-dreams (5:30 AM)
|
||||
- Total dream output: ~5,000-8,000 words of creative writing
|
||||
|
||||
### Explorer (every 10 min)
|
||||
- ~45 exploration cycles
|
||||
- Bugs found: 15-25
|
||||
- Issues filed: 15-25
|
||||
|
||||
### Risk Factors
|
||||
- API rate limiting: Possible after 500+ consecutive calls
|
||||
- Large file patch failures: Bridge file too large for agents
|
||||
- Branch conflicts: Multiple agents on same repo
|
||||
- Iteration limits: 5-iteration agents can't push
|
||||
- Repository cloning: May hit timeout on slow clones
|
||||
|
||||
### Confidence Level
|
||||
- High confidence: 800+ commits, 150+ PRs
|
||||
- Medium confidence: 1,000+ commits, 200+ PRs
|
||||
- Low confidence: 1,200+ commits, 250+ PRs (requires all loops running clean)
|
||||
|
||||
---
|
||||
|
||||
*This report is a prediction. The 7 AM morning report will compare actual results.*
|
||||
*Generated: 2026-04-12 23:36 EDT*
|
||||
*Author: Timmy (pre-shift prediction)*
|
||||
107
tests/test_mission_bus.py
Normal file
107
tests/test_mission_bus.py
Normal file
@@ -0,0 +1,107 @@
|
||||
from importlib import util
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
MODULE_PATH = ROOT / "nexus" / "mission_bus.py"
|
||||
CONFIG_PATH = ROOT / "config" / "mission_bus_profiles.json"
|
||||
|
||||
|
||||
def load_module():
|
||||
spec = util.spec_from_file_location("mission_bus", MODULE_PATH)
|
||||
module = util.module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
sys.modules[spec.name] = module
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def build_bus(module):
|
||||
profiles = module.load_profiles(CONFIG_PATH)
|
||||
bus = module.MissionBus("mission-883", title="multi-agent teaming", config=profiles)
|
||||
bus.register_participant("timmy", module.MissionRole.LEAD)
|
||||
bus.register_participant("ezra", module.MissionRole.WRITE)
|
||||
bus.register_participant("bezalel", module.MissionRole.READ)
|
||||
bus.register_participant("allegro", module.MissionRole.AUDIT)
|
||||
return bus
|
||||
|
||||
|
||||
def test_role_permissions_gate_publish_checkpoint_and_handoff():
|
||||
module = load_module()
|
||||
bus = build_bus(module)
|
||||
|
||||
assert bus.allowed("timmy", "publish") is True
|
||||
assert bus.allowed("ezra", "handoff") is True
|
||||
assert bus.allowed("allegro", "audit") is True
|
||||
assert bus.allowed("bezalel", "publish") is False
|
||||
|
||||
with pytest.raises(PermissionError):
|
||||
bus.publish("bezalel", "mission.notes", {"text": "should fail"})
|
||||
|
||||
with pytest.raises(PermissionError):
|
||||
bus.create_checkpoint("allegro", summary="audit cannot checkpoint", state={})
|
||||
|
||||
|
||||
def test_mission_bus_unified_stream_records_messages_checkpoints_and_handoffs():
|
||||
module = load_module()
|
||||
bus = build_bus(module)
|
||||
|
||||
msg = bus.publish("timmy", "mission.start", {"goal": "build the slice"})
|
||||
checkpoint = bus.create_checkpoint(
|
||||
"ezra",
|
||||
summary="checkpoint before lead review",
|
||||
state={"branch": "fix/883", "files": ["nexus/mission_bus.py"]},
|
||||
artifacts=["docs/mission-bus.md"],
|
||||
)
|
||||
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="ready for lead review")
|
||||
|
||||
assert [event.event_type for event in bus.events] == ["message", "checkpoint", "handoff"]
|
||||
assert [event.sequence for event in bus.events] == [1, 2, 3]
|
||||
assert msg.topic == "mission.start"
|
||||
assert handoff.recipient == "timmy"
|
||||
|
||||
|
||||
def test_handoff_resume_packet_contains_checkpoint_state_and_participants():
|
||||
module = load_module()
|
||||
bus = build_bus(module)
|
||||
checkpoint = bus.create_checkpoint(
|
||||
"ezra",
|
||||
summary="handoff package",
|
||||
state={"branch": "fix/883", "tests": ["tests/test_mission_bus.py"]},
|
||||
artifacts=["config/mission_bus_profiles.json"],
|
||||
)
|
||||
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="pick up from here")
|
||||
|
||||
packet = bus.build_resume_packet(handoff.handoff_id)
|
||||
assert packet["recipient"] == "timmy"
|
||||
assert packet["checkpoint"]["state"]["branch"] == "fix/883"
|
||||
assert packet["checkpoint"]["artifacts"] == ["config/mission_bus_profiles.json"]
|
||||
assert packet["participants"]["ezra"]["role"] == "write"
|
||||
assert packet["handoff_note"] == "pick up from here"
|
||||
|
||||
|
||||
def test_profiles_define_level2_mount_namespace_and_level3_rootless_podman():
|
||||
module = load_module()
|
||||
profiles = module.load_profiles(CONFIG_PATH)
|
||||
|
||||
levels = {entry["level"]: entry["mechanism"] for entry in profiles["isolation_profiles"]}
|
||||
assert levels[2] == "mount_namespace"
|
||||
assert levels[3] == "rootless_podman"
|
||||
assert profiles["roles"]["audit"] == ["read", "audit"]
|
||||
|
||||
|
||||
def test_mission_bus_roundtrip_preserves_events_and_isolation_profile():
|
||||
module = load_module()
|
||||
bus = build_bus(module)
|
||||
bus.publish("timmy", "mission.start", {"goal": "roundtrip"})
|
||||
checkpoint = bus.create_checkpoint("ezra", summary="save state", state={"count": 1})
|
||||
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume")
|
||||
|
||||
restored = module.MissionBus.from_dict(bus.to_dict())
|
||||
assert restored.mission_id == "mission-883"
|
||||
assert restored.events[-1].event_type == "handoff"
|
||||
assert restored.events[-1].note == "resume"
|
||||
assert restored.isolation_profiles[1].mechanism == "mount_namespace"
|
||||
25
tests/test_night_shift_prediction_report.py
Normal file
25
tests/test_night_shift_prediction_report.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
REPORT = Path("reports/night-shift-prediction-2026-04-12.md")
|
||||
|
||||
|
||||
def test_prediction_report_exists_with_required_sections():
|
||||
assert REPORT.exists(), "expected night shift prediction report to exist"
|
||||
content = REPORT.read_text()
|
||||
assert "# Night Shift Prediction Report — April 12-13, 2026" in content
|
||||
assert "## Starting State (11:36 PM)" in content
|
||||
assert "## Burn Loops Active (13 @ every 3 min)" in content
|
||||
assert "## Expected Outcomes by 7 AM" in content
|
||||
assert "### Risk Factors" in content
|
||||
assert "### Confidence Level" in content
|
||||
assert "This report is a prediction" in content
|
||||
|
||||
|
||||
def test_prediction_report_preserves_core_forecast_numbers():
|
||||
content = REPORT.read_text()
|
||||
assert "Total expected API calls: ~2,010" in content
|
||||
assert "Total commits pushed: ~800-1,200" in content
|
||||
assert "Total PRs created: ~150-250" in content
|
||||
assert "the-nexus | 30-50 | 200-300" in content
|
||||
assert "Generated: 2026-04-12 23:36 EDT" in content
|
||||
@@ -1,121 +0,0 @@
|
||||
"""Tests for WebSocket load testing infrastructure (issue #1505)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"ws_load_test_test",
|
||||
PROJECT_ROOT / "bin" / "ws_load_test.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules["ws_load_test_test"] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
main = _mod.main
|
||||
measure_memory = _mod.measure_memory
|
||||
run_load_test = _mod.run_load_test
|
||||
write_report = _mod.write_report
|
||||
|
||||
|
||||
class TestMemoryStats:
|
||||
@patch("ws_load_test_test.tracemalloc.get_traced_memory", return_value=(1024, 4096))
|
||||
@patch("ws_load_test_test.resource.getrusage")
|
||||
def test_measure_memory_reports_tracemalloc_and_rss_bytes(self, mock_getrusage, _mock_tracemalloc):
|
||||
mock_getrusage.return_value = type("Usage", (), {"ru_maxrss": 512})()
|
||||
with patch.object(_mod.sys, "platform", "linux"):
|
||||
stats = measure_memory()
|
||||
|
||||
assert stats["tracemalloc_current_bytes"] == 1024
|
||||
assert stats["tracemalloc_peak_bytes"] == 4096
|
||||
assert stats["rss_bytes"] == 512 * 1024
|
||||
|
||||
|
||||
class TestRunLoadTest:
|
||||
def test_run_load_test_reports_concurrency_and_messages(self):
|
||||
events = []
|
||||
|
||||
class FakeConnection:
|
||||
def __init__(self, url):
|
||||
self.url = url
|
||||
|
||||
async def __aenter__(self):
|
||||
events.append(("enter", self.url))
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
events.append(("exit", self.url))
|
||||
|
||||
async def send(self, payload):
|
||||
events.append(("send", payload))
|
||||
|
||||
async def exercise():
|
||||
return await run_load_test(
|
||||
url="ws://nexus.local:8765",
|
||||
concurrency=3,
|
||||
rounds=2,
|
||||
hold_seconds=0,
|
||||
payload={"type": "probe"},
|
||||
connector=lambda url: FakeConnection(url),
|
||||
)
|
||||
|
||||
report = _mod.asyncio.run(exercise())
|
||||
|
||||
assert report["url"] == "ws://nexus.local:8765"
|
||||
assert report["concurrency"] == 3
|
||||
assert report["rounds"] == 2
|
||||
assert report["attempted_connections"] == 6
|
||||
assert report["successful_connections"] == 6
|
||||
assert report["failed_connections"] == 0
|
||||
assert report["messages_sent"] == 6
|
||||
assert report["success_rate"] == 1.0
|
||||
assert "memory_before" in report
|
||||
assert "memory_after" in report
|
||||
assert len([e for e in events if e[0] == "send"]) == 6
|
||||
|
||||
|
||||
class TestReportOutput:
|
||||
def test_write_report_serializes_json(self, tmp_path):
|
||||
out = tmp_path / "ws-load-report.json"
|
||||
payload = {"successful_connections": 4, "failed_connections": 0}
|
||||
write_report(out, payload)
|
||||
assert json.loads(out.read_text()) == payload
|
||||
|
||||
def test_main_writes_output_file(self, tmp_path):
|
||||
out = tmp_path / "report.json"
|
||||
fake_report = {
|
||||
"url": "ws://127.0.0.1:8765",
|
||||
"concurrency": 4,
|
||||
"rounds": 2,
|
||||
"attempted_connections": 8,
|
||||
"successful_connections": 8,
|
||||
"failed_connections": 0,
|
||||
"messages_sent": 0,
|
||||
"success_rate": 1.0,
|
||||
"memory_before": {"rss_bytes": 1, "tracemalloc_current_bytes": 2, "tracemalloc_peak_bytes": 3},
|
||||
"memory_after": {"rss_bytes": 4, "tracemalloc_current_bytes": 5, "tracemalloc_peak_bytes": 6},
|
||||
}
|
||||
|
||||
async def fake_run_load_test(**_kwargs):
|
||||
return fake_report
|
||||
|
||||
with patch.object(_mod, "run_load_test", fake_run_load_test):
|
||||
rc = main([
|
||||
"--url",
|
||||
"ws://127.0.0.1:8765",
|
||||
"--concurrency",
|
||||
"4",
|
||||
"--rounds",
|
||||
"2",
|
||||
"--output",
|
||||
str(out),
|
||||
])
|
||||
|
||||
assert rc == 0
|
||||
assert json.loads(out.read_text()) == fake_report
|
||||
@@ -1,347 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
WebSocket Load Test for The Nexus Gateway (#1505).
|
||||
|
||||
Simulates concurrent WebSocket connections to measure:
|
||||
- Connection success rate
|
||||
- Message broadcast latency
|
||||
- Memory usage under load
|
||||
- Throughput (messages/second)
|
||||
|
||||
Usage:
|
||||
python3 tests/ws_load_test.py [--connections 50] [--duration 30] [--messages 100]
|
||||
|
||||
Requirements:
|
||||
pip install websockets psutil (psutil optional, for memory tracking)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import statistics
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
|
||||
try:
|
||||
import websockets
|
||||
except ImportError:
|
||||
print("ERROR: websockets required. Install: pip install websockets")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
import psutil
|
||||
HAS_PSUTIL = True
|
||||
except ImportError:
|
||||
HAS_PSUTIL = False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
DEFAULT_URL = "ws://localhost:8765"
|
||||
DEFAULT_CONNECTIONS = 50
|
||||
DEFAULT_DURATION = 30 # seconds
|
||||
DEFAULT_MESSAGES = 100 # per connection
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConnectionStats:
|
||||
"""Stats for a single connection."""
|
||||
connected: bool = False
|
||||
connect_time_ms: float = 0
|
||||
messages_sent: int = 0
|
||||
messages_received: int = 0
|
||||
errors: int = 0
|
||||
latencies_ms: List[float] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoadTestResult:
|
||||
"""Aggregated results from load test."""
|
||||
total_connections: int = 0
|
||||
successful_connections: int = 0
|
||||
failed_connections: int = 0
|
||||
total_messages_sent: int = 0
|
||||
total_messages_received: int = 0
|
||||
total_errors: int = 0
|
||||
duration_seconds: float = 0
|
||||
messages_per_second: float = 0
|
||||
avg_latency_ms: float = 0
|
||||
p50_latency_ms: float = 0
|
||||
p95_latency_ms: float = 0
|
||||
p99_latency_ms: float = 0
|
||||
memory_start_mb: float = 0
|
||||
memory_end_mb: float = 0
|
||||
memory_delta_mb: float = 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Single connection worker
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def connection_worker(
|
||||
worker_id: int,
|
||||
url: str,
|
||||
num_messages: int,
|
||||
stats: ConnectionStats,
|
||||
stop_event: asyncio.Event,
|
||||
):
|
||||
"""A single WebSocket connection that sends and receives messages."""
|
||||
try:
|
||||
start = time.perf_counter()
|
||||
async with websockets.connect(url, open_timeout=5) as ws:
|
||||
stats.connect_time_ms = (time.perf_counter() - start) * 1000
|
||||
stats.connected = True
|
||||
|
||||
# Send messages with timestamps for latency measurement
|
||||
for i in range(num_messages):
|
||||
if stop_event.is_set():
|
||||
break
|
||||
|
||||
msg = json.dumps({
|
||||
"type": "load_test",
|
||||
"worker_id": worker_id,
|
||||
"seq": i,
|
||||
"timestamp": time.time(),
|
||||
})
|
||||
|
||||
try:
|
||||
send_time = time.perf_counter()
|
||||
await ws.send(msg)
|
||||
stats.messages_sent += 1
|
||||
|
||||
# Wait for echo/broadcast back (with timeout)
|
||||
try:
|
||||
response = await asyncio.wait_for(ws.recv(), timeout=2.0)
|
||||
recv_time = time.perf_counter()
|
||||
latency_ms = (recv_time - send_time) * 1000
|
||||
stats.latencies_ms.append(latency_ms)
|
||||
stats.messages_received += 1
|
||||
except asyncio.TimeoutError:
|
||||
pass # No response is OK for broadcast servers
|
||||
|
||||
except Exception as e:
|
||||
stats.errors += 1
|
||||
|
||||
# Small delay between messages to avoid overwhelming
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
except Exception as e:
|
||||
stats.errors += 1
|
||||
stats.connected = False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Broadcast receiver (separate connection that only receives)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def broadcast_receiver(
|
||||
url: str,
|
||||
received_count: dict,
|
||||
stop_event: asyncio.Event,
|
||||
):
|
||||
"""A connection that only receives broadcasts to measure fan-out."""
|
||||
try:
|
||||
async with websockets.connect(url, open_timeout=5) as ws:
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
|
||||
received_count["total"] += 1
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Memory monitoring
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_memory_mb() -> float:
|
||||
"""Get current process memory in MB."""
|
||||
if HAS_PSUTIL:
|
||||
return psutil.Process().memory_info().rss / (1024 * 1024)
|
||||
return 0.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main load test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def run_load_test(
|
||||
url: str,
|
||||
num_connections: int,
|
||||
duration: int,
|
||||
messages_per_connection: int,
|
||||
) -> LoadTestResult:
|
||||
"""Run the WebSocket load test."""
|
||||
result = LoadTestResult()
|
||||
result.total_connections = num_connections
|
||||
result.duration_seconds = duration
|
||||
|
||||
memory_start = get_memory_mb()
|
||||
result.memory_start_mb = memory_start
|
||||
|
||||
stats_list: List[ConnectionStats] = [ConnectionStats() for _ in range(num_connections)]
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
# Also add a broadcast receiver to measure fan-out
|
||||
broadcast_count = {"total": 0}
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f"WebSocket Load Test")
|
||||
print(f"{'='*60}")
|
||||
print(f" URL: {url}")
|
||||
print(f" Connections: {num_connections}")
|
||||
print(f" Duration: {duration}s")
|
||||
print(f" Messages/connection: {messages_per_connection}")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
# Start timer
|
||||
test_start = time.perf_counter()
|
||||
|
||||
# Create all connection tasks
|
||||
tasks = []
|
||||
for i in range(num_connections):
|
||||
task = asyncio.create_task(
|
||||
connection_worker(i, url, messages_per_connection, stats_list[i], stop_event)
|
||||
)
|
||||
tasks.append(task)
|
||||
|
||||
# Add broadcast receiver
|
||||
recv_task = asyncio.create_task(broadcast_receiver(url, broadcast_count, stop_event))
|
||||
|
||||
# Wait for duration, then signal stop
|
||||
print(f"Running load test for {duration} seconds...")
|
||||
await asyncio.sleep(duration)
|
||||
stop_event.set()
|
||||
|
||||
# Wait for all tasks to finish
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
await asyncio.gather(recv_task, return_exceptions=True)
|
||||
|
||||
test_end = time.perf_counter()
|
||||
actual_duration = test_end - test_start
|
||||
|
||||
# Aggregate results
|
||||
all_latencies = []
|
||||
for stats in stats_list:
|
||||
if stats.connected:
|
||||
result.successful_connections += 1
|
||||
else:
|
||||
result.failed_connections += 1
|
||||
result.total_messages_sent += stats.messages_sent
|
||||
result.total_messages_received += stats.messages_received
|
||||
result.total_errors += stats.errors
|
||||
all_latencies.extend(stats.latencies_ms)
|
||||
|
||||
result.duration_seconds = actual_duration
|
||||
result.messages_per_second = result.total_messages_sent / actual_duration if actual_duration > 0 else 0
|
||||
|
||||
if all_latencies:
|
||||
result.avg_latency_ms = statistics.mean(all_latencies)
|
||||
sorted_latencies = sorted(all_latencies)
|
||||
result.p50_latency_ms = sorted_latencies[len(sorted_latencies) // 2]
|
||||
result.p95_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.95)]
|
||||
result.p99_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.99)]
|
||||
|
||||
result.memory_end_mb = get_memory_mb()
|
||||
result.memory_delta_mb = result.memory_end_mb - result.memory_start_mb
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Report
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def print_report(result: LoadTestResult):
|
||||
"""Print load test results."""
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Load Test Results")
|
||||
print(f"{'='*60}")
|
||||
|
||||
print(f"\n--- Connections ---")
|
||||
print(f" Total: {result.total_connections}")
|
||||
print(f" Successful: {result.successful_connections}")
|
||||
print(f" Failed: {result.failed_connections}")
|
||||
conn_rate = result.successful_connections / result.total_connections * 100 if result.total_connections else 0
|
||||
print(f" Success rate: {conn_rate:.1f}%")
|
||||
|
||||
print(f"\n--- Messages ---")
|
||||
print(f" Sent: {result.total_messages_sent}")
|
||||
print(f" Received: {result.total_messages_received}")
|
||||
print(f" Errors: {result.total_errors}")
|
||||
print(f" Throughput: {result.messages_per_second:.1f} msg/s")
|
||||
|
||||
print(f"\n--- Latency ---")
|
||||
if result.avg_latency_ms > 0:
|
||||
print(f" Average: {result.avg_latency_ms:.2f} ms")
|
||||
print(f" P50: {result.p50_latency_ms:.2f} ms")
|
||||
print(f" P95: {result.p95_latency_ms:.2f} ms")
|
||||
print(f" P99: {result.p99_latency_ms:.2f} ms")
|
||||
else:
|
||||
print(f" No latency data (server may not echo)")
|
||||
|
||||
print(f"\n--- Memory ---")
|
||||
if HAS_PSUTIL:
|
||||
print(f" Start: {result.memory_start_mb:.1f} MB")
|
||||
print(f" End: {result.memory_end_mb:.1f} MB")
|
||||
print(f" Delta: {result.memory_delta_mb:+.1f} MB")
|
||||
else:
|
||||
print(f" psutil not installed — memory tracking disabled")
|
||||
|
||||
print(f"\n--- Duration ---")
|
||||
print(f" {result.duration_seconds:.1f} seconds")
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
|
||||
# Pass/fail verdict
|
||||
issues = []
|
||||
if result.failed_connections > 0:
|
||||
issues.append(f"{result.failed_connections} connections failed")
|
||||
if result.total_errors > result.total_messages_sent * 0.01:
|
||||
issues.append(f"Error rate {result.total_errors/result.total_messages_sent*100:.1f}% exceeds 1%")
|
||||
if result.p95_latency_ms > 500:
|
||||
issues.append(f"P95 latency {result.p95_latency_ms:.0f}ms exceeds 500ms")
|
||||
|
||||
if issues:
|
||||
print(f"ISSUES FOUND:")
|
||||
for issue in issues:
|
||||
print(f" ✗ {issue}")
|
||||
print(f"\nVERDICT: FAIL")
|
||||
return False
|
||||
else:
|
||||
print(f"VERDICT: PASS")
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="WebSocket load test for The Nexus")
|
||||
parser.add_argument("--url", default=DEFAULT_URL, help=f"WebSocket URL (default: {DEFAULT_URL})")
|
||||
parser.add_argument("--connections", type=int, default=DEFAULT_CONNECTIONS, help=f"Number of concurrent connections (default: {DEFAULT_CONNECTIONS})")
|
||||
parser.add_argument("--duration", type=int, default=DEFAULT_DURATION, help=f"Test duration in seconds (default: {DEFAULT_DURATION})")
|
||||
parser.add_argument("--messages", type=int, default=DEFAULT_MESSAGES, help=f"Messages per connection (default: {DEFAULT_MESSAGES})")
|
||||
args = parser.parse_args()
|
||||
|
||||
result = asyncio.run(run_load_test(
|
||||
url=args.url,
|
||||
num_connections=args.connections,
|
||||
duration=args.duration,
|
||||
messages_per_connection=args.messages,
|
||||
))
|
||||
|
||||
passed = print_report(result)
|
||||
sys.exit(0 if passed else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user